Filter Modules

Expand a stream by combining two or more streams or by combining streams with unfolds.

## Binary Combinators (Linear)

Functions ending in the shape:

`t m a -> t m a -> t m a`.

The functions in this section have a linear or flat n-ary combining characterstics. It means that when combined `n` times (e.g. ```a serial b serial c ...```) the resulting expression will have an `O(n)` complexity (instead O(n^2) for pair wise combinators described in the next section. These functions can be used efficiently with `concatMapWith` et. al. combinators that combine streams in a linear fashion (contrast with `concatPairsWith` which combines streams as a binary tree).

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

````>>> ````import Streamly.Prelude (serial)
````>>> ````stream1 = Stream.fromList [1,2]
````>>> ````stream2 = Stream.fromList [3,4]
````>>> ````Stream.toList \$ stream1 `serial` stream2
```[1,2,3,4]
```

This operation can be used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

````>>> ````import Streamly.Prelude (ahead, SerialT)
````>>> ````stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
````>>> ````stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
````>>> ````Stream.toList \$ stream1 `ahead` stream2 :: IO [Int]
```2 sec
4 sec
[4,2]
```

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

````>>> ````stream3 = Stream.fromEffect (delay 1)
````>>> ````Stream.toList \$ stream1 `ahead` stream2 `ahead` stream3
```1 sec
2 sec
4 sec
[4,2,1]
```

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

````>>> ````Stream.toList \$ Stream.maxThreads 2 \$ stream1 `ahead` stream2 `ahead` stream3
```2 sec
1 sec
4 sec
[4,2,1]
```

Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using `ahead`.

`ahead` can be safely used to fold an infinite lazy container of streams.

Since: 0.3.0 (Streamly)

Since: 0.8.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

````>>> ````import Streamly.Prelude (async)
````>>> ````stream1 = Stream.fromEffect (delay 4)
````>>> ````stream2 = Stream.fromEffect (delay 2)
````>>> ````Stream.toList \$ stream1 `async` stream2
```2 sec
4 sec
[2,4]
```

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

````>>> ````stream3 = Stream.fromEffect (delay 1)
````>>> ````Stream.toList \$ stream1 `async` stream2 `async` stream3
```...
[1,2,4]
```

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

````>>> ````Stream.toList \$ Stream.maxThreads 2 \$ stream1 `async` stream2 `async` stream3
```...
[2,1,4]
```

With a single thread, it becomes serial:

````>>> ````Stream.toList \$ Stream.maxThreads 1 \$ stream1 `async` stream2 `async` stream3
```...
[4,2,1]
```

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

````>>> ````stream1 = Stream.fromListM \$ Prelude.map delay [3,3] -- SerialT IO Int
````>>> ````stream2 = Stream.fromListM \$ Prelude.map delay [1,1] -- SerialT IO Int
````>>> ````Stream.toList \$ stream1 `async` stream2 -- IO [Int]
```...
[1,1,3,3]
```

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

````>>> ````stream3 = Stream.fromListM \$ Prelude.map delay [2,2] -- SerialT IO Int
````>>> ````Stream.toList \$ Stream.maxThreads 2 \$ stream1 `async` stream2 `async` stream3 -- IO [Int]
```...
[1,1,3,2,3,2]
```

Thus `async` goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior, `async` can be safely used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, `wAsync` is the same as `async`. See `async` for singleton stream behavior. For multi-element streams, while `async` is left biased i.e. it tries to evaluate the left side stream as much as possible, `wAsync` tries to schedule them both fairly. In other words, `async` goes deep while `wAsync` goes wide. However, outputs are always used as they arrive.

With a single thread, `async` starts behaving like `serial` while `wAsync` starts behaving like `wSerial`.

````>>> ````import Streamly.Prelude (wAsync)
````>>> ````stream1 = Stream.fromList [1,2,3]
````>>> ````stream2 = Stream.fromList [4,5,6]
````>>> ````Stream.toList \$ Stream.fromAsync \$ Stream.maxThreads 1 \$ stream1 `async` stream2
```[1,2,3,4,5,6]
```
````>>> ````Stream.toList \$ Stream.fromWAsync \$ Stream.maxThreads 1 \$ stream1 `wAsync` stream2
```[1,4,2,5,3,6]
```

With two threads available, and combining three streams:

````>>> ````stream3 = Stream.fromList [7,8,9]
````>>> ````Stream.toList \$ Stream.fromAsync \$ Stream.maxThreads 2 \$ stream1 `async` stream2 `async` stream3
```[1,2,3,4,5,6,7,8,9]
```
````>>> ````Stream.toList \$ Stream.fromWAsync \$ Stream.maxThreads 2 \$ stream1 `wAsync` stream2 `wAsync` stream3
```[1,4,2,7,5,3,8,6,9]
```

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that `WSerialT` and single threaded `WAsyncT` both interleave streams but the exact scheduling is slightly different in both cases.

Since: 0.2.0 (Streamly)

Since: 0.8.0

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like `async` except that the execution is much more strict. There is no limit on the number of threads. While `async` may not schedule a stream if there is no demand from the consumer, `parallel` always evaluates both the streams immediately. The only limit that applies to `parallel` is `maxBuffer`. Evaluation may block if the output buffer becomes full.

````>>> ````import Streamly.Prelude (parallel)
````>>> ````stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
````>>> ````Stream.toList stream -- IO [Int]
```1 sec
2 sec
[1,2]
```

`parallel` guarantees that all the streams are scheduled for execution immediately, therefore, we could use things like starting timers inside the streams and relying on the fact that all timers were started at the same time.

Unlike `async` this operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams strictly concurrently.

Since: 0.2.0 (Streamly)

Since: 0.8.0

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel` but stops the output as soon as the first stream stops.

Pre-release

parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel` but stops the output as soon as any of the two streams stops.

Pre-release

## Binary Combinators (Pair Wise)

Like the functions in the section above these functions also combine two streams into a single stream but when used `n` times linearly they exhibit O(n^2) complexity. They are best combined in a binary tree fashion using `concatPairsWith` giving a `n * log n` complexity. Avoid using these with `concatMapWith` when combining a large or infinite number of streams.

### Append

append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.

IMPORTANT NOTE: This could be 100x faster than `serial/<>` for appending a few (say 100) streams because it can fuse via stream fusion. However, it does not scale for a large number of streams (say 1000s) and becomes qudartically slow. Therefore use this for custom appending of a few streams but use `concatMap` or 'concatMapWith serial' for appending `n` streams or infinite containers of streams.

Pre-release

### wSerial

`wSerial` is a CPS based stream interleaving functions. It can be used with `concatMapWith` as well, however, the interleaving behavior of `n` streams would be asymmetric giving exponentially more weightage to streams that come earlier in the composition.

wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

````>>> ````import Streamly.Prelude (wSerial)
````>>> ````stream1 = Stream.fromList [1,2]
````>>> ````stream2 = Stream.fromList [3,4]
````>>> ````Stream.toList \$ Stream.fromWSerial \$ stream1 `wSerial` stream2
```[1,3,2,4]
```

Note, for singleton streams `wSerial` and `serial` are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wSerialFst :: IsStream t => t m a -> t m a -> t m a Source #

Like `wSerial` but stops interleaving as soon as the first stream stops.

Since: 0.7.0

wSerialMin :: IsStream t => t m a -> t m a -> t m a Source #

Like `wSerial` but stops interleaving as soon as any of the two streams stops.

Since: 0.7.0

### Interleave

`interleave` is like `wSerial` but using a direct style implementation instead of CPS. It is faster than `wSerial` due to stream fusion but has worse efficiency when used with `concatMapWith` for large number of streams.

interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.

````>>> ````:set -XOverloadedStrings
````>>> ````import Data.Functor.Identity (Identity)
````>>> ````Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char
```fromList "a,b,,,"
```
````>>> ````Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char
```fromList "a,b,cd"
```

`interleave` is dual to `interleaveMin`, it can be called `interleaveMax`.

Do not use at scale in concatMapWith.

Pre-release

interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.

````>>> ````:set -XOverloadedStrings
````>>> ````import Data.Functor.Identity (Identity)
````>>> ````Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char
```fromList "a,b,"
`>>> ````Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char
```fromList "a,b,c"
```

`interleaveMin` is dual to `interleave`.

Do not use at scale in concatMapWith.

Pre-release

interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

````>>> ````:set -XOverloadedStrings
````>>> ````import Data.Functor.Identity (Identity)
````>>> ````Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char
```fromList "a,b,c,"
`>>> ````Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char
```fromList "a,bc"
```

`interleaveSuffix` is a dual of `interleaveInfix`.

Do not use at scale in concatMapWith.

Pre-release

interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

````>>> ````:set -XOverloadedStrings
````>>> ````import Data.Functor.Identity (Identity)
````>>> ````Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char
```fromList "a,b,c"
`>>> ````Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char
```fromList "a,bc"
```

`interleaveInfix` is a dual of `interleaveSuffix`.

Do not use at scale in concatMapWith.

Pre-release

### Round Robin

roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Schedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may chose to `Skip` producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to `async`.

Do not use at scale in concatMapWith.

Pre-release

### Zip

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a pure zipping function.

```> S.toList \$ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
[5,7,9]
```

Since: 0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipWith` but using a monadic zipping function.

Since: 0.4.0

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like `zipWith` but zips concurrently i.e. both the streams being zipped are generated concurrently.

Since: 0.1.0

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipWithM` but zips concurrently i.e. both the streams being zipped are generated concurrently.

Since: 0.4.0

### Merge

mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

```>>> Stream.toList \$ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
[1,2,3,4,5,6,8]

```

Since: 0.6.0

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeBy` but with a monadic comparison function.

Merge two streams randomly:

```> randomly _ _ = randomIO >>= x -> return \$ if x then LT else GT
> Stream.toList \$ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]
```

Merge two streams in a proportion of 2:1:

```>>> :{
do
let proportionately m n = do
ref <- newIORef \$ cycle \$ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
return \$ _ _ -> do
writeIORef ref \$ Prelude.tail r
f <- proportionately 2 1
xs <- Stream.toList \$ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2])
print xs
:}
[1,1,2,1,1,2,1,1,2]

```

Since: 0.6.0

mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeBy` but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM` but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

## Combine Streams and Unfolds

Expand a stream by repeatedly using an unfold and merging the resulting streams. Functions generally ending in the shape:

`Unfold m a b -> t m a -> t m b`

### Append Many (Unfold)

Unfold and flatten streams.

unfoldMany :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `concatMap` but uses an `Unfold` for stream generation. Unlike `concatMap` this can fuse the `Unfold` code with the inner loop and therefore provide many times better performance.

Since: 0.8.0

unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `unfoldMany` but interleaves the streams in the same way as `interleave` behaves instead of appending them.

Pre-release

unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `unfoldMany` but executes the streams in the same way as `roundrobin`.

Pre-release

### Interpose

Insert effects between streams. Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.

interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

`unwords = S.interpose ' '`

Pre-release

interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

`unlines = S.interposeSuffix '\n'`

Pre-release

### Intercalate

Insert Streams between Streams. Like unfoldMany but intersperses streams from another source between the streams from the first source.

intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

`intersperse` followed by unfold and concat.

```intercalate unf a str = unfoldMany unf \$ intersperse a str
intersperse = intercalate (Unfold.function id)
unwords = intercalate Unfold.fromList " "```
````>>> ````Stream.toList \$ Stream.intercalate Unfold.fromList " " \$ Stream.fromList ["abc", "def", "ghi"]
```"abc def ghi"
```

Since: 0.8.0

intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

`intersperseSuffix` followed by unfold and concat.

```intercalateSuffix unf a str = unfoldMany unf \$ intersperseSuffix a str
intersperseSuffix = intercalateSuffix (Unfold.function id)
unlines = intercalateSuffix Unfold.fromList "\n"```
````>>> ````Stream.toList \$ Stream.intercalateSuffix Unfold.fromList "\n" \$ Stream.fromList ["abc", "def", "ghi"]
```"abc\ndef\nghi\n"
```

Since: 0.8.0

gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

`interleaveInfix` followed by unfold and concat.

Pre-release

gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

`interleaveSuffix` followed by unfold and concat.

Pre-release

## Append Many (concatMap)

Map and serially append streams. `concatMapM` is a generalization of the binary append operation to append many streams.

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike `concatMap`, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

```concatMap f = `concatMapM` (return . f)
concatMap = `concatMapWith` `serial`
concatMap f = 'concat . map f'
concatMap f = `unfoldMany` (UF.lmap f UF.fromStream)
```

Since: 0.6.0

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

```concatM = concat . fromEffect
concatM = concat . lift    -- requires `(MonadTrans t)`
concatM = join . lift      -- requires `(MonadTrans t`, `Monad (t m))`
```

See also: `concat`, `sequence`

Internal

concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #

Flatten a stream of streams to a single stream.

```concat = concatMap id
```

Pre-release

## Flatten Containers

Flatten `Foldable` containers using the binary stream merging operations.

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of `fold` that allows you to fold a `Foldable` container of streams using the specified stream sum operation.

`concatFoldableWith `async` \$ map return [1..3]`

Equivalent to:

```concatFoldableWith f = Prelude.foldr f S.nil
concatFoldableWith f = S.concatMapFoldableWith f id
```

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of `foldMap` that allows you to map a monadic streaming action on a `Foldable` container and then fold it using the specified stream merge operation.

`concatMapFoldableWith `async` return [1..3]`

Equivalent to:

```concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
```

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like `concatMapFoldableWith` but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

```concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs
concatForFoldableWith = flip S.concatMapFoldableWith
```

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

## ConcatMapWith

Map and flatten a stream like `concatMap` but using a custom binary stream merging combinator instead of just appending the streams. The merging occurs sequentially, it works efficiently for `serial`, `async`, `ahead` like merge operations where we consume one stream before the next or in case of `wAsync` or `parallel` where we consume all streams simultaneously anyway.

However, in cases where the merging consumes streams in a round robin fashion, a pair wise merging using `concatPairsWith` would be more efficient. These cases include operations like `mergeBy` or `zipWith`.

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

`concatMapWith mixer generator stream` is a two dimensional looping combinator. The `generator` function is used to generate streams from the elements in the input `stream` and the `mixer` function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b Source #

Like `concatMapWith` but carries a state which can be used to share information across multiple steps of concat.

```concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial
```

Pre-release

## ConcatPairsWith

See the notes about suitable merge functions in the `concatMapWith` section.

concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

Combine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.

For example, you can sort a stream using merge sort like this:

````>>> ````Stream.toList \$ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure \$ Stream.fromList [5,1,7,9,2]
```[1,2,5,7,9]
```

Caution: the stream of streams must be finite

Pre-release

## IterateMap

Map and flatten Trees of Streams

iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a Source #

Like `iterateM` but iterates after mapping a stream generator on the output.

Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure.

Note that `iterateM` is a special case of `iterateMapWith`:

```iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect
```

It can be used to traverse a tree structure. For example, to list a directory tree:

```Stream.iterateMapWith Stream.serial
(either Dir.toEither (const nil))
(fromPure (Left "tmp"))
```

Pre-release

iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a Source #

Like `iterateMap` but carries a state in the stream generation function. This can be used to traverse graph like structures, we can remember the visited nodes in the state to avoid cycles.

Note that a combination of `iterateMap` and `usingState` can also be used to traverse graphs. However, this function provides a more localized state instead of using a global state.

See also: `mfix`

Pre-release

iterateMapLeftsWith :: IsStream t => (t m (Either a b) -> t m (Either a b) -> t m (Either a b)) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b) Source #

In an `Either` stream iterate on `Left`s. This is a special case of `iterateMapWith`:

```iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))
```

To traverse a directory tree:

```iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))
```

Pre-release

## Deprecated

concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #