Direct style re-implementation of CPS stream in
Streamly.Internal.Data.StreamK. The symbol or suffix `D`

in this
module denotes the Direct style. GHC is able to INLINE and fuse direct
style better, providing better performance than CPS implementation.

import qualified Streamly.Internal.Data.Stream as D

## The stream type

A stream consists of a step function that generates the next step given a current state, and the current state.

##### Instances

## CrossStream type wrapper

data CrossStream m a Source #

A newtype wrapper for the `Stream`

type with a cross product style monad
instance.

A `Monad`

bind behaves like a `for`

loop:

`>>>`

Stream.fold Fold.toList $ Stream.unCross $ do x <- Stream.mkCross $ Stream.fromList [1,2] -- Perform the following actions for each x in the stream return x :} [1,2]`:{`

Nested monad binds behave like nested `for`

loops:

`>>>`

Stream.fold Fold.toList $ Stream.unCross $ do x <- Stream.mkCross $ Stream.fromList [1,2] y <- Stream.mkCross $ Stream.fromList [3,4] -- Perform the following actions for each x, for each y return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]`:{`

##### Instances

unCross :: CrossStream m a -> Stream m a Source #

mkCross :: Stream m a -> CrossStream m a Source #

## Conversion to StreamK

fromStreamK :: Applicative m => StreamK m a -> Stream m a Source #

Convert a CPS encoded StreamK to direct style step encoded StreamD

toStreamK :: Monad m => Stream m a -> StreamK m a Source #

Convert a direct style step encoded StreamD to a CPS encoded StreamK

## From Unfold

unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #

Convert an `Unfold`

into a stream by supplying it an input seed.

`>>>`

`s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")`

`>>>`

hello hello hello`Stream.fold Fold.drain s`

## Construction

### Primitives

nilM :: Applicative m => m b -> Stream m a Source #

A stream that terminates without producing any output, but produces a side effect.

`>>>`

"nil" []`Stream.fold Fold.toList (Stream.nilM (print "nil"))`

*Pre-release*

consM :: Applicative m => m a -> Stream m a -> Stream m a infixr 5 Source #

Like `cons`

but fuses an effect instead of a pure value.

### From Values

fromPure :: Applicative m => a -> Stream m a Source #

Create a singleton stream from a pure value.

`>>>`

`fromPure a = a `Stream.cons` Stream.nil`

`>>>`

`fromPure = pure`

`>>>`

`fromPure = Stream.fromEffect . pure`

fromEffect :: Applicative m => m a -> Stream m a Source #

Create a singleton stream from a monadic action.

`>>>`

`fromEffect m = m `Stream.consM` Stream.nil`

`>>>`

`fromEffect = Stream.sequence . Stream.fromPure`

`>>>`

hello`Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")`

### From Containers

fromList :: Applicative m => [a] -> Stream m a Source #

Construct a stream from a list of pure values.

## Elimination

### Primitives

uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns
`Nothing`

. If the stream is non-empty, returns `Just (a, ma)`

, where `a`

is
the head of the stream and `ma`

its tail.

Properties:

`>>>`

`Nothing <- Stream.uncons Stream.nil`

`>>>`

`Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)`

This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of `uncons`

, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.

`foldBreak`

is a more general way of consuming a stream piecemeal.

`>>>`

uncons xs = do r <- Stream.foldBreak Fold.one xs return $ case r of (Nothing, _) -> Nothing (Just h, t) -> Just (h, t) :}`:{`

### Strict Left Folds

fold :: Monad m => Fold m a b -> Stream m a -> m b Source #

Fold a stream using the supplied left `Fold`

and reducing the resulting
expression strictly at each step. The behavior is similar to `foldl'`

. A
`Fold`

can terminate early without consuming the full stream. See the
documentation of individual `Fold`

s for termination behavior.

Definitions:

`>>>`

`fold f = fmap fst . Stream.foldBreak f`

`>>>`

`fold f = Stream.parse (Parser.fromFold f)`

Example:

`>>>`

5050`Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)`

foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b Source #

Append a stream to a fold lazily to build an accumulator incrementally.

Example, to continue folding a list of streams on the same sum fold:

`>>>`

`streams = [Stream.fromList [1..5], Stream.fromList [6..10]]`

`>>>`

`f = Prelude.foldl Stream.foldAddLazy Fold.sum streams`

`>>>`

55`Stream.fold f Stream.nil`

foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b) Source #

`>>>`

`foldAdd = flip Fold.addStream`

foldEither :: Monad m => Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a)) Source #

Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.

*Internal*

### Lazy Right Folds

foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

Right associative/lazy pull fold. `foldrM build final stream`

constructs
an output structure using the step function `build`

. `build`

is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls `build`

again thus
lazily consuming the input `stream`

until either the output expression built
by `build`

is free of the "tail" or the input is exhausted in which case
`final`

is used as the terminating case for the output structure. For more
details see the description in the previous section.

Example, determine if any element is `odd`

in a stream:

`>>>`

`s = Stream.fromList (2:4:5:undefined)`

`>>>`

`step x xs = if odd x then return True else xs`

`>>>`

True`Stream.foldrM step (return False) s`

foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad `m`

is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.

`>>>`

`foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)`

Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.

foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

### Specific Folds

drain :: Monad m => Stream m a -> m () Source #

Definitions:

`>>>`

`drain = Stream.fold Fold.drain`

`>>>`

`drain = Stream.foldrM (\_ xs -> xs) (return ())`

Run a stream, discarding the results.

toList :: Monad m => Stream m a -> m [a] Source #

Definitions:

`>>>`

`toList = Stream.foldr (:) []`

`>>>`

`toList = Stream.fold Fold.toList`

Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. `Identity`

). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.

*Warning!* working on large lists accumulated as buffers in memory could be
very inefficient, consider using Streamly.Data.Array instead.

Note that this could a bit more efficient compared to ```
Stream.fold
Fold.toList
```

, and it can fuse with pure list consumers.

## Mapping

mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

`>>>`

`mapM f = Stream.sequence . fmap f`

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

`>>>`

`s = Stream.fromList ["a", "b", "c"]`

`>>>`

abc`Stream.fold Fold.drain $ Stream.mapM putStr s`

## Stateful Filters

take :: Applicative m => Int -> Stream m a -> Stream m a Source #

Take first `n`

elements from the stream and discard the rest.

takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

End the stream as soon as the predicate fails on an element.

takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as `takeWhile`

but with a monadic predicate.

## Combining Two Streams

### Zipping

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Like `zipWith`

but using a monadic zipping function.

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.`zipWith`

otherwise.

Stream `a`

is evaluated first, followed by stream `b`

, the resulting
elements `a`

and `b`

are then zipped using the supplied zip function and the
result `c`

is yielded to the consumer.

If stream `a`

or stream `b`

ends, the zipped stream ends. If stream `b`

ends
first, the element `a`

from previous evaluation of stream `a`

is discarded.

`>>>`

`s1 = Stream.fromList [1,2,3]`

`>>>`

`s2 = Stream.fromList [4,5,6]`

`>>>`

[5,7,9]`Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2`

### Cross Product

crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b Source #

Apply a stream of functions to a stream of values and flatten the results.

Note that the second stream is evaluated multiple times.

`>>>`

`crossApply = Stream.crossWith id`

crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

Definition:

`>>>`

`crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2`

Note that the second stream is evaluated multiple times.

cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b) Source #

Given a `Stream m a`

and `Stream m b`

generate a stream with all possible
combinations of the tuple `(a, b)`

.

Definition:

`>>>`

`cross = Stream.crossWith (,)`

The second stream is evaluated multiple times. If that is not desired it can
be cached in an `Array`

and then generated from the array before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.

See `cross`

for a much faster fused
alternative.

Time: O(m x n)

*Pre-release*

## Unfold Many

data ConcatMapUState o i Source #

unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

`unfoldMany unfold stream`

uses `unfold`

to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.

Like `concatMap`

but uses an `Unfold`

for stream generation. Unlike
`concatMap`

this can fuse the `Unfold`

code with the inner loop and
therefore provide many times better performance.

## Concat

concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

`>>>`

`concatMap f = Stream.concatMapM (return . f)`

`>>>`

`concatMap f = Stream.concat . fmap f`

`>>>`

`concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)`

See `unfoldMany`

for a fusible alternative.

concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #

Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike `concatMap`

, it can produce an
effect at the beginning of each iteration of the inner loop.

See `unfoldMany`

for a fusible alternative.

concat :: Monad m => Stream m (Stream m a) -> Stream m a Source #

Flatten a stream of streams to a single stream.

`>>>`

`concat = Stream.concatMap id`

*Pre-release*

## Unfold Iterate

unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #

Same as `concatIterateDfs`

but more efficient due to stream fusion.

Example, list a directory tree using DFS:

`>>>`

`f = Unfold.either Dir.eitherReaderPaths Unfold.nil`

`>>>`

`input = Stream.fromPure (Left ".")`

`>>>`

`ls = Stream.unfoldIterateDfs f input`

*Pre-release*

unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #

Like `unfoldIterateDfs`

but uses breadth first style traversal.

*Pre-release*

unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #

Like `unfoldIterateBfs`

but processes the children in reverse order,
therefore, may be slightly faster.

*Pre-release*

## Concat Iterate

concatIterateScan :: Monad m => (b -> a -> m b) -> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a Source #

Generate a stream from an initial state, scan and concat the stream, generate a stream again from the final state of the previous scan and repeat the process.

concatIterateDfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #

Traverse the stream in depth first style (DFS). Map each element in the input stream to a stream and flatten, recursively map the resulting elements as well to a stream and flatten until no more streams are generated.

Example, list a directory tree using DFS:

`>>>`

`f = either (Just . Dir.readEitherPaths) (const Nothing)`

`>>>`

`input = Stream.fromPure (Left ".")`

`>>>`

`ls = Stream.concatIterateDfs f input`

This is equivalent to using `concatIterateWith StreamK.append`

.

*Pre-release*

concatIterateBfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #

Similar to `concatIterateDfs`

except that it traverses the stream in
breadth first style (BFS). First, all the elements in the input stream are
emitted, and then their traversals are emitted.

Example, list a directory tree using BFS:

`>>>`

`f = either (Just . Dir.readEitherPaths) (const Nothing)`

`>>>`

`input = Stream.fromPure (Left ".")`

`>>>`

`ls = Stream.concatIterateBfs f input`

*Pre-release*

concatIterateBfsRev :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #

Same as `concatIterateBfs`

except that the traversal of the last
element on a level is emitted first and then going backwards up to the first
element (reversed ordering). This may be slightly faster than
`concatIterateBfs`

.

## Fold Many

data FoldMany s fs b a Source #

FoldManyStart s | |

FoldManyFirst fs s | |

FoldManyLoop s fs | |

FoldManyYield b (FoldMany s fs b a) | |

FoldManyDone |

data FoldManyPost s fs b a Source #

FoldManyPostStart s | |

FoldManyPostLoop s fs | |

FoldManyPostYield b (FoldManyPost s fs b a) | |

FoldManyPostDone |

foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Apply a `Fold`

repeatedly on a stream and emit the results in the output
stream.

Definition:

`>>>`

`foldMany f = Stream.parseMany (Parser.fromFold f)`

Example, empty stream:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

`fmany = Stream.fold Fold.toList . Stream.foldMany f`

`>>>`

[]`fmany $ Stream.fromList []`

Example, last fold empty:

`>>>`

[3,7]`fmany $ Stream.fromList [1..4]`

Example, last fold non-empty:

`>>>`

[3,7,5]`fmany $ Stream.fromList [1..5]`

Note that using a closed fold e.g. `Fold.take 0`

, would result in an
infinite stream on a non-empty input stream.

foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Like `foldMany`

but evaluates the fold even if the fold did not receive
any input, therefore, always results in a non-empty output even on an empty
stream (default result of the fold).

Example, empty stream:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

`fmany = Stream.fold Fold.toList . Stream.foldManyPost f`

`>>>`

[0]`fmany $ Stream.fromList []`

Example, last fold empty:

`>>>`

[3,7,0]`fmany $ Stream.fromList [1..4]`

Example, last fold non-empty:

`>>>`

[3,7,5]`fmany $ Stream.fromList [1..5]`

Note that using a closed fold e.g. `Fold.take 0`

, would result in an
infinite stream without consuming the input.

*Pre-release*

groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #

Group the input stream into groups of `n`

elements each and then fold each
group using the provided fold function.

groupsOf n f = foldMany (FL.take n f)

`>>>`

[3,7,11,15,19]`Stream.toList $ Stream.groupsOf 2 Fold.sum (Stream.enumerateFromTo 1 10)`

This can be considered as an n-fold version of `take`

where we apply
`take`

repeatedly on the leftover stream until the stream exhausts.

refoldIterateM :: Monad m => Refold m b a b -> m b -> Stream m a -> Stream m b Source #

Like `foldIterateM`

but using the `Refold`

type instead. This could be
much more efficient due to stream fusion.

*Internal*

## Fold Iterate

reduceIterateBfs :: Monad m => (a -> a -> m a) -> Stream m a -> m (Maybe a) Source #

Binary BFS style reduce, folds a level entirely using the supplied fold function, collecting the outputs as next level of the tree, then repeats the same process on the next level. The last elements of a previously folded level are folded first.

foldIterateBfs :: Fold m a (Either a a) -> Stream m a -> m (Maybe a) Source #

N-Ary BFS style iterative fold, if the input stream finished before the fold then it returns Left otherwise Right. If the fold returns Left we terminate.

*Unimplemented*

## Splitting

indexOnSuffix :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

Like `splitOnSuffix`

but generates a stream of (index, len) tuples marking
the places where the predicate matches in the stream.

*Pre-release*

## Multi-stream folds

eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #

Compare two streams for equality

cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #

Compare two streams lexicographically.

## Deprecated

sliceOnSuffix :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

Deprecated: Please use indexOnSuffix instead.

## Primitives

nil :: Applicative m => Stream m a Source #

A stream that terminates without producing any output or side effect.

`>>>`

[]`Stream.toList Stream.nil`

nilM :: Applicative m => m b -> Stream m a Source #

A stream that terminates without producing any output, but produces a side effect.

`>>>`

"nil" []`Stream.fold Fold.toList (Stream.nilM (print "nil"))`

*Pre-release*

cons :: Applicative m => a -> Stream m a -> Stream m a infixr 5 Source #

WARNING! O(n^2) time complexity wrt number of elements. Use the O(n)
complexity StreamK.`cons`

unless you want to
statically fuse just a few elements.

Fuse a pure value at the head of an existing stream::

`>>>`

`s = 1 `Stream.cons` Stream.fromList [2,3]`

`>>>`

[1,2,3]`Stream.toList s`

Definition:

`>>>`

`cons x xs = return x `Stream.consM` xs`

consM :: Applicative m => m a -> Stream m a -> Stream m a infixr 5 Source #

Like `cons`

but fuses an effect instead of a pure value.

## From `Unfold`

unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #

Convert an `Unfold`

into a stream by supplying it an input seed.

`>>>`

`s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")`

`>>>`

hello hello hello`Stream.fold Fold.drain s`

## Unfolding

unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a Source #

Build a stream by unfolding a *pure* step function `step`

starting from a
seed `s`

. The step function returns the next element in the stream and the
next seed value. When it is done it returns `Nothing`

and the stream ends.
For example,

`>>>`

let f b = if b > 2 then Nothing else Just (b, b + 1) in Stream.toList $ Stream.unfoldr f 0 :} [0,1,2]`:{`

unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a Source #

Build a stream by unfolding a *monadic* step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns `Nothing`

and the stream ends. For
example,

`>>>`

let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in Stream.toList $ Stream.unfoldrM f 0 :} [0,1,2]`:{`

## From Values

fromPure :: Applicative m => a -> Stream m a Source #

Create a singleton stream from a pure value.

`>>>`

`fromPure a = a `Stream.cons` Stream.nil`

`>>>`

`fromPure = pure`

`>>>`

`fromPure = Stream.fromEffect . pure`

fromEffect :: Applicative m => m a -> Stream m a Source #

Create a singleton stream from a monadic action.

`>>>`

`fromEffect m = m `Stream.consM` Stream.nil`

`>>>`

`fromEffect = Stream.sequence . Stream.fromPure`

`>>>`

hello`Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")`

repeat :: Monad m => a -> Stream m a Source #

Generate an infinite stream by repeating a pure value.

`>>>`

`repeat x = Stream.repeatM (pure x)`

repeatM :: Monad m => m a -> Stream m a Source #

`>>>`

`repeatM = Stream.sequence . Stream.repeat`

Generate a stream by repeatedly executing a monadic action forever.

`>>>`

repeatAction = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fold Fold.drain :}`:{`

replicate :: Monad m => Int -> a -> Stream m a Source #

`>>>`

`replicate n = Stream.take n . Stream.repeat`

`>>>`

`replicate n x = Stream.replicateM n (pure x)`

Generate a stream of length `n`

by repeating a value `n`

times.

replicateM :: Monad m => Int -> m a -> Stream m a Source #

`>>>`

`replicateM n = Stream.sequence . Stream.replicate n`

Generate a stream by performing a monadic action `n`

times.

## Enumeration

### Enumerating `Num`

Types

enumerateFromStepNum :: (Monad m, Num a) => a -> a -> Stream m a Source #

For floating point numbers if the increment is less than the precision then it just gets lost. Therefore we cannot always increment it correctly by just repeated addition. 9007199254740992 + 1 + 1 :: Double => 9.007199254740992e15 9007199254740992 + 2 :: Double => 9.007199254740994e15

Instead we accumulate the increment counter and compute the increment every time before adding it to the starting number.

This works for Integrals as well as floating point numbers, but enumerateFromStepIntegral is faster for integrals.

### Enumerating `Bounded`

`Enum`

Types

enumerateTo :: (Monad m, Bounded a, Enumerable a) => a -> Stream m a Source #

enumerateFromBounded :: (Monad m, Enumerable a, Bounded a) => a -> Stream m a Source #

`>>>`

`enumerateFromBounded from = Stream.enumerateFromTo from maxBound`

`enumerateFrom`

for `Bounded`

`Enum`

types.

### Enumerating `Enum`

Types not larger than `Int`

enumerateFromToSmall :: (Monad m, Enum a) => a -> a -> Stream m a Source #

`enumerateFromTo`

for `Enum`

types not larger than `Int`

.

enumerateFromThenToSmall :: (Monad m, Enum a) => a -> a -> a -> Stream m a Source #

`enumerateFromThenTo`

for `Enum`

types not larger than `Int`

.

enumerateFromThenSmallBounded :: (Monad m, Enumerable a, Bounded a) => a -> a -> Stream m a Source #

`enumerateFromThen`

for `Enum`

types not larger than `Int`

.

Note: We convert the `Enum`

to `Int`

and enumerate the `Int`

. If a
type is bounded but does not have a `Bounded`

instance then we can go on
enumerating it beyond the legal values of the type, resulting in the failure
of `toEnum`

when converting back to `Enum`

. Therefore we require a `Bounded`

instance for this function to be safely used.

### Enumerating `Bounded`

`Integral`

Types

enumerateFromThenIntegral :: (Monad m, Integral a, Bounded a) => a -> a -> Stream m a Source #

Enumerate an `Integral`

type in steps. ```
enumerateFromThenIntegral from
then
```

generates a stream whose first element is `from`

, the second element
is `then`

and the successive elements are in increments of `then - from`

.
The stream is bounded by the size of the `Integral`

type.

`>>>`

[0,2,4,6]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) 2`

`>>>`

[0,-2,-4,-6]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) (-2)`

### Enumerating `Integral`

Types

enumerateFromToIntegral :: (Monad m, Integral a) => a -> a -> Stream m a Source #

Enumerate an `Integral`

type up to a given limit.
`enumerateFromToIntegral from to`

generates a finite stream whose first
element is `from`

and successive elements are in increments of `1`

up to
`to`

.

`>>>`

[0,1,2,3,4]`Stream.toList $ Stream.enumerateFromToIntegral 0 4`

enumerateFromThenToIntegral :: (Monad m, Integral a) => a -> a -> a -> Stream m a Source #

Enumerate an `Integral`

type in steps up to a given limit.
`enumerateFromThenToIntegral from then to`

generates a finite stream whose
first element is `from`

, the second element is `then`

and the successive
elements are in increments of `then - from`

up to `to`

.

`>>>`

[0,2,4,6]`Stream.toList $ Stream.enumerateFromThenToIntegral 0 2 6`

`>>>`

[0,-2,-4,-6]`Stream.toList $ Stream.enumerateFromThenToIntegral 0 (-2) (-6)`

### Enumerating unbounded `Integral`

Types

enumerateFromStepIntegral :: (Integral a, Monad m) => a -> a -> Stream m a Source #

`enumerateFromStepIntegral from step`

generates an infinite stream whose
first element is `from`

and the successive elements are in increments of
`step`

.

CAUTION: This function is not safe for finite integral types. It does not check for overflow, underflow or bounds.

`>>>`

[0,2,4,6]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromStepIntegral 0 2`

`>>>`

[0,-2,-4]`Stream.toList $ Stream.take 3 $ Stream.enumerateFromStepIntegral 0 (-2)`

### Enumerating `Fractional`

Types

enumerateFromFractional :: (Monad m, Fractional a) => a -> Stream m a Source #

Numerically stable enumeration from a `Fractional`

number in steps of size
`1`

. `enumerateFromFractional from`

generates a stream whose first element
is `from`

and the successive elements are in increments of `1`

. No overflow
or underflow checks are performed.

This is the equivalent to `enumFrom`

for `Fractional`

types. For example:

`>>>`

[1.1,2.1,3.1,4.1]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromFractional 1.1`

enumerateFromToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> Stream m a Source #

Numerically stable enumeration from a `Fractional`

number to a given
limit. `enumerateFromToFractional from to`

generates a finite stream whose
first element is `from`

and successive elements are in increments of `1`

up
to `to`

.

This is the equivalent of `enumFromTo`

for `Fractional`

types. For
example:

`>>>`

[1.1,2.1,3.1,4.1]`Stream.toList $ Stream.enumerateFromToFractional 1.1 4`

`>>>`

[1.1,2.1,3.1,4.1,5.1]`Stream.toList $ Stream.enumerateFromToFractional 1.1 4.6`

Notice that the last element is equal to the specified `to`

value after
rounding to the nearest integer.

enumerateFromThenFractional :: (Monad m, Fractional a) => a -> a -> Stream m a Source #

Numerically stable enumeration from a `Fractional`

number in steps.
`enumerateFromThenFractional from then`

generates a stream whose first
element is `from`

, the second element is `then`

and the successive elements
are in increments of `then - from`

. No overflow or underflow checks are
performed.

This is the equivalent of `enumFromThen`

for `Fractional`

types. For
example:

`>>>`

[1.1,2.1,3.1,4.1]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 2.1`

`>>>`

[1.1,-2.1,-5.300000000000001,-8.500000000000002]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 (-2.1)`

enumerateFromThenToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> a -> Stream m a Source #

Numerically stable enumeration from a `Fractional`

number in steps up to a
given limit. `enumerateFromThenToFractional from then to`

generates a
finite stream whose first element is `from`

, the second element is `then`

and the successive elements are in increments of `then - from`

up to `to`

.

This is the equivalent of `enumFromThenTo`

for `Fractional`

types. For
example:

`>>>`

[0.1,2.0,3.9,5.799999999999999]`Stream.toList $ Stream.enumerateFromThenToFractional 0.1 2 6`

`>>>`

[0.1,-2.0,-4.1000000000000005,-6.200000000000001]`Stream.toList $ Stream.enumerateFromThenToFractional 0.1 (-2) (-6)`

### Enumerable Type Class

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the `Enum`

type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.

enumerateFrom :: Monad m => a -> Stream m a Source #

`enumerateFrom from`

generates a stream starting with the element
`from`

, enumerating up to `maxBound`

when the type is `Bounded`

or
generating an infinite stream when the type is not `Bounded`

.

`>>>`

[0,1,2,3]`Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)`

For `Fractional`

types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.

`>>>`

[1.1,2.1,3.1,4.1]`Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1`

enumerateFromTo :: Monad m => a -> a -> Stream m a Source #

Generate a finite stream starting with the element `from`

, enumerating
the type up to the value `to`

. If `to`

is smaller than `from`

then an
empty stream is returned.

`>>>`

[0,1,2,3,4]`Stream.toList $ Stream.enumerateFromTo 0 4`

For `Fractional`

types, the last element is equal to the specified `to`

value after rounding to the nearest integral value.

`>>>`

[1.1,2.1,3.1,4.1]`Stream.toList $ Stream.enumerateFromTo 1.1 4`

`>>>`

[1.1,2.1,3.1,4.1,5.1]`Stream.toList $ Stream.enumerateFromTo 1.1 4.6`

enumerateFromThen :: Monad m => a -> a -> Stream m a Source #

`enumerateFromThen from then`

generates a stream whose first element
is `from`

, the second element is `then`

and the successive elements are
in increments of `then - from`

. Enumeration can occur downwards or
upwards depending on whether `then`

comes before or after `from`

. For
`Bounded`

types the stream ends when `maxBound`

is reached, for
unbounded types it keeps enumerating infinitely.

`>>>`

[0,2,4,6]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2`

`>>>`

[0,-2,-4,-6]`Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)`

enumerateFromThenTo :: Monad m => a -> a -> a -> Stream m a Source #

`enumerateFromThenTo from then to`

generates a finite stream whose
first element is `from`

, the second element is `then`

and the successive
elements are in increments of `then - from`

up to `to`

. Enumeration can
occur downwards or upwards depending on whether `then`

comes before or
after `from`

.

`>>>`

[0,2,4,6]`Stream.toList $ Stream.enumerateFromThenTo 0 2 6`

`>>>`

[0,-2,-4,-6]`Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)`

##### Instances

## Time Enumeration

times :: MonadIO m => Stream m (AbsTime, RelTime64) Source #

`times`

returns a stream of time value tuples with clock of 10 ms
granularity. The first component of the tuple is an absolute time reference
(epoch) denoting the start of the stream and the second component is a time
relative to the reference.

`>>>`

`f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)`

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))`Stream.fold f $ Stream.take 3 $ Stream.times`

Note: This API is not safe on 32-bit machines.

*Pre-release*

timesWith :: MonadIO m => Double -> Stream m (AbsTime, RelTime64) Source #

`timesWith g`

returns a stream of time value tuples. The first component
of the tuple is an absolute time reference (epoch) denoting the start of the
stream and the second component is a time relative to the reference.

The argument `g`

specifies the granularity of the relative time in seconds.
A lower granularity clock gives higher precision but is more expensive in
terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

`>>>`

`import Control.Concurrent (threadDelay)`

`>>>`

`f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)`

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))`Stream.fold f $ Stream.take 3 $ Stream.timesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimes :: MonadIO m => Stream m AbsTime Source #

`absTimes`

returns a stream of absolute timestamps using a clock of 10 ms
granularity.

`>>>`

`f = Fold.drainMapM print`

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimesWith :: MonadIO m => Double -> Stream m AbsTime Source #

`absTimesWith g`

returns a stream of absolute timestamps using a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.

`>>>`

`f = Fold.drainMapM print`

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimes :: MonadIO m => Stream m RelTime64 Source #

`relTimes`

returns a stream of relative time values starting from 0,
using a clock of granularity 10 ms.

`>>>`

`f = Fold.drainMapM print`

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimesWith :: MonadIO m => Double -> Stream m RelTime64 Source #

`relTimesWith g`

returns a stream of relative time values starting from 0,
using a clock of granularity `g`

specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.

`>>>`

`f = Fold.drainMapM print`

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

durations :: Double -> t m RelTime64 Source #

`durations g`

returns a stream of relative time values measuring the time
elapsed since the immediate predecessor element of the stream was generated.
The first element of the stream is always 0. `durations`

uses a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
Durations lower than 1 ms will be 0.

Note: This API is not safe on 32-bit machines.

*Unimplemented*

timeout :: AbsTime -> t m () Source #

Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.

*Unimplemented*

## From Generators

Generate a monadic stream from a seed.

## Iteration

iterate :: Monad m => (a -> a) -> a -> Stream m a Source #

Generate an infinite stream with `x`

as the first element and each
successive element derived by applying the function `f`

on the previous
element.

`>>>`

[1,2,3,4,5]`Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1`

iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #

Generate an infinite stream with the first element generated by the action
`m`

and each successive element derived by applying the monadic function `f`

on the previous element.

`>>>`

Stream.iterateM (\x -> print x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.toList :} 0 1 [0,1,2]`:{`

## From Containers

Transform an input structure into a stream.

fromList :: Applicative m => [a] -> Stream m a Source #

Construct a stream from a list of pure values.

fromFoldable :: (Monad m, Foldable f) => f a -> Stream m a Source #

`>>>`

`fromFoldable = Prelude.foldr Stream.cons Stream.nil`

Construct a stream from a `Foldable`

containing pure values:

/WARNING: O(n^2), suitable only for a small number of elements in the stream/

fromFoldableM :: (Monad m, Foldable f) => f (m a) -> Stream m a Source #

`>>>`

`fromFoldableM = Prelude.foldr Stream.consM Stream.nil`

Construct a stream from a `Foldable`

containing pure values:

/WARNING: O(n^2), suitable only for a small number of elements in the stream/

## From Pointers

fromByteStr# :: Monad m => Addr# -> Stream m Word8 Source #

Read bytes from an immutable `Addr#`

until a 0 byte is encountered, the 0
byte is not included in the stream.

`>>>`

`:set -XMagicHash`

`>>>`

`fromByteStr# addr = Stream.takeWhile (/= 0) $ Stream.fromPtr $ Ptr addr`

*Unsafe:* The caller is responsible for safe addressing.

Note that this is completely safe when reading from Haskell string literals because they are guaranteed to be NULL terminated:

`>>>`

[1,2,3]`Stream.toList $ Stream.fromByteStr# "\1\2\3\0"#`

## Conversions

fromStreamK :: Applicative m => StreamK m a -> Stream m a Source #

Convert a CPS encoded StreamK to direct style step encoded StreamD

toStreamK :: Monad m => Stream m a -> StreamK m a Source #

Convert a direct style step encoded StreamD to a CPS encoded StreamK

## Running a `Fold`

fold :: Monad m => Fold m a b -> Stream m a -> m b Source #

Fold a stream using the supplied left `Fold`

and reducing the resulting
expression strictly at each step. The behavior is similar to `foldl'`

. A
`Fold`

can terminate early without consuming the full stream. See the
documentation of individual `Fold`

s for termination behavior.

Definitions:

`>>>`

`fold f = fmap fst . Stream.foldBreak f`

`>>>`

`fold f = Stream.parse (Parser.fromFold f)`

Example:

`>>>`

5050`Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)`

parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #

Parse a stream using the supplied `Parser`

.

Parsers (See Streamly.Internal.Data.Parser) are more powerful folds that add backtracking and error functionality to terminating folds. Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

`>>>`

Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")`Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil`

Note: `parse p`

is not the same as `head . parseMany p`

on an empty stream.

parseD :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #

Run a `Parse`

over a stream.

parseBreak :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b, Stream m a) Source #

Parse a stream using the supplied `Parser`

.

parseBreakD :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b, Stream m a) Source #

Run a `Parse`

over a stream and return rest of the Stream.

## Stream Deconstruction

uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns
`Nothing`

. If the stream is non-empty, returns `Just (a, ma)`

, where `a`

is
the head of the stream and `ma`

its tail.

Properties:

`>>>`

`Nothing <- Stream.uncons Stream.nil`

`>>>`

`Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)`

This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of `uncons`

, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.

`foldBreak`

is a more general way of consuming a stream piecemeal.

`>>>`

uncons xs = do r <- Stream.foldBreak Fold.one xs return $ case r of (Nothing, _) -> Nothing (Just h, t) -> Just (h, t) :}`:{`

## Right Folds

foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

Right associative/lazy pull fold. `foldrM build final stream`

constructs
an output structure using the step function `build`

. `build`

is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls `build`

again thus
lazily consuming the input `stream`

until either the output expression built
by `build`

is free of the "tail" or the input is exhausted in which case
`final`

is used as the terminating case for the output structure. For more
details see the description in the previous section.

Example, determine if any element is `odd`

in a stream:

`>>>`

`s = Stream.fromList (2:4:5:undefined)`

`>>>`

`step x xs = if odd x then return True else xs`

`>>>`

True`Stream.foldrM step (return False) s`

foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad `m`

is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.

`>>>`

`foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)`

Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.

## Left Folds

## Specific Fold Functions

drain :: Monad m => Stream m a -> m () Source #

Definitions:

`>>>`

`drain = Stream.fold Fold.drain`

`>>>`

`drain = Stream.foldrM (\_ xs -> xs) (return ())`

Run a stream, discarding the results.

mapM_ :: Monad m => (a -> m b) -> Stream m a -> m () Source #

Execute a monadic action for each element of the `Stream`

## To containers

toList :: Monad m => Stream m a -> m [a] Source #

Definitions:

`>>>`

`toList = Stream.foldr (:) []`

`>>>`

`toList = Stream.fold Fold.toList`

Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. `Identity`

). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.

*Warning!* working on large lists accumulated as buffers in memory could be
very inefficient, consider using Streamly.Data.Array instead.

Note that this could a bit more efficient compared to ```
Stream.fold
Fold.toList
```

, and it can fuse with pure list consumers.

## Multi-Stream Folds

### Comparisons

These should probably be expressed using zipping operations.

eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #

Compare two streams for equality

cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #

Compare two streams lexicographically.

### Substreams

These should probably be expressed using parsers.

isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns `True`

if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.

`>>>`

True`Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)`

isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a) => Stream m a -> Stream m a -> m Bool Source #

isSuffixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns `True`

if the first stream is a suffix of the second. A stream is
considered a suffix of itself.

`>>>`

True`Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)`

Space: `O(n)`

, buffers entire input stream and the suffix.

*Pre-release*

*Suboptimal* - Help wanted.

isSuffixOfUnbox :: (MonadIO m, Eq a, Unbox a) => Stream m a -> Stream m a -> m Bool Source #

Much faster than `isSuffixOf`

.

isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns `True`

if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.

`>>>`

True`Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: Stream IO Char)`

stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

`stripPrefix prefix input`

strips the `prefix`

stream from the `input`

stream if it is a prefix of input. Returns `Nothing`

if the input does not
start with the given prefix, stripped input otherwise. Returns `Just nil`

when the prefix is the same as the input stream.

Space: `O(1)`

stripSuffix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

Drops the given suffix from a stream. Returns `Nothing`

if the stream does
not end with the given suffix. Returns `Just nil`

when the suffix is the
same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

Space: `O(n)`

, buffers the entire input stream as well as the suffix

*Pre-release*

stripSuffixUnbox :: (MonadIO m, Eq a, Unbox a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

Much faster than `stripSuffix`

.

:: Monad m | |

=> m c | before |

-> (c -> m d) | after, on normal stop |

-> (c -> e -> Stream m b -> m (Stream m b)) | on exception |

-> (forall s. m s -> m (Either e s)) | try (exception handling) |

-> (c -> Stream m b) | stream generator |

-> Stream m b |

Like `gbracket`

but with following differences:

- alloc action
`m c`

runs with async exceptions enabled - cleanup action
`c -> m d`

won't run if the stream is garbage collected after partial evaluation.

*Inhibits stream fusion*

*Pre-release*

:: MonadIO m | |

=> IO c | before |

-> (c -> IO d1) | on normal stop |

-> (c -> e -> Stream m b -> IO (Stream m b)) | on exception |

-> (c -> IO d2) | on GC without normal stop or exception |

-> (forall s. m s -> m (Either e s)) | try (exception handling) |

-> (c -> Stream m b) | stream generator |

-> Stream m b |

Run the alloc action `m c`

with async exceptions disabled but keeping
blocking operations interruptible (see `mask`

). Use the
output `c`

as input to `c -> Stream m b`

to generate an output stream. When
generating the stream use the supplied `try`

operation ```
forall s. m s -> m
(Either e s)
```

to catch synchronous exceptions. If an exception occurs run
the exception handler `c -> e -> Stream m b -> m (Stream m b)`

. Note that
`gbracket`

does not rethrow the exception, it has to be done by the
exception handler if desired.

The cleanup action `c -> m d`

, runs whenever the stream ends normally, due
to a sync or async exception or if it gets garbage collected after a partial
lazy evaluation. See `bracket`

for the semantics of the cleanup action.

`gbracket`

can express all other exception handling combinators.

*Inhibits stream fusion*

*Pre-release*

before :: Monad m => m b -> Stream m a -> Stream m a Source #

Run the action `m b`

before the stream yields its first element.

Same as the following but more efficient due to fusion:

`>>>`

`before action xs = Stream.nilM action <> xs`

`>>>`

`before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)`

afterUnsafe :: Monad m => m b -> Stream m a -> Stream m a Source #

Like `after`

, with following differences:

- action
`m b`

won't run if the stream is garbage collected after partial evaluation. - Monad
`m`

does not require any other constraints. - has slightly better performance than
`after`

.

Same as the following, but with stream fusion:

`>>>`

`afterUnsafe action xs = xs <> Stream.nilM action`

*Pre-release*

afterIO :: MonadIO m => IO b -> Stream m a -> Stream m a Source #

Run the action `IO b`

whenever the stream is evaluated to completion, or
if it is garbage collected after a partial lazy evaluation.

The semantics of the action `IO b`

are similar to the semantics of cleanup
action in `bracketIO`

.

*See also afterUnsafe*

bracketUnsafe :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #

Like `bracket`

but with following differences:

- alloc action
`m b`

runs with async exceptions enabled - cleanup action
`b -> m c`

won't run if the stream is garbage collected after partial evaluation. - has slightly better performance than
`bracketIO`

.

*Inhibits stream fusion*

*Pre-release*

bracketIO3 :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a Source #

Like `bracketIO`

but can use 3 separate cleanup actions depending on the
mode of termination:

- When the stream stops normally
- When the stream is garbage collected
- When the stream encounters an exception

`bracketIO3 before onStop onGC onException action`

runs `action`

using the
result of `before`

. If the stream stops, `onStop`

action is executed, if the
stream is abandoned `onGC`

is executed, if the stream encounters an
exception `onException`

is executed.

The exception is not caught, it is rethrown.

*Inhibits stream fusion*

*Pre-release*

bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #

Run the alloc action `IO b`

with async exceptions disabled but keeping
blocking operations interruptible (see `mask`

). Use the
output `b`

of the IO action as input to the function `b -> Stream m a`

to
generate an output stream.

`b`

is usually a resource under the IO monad, e.g. a file handle, that
requires a cleanup after use. The cleanup action `b -> IO c`

, runs whenever
(1) the stream ends normally, (2) due to a sync or async exception or, (3)
if it gets garbage collected after a partial lazy evaluation. The exception
is not caught, it is rethrown.

`bracketIO`

only guarantees that the cleanup action runs, and it runs with
async exceptions enabled. The action must ensure that it can successfully
cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run. An example where GC based cleanup happens is when a stream is being folded but the fold terminates without draining the entire stream or if the consumer of the stream encounters an exception.

Observes exceptions only in the stream generation, and not in stream consumers.

*See also: bracketUnsafe*

*Inhibits stream fusion*

onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Run the action `m b`

if the stream evaluation is aborted due to an
exception. The exception is not caught, simply rethrown.

Observes exceptions only in the stream generation, and not in stream consumers.

*Inhibits stream fusion*

finallyUnsafe :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Like `finally`

with following differences:

- action
`m b`

won't run if the stream is garbage collected after partial evaluation. - has slightly better performance than
`finallyIO`

.

*Inhibits stream fusion*

*Pre-release*

finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a Source #

Run the action `IO b`

whenever the stream stream stops normally, aborts
due to an exception or if it is garbage collected after a partial lazy
evaluation.

The semantics of running the action `IO b`

are similar to the cleanup action
semantics described in `bracketIO`

.

`>>>`

`finallyIO release = Stream.bracketIO (return ()) (const release)`

*See also finallyUnsafe*

*Inhibits stream fusion*

ghandle :: (MonadCatch m, Exception e) => (e -> Stream m a -> m (Stream m a)) -> Stream m a -> Stream m a Source #

Like `handle`

but the exception handler is also provided with the stream
that generated the exception as input. The exception handler can thus
re-evaluate the stream to retry the action that failed. The exception
handler can again call `ghandle`

on it to retry the action multiple times.

This is highly experimental. In a stream of actions we can map the stream with a retry combinator to retry each action on failure.

*Inhibits stream fusion*

*Pre-release*

handle :: (MonadCatch m, Exception e) => (e -> m (Stream m a)) -> Stream m a -> Stream m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument. The exception is caught and handled unless the handler decides to rethrow it. Note that exception handling is not applied to the stream returned by the exception handler.

Observes exceptions only in the stream generation, and not in stream consumers.

*Inhibits stream fusion*

## Generalize Inner Monad

morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #

Transform the inner monad of a stream using a natural transformation.

Example, generalize the inner monad from Identity to any other:

`>>>`

`generalizeInner = Stream.morphInner (return . runIdentity)`

Also known as hoist.

generalizeInner :: Monad m => Stream Identity a -> Stream m a Source #

Generalize the inner monad of the stream from `Identity`

to any monad.

Definition:

`>>>`

`generalizeInner = Stream.morphInner (return . runIdentity)`

## Transform Inner Monad

liftInnerWith :: Monad (t m) => (forall b. m b -> t m b) -> Stream m a -> Stream (t m) a Source #

Lift the inner monad `m`

of a stream `Stream m a`

to `t m`

using the
supplied lift function.

runInnerWith :: Monad m => (forall b. t m b -> m b) -> Stream (t m) a -> Stream m a Source #

Evaluate the inner monad of a stream using the supplied runner function.

runInnerWithState :: Monad m => (forall b. s -> t m b -> m (b, s)) -> m s -> Stream (t m) a -> Stream m (s, a) Source #

Evaluate the inner monad of a stream using the supplied stateful runner function and the initial state. The state returned by an invocation of the runner is supplied as input state to the next invocation.

foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b Source #

Lazy left fold to a transformer monad.

foldrT :: (Monad m, Monad (t m), MonadTrans t) => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b Source #

Right fold to a transformer monad. This is the most general right fold
function. `foldrS`

is a special case of `foldrT`

, however `foldrS`

implementation can be more efficient:

`>>>`

`foldrS = Stream.foldrT`

`>>>`

`step f x xs = lift $ f x (runIdentityT xs)`

`>>>`

`foldrM f z s = runIdentityT $ Stream.foldrT (step f) (lift z) s`

`foldrT`

can be used to translate streamly streams to other transformer
monads e.g. to a different streaming type.

*Pre-release*

## Transform Inner Monad

liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a Source #

Lift the inner monad `m`

of `Stream m a`

to `t m`

where `t`

is a monad
transformer.

runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a Source #

Evaluate the inner monad of a stream as `ReaderT`

.

usingReaderT :: Monad m => m r -> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a) -> Stream m a -> Stream m a Source #

Run a stream transformation using a given environment.

evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a Source #

Evaluate the inner monad of a stream as `StateT`

.

`>>>`

`evalStateT s = fmap snd . Stream.runStateT s`

runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a) Source #

Evaluate the inner monad of a stream as `StateT`

and emit the resulting
state and value pair after each step.

usingStateT :: Monad m => m s -> (Stream (StateT s m) a -> Stream (StateT s m) a) -> Stream m a -> Stream m a Source #

Run a stateful (StateT) stream transformation using a given state.

`>>>`

`usingStateT s f = Stream.evalStateT s . f . Stream.liftInner`

See also: `scan`

## Generate

Combining streams to generate streams.

### Combine Two Streams

Functions ending in the shape:

`t m a -> t m a -> t m a`

.

#### Appending

Append a stream after another. A special case of concatMap or unfoldMany.

data AppendState s1 s2 Source #

AppendFirst s1 | |

AppendSecond s2 |

append :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.`append`

otherwise.

Fuses two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

`>>>`

`s1 = Stream.fromList [1,2]`

`>>>`

`s2 = Stream.fromList [3,4]`

`>>>`

[1,2,3,4]`Stream.fold Fold.toList $ s1 `Stream.append` s2`

#### Interleaving

Interleave elements from two streams alternately. A special case of unfoldInterleave.

data InterleaveState s1 s2 Source #

InterleaveFirst s1 s2 | |

InterleaveSecond s1 s2 | |

InterleaveSecondOnly s2 | |

InterleaveFirstOnly s1 |

interleave :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.`interleave`

otherwise.

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

interleaveMin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Like `interleave`

but stops interleaving as soon as any of the two streams
stops.

interleaveFst :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,c"`Stream.interleaveFst "abc" ",,,," :: Stream Identity Char`

`>>>`

fromList "a,bc"`Stream.interleaveFst "abc" "," :: Stream Identity Char`

`interleaveFst`

is a dual of `interleaveFstSuffix`

.

Do not use dynamically.

*Pre-release*

interleaveFstSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,c,"`Stream.interleaveFstSuffix "abc" ",,,," :: Stream Identity Char`

`>>>`

fromList "a,bc"`Stream.interleaveFstSuffix "abc" "," :: Stream Identity Char`

`interleaveFstSuffix`

is a dual of `interleaveFst`

.

Do not use dynamically.

*Pre-release*

#### Scheduling

Execute streams alternately irrespective of whether they generate
elements or not. Note `interleave`

would execute a stream until it
yields an element. A special case of unfoldRoundRobin.

roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Schedule the execution of two streams in a fair round-robin manner,
executing each stream once, alternately. Execution of a stream may not
necessarily result in an output, a stream may choose to `Skip`

producing an
element until later giving the other stream a chance to run. Therefore, this
combinator fairly interleaves the execution of two streams rather than
fairly interleaving the output of the two streams. This can be useful in
co-operative multitasking without using explicit threads. This can be used
as an alternative to `async`

.

Do not use dynamically.

*Pre-release*

#### Zipping

Zip corresponding elements of two streams.

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.`zipWith`

otherwise.

Stream `a`

is evaluated first, followed by stream `b`

, the resulting
elements `a`

and `b`

are then zipped using the supplied zip function and the
result `c`

is yielded to the consumer.

If stream `a`

or stream `b`

ends, the zipped stream ends. If stream `b`

ends
first, the element `a`

from previous evaluation of stream `a`

is discarded.

`>>>`

`s1 = Stream.fromList [1,2,3]`

`>>>`

`s2 = Stream.fromList [4,5,6]`

`>>>`

[5,7,9]`Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2`

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Like `zipWith`

but using a monadic zipping function.

#### Merging

Interleave elements from two streams based on a condition.

mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.`mergeBy`

otherwise.

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

`>>>`

`s1 = Stream.fromList [1,3,5]`

`>>>`

`s2 = Stream.fromList [2,4,6,8]`

`>>>`

[1,2,3,4,5,6,8]`Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2`

mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like `mergeBy`

but with a monadic comparison function.

Example, to merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]

Example, merge two streams in a proportion of 2:1:

`>>>`

do let s1 = Stream.fromList [1,1,1,1,1,1] s2 = Stream.fromList [2,2,2] let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ \_ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2 print xs :} [1,1,2,1,1,2,1,1,2]`:{`

mergeMinBy :: (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like `mergeByM`

but stops merging as soon as any of the two streams stops.

*Unimplemented*

mergeFstBy :: (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like `mergeByM`

but stops merging as soon as the first stream stops.

*Unimplemented*

### Combine N Streams

Functions generally ending in these shapes:

concat: f (t m a) -> t m a concatMap: (a -> t m b) -> t m a -> t m b unfoldMany: Unfold m a b -> t m a -> t m b

#### ConcatMap

Generate streams by mapping a stream generator on each element of an input stream, append the resulting streams and flatten.

concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

`>>>`

`concatMap f = Stream.concatMapM (return . f)`

`>>>`

`concatMap f = Stream.concat . fmap f`

`>>>`

`concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)`

See `unfoldMany`

for a fusible alternative.

concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #

Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike `concatMap`

, it can produce an
effect at the beginning of each iteration of the inner loop.

See `unfoldMany`

for a fusible alternative.

#### ConcatUnfold

Generate streams by using an unfold on each element of an input stream, append the resulting streams and flatten. A special case of gintercalate.

unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

`unfoldMany unfold stream`

uses `unfold`

to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.

Like `concatMap`

but uses an `Unfold`

for stream generation. Unlike
`concatMap`

this can fuse the `Unfold`

code with the inner loop and
therefore provide many times better performance.

data ConcatUnfoldInterleaveState o i Source #

ConcatUnfoldInterleaveOuter o [i] | |

ConcatUnfoldInterleaveInner o [i] | |

ConcatUnfoldInterleaveInnerL [i] [i] | |

ConcatUnfoldInterleaveInnerR [i] [i] |

unfoldInterleave :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

This does not pair streams like mergeMapWith, instead, it goes through each stream one by one and yields one element from each stream. After it goes to the last stream it reverses the traversal to come back to the first stream yielding elements from each stream on its way back to the first stream and so on.

`>>>`

`lists = Stream.fromList [[1,1],[2,2],[3,3],[4,4],[5,5]]`

`>>>`

`interleaved = Stream.unfoldInterleave Unfold.fromList lists`

`>>>`

[1,2,3,4,5,5,4,3,2,1]`Stream.fold Fold.toList interleaved`

Note that this is order of magnitude more efficient than "mergeMapWith interleave" because of fusion.

unfoldRoundRobin :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

`unfoldInterleave`

switches to the next stream whenever a value from a
stream is yielded, it does not switch on a `Skip`

. So if a stream keeps
skipping for long time other streams won't get a chance to run.
`unfoldRoundRobin`

switches on Skip as well. So it basically schedules each
stream fairly irrespective of whether it produces a value or not.

#### Interpose

Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.

interpose :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

`>>>`

`unwords = Stream.interpose ' '`

*Pre-release*

interposeSuffix :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

`>>>`

`unlines = Stream.interposeSuffix '\n'`

*Pre-release*

#### Intercalate

Like unfoldMany but intersperses streams from another source between the streams from the first source.

gintercalate :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

`interleaveFst`

followed by unfold and concat.

*Pre-release*

gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

`interleaveFstSuffix`

followed by unfold and concat.

*Pre-release*

intercalate :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

`intersperse`

followed by unfold and concat.

`>>>`

`intercalate u a = Stream.unfoldMany u . Stream.intersperse a`

`>>>`

`intersperse = Stream.intercalate Unfold.identity`

`>>>`

`unwords = Stream.intercalate Unfold.fromList " "`

`>>>`

`input = Stream.fromList ["abc", "def", "ghi"]`

`>>>`

"abc def ghi"`Stream.fold Fold.toList $ Stream.intercalate Unfold.fromList " " input`

intercalateSuffix :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

`intersperseMSuffix`

followed by unfold and concat.

`>>>`

`intercalateSuffix u a = Stream.unfoldMany u . Stream.intersperseMSuffix a`

`>>>`

`intersperseMSuffix = Stream.intercalateSuffix Unfold.identity`

`>>>`

`unlines = Stream.intercalateSuffix Unfold.fromList "\n"`

`>>>`

`input = Stream.fromList ["abc", "def", "ghi"]`

`>>>`

"abc\ndef\nghi\n"`Stream.fold Fold.toList $ Stream.intercalateSuffix Unfold.fromList "\n" input`

## Eliminate

Folding and Parsing chunks of streams to eliminate nested streams. Functions generally ending in these shapes:

f (Fold m a b) -> t m a -> t m b f (Parser a m b) -> t m a -> t m b

### Folding

Apply folds on a stream.

foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Apply a `Fold`

repeatedly on a stream and emit the results in the output
stream.

Definition:

`>>>`

`foldMany f = Stream.parseMany (Parser.fromFold f)`

Example, empty stream:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

`fmany = Stream.fold Fold.toList . Stream.foldMany f`

`>>>`

[]`fmany $ Stream.fromList []`

Example, last fold empty:

`>>>`

[3,7]`fmany $ Stream.fromList [1..4]`

Example, last fold non-empty:

`>>>`

[3,7,5]`fmany $ Stream.fromList [1..5]`

Note that using a closed fold e.g. `Fold.take 0`

, would result in an
infinite stream on a non-empty input stream.

foldSequence :: Stream m (Fold m a b) -> Stream m a -> Stream m b Source #

Apply a stream of folds to an input stream and emit the results in the output stream.

*Unimplemented*

foldIterateM :: Monad m => (b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b Source #

Iterate a fold generator on a stream. The initial value `b`

is used to
generate the first fold, the fold is applied on the stream and the result of
the fold is used to generate the next fold and so on.

`>>>`

`import Data.Monoid (Sum(..))`

`>>>`

`f x = return (Fold.take 2 (Fold.sconcat x))`

`>>>`

`s = fmap Sum $ Stream.fromList [1..10]`

`>>>`

[3,10,21,36,55,55]`Stream.fold Fold.toList $ fmap getSum $ Stream.foldIterateM f (pure 0) s`

This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.

*Pre-release*

refoldIterateM :: Monad m => Refold m b a b -> m b -> Stream m a -> Stream m b Source #

Like `foldIterateM`

but using the `Refold`

type instead. This could be
much more efficient due to stream fusion.

*Internal*

### Parsing

Parsing is opposite to flattening. `parseMany`

is dual to concatMap or
unfoldMany. concatMap generates a stream from single values in a
stream and flattens, parseMany does the opposite of flattening by
splitting the stream and then folds each such split to single value in
the output stream.

parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

Apply a `Parser`

repeatedly on a stream and emit the parsed values in the
output stream.

Example:

`>>>`

`s = Stream.fromList [1..10]`

`>>>`

`parser = Parser.takeBetween 0 2 Fold.sum`

`>>>`

[Right 3,Right 7,Right 11,Right 15,Right 19]`Stream.fold Fold.toList $ Stream.parseMany parser s`

This is the streaming equivalent of the `many`

parse
combinator.

Known Issues: When the parser fails there is no way to get the remaining stream.

parseManyD :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

parseSequence :: Stream m (Parser a m b) -> Stream m a -> Stream m b Source #

Apply a stream of parsers to an input stream and emit the results in the output stream.

*Unimplemented*

parseManyTill :: Parser a m b -> Parser a m x -> Stream m a -> Stream m b Source #

`parseManyTill collect test stream`

tries the parser `test`

on the input,
if `test`

fails it backtracks and tries `collect`

, after `collect`

succeeds
`test`

is tried again and so on. The parser stops when `test`

succeeds. The
output of `test`

is discarded and the output of `collect`

is emitted in the
output stream. The parser fails if `collect`

fails.

*Unimplemented*

parseIterate :: Monad m => (b -> Parser a m b) -> b -> Stream m a -> Stream m (Either ParseError b) Source #

Iterate a parser generating function on a stream. The initial value `b`

is
used to generate the first parser, the parser is applied on the stream and
the result is used to generate the next parser and so on.

`>>>`

`import Data.Monoid (Sum(..))`

`>>>`

`s = Stream.fromList [1..10]`

`>>>`

[3,10,21,36,55,55]`Stream.fold Fold.toList $ fmap getSum $ Stream.catRights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum s`

This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.

*Pre-release*

parseIterateD :: Monad m => (b -> Parser a m b) -> b -> Stream m a -> Stream m (Either ParseError b) Source #

### Grouping

Group segments of a stream and fold. Special case of parsing.

groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #

Group the input stream into groups of `n`

elements each and then fold each
group using the provided fold function.

groupsOf n f = foldMany (FL.take n f)

`>>>`

[3,7,11,15,19]`Stream.toList $ Stream.groupsOf 2 Fold.sum (Stream.enumerateFromTo 1 10)`

This can be considered as an n-fold version of `take`

where we apply
`take`

repeatedly on the leftover stream until the stream exhausts.

groupsBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use groupsWhile instead. Please note the change in the argument order of the comparison function.

groupsWhile :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

The argument order of the comparison function in `groupsWhile`

is
different than that of `groupsBy`

.

In `groupsBy`

the comparison function takes the next element as the first
argument and the previous element as the second argument. In `groupsWhile`

the first argument is the previous element and second argument is the next
element.

### Splitting

A special case of parsing.

wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split the stream after stripping leading, trailing, and repeated separators
as per the fold supplied.
Therefore, `".a..b."`

with `.`

as the separator would be parsed as
`["a","b"]`

. In other words, its like parsing words from whitespace
separated text.

splitOnSeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

splitOnSuffixSeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b Source #

splitOnSuffixSeqAny :: [Array a] -> Fold m a b -> Stream m a -> Stream m b Source #

Split post any one of the given patterns.

*Unimplemented*

splitOnPrefix :: (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split on a prefixed separator element, dropping the separator. The
supplied `Fold`

is applied on the split segments.

```
> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (==
````.`

) ".a.b"
["a","b"]

An empty stream results in an empty output stream:
```
> splitOnPrefix' (==
```

`.`

) ""
[]

An empty segment consisting of only a prefix is folded to the default output of the fold:

> splitOnPrefix' (==`.`

) "." [""] > splitOnPrefix' (==`.`

) ".a.b." ["a","b",""] > splitOnPrefix' (==`.`

) ".a..b" ["a","","b"]

A prefix is optional at the beginning of the stream:

> splitOnPrefix' (==`.`

) "a" ["a"] > splitOnPrefix' (==`.`

) "a.b" ["a","b"]

`splitOnPrefix`

is an inverse of `intercalatePrefix`

with a single element:

Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id

*Unimplemented*

splitOnAny :: [Array a] -> Fold m a b -> Stream m a -> Stream m b Source #

Split on any one of the given patterns.

*Unimplemented*

## Transform (Nested Containers)

Opposite to compact in ArrayStream

splitInnerBy :: Monad m => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

splitInnerBySuffix :: Monad m => (f a -> Bool) -> (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

intersectBySorted :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

## Reduce By Streams

dropPrefix :: Stream m a -> Stream m a -> Stream m a Source #

Drop prefix from the input stream if present.

Space: `O(1)`

*Unimplemented*

dropInfix :: Stream m a -> Stream m a -> Stream m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: `O(n)`

where n is the length of the infix.

*Unimplemented*

dropSuffix :: Stream m a -> Stream m a -> Stream m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: `O(n)`

where n is the length of the suffix.

*Unimplemented*

## Piping

Pass through a `Pipe`

.

transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b Source #

Use a `Pipe`

to transform a stream.

*Pre-release*

## Mapping

Stateless one-to-one maps.

mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

`>>>`

`mapM f = Stream.sequence . fmap f`

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

`>>>`

`s = Stream.fromList ["a", "b", "c"]`

`>>>`

abc`Stream.fold Fold.drain $ Stream.mapM putStr s`

sequence :: Monad m => Stream m (m a) -> Stream m a Source #

`>>>`

`sequence = Stream.mapM id`

Replace the elements of a stream of monadic actions with the outputs of those actions.

`>>>`

`s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]`

`>>>`

abc`Stream.fold Fold.drain $ Stream.sequence s`

## Mapping Effects

tap :: Monad m => Fold m a b -> Stream m a -> Stream m a Source #

Tap the data flowing through a stream into a `Fold`

. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.

Fold m a b | -----stream m a ---------------stream m a-----

`>>>`

`s = Stream.enumerateFromTo 1 2`

`>>>`

1 2`Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s`

Compare with `trace`

.

trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

`>>>`

`s = Stream.enumerateFromTo 1 2`

`>>>`

1 2`Stream.fold Fold.drain $ Stream.trace print s`

Compare with `tap`

.

trace_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Perform a side effect before yielding each element of the stream and discard the results.

`>>>`

`s = Stream.enumerateFromTo 1 2`

`>>>`

"got here" "got here"`Stream.fold Fold.drain $ Stream.trace_ (print "got here") s`

Same as `intersperseMPrefix_`

but always serial.

See also: `trace`

*Pre-release*

## Folding

foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

## Scanning By `Fold`

postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

`>>>`

`import Data.Maybe (fromJust)`

`>>>`

`let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)`

`>>>`

`s = Stream.enumerateFromTo 1.0 100.0`

`>>>`

Stream.fold Fold.toList $ fmap (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.latest avg) s :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]`:{`

scan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Strict left scan. Scan a stream using the given monadic fold.

`>>>`

`s = Stream.fromList [1..10]`

`>>>`

[0,1,3,6]`Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum s`

See also: `usingStateT`

scanMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Like `scan`

but restarts scanning afresh when the scanning fold
terminates.

## Splitting

splitOn :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split on an infixed separator element, dropping the separator. The
supplied `Fold`

is applied on the split segments. Splits the stream on
separator elements determined by the supplied predicate, separator is
considered as infixed between two segments:

`>>>`

`splitOn' p xs = Stream.fold Fold.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)`

`>>>`

["a","b"]`splitOn' (== '.') "a.b"`

An empty stream is folded to the default value of the fold:

`>>>`

[""]`splitOn' (== '.') ""`

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

`>>>`

["",""]`splitOn' (== '.') "."`

`>>>`

["","a"]`splitOn' (== '.') ".a"`

`>>>`

["a",""]`splitOn' (== '.') "a."`

`>>>`

["a","","b"]`splitOn' (== '.') "a..b"`

splitOn is an inverse of intercalating single element:

Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id

## Scanning

Left scans. Stateful, mostly one-to-one maps.

scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

Like `scanl'`

but with a monadic step function and a monadic seed.

scanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b Source #

`scanlMAfter' accumulate initial done stream`

is like `scanlM'`

except
that it provides an additional `done`

function to be applied on the
accumulator when the stream stops. The result of `done`

is also emitted in
the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

*Pre-release*

scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

Strict left scan. Like `map`

, `scanl'`

too is a one to one transformation,
however it adds an extra element.

`>>>`

[0,1,3,6,10]`Stream.toList $ Stream.scanl' (+) 0 $ Stream.fromList [1,2,3,4]`

`>>>`

[[],[1],[2,1],[3,2,1],[4,3,2,1]]`Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]`

The output of `scanl'`

is the initial value of the accumulator followed by
all the intermediate steps and the final result of `foldl'`

.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product
of the elements in a stream in one go using a `foldl'`

:

`>>>`

(10,24)`Stream.fold (Fold.foldl' (\(s, p) x -> (s + x, p * x)) (0,1)) $ Stream.fromList [1,2,3,4]`

Using `scanl'`

we can make it modular by computing the sum in the first
stage and passing it down to the next stage for computing the product:

`>>>`

Stream.fold (Fold.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1)) $ Stream.scanl' (\(s, _) x -> (s + x, x)) (0,1) $ Stream.fromList [1,2,3,4] :} (10,24)`:{`

IMPORTANT: `scanl'`

evaluates the accumulator to WHNF. To avoid building
lazy expressions inside the accumulator, it is recommended that a strict
data structure is used for accumulator.

`>>>`

`scanl' step z = Stream.scan (Fold.foldl' step z)`

`>>>`

`scanl' f z xs = Stream.scanlM' (\a b -> return (f a b)) (return z) xs`

See also: `usingStateT`

scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a Source #

Like `scanl1'`

but with a monadic step function.

scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a Source #

Like `scanl'`

but for a non-empty stream. The first element of the stream
is used as the initial value of the accumulator. Does nothing if the stream
is empty.

`>>>`

[1,3,6,10]`Stream.toList $ Stream.scanl1' (+) $ Stream.fromList [1,2,3,4]`

postscanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b Source #

postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #

## Filtering

Produce a subset of the stream.

with :: Monad m => (Stream m a -> Stream m (s, a)) -> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)) -> ((s, a) -> b) -> Stream m a -> Stream m a Source #

Modify a `Stream m a -> Stream m a`

stream transformation that accepts a
predicate `(a -> b)`

to accept `((s, a) -> b)`

instead, provided a
transformation `Stream m a -> Stream m (s, a)`

. Convenient to filter with
index or time.

`>>>`

`filterWithIndex = Stream.with Stream.indexed Stream.filter`

*Pre-release*

scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b Source #

Use a filtering fold on a stream.

`>>>`

`scanMaybe f = Stream.catMaybes . Stream.postscan f`

filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

Include only those elements that pass a predicate.

`>>>`

`filter p = Stream.filterM (return . p)`

`>>>`

`filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)`

`>>>`

`filter p = Stream.scanMaybe (Fold.filtering p)`

filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as `filter`

but with a monadic predicate.

`>>>`

`f p x = p x >>= \r -> return $ if r then Just x else Nothing`

`>>>`

`filterM p = Stream.mapMaybeM (f p)`

deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

`>>>`

`input = Stream.fromList [1,3,3,5]`

`>>>`

[1,3,5]`Stream.fold Fold.toList $ Stream.deleteBy (==) 3 input`

uniqBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a Source #

Drop repeated elements that are adjacent to each other using the supplied comparison function.

`>>>`

`uniq = Stream.uniqBy (==)`

To strip duplicate path separators:

`>>>`

`input = Stream.fromList "//a//b"`

`>>>`

`f x y = x == '/' && y == '/'`

`>>>`

"/a/b"`Stream.fold Fold.toList $ Stream.uniqBy f input`

Space: `O(1)`

*Pre-release*

uniq :: (Eq a, Monad m) => Stream m a -> Stream m a Source #

Drop repeated elements that are adjacent to each other.

`>>>`

`uniq = Stream.uniqBy (==)`

prune :: (a -> Bool) -> Stream m a -> Stream m a Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

> prune p = Stream.dropWhileAround p $ Stream.uniqBy (x y -> p x && p y)

> Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!"

Space: `O(1)`

*Unimplemented*

## Trimming

Produce a subset of the stream trimmed at ends.

take :: Applicative m => Int -> Stream m a -> Stream m a Source #

Take first `n`

elements from the stream and discard the rest.

takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

End the stream as soon as the predicate fails on an element.

takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as `takeWhile`

but with a monadic predicate.

takeWhileLast :: (a -> Bool) -> Stream m a -> Stream m a Source #

Take all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeWhileAround :: (a -> Bool) -> Stream m a -> Stream m a Source #

Like `takeWhile`

and `takeWhileLast`

combined.

O(n) space, where n is the number elements taken from the end.

*Unimplemented*

drop :: Monad m => Int -> Stream m a -> Stream m a Source #

Discard first `n`

elements from the stream and take the rest.

dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as `dropWhile`

but with a monadic predicate.

dropLast :: Int -> Stream m a -> Stream m a Source #

Drop `n`

elements at the end of the stream.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropWhileLast :: (a -> Bool) -> Stream m a -> Stream m a Source #

Drop all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropWhileAround :: (a -> Bool) -> Stream m a -> Stream m a Source #

Like `dropWhile`

and `dropWhileLast`

combined.

O(n) space, where n is the number elements dropped from the end.

*Unimplemented*

## Inserting Elements

Produce a superset of the stream.

insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #

`insertBy cmp elem stream`

inserts `elem`

before the first element in
`stream`

that is less than `elem`

when compared using `cmp`

.

`>>>`

`insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)`

`>>>`

`input = Stream.fromList [1,3,5]`

`>>>`

[1,2,3,5]`Stream.fold Fold.toList $ Stream.insertBy compare 2 input`

intersperse :: Monad m => a -> Stream m a -> Stream m a Source #

Insert a pure value between successive elements of a stream.

`>>>`

`input = Stream.fromList "hello"`

`>>>`

"h,e,l,l,o"`Stream.fold Fold.toList $ Stream.intersperse ',' input`

intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

`>>>`

`input = Stream.fromList "hello"`

`>>>`

h.,e.,l.,l.,o"h,e,l,l,o"`Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') input`

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

`>>>`

he.l.l.o."h,e,l,l,o"`Stream.fold Fold.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar input`

intersperseMWith :: Int -> m a -> Stream m a -> Stream m a Source #

Intersperse a monadic action into the input stream after every `n`

elements.

> input = Stream.fromList "hello" > Stream.fold Fold.toList $ Stream.intersperseMWith 2 (return ',') input

"he,ll,o"

*Unimplemented*

intersperseMSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a Source #

Insert an effect and its output after consuming an element of a stream.

`>>>`

`input = Stream.fromList "hello"`

`>>>`

h.,e.,l.,l.,o.,"h,e,l,l,o,"`Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseMSuffix (putChar '.' >> return ',') input`

*Pre-release*

intersperseMSuffixWith :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a Source #

Like `intersperseMSuffix`

but intersperses an effectful action into the
input stream after every `n`

elements and after the last element.

`>>>`

`input = Stream.fromList "hello"`

`>>>`

"he,ll,o,"`Stream.fold Fold.toList $ Stream.intersperseMSuffixWith 2 (return ',') input`

*Pre-release*

## Inserting Side Effects

intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Insert a side effect before consuming an element of a stream except the first one.

`>>>`

`input = Stream.fromList "hello"`

`>>>`

h.e.l.l.o`Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') input`

*Pre-release*

intersperseMSuffix_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Insert a side effect after consuming an element of a stream.

`>>>`

`input = Stream.fromList "hello"`

`>>>`

"hello"`Stream.fold Fold.toList $ Stream.intersperseMSuffix_ (threadDelay 1000000) input`

*Pre-release*

intersperseMPrefix_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Insert a side effect before consuming an element of a stream.

Definition:

`>>>`

`intersperseMPrefix_ m = Stream.mapM (\x -> void m >> return x)`

`>>>`

`input = Stream.fromList "hello"`

`>>>`

.h.e.l.l.o"hello"`Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') input`

Same as `trace_`

.

*Pre-release*

delay :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds between elements of the stream.

Definition:

`>>>`

`sleep n = liftIO $ threadDelay $ round $ n * 1000000`

`>>>`

`delay = Stream.intersperseM_ . sleep`

Example:

`>>>`

`input = Stream.enumerateFromTo 1 3`

`>>>`

1 2 3`Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input`

delayPre :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds before consuming an element of a stream.

Definition:

`>>>`

`sleep n = liftIO $ threadDelay $ round $ n * 1000000`

`>>>`

`delayPre = Stream.intersperseMPrefix_. sleep`

Example:

`>>>`

`input = Stream.enumerateFromTo 1 3`

`>>>`

1 2 3`Stream.fold (Fold.drainMapM print) $ Stream.delayPre 1 input`

*Pre-release*

delayPost :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds after consuming an element of a stream.

Definition:

`>>>`

`sleep n = liftIO $ threadDelay $ round $ n * 1000000`

`>>>`

`delayPost = Stream.intersperseMSuffix_ . sleep`

Example:

`>>>`

`input = Stream.enumerateFromTo 1 3`

`>>>`

1 2 3`Stream.fold (Fold.drainMapM print) $ Stream.delayPost 1 input`

*Pre-release*

## Reordering

Produce strictly the same set but reordered.

reverse :: Monad m => Stream m a -> Stream m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

Definition:

`>>>`

`reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList`

reassembleBy :: Fold m a b -> (a -> a -> Int) -> Stream m a -> Stream m b Source #

Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.

*Unimplemented*

## Position Indexing

indexed :: Monad m => Stream m a -> Stream m (Int, a) Source #

`>>>`

`f = Fold.foldl' (\(i, _) x -> (i + 1, x)) (-1,undefined)`

`>>>`

`indexed = Stream.postscan f`

`>>>`

`indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)`

`>>>`

`indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed`

Pair each element in a stream with its index, starting from index 0.

`>>>`

[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]`Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"`

indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a) Source #

`>>>`

`f n = Fold.foldl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)`

`>>>`

`indexedR n = Stream.postscan (f n)`

`>>>`

`s n = Stream.enumerateFromThen n (n - 1)`

`>>>`

`indexedR n = Stream.zipWith (,) (s n)`

Pair each element in a stream with its index, starting from the
given index `n`

and counting down.

`>>>`

[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]`Stream.fold Fold.toList $ Stream.indexedR 10 $ Stream.fromList "hello"`

## Time Indexing

timestampWith :: MonadIO m => Double -> Stream m a -> Stream m (AbsTime, a) Source #

Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.

`>>>`

[(AbsTime (TimeSpec {sec = ..., nsec = ...}),1),(AbsTime (TimeSpec {sec = ..., nsec = ...}),2),(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)]`Stream.fold Fold.toList $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

timeIndexWith :: MonadIO m => Double -> Stream m a -> Stream m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.

`>>>`

[(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]`Stream.fold Fold.toList $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

timeIndexed :: MonadIO m => Stream m a -> Stream m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.

`>>>`

[(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]`Stream.fold Fold.toList $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

## Searching

findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

`>>>`

`findIndices p = Stream.scanMaybe (Fold.findIndices p)`

elemIndices :: (Monad m, Eq a) => a -> Stream m a -> Stream m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

`>>>`

`elemIndices a = Stream.findIndices (== a)`

## Rolling map

Map using the previous element.

rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b Source #

Apply a function on every two successive elements of a stream. The first
argument of the map function is the previous element and the second argument
is the current element. When the current element is the first element, the
previous element is `Nothing`

.

*Pre-release*

rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b Source #

Like `rollingMap`

but with an effectful map function.

*Pre-release*

rollingMap2 :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b Source #

Like `rollingMap`

but requires at least two elements in the stream,
returns an empty stream otherwise.

This is the stream equivalent of the list idiom `zipWith f xs (tail xs)`

.

*Pre-release*

## Maybe Streams

mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #

Like `mapMaybe`

but maps a monadic function.

Equivalent to:

`>>>`

`mapMaybeM f = Stream.catMaybes . Stream.mapM f`

`>>>`

`mapM f = Stream.mapMaybeM (\x -> Just <$> f x)`

## Either Streams

catEithers :: Monad m => Stream m (Either a a) -> Stream m a Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

`>>>`

`catEithers = fmap (either id id)`

*Pre-release*

## Transformation

### Sampling

Value agnostic filtering.

strideFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a Source #

`strideFromthen offset stride`

takes the element at `offset`

index and
then every element at strides of `stride`

.

`>>>`

[2,5,8]`Stream.fold Fold.toList $ Stream.strideFromThen 2 3 $ Stream.enumerateFromTo 0 10`

## Nesting

### Set like operations

These are not exactly set operations because streams are not necessarily sets, they may have duplicated elements. These operations are generic i.e. they work on streams of unconstrained types, therefore, they have quadratic performance characterstics. For better performance using Set structures see the Streamly.Internal.Data.Stream.Container module.

filterInStreamGenericBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #

`filterInStreamGenericBy`

retains only those elements in the second stream that
are present in the first stream.

`>>>`

[2,1,1]`Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])`

`>>>`

[1,2,2]`Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])`

Similar to the list intersectBy operation but with the stream argument order flipped.

The first stream must be finite and must not block. Second stream is processed only after the first stream is fully realized.

Space: O(n) where `n`

is the number of elements in the second stream.

Time: O(m x n) where `m`

is the number of elements in the first stream and
`n`

is the number of elements in the second stream.

*Pre-release*

deleteInStreamGenericBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #

Delete all elements of the first stream from the seconds stream. If an element occurs multiple times in the first stream as many occurrences of it are deleted from the second stream.

`>>>`

[2]`Stream.fold Fold.toList $ Stream.deleteInStreamGenericBy (==) (Stream.fromList [1,2,3]) (Stream.fromList [1,2,2])`

The following laws hold:

deleteInStreamGenericBy (==) s1 (s1 `append` s2) === s2 deleteInStreamGenericBy (==) s1 (s1 `interleave` s2) === s2

Same as the list `//`

operation but with argument order flipped.

The first stream must be finite and must not block. Second stream is processed only after the first stream is fully realized.

Space: O(m) where `m`

is the number of elements in the first stream.

Time: O(m x n) where `m`

is the number of elements in the first stream and
`n`

is the number of elements in the second stream.

*Pre-release*

unionWithStreamGenericBy :: MonadIO m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #

This essentially appends to the second stream all the occurrences of elements in the first stream that are not already present in the second stream.

Equivalent to the following except that `s2`

is evaluated only once:

`>>>`

`unionWithStreamGenericBy eq s1 s2 = s2 `Stream.append` (Stream.deleteInStreamGenericBy eq s2 s1)`

Example:

`>>>`

[1,2,2,4,3]`Stream.fold Fold.toList $ Stream.unionWithStreamGenericBy (==) (Stream.fromList [1,1,2,3]) (Stream.fromList [1,2,2,4])`

Space: O(n)

Time: O(m x n)

*Pre-release*

### Set like operations on sorted streams

filterInStreamAscBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like `filterInStreamGenericBy`

but assumes that the input streams are sorted in
ascending order. To use it on streams sorted in descending order pass an
inverted comparison function returning GT for less than and LT for greater
than.

Space: O(1)

Time: O(m+n)

*Pre-release*

deleteInStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

A more efficient `deleteInStreamGenericBy`

for streams sorted in ascending order.

Space: O(1)

*Unimplemented*

### Join operations

joinInnerGeneric :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b) Source #

Like `cross`

but emits only those tuples where `a == b`

using the
supplied equality predicate.

Definition:

`>>>`

`joinInnerGeneric eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2`

You should almost always prefer `joinInnerOrd`

over `joinInnerGeneric`

if
possible. `joinInnerOrd`

is an order of magnitude faster but may take more
space for caching the second stream.

See `joinInnerGeneric`

for a much faster fused
alternative.

Time: O(m x n)

*Pre-release*

## Joins on sorted stream

joinInnerAscBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b) Source #

A more efficient `joinInner`

for sorted streams.

Space: O(1)

Time: O(m + n)

*Unimplemented*

joinLeftAscBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b) Source #

A more efficient `joinLeft`

for sorted streams.

Space: O(1)

Time: O(m + n)

*Unimplemented*

joinOuterAscBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b) Source #

A more efficient `joinOuter`

for sorted streams.

Space: O(1)

Time: O(m + n)

*Unimplemented*

nub :: (Monad m, Ord a) => Stream m a -> Stream m a Source #

The memory used is proportional to the number of unique elements in the stream. If we want to limit the memory we can just use "take" to limit the uniq elements in the stream.

## Joins for unconstrained types

joinLeftGeneric :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b) Source #

Like `joinInner`

but emit `(a, Just b)`

, and additionally, for those `a`

's
that are not equal to any `b`

emit `(a, Nothing)`

.

The second stream is evaluated multiple times. If the stream is a
consume-once stream then the caller should cache it in an `Array`

before calling this function. Caching may also improve performance if the
stream is expensive to evaluate.

`>>>`

`joinRightGeneric eq = flip (Stream.joinLeftGeneric eq)`

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

*Unimplemented*

joinOuterGeneric :: MonadIO m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b) Source #

Like `joinLeft`

but emits a `(Just a, Just b)`

. Like `joinLeft`

, for those
`a`

's that are not equal to any `b`

emit `(Just a, Nothing)`

, but
additionally, for those `b`

's that are not equal to any `a`

emit ```
(Nothing,
Just b)
```

.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m x n)

*Pre-release*

## Joins with Ord constraint

joinInner :: (Monad m, Ord k) => Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b) Source #

Like `joinInner`

but uses a `Map`

for efficiency.

If the input streams have duplicate keys, the behavior is undefined.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

*Pre-release*