Loading...

Streamly.Internal.Data.Stream

Direct style re-implementation of CPS stream in Streamly.Internal.Data.StreamK. GHC is able to INLINE and fuse direct style better, providing better performance than CPS implementation.

import qualified Streamly.Internal.Data.Stream as Stream

Type

data Step s a Source #

A stream is a succession of Steps. A Yield produces a single value and the next state of the stream. Stop indicates there are no more values in the stream.

Constructors

Yield a s 
Skip s 
Stop 
Instances
Instances details
Functor (Step s) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Step

Methods

fmap :: (a -> b) -> Step s a -> Step s b Source #

(<$) :: a -> Step s b -> Step s a Source #

data Stream m a Source #

A stream consists of a step function that generates the next step given a current state, and the current state.

Constructors

forall s. UnStream (State StreamK m a -> s -> m (Step s a)) s 

Bundled Patterns

pattern Stream :: (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a 
Instances
Instances details
(Foldable m, Monad m) => Foldable (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fold :: Monoid m0 => Stream m m0 -> m0 Source #

foldMap :: Monoid m0 => (a -> m0) -> Stream m a -> m0 Source #

foldMap' :: Monoid m0 => (a -> m0) -> Stream m a -> m0 Source #

foldr :: (a -> b -> b) -> b -> Stream m a -> b Source #

foldr' :: (a -> b -> b) -> b -> Stream m a -> b Source #

foldl :: (b -> a -> b) -> b -> Stream m a -> b Source #

foldl' :: (b -> a -> b) -> b -> Stream m a -> b Source #

foldr1 :: (a -> a -> a) -> Stream m a -> a Source #

foldl1 :: (a -> a -> a) -> Stream m a -> a Source #

toList :: Stream m a -> [a] Source #

null :: Stream m a -> Bool Source #

length :: Stream m a -> Int Source #

elem :: Eq a => a -> Stream m a -> Bool Source #

maximum :: Ord a => Stream m a -> a Source #

minimum :: Ord a => Stream m a -> a Source #

sum :: Num a => Stream m a -> a Source #

product :: Num a => Stream m a -> a Source #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b Source #

(<$) :: a -> Stream m b -> Stream m a Source #

a ~ Char => IsString (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

IsList (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Associated Types

type Item (Stream Identity a) Source #

Read a => Read (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Show a => Show (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Eq a => Eq (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Ord a => Ord (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) = a

CrossStream

data CrossStream m a Source #

A newtype wrapper for the Stream type with a cross product style monad instance.

A Monad bind behaves like a for loop:

>>> :{
Stream.fold Fold.toList $ Stream.unCross $ do
    x <- Stream.mkCross $ Stream.fromList [1,2]
    -- Perform the following actions for each x in the stream
    return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.fold Fold.toList $ Stream.unCross $ do
    x <- Stream.mkCross $ Stream.fromList [1,2]
    y <- Stream.mkCross $ Stream.fromList [3,4]
    -- Perform the following actions for each x, for each y
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]
Instances
Instances details
MonadTrans CrossStream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

lift :: Monad m => m a -> CrossStream m a Source #

MonadIO m => MonadIO (CrossStream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

liftIO :: IO a -> CrossStream m a Source #

(Foldable m, Monad m) => Foldable (CrossStream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fold :: Monoid m0 => CrossStream m m0 -> m0 Source #

foldMap :: Monoid m0 => (a -> m0) -> CrossStream m a -> m0 Source #

foldMap' :: Monoid m0 => (a -> m0) -> CrossStream m a -> m0 Source #

foldr :: (a -> b -> b) -> b -> CrossStream m a -> b Source #

foldr' :: (a -> b -> b) -> b -> CrossStream m a -> b Source #

foldl :: (b -> a -> b) -> b -> CrossStream m a -> b Source #

foldl' :: (b -> a -> b) -> b -> CrossStream m a -> b Source #

foldr1 :: (a -> a -> a) -> CrossStream m a -> a Source #

foldl1 :: (a -> a -> a) -> CrossStream m a -> a Source #

toList :: CrossStream m a -> [a] Source #

null :: CrossStream m a -> Bool Source #

length :: CrossStream m a -> Int Source #

elem :: Eq a => a -> CrossStream m a -> Bool Source #

maximum :: Ord a => CrossStream m a -> a Source #

minimum :: Ord a => CrossStream m a -> a Source #

sum :: Num a => CrossStream m a -> a Source #

product :: Num a => CrossStream m a -> a Source #

Monad m => Applicative (CrossStream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

pure :: a -> CrossStream m a Source #

(<*>) :: CrossStream m (a -> b) -> CrossStream m a -> CrossStream m b Source #

liftA2 :: (a -> b -> c) -> CrossStream m a -> CrossStream m b -> CrossStream m c Source #

(*>) :: CrossStream m a -> CrossStream m b -> CrossStream m b Source #

(<*) :: CrossStream m a -> CrossStream m b -> CrossStream m a Source #

Monad m => Functor (CrossStream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fmap :: (a -> b) -> CrossStream m a -> CrossStream m b Source #

(<$) :: a -> CrossStream m b -> CrossStream m a Source #

Monad m => Monad (CrossStream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

(>>=) :: CrossStream m a -> (a -> CrossStream m b) -> CrossStream m b Source #

(>>) :: CrossStream m a -> CrossStream m b -> CrossStream m b Source #

return :: a -> CrossStream m a Source #

MonadThrow m => MonadThrow (CrossStream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

throwM :: Exception e => e -> CrossStream m a Source #

a ~ Char => IsString (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

IsList (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Associated Types

type Item (CrossStream Identity a) Source #

Read a => Read (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Show a => Show (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Eq a => Eq (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Ord a => Ord (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (CrossStream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

To StreamK

fromStreamK :: Applicative m => StreamK m a -> Stream m a Source #

Convert a CPS encoded StreamK to direct style step encoded StreamD

toStreamK :: Monad m => Stream m a -> StreamK m a Source #

Convert a direct style step encoded StreamD to a CPS encoded StreamK

From Unfold

unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #

Convert an Unfold into a stream by supplying it an input seed.

>>> s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
>>> Stream.fold Fold.drain s
hello
hello
hello

Construction

Primitives

nilM :: Applicative m => m b -> Stream m a Source #

A stream that terminates without producing any output, but produces a side effect.

>>> nilM action = Stream.before action Stream.nil
>>> Stream.fold Fold.toList (Stream.nilM (print "nil"))
"nil"
[]

Pre-release

consM :: Applicative m => m a -> Stream m a -> Stream m a infixr 5 Source #

Like cons but fuses an effect instead of a pure value.

From Values

fromPure :: Applicative m => a -> Stream m a Source #

Create a singleton stream from a pure value.

>>> fromPure a = a `Stream.cons` Stream.nil
>>> fromPure = pure
>>> fromPure = Stream.fromEffect . pure

fromEffect :: Applicative m => m a -> Stream m a Source #

Create a singleton stream from a monadic action.

>>> fromEffect m = m `Stream.consM` Stream.nil
>>> fromEffect = Stream.sequence . Stream.fromPure
>>> Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
hello

From Containers

fromList :: Applicative m => [a] -> Stream m a Source #

Construct a stream from a list of pure values.

Elimination

Primitives

uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

Properties:

>>> Nothing <- Stream.uncons Stream.nil
>>> Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)

This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however, this is generally less efficient than specific folds because it takes apart the stream one element at a time, therefore, does not take adavantage of stream fusion.

foldBreak is a more general way of consuming a stream piecemeal.

>>> :{
uncons xs = do
    r <- Stream.foldBreak Fold.one xs
    return $ case r of
        (Nothing, _) -> Nothing
        (Just h, t) -> Just (h, t)
:}

Strict Left Folds

fold :: Monad m => Fold m a b -> Stream m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

Definitions:

>>> fold f = fmap fst . Stream.foldBreak f
>>> fold f = Stream.parse (Parser.fromFold f)

Example:

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a) Source #

Like fold but also returns the remaining stream. The resulting stream would be nil if the stream finished before the fold.

foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b Source #

Append a stream to a fold lazily to build an accumulator incrementally.

Example, to continue folding a list of streams on the same sum fold:

>>> streams = [Stream.fromList [1..5], Stream.fromList [6..10]]
>>> f = Prelude.foldl Stream.foldAddLazy Fold.sum streams
>>> Stream.fold f Stream.nil
55

foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b) Source #

>>> foldAdd = flip Fold.addStream

foldEither :: Monad m => Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a)) Source #

Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.

Internal

foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b Source #

foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b Source #

foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b Source #

foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b Source #

Lazy Right Folds

foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> s = Stream.fromList (2:4:5:undefined)
>>> step x xs = if odd x then return True else xs
>>> Stream.foldrM step (return False) s
True
>>> import Control.Monad (join)
>>> foldrM f z = join . Stream.foldr f z

foldrMx :: Monad m => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b Source #

foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

>>> foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)

Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.

foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Specific Folds

drain :: Monad m => Stream m a -> m () Source #

Definitions:

>>> drain = Stream.fold Fold.drain
>>> drain = Stream.foldrM (\_ xs -> xs) (return ())

Run a stream, discarding the results.

head :: Monad m => Stream m a -> m (Maybe a) Source #

headElse :: Monad m => a -> Stream m a -> m a Source #

toList :: Monad m => Stream m a -> m [a] Source #

Definitions:

>>> toList = Stream.foldr (:) []
>>> toList = Stream.fold Fold.toList

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.

Note that this could a bit more efficient compared to Stream.fold Fold.toList, and it can fuse with pure list consumers.

Mapping

map :: Monad m => (a -> b) -> Stream m a -> Stream m b Source #

mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

>>> mapM f = Stream.sequence . fmap f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> s = Stream.fromList ["a", "b", "c"]
>>> Stream.fold Fold.drain $ Stream.mapM putStr s
abc

Stateful Filters

take :: Applicative m => Int -> Stream m a -> Stream m a Source #

Take first n elements from the stream and discard the rest.

takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

End the stream as soon as the predicate fails on an element.

takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as takeWhile but with a monadic predicate.

takeEndBy_ :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

>>> takeEndBy_ f = Stream.takeWhile (not . f)

takeEndBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

takeEndByM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Combining Two Streams

Zipping

Zip corresponding elements of two streams.

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Like zipWith but using a monadic zipping function.

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.zipWith otherwise.

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

>>> s1 = Stream.fromList [1,2,3]
>>> s2 = Stream.fromList [4,5,6]
>>> Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
[5,7,9]

Cross Product

crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b Source #

Apply a stream of functions to a stream of values and flatten the results.

Note that the second stream is evaluated multiple times.

>>> crossApply = Stream.crossWith id

crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a Source #

crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b Source #

crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

Definition:

>>> crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2

Note that the second stream is evaluated multiple times.

cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b) Source #

Given a Stream m a and Stream m b generate a stream with all possible combinations of the tuple (a, b).

Definition:

>>> cross = Stream.crossWith (,)

The second stream is evaluated multiple times. If that is not desired it can be cached in an Array and then generated from the array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

See cross for a much faster fused alternative.

Time: O(m x n)

Pre-release

loop :: Monad m => Stream m b -> Stream m a -> Stream m (a, b) Source #

Loop the supplied stream (first argument) around each element of the input stream (second argument) generating tuples. This is an argument flipped version of cross.

loopBy :: Monad m => Unfold m x b -> x -> Stream m a -> Stream m (a, b) Source #

Loop by unfold. Unfold a value into a stream and nest it with the input stream. This is much faster than loop due to stream fusion.

Unfold Many

unfoldEach :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

unfoldEach unfold stream uses unfold to map the input stream elements to streams and then flattens the generated streams into a single output stream.

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

Concat

Generate streams by mapping a stream generator on each element of an input stream, append the resulting streams and flatten.

concatEffect :: Monad m => m (Stream m a) -> Stream m a Source #

Flatten a stream generated by an effect i.e. concat the effect monad with the stream monad.

>>> concatEffect = Stream.concat . Stream.fromEffect
>>> concatEffect eff = Stream.concatMapM (\() -> eff) (Stream.fromPure ())

See also: concat, sequence

concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concat . fmap f
>>> concatMap f = Stream.unfoldEach (Unfold.lmap f Unfold.fromStream)

See unfoldEach for a fusible alternative.

concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

See unfoldEach for a fusible alternative.

concat :: Monad m => Stream m (Stream m a) -> Stream m a Source #

Flatten a stream of streams to a single stream.

>>> concat = Stream.concatMap id

Pre-release

Unfold Iterate

unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #

Same as concatIterateDfs but more efficient due to stream fusion.

Example, list a directory tree using DFS:

>>> f = Unfold.either Dir.eitherReaderPaths Unfold.nil
>>> input = Stream.fromEffect (Left <$> Path.fromString ".")
>>> ls = Stream.unfoldIterateDfs f input

Pre-release

unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #

Like unfoldIterateDfs but uses breadth first style traversal.

Pre-release

unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #

Like unfoldIterateBfs but processes the children in reverse order, therefore, may be slightly faster.

Pre-release

Concat Iterate

concatIterateScan :: Monad m => (b -> a -> m b) -> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a Source #

Generate a stream from an initial state, scan and concat the stream, generate a stream again from the final state of the previous scan and repeat the process.

concatIterateDfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #

Traverse the stream in depth first style (DFS). Map each element in the input stream to a stream and flatten, recursively map the resulting elements as well to a stream and flatten until no more streams are generated.

Example, list a directory tree using DFS:

>>> f = either (Just . Dir.readEitherPaths) (const Nothing)
>>> input = Stream.fromEffect (Left <$> Path.fromString ".")
>>> ls = Stream.concatIterateDfs f input

This is equivalent to using concatIterateWith StreamK.append.

Pre-release

concatIterateBfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #

Similar to concatIterateDfs except that it traverses the stream in breadth first style (BFS). First, all the elements in the input stream are emitted, and then their traversals are emitted.

Example, list a directory tree using BFS:

>>> f = either (Just . Dir.readEitherPaths) (const Nothing)
>>> input = Stream.fromEffect (Left <$> Path.fromString ".")
>>> ls = Stream.concatIterateBfs f input

Pre-release

concatIterateBfsRev :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #

Same as concatIterateBfs except that the traversal of the last element on a level is emitted first and then going backwards up to the first element (reversed ordering). This may be slightly faster than concatIterateBfs.

Fold Many

data FoldMany s fs b a Source #

Constructors

FoldManyStart s 
FoldManyFirst fs s 
FoldManyLoop s fs 
FoldManyYield b (FoldMany s fs b a) 
FoldManyDone 

foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Apply a terminating Fold repeatedly on a stream and emit the results in the output stream. If the last fold is empty, it's result is not emitted. This means if the input stream is empty the result is also an empty stream. See foldManyPost for an alternate behavior which always results in a non-empty stream even if the input stream is empty.

Definition:

>>> foldMany f = Stream.parseMany (Parser.fromFold f)

Example, empty stream, omits the empty fold value:

>>> f = Fold.take 2 Fold.toList
>>> fmany = Stream.fold Fold.toList . Stream.foldMany f
>>> fmany $ Stream.fromList []
[]

Example, omits the last empty fold value:

>>> fmany $ Stream.fromList [1..4]
[[1,2],[3,4]]

Example, last fold non-empty:

>>> fmany $ Stream.fromList [1..5]
[[1,2],[3,4],[5]]

Note that using a closed fold e.g. Fold.take 0, would result in an infinite stream on a non-empty input stream.

foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Like foldMany but evaluates the fold even if the fold did not receive any input, therefore, always results in a non-empty output even on an empty stream (default result of the fold).

Example, empty stream, compare with foldMany:

>>> f = Fold.take 2 Fold.toList
>>> fmany = Stream.fold Fold.toList . Stream.foldManyPost f
>>> fmany $ Stream.fromList []
[[]]

Example, last empty fold is included, compare with foldMany:

>>> fmany $ Stream.fromList [1..4]
[[1,2],[3,4],[]]

Example, last fold non-empty, same as foldMany:

>>> fmany $ Stream.fromList [1..5]
[[1,2],[3,4],[5]]

Pre-release

foldManySepBy :: Fold m a b -> Fold m a b -> Stream m a -> Stream m b Source #

Apply fold f1 infix separated by fold f2.

Unimplemented

groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #

Group the input stream into groups of n elements each and then fold each group using the provided fold function.

Definition:

>>> groupsOf n f = Stream.foldMany (Fold.take n f)

Usage:

>>> Stream.toList $ Stream.groupsOf 2 Fold.toList (Stream.enumerateFromTo 1 10)
[[1,2],[3,4],[5,6],[7,8],[9,10]]

This can be considered as an n-fold version of take where we apply take repeatedly on the leftover stream until the stream exhausts.

refoldMany :: Monad m => Refold m x a b -> m x -> Stream m a -> Stream m b Source #

Like foldMany but for the Refold type. The supplied action is used as the initial value for each refold.

Internal

refoldIterateM :: Monad m => Refold m b a b -> m b -> Stream m a -> Stream m b Source #

Like foldIterateM but using the Refold type instead. This could be much more efficient due to stream fusion.

Internal

Fold Iterate

reduceIterateBfs :: Monad m => (a -> a -> m a) -> Stream m a -> m (Maybe a) Source #

Binary BFS style reduce, folds a level entirely using the supplied fold function, collecting the outputs as next level of the tree, then repeats the same process on the next level. The last elements of a previously folded level are folded first.

foldIterateBfs :: Fold m a (Either a a) -> Stream m a -> m (Maybe a) Source #

N-Ary BFS style iterative fold, if the input stream finished before the fold then it returns Left otherwise Right. If the fold returns Left we terminate.

Unimplemented

Splitting

indexEndBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

Like splitEndBy but generates a stream of (index, len) tuples marking the places where the predicate matches in the stream.

>>> Stream.toList $ Stream.indexEndBy (== '/') $ Stream.fromList "/home/harendra"
[(0,1),(1,5),(6,8)]

Pre-release

indexEndBy_ :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

Like splitEndBy_ but generates a stream of (index, len) tuples marking the places where the predicate matches in the stream.

>>> Stream.toList $ Stream.indexEndBy_ (== '/') $ Stream.fromList "/home/harendra"
[(0,0),(1,4),(6,8)]

Pre-release

Multi-stream folds

These should probably be expressed using zipping operations.

eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #

Compare two streams for equality

cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #

Compare two streams lexicographically.

Deprecated

sliceOnSuffix :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

Deprecated: Please use indexEndBy_ instead.

unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use unfoldEach instead.

unfoldEach unfold stream uses unfold to map the input stream elements to streams and then flattens the generated streams into a single output stream.

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

indexOnSuffix :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

Deprecated: Please use indexEndBy_ instead.

Like splitEndBy_ but generates a stream of (index, len) tuples marking the places where the predicate matches in the stream.

>>> Stream.toList $ Stream.indexEndBy_ (== '/') $ Stream.fromList "/home/harendra"
[(0,0),(1,4),(6,8)]

Pre-release

Primitives

nil :: Applicative m => Stream m a Source #

A stream that terminates without producing any output or side effect.

>>> Stream.toList Stream.nil
[]

cons :: Applicative m => a -> Stream m a -> Stream m a infixr 5 Source #

WARNING! O(n^2) time complexity wrt number of elements. Use the O(n) complexity StreamK.cons unless you want to statically fuse just a few elements.

Fuse a pure value at the head of an existing stream::

>>> s = 1 `Stream.cons` Stream.fromList [2,3]
>>> Stream.toList s
[1,2,3]

Definition:

>>> cons x xs = return x `Stream.consM` xs

Unfolding

unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a Source #

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then Nothing
        else Just (b, b + 1)
in Stream.toList $ Stream.unfoldr f 0
:}
[0,1,2]

unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then return Nothing
        else return (Just (b, b + 1))
in Stream.toList $ Stream.unfoldrM f 0
:}
[0,1,2]

From Values

repeat :: Monad m => a -> Stream m a Source #

Generate an infinite stream by repeating a pure value.

>>> repeat = Stream.iterate id
>>> repeat x = Stream.repeatM (pure x)

repeatM :: Monad m => m a -> Stream m a Source #

>>> repeatM act = Stream.iterateM (const act) act
>>> repeatM = Stream.sequence . Stream.repeat

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAction =
       Stream.repeatM (threadDelay 1000000 >> print 1)
     & Stream.take 10
     & Stream.fold Fold.drain
:}

replicate :: Monad m => Int -> a -> Stream m a Source #

>>> replicate n = Stream.take n . Stream.repeat
>>> replicate n x = Stream.replicateM n (pure x)

Generate a stream of length n by repeating a value n times.

replicateM :: Monad m => Int -> m a -> Stream m a Source #

>>> replicateM n = Stream.sequence . Stream.replicate n

Generate a stream by performing a monadic action n times.

Enumeration

Enumerating Num Types

enumerateFromStepNum :: (Monad m, Num a) => a -> a -> Stream m a Source #

For floating point numbers if the increment is less than the precision then it just gets lost. Therefore we cannot always increment it correctly by just repeated addition. 9007199254740992 + 1 + 1 :: Double => 9.007199254740992e15 9007199254740992 + 2 :: Double => 9.007199254740994e15

Instead we accumulate the increment counter and compute the increment every time before adding it to the starting number.

This works for Integrals as well as floating point numbers, but enumerateFromStepIntegral is faster for integrals.

enumerateFromNum :: (Monad m, Num a) => a -> Stream m a Source #

enumerateFromThenNum :: (Monad m, Num a) => a -> a -> Stream m a Source #

Enumerating Bounded Enum Types

enumerate :: (Monad m, Bounded a, Enumerable a) => Stream m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

enumerateTo :: (Monad m, Bounded a, Enumerable a) => a -> Stream m a Source #

>>> enumerateTo = Stream.enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

enumerateFromBounded :: (Monad m, Enumerable a, Bounded a) => a -> Stream m a Source #

>>> enumerateFromBounded from = Stream.enumerateFromTo from maxBound

enumerateFrom for Bounded Enum types.

Enumerating Enum Types not larger than Int

enumerateFromToSmall :: (Monad m, Enum a) => a -> a -> Stream m a Source #

enumerateFromTo for Enum types not larger than Int.

enumerateFromThenToSmall :: (Monad m, Enum a) => a -> a -> a -> Stream m a Source #

enumerateFromThenTo for Enum types not larger than Int.

enumerateFromThenSmallBounded :: (Monad m, Enumerable a, Bounded a) => a -> a -> Stream m a Source #

enumerateFromThen for Enum types not larger than Int.

Note: We convert the Enum to Int and enumerate the Int. If a type is bounded but does not have a Bounded instance then we can go on enumerating it beyond the legal values of the type, resulting in the failure of toEnum when converting back to Enum. Therefore we require a Bounded instance for this function to be safely used.

Enumerating Bounded Integral Types

enumerateFromIntegral :: (Monad m, Integral a, Bounded a) => a -> Stream m a Source #

Enumerate an Integral type. enumerateFromIntegral from generates a stream whose first element is from and the successive elements are in increments of 1. The stream is bounded by the size of the Integral type.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromIntegral (0 :: Int)
[0,1,2,3]

enumerateFromThenIntegral :: (Monad m, Integral a, Bounded a) => a -> a -> Stream m a Source #

Enumerate an Integral type in steps. enumerateFromThenIntegral from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. The stream is bounded by the size of the Integral type.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) 2
[0,2,4,6]
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) (-2)
[0,-2,-4,-6]

Enumerating Integral Types

enumerateFromToIntegral :: (Monad m, Integral a) => a -> a -> Stream m a Source #

Enumerate an Integral type up to a given limit. enumerateFromToIntegral from to generates a finite stream whose first element is from and successive elements are in increments of 1 up to to.

>>> Stream.toList $ Stream.enumerateFromToIntegral 0 4
[0,1,2,3,4]

enumerateFromThenToIntegral :: (Monad m, Integral a) => a -> a -> a -> Stream m a Source #

Enumerate an Integral type in steps up to a given limit. enumerateFromThenToIntegral from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to.

>>> Stream.toList $ Stream.enumerateFromThenToIntegral 0 2 6
[0,2,4,6]
>>> Stream.toList $ Stream.enumerateFromThenToIntegral 0 (-2) (-6)
[0,-2,-4,-6]

Enumerating unbounded Integral Types

enumerateFromStepIntegral :: (Integral a, Monad m) => a -> a -> Stream m a Source #

enumerateFromStepIntegral from step generates an infinite stream whose first element is from and the successive elements are in increments of step.

CAUTION: This function is not safe for finite integral types. It does not check for overflow, underflow or bounds.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromStepIntegral 0 2
[0,2,4,6]
>>> Stream.toList $ Stream.take 3 $ Stream.enumerateFromStepIntegral 0 (-2)
[0,-2,-4]

Enumerating Fractional Types

enumerateFromFractional :: (Monad m, Fractional a) => a -> Stream m a Source #

Numerically stable enumeration from a Fractional number in steps of size 1. enumerateFromFractional from generates a stream whose first element is from and the successive elements are in increments of 1. No overflow or underflow checks are performed.

This is the equivalent to enumFrom for Fractional types. For example:

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromFractional 1.1
[1.1,2.1,3.1,4.1]

enumerateFromToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> Stream m a Source #

Numerically stable enumeration from a Fractional number to a given limit. enumerateFromToFractional from to generates a finite stream whose first element is from and successive elements are in increments of 1 up to to.

This is the equivalent of enumFromTo for Fractional types. For example:

>>> Stream.toList $ Stream.enumerateFromToFractional 1.1 4
[1.1,2.1,3.1,4.1]
>>> Stream.toList $ Stream.enumerateFromToFractional 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Notice that the last element is equal to the specified to value after rounding to the nearest integer.

enumerateFromThenFractional :: (Monad m, Fractional a) => a -> a -> Stream m a Source #

Numerically stable enumeration from a Fractional number in steps. enumerateFromThenFractional from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. No overflow or underflow checks are performed.

This is the equivalent of enumFromThen for Fractional types. For example:

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 2.1
[1.1,2.1,3.1,4.1]
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 (-2.1)
[1.1,-2.1,-5.300000000000001,-8.500000000000002]

enumerateFromThenToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> a -> Stream m a Source #

Numerically stable enumeration from a Fractional number in steps up to a given limit. enumerateFromThenToFractional from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to.

This is the equivalent of enumFromThenTo for Fractional types. For example:

>>> Stream.toList $ Stream.enumerateFromThenToFractional 0.1 2 6
[0.1,2.0,3.9,5.799999999999999]
>>> Stream.toList $ Stream.enumerateFromThenToFractional 0.1 (-2) (-6)
[0.1,-2.0,-4.1000000000000005,-6.200000000000001]

Enumerable Type Class

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Methods

enumerateFrom :: Monad m => a -> Stream m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

enumerateFromTo :: Monad m => a -> a -> Stream m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]
>>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

enumerateFromThen :: Monad m => a -> a -> Stream m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

enumerateFromThenTo :: Monad m => a -> a -> a -> Stream m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]
>>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]
Instances
Instances details
Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int16 -> Stream m Int16 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Stream m Int16 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Stream m Int16 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Int16 -> Stream m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int32 -> Stream m Int32 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Stream m Int32 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Stream m Int32 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Int32 -> Stream m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int64 -> Stream m Int64 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Stream m Int64 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Stream m Int64 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Int64 -> Stream m Int64 Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int8 -> Stream m Int8 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Stream m Int8 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Stream m Int8 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Int8 -> Stream m Int8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word16 -> Stream m Word16 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Stream m Word16 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Stream m Word16 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Word16 -> Stream m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word32 -> Stream m Word32 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Stream m Word32 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Stream m Word32 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Word32 -> Stream m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word64 -> Stream m Word64 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Stream m Word64 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Stream m Word64 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Word64 -> Stream m Word64 Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word8 -> Stream m Word8 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Stream m Word8 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Stream m Word8 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Word8 -> Stream m Word8 Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Ordering -> Stream m Ordering Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Stream m Ordering Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Stream m Ordering Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Ordering -> Stream m Ordering Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Integer -> Stream m Integer Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Stream m Integer Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Stream m Integer Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Integer -> Stream m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Natural -> Stream m Natural Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Stream m Natural Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Stream m Natural Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Natural -> Stream m Natural Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => () -> Stream m () Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => () -> () -> Stream m () Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => () -> () -> Stream m () Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => () -> () -> () -> Stream m () Source #

Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Bool -> Stream m Bool Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Stream m Bool Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Stream m Bool Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Bool -> Stream m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Char -> Stream m Char Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Char -> Char -> Stream m Char Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Char -> Char -> Stream m Char Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Char -> Char -> Char -> Stream m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Double -> Stream m Double Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Double -> Double -> Stream m Double Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Double -> Double -> Stream m Double Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Double -> Double -> Double -> Stream m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Float -> Stream m Float Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Float -> Float -> Stream m Float Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Float -> Float -> Stream m Float Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Float -> Float -> Float -> Stream m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int -> Stream m Int Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int -> Int -> Stream m Int Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int -> Int -> Stream m Int Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int -> Int -> Int -> Stream m Int Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word -> Stream m Word Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word -> Word -> Stream m Word Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word -> Word -> Stream m Word Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word -> Word -> Word -> Stream m Word Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Identity a -> Stream m (Identity a) Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Stream m (Identity a) Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Stream m (Identity a) Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Identity a -> Stream m (Identity a) Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Ratio a -> Stream m (Ratio a) Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Stream m (Ratio a) Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Stream m (Ratio a) Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Ratio a -> Stream m (Ratio a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Fixed a -> Stream m (Fixed a) Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Stream m (Fixed a) Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Stream m (Fixed a) Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Fixed a -> Stream m (Fixed a) Source #

Time Enumeration

times :: MonadIO m => Stream m (AbsTime, RelTime64) Source #

times returns a stream of time value tuples with clock of 10 ms granularity. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

>>> f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)
>>> Stream.fold f $ Stream.take 3 $ Stream.times
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

timesWith :: MonadIO m => Double -> Stream m (AbsTime, RelTime64) Source #

timesWith g returns a stream of time value tuples. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

The argument g specifies the granularity of the relative time in seconds. A lower granularity clock gives higher precision but is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> import Control.Concurrent (threadDelay)
>>> f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)
>>> Stream.fold f $ Stream.take 3 $ Stream.timesWith 0.01
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

absTimes :: MonadIO m => Stream m AbsTime Source #

absTimes returns a stream of absolute timestamps using a clock of 10 ms granularity.

>>> f = Fold.drainMapM print
>>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

absTimesWith :: MonadIO m => Double -> Stream m AbsTime Source #

absTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> f = Fold.drainMapM print
>>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimes :: MonadIO m => Stream m RelTime64 Source #

relTimes returns a stream of relative time values starting from 0, using a clock of granularity 10 ms.

>>> f = Fold.drainMapM print
>>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

relTimesWith :: MonadIO m => Double -> Stream m RelTime64 Source #

relTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> f = Fold.drainMapM print
>>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

durations :: Double -> t m RelTime64 Source #

durations g returns a stream of relative time values measuring the time elapsed since the immediate predecessor element of the stream was generated. The first element of the stream is always 0. durations uses a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. The minimum granularity is 1 millisecond. Durations lower than 1 ms will be 0.

Note: This API is not safe on 32-bit machines.

Unimplemented

timeout :: AbsTime -> t m () Source #

Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.

Unimplemented

From Generators

Generate a monadic stream from a seed.

fromIndices :: Monad m => (Int -> a) -> Stream m a Source #

fromIndicesM :: Monad m => (Int -> m a) -> Stream m a Source #

generate :: Monad m => Int -> (Int -> a) -> Stream m a Source #

generateM :: Monad m => Int -> (Int -> m a) -> Stream m a Source #

Iteration

iterate :: Monad m => (a -> a) -> a -> Stream m a Source #

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]

iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

>>> :{
Stream.iterateM (\x -> print x >> return (x + 1)) (return 0)
    & Stream.take 3
    & Stream.toList
:}
0
1
[0,1,2]

From Containers

Transform an input structure into a stream.

fromListM :: Monad m => [m a] -> Stream m a Source #

Convert a list of monadic actions to a Stream

fromFoldable :: (Monad m, Foldable f) => f a -> Stream m a Source #

>>> fromFoldable = Prelude.foldr Stream.cons Stream.nil

Construct a stream from a Foldable containing pure values:

/WARNING: O(n^2), suitable only for a small number of elements in the stream/

fromFoldableM :: (Monad m, Foldable f) => f (m a) -> Stream m a Source #

>>> fromFoldableM = Prelude.foldr Stream.consM Stream.nil

Construct a stream from a Foldable containing pure values:

/WARNING: O(n^2), suitable only for a small number of elements in the stream/

From Pointers

fromPtr :: forall m a. (Monad m, Storable a) => Ptr a -> Stream m a Source #

Keep reading Storable elements from an immutable Ptr onwards.

Unsafe: The caller is responsible for safe addressing.

Pre-release

fromPtrN :: (Monad m, Storable a) => Int -> Ptr a -> Stream m a Source #

Take n Storable elements starting from an immutable Ptr onwards.

>>> fromPtrN n = Stream.take n . Stream.fromPtr

Unsafe: The caller is responsible for safe addressing.

Pre-release

fromCString# :: Monad m => Addr# -> Stream m Word8 Source #

Read bytes from an immutable Addr# until a 0 byte is encountered, the 0 byte is not included in the stream.

>>> :set -XMagicHash
>>> fromCString# addr = Stream.takeWhile (/= 0) $ Stream.fromPtr $ (Ptr addr :: Ptr Word8)

Unsafe: The caller is responsible for safe addressing.

Note that this is completely safe when reading from Haskell string literals because they are guaranteed to be NULL terminated:

>>> Stream.toList $ Stream.fromCString# "\1\2\3\0"#
[1,2,3]

fromW16CString# :: Monad m => Addr# -> Stream m Word16 Source #

Read Word16 from an immutable Addr# until a 0 Word16 is encountered, the 0 Word16 is not included in the stream.

>>> :set -XMagicHash
>>> fromW16CString# addr = Stream.takeWhile (/= 0) $ Stream.fromPtr $ (Ptr addr :: Ptr Word16)

Unsafe: The caller is responsible for safe addressing.

Deprecated

fromByteStr# :: Monad m => Addr# -> Stream m Word8 Source #

Deprecated: Please use fromCString# instead

Running a Parser

parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #

Parse a stream using the supplied Parser.

Parsers (See Streamly.Internal.Data.Parser) are more powerful folds that add backtracking and error functionality to terminating folds. Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

>>> Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")

Note: parse p is not the same as head . parseMany p on an empty stream.

parseD :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #

Run a Parse over a stream.

parseBreak :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b, Stream m a) Source #

Parse a stream using the supplied Parser.

parseBreakD :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b, Stream m a) Source #

Run a Parse over a stream and return rest of the Stream.

Deconstruction

uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

Properties:

>>> Nothing <- Stream.uncons Stream.nil
>>> Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)

This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however, this is generally less efficient than specific folds because it takes apart the stream one element at a time, therefore, does not take adavantage of stream fusion.

foldBreak is a more general way of consuming a stream piecemeal.

>>> :{
uncons xs = do
    r <- Stream.foldBreak Fold.one xs
    return $ case r of
        (Nothing, _) -> Nothing
        (Just h, t) -> Just (h, t)
:}

Right Folds

foldr1 :: Monad m => (a -> a -> a) -> Stream m a -> m (Maybe a) Source #

Specific Fold Functions

mapM_ :: Monad m => (a -> m b) -> Stream m a -> m () Source #

Execute a monadic action for each element of the Stream

null :: Monad m => Stream m a -> m Bool Source #

init :: Monad m => Stream m a -> m (Maybe (Stream m a)) Source #

tail :: Monad m => Stream m a -> m (Maybe (Stream m a)) Source #

Same as:

>>> tail = fmap (fmap snd) . Stream.uncons

Does not fuse, has the same performance as the StreamK version.

last :: Monad m => Stream m a -> m (Maybe a) Source #

elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool Source #

notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool Source #

all :: Monad m => (a -> Bool) -> Stream m a -> m Bool Source #

any :: Monad m => (a -> Bool) -> Stream m a -> m Bool Source #

maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a) Source #

maximumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a) Source #

minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a) Source #

minimumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a) Source #

lookup :: (Monad m, Eq a) => a -> Stream m (a, b) -> m (Maybe b) Source #

findM :: Monad m => (a -> m Bool) -> Stream m a -> m (Maybe a) Source #

find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a) Source #

(!!) :: Monad m => Stream m a -> Int -> m (Maybe a) Source #

the :: (Eq a, Monad m) => Stream m a -> m (Maybe a) Source #

To containers

toListRev :: Monad m => Stream m a -> m [a] Source #

Multi-Stream Folds

These should probably be expressed using parsers.

isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

>>> Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)
True

isInfixOf :: (MonadIO m, Eq a, Enum a, Unbox a) => Stream m a -> Stream m a -> m Bool Source #

Returns True if the first stream is an infix of the second. A stream is considered an infix of itself.

>>> s = Stream.fromList "hello" :: Stream IO Char
>>> Stream.isInfixOf s s
True

Space: O(n) worst case where n is the length of the infix.

Pre-release

Requires Storable constraint

isSuffixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns True if the first stream is a suffix of the second. A stream is considered a suffix of itself.

>>> Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)
True

Space: O(n), buffers entire input stream and the suffix.

Pre-release

Suboptimal - Help wanted.

isSuffixOfUnbox :: (MonadIO m, Eq a, Unbox a) => Stream m a -> Stream m a -> m Bool Source #

Much faster than isSuffixOf.

isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

>>> Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: Stream IO Char)
True

stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

stripPrefix prefix input strips the prefix stream from the input stream if it is a prefix of input. Returns Nothing if the input does not start with the given prefix, stripped input otherwise. Returns Just nil when the prefix is the same as the input stream.

Space: O(1)

stripSuffix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

Drops the given suffix from a stream. Returns Nothing if the stream does not end with the given suffix. Returns Just nil when the suffix is the same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

Space: O(n), buffers the entire input stream as well as the suffix

Pre-release

stripSuffixUnbox :: (MonadIO m, Eq a, Unbox a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

Much faster than stripSuffix.

Resources

before :: Monad m => m b -> Stream m a -> Stream m a Source #

Run the action m b before the stream yields its first element.

Same as the following but more efficient due to fusion:

>>> before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)

afterIO :: MonadIO m => IO b -> Stream m a -> Stream m a Source #

Run the action IO b whenever the stream is evaluated to completion, or if it is garbage collected after a partial lazy evaluation.

The semantics of the action IO b are similar to the semantics of cleanup action in bracketIO.

See also afterUnsafe

afterUnsafe :: Monad m => m b -> Stream m a -> Stream m a Source #

Like after, with following differences:

  • action m b won't run if the stream is garbage collected after partial evaluation.
  • Monad m does not require any other constraints.
  • has slightly better performance than after.

Same as the following, but with stream fusion:

>>> afterUnsafe action xs = xs <> Stream.nilM action

Pre-release

finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a Source #

Run the action IO b whenever the stream stream stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

The semantics of running the action IO b are similar to the cleanup action semantics described in bracketIO.

>>> finallyIO release = Stream.bracketIO (return ()) (const release)

See also finallyUnsafe

Inhibits stream fusion

finallyUnsafe :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Like finally with following differences:

  • action m b won't run if the stream is garbage collected after partial evaluation.
  • has slightly better performance than finallyIO.

Inhibits stream fusion

Pre-release

gbracket_ Source #

Arguments

:: Monad m 
=> m c

before

-> (c -> m d)

after, on normal stop

-> (c -> e -> Stream m b -> m (Stream m b))

on exception

-> (forall s. m s -> m (Either e s))

try (exception handling)

-> (c -> Stream m b)

stream generator

-> Stream m b 

Like gbracket but with following differences:

  • alloc action m c runs with async exceptions enabled
  • cleanup action c -> m d won't run if the stream is garbage collected after partial evaluation.

Inhibits stream fusion

Pre-release

gbracket Source #

Arguments

:: MonadIO m 
=> IO c

before

-> (c -> IO d1)

on normal stop

-> (c -> e -> Stream m b -> IO (Stream m b))

on exception

-> (c -> IO d2)

on GC without normal stop or exception

-> (forall s. m s -> m (Either e s))

try (exception handling)

-> (c -> Stream m b)

stream generator

-> Stream m b 

Run the alloc action m c with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output c as input to c -> Stream m b to generate an output stream. When generating the stream use the supplied try operation forall s. m s -> m (Either e s) to catch synchronous exceptions. If an exception occurs run the exception handler c -> e -> Stream m b -> m (Stream m b). Note that gbracket does not rethrow the exception, it has to be done by the exception handler if desired.

The cleanup action c -> m d, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation. See bracket for the semantics of the cleanup action.

gbracket can express all other exception handling combinators.

Inhibits stream fusion

Pre-release

bracketUnsafe :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #

Like bracket but with following differences:

  • alloc action m b runs with async exceptions enabled
  • cleanup action b -> m c won't run if the stream is garbage collected after partial evaluation.
  • has slightly better performance than bracketIO.

Inhibits stream fusion

Pre-release

bracketIO3 :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a Source #

Like bracketIO but can use 3 separate cleanup actions depending on the mode of termination:

  1. When the stream stops normally
  2. When the stream is garbage collected
  3. When the stream encounters an exception

bracketIO3 before onStop onGC onException action runs action using the result of before. If the stream stops, onStop action is executed, if the stream is abandoned onGC is executed, if the stream encounters an exception onException is executed.

The exception is not caught, it is rethrown.

Inhibits stream fusion

Pre-release

bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #

Run the alloc action IO b with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output b of the IO action as input to the function b -> Stream m a to generate an output stream.

b is usually a resource under the IO monad, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> IO c, runs whenever (1) the stream ends normally, (2) due to a sync or async exception or, (3) if it gets garbage collected after a partial lazy evaluation. The exception is not caught, it is rethrown.

bracketIO only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run. An example where GC based cleanup happens is when a stream is being folded but the fold terminates without draining the entire stream or if the consumer of the stream encounters an exception.

Observes exceptions only in the stream generation, and not in stream consumers.

See also: bracketUnsafe

Inhibits stream fusion

Exceptions

onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Run the action m b if the stream evaluation is aborted due to an exception. The exception is not caught, simply rethrown.

Observes exceptions only in the stream generation, and not in stream consumers.

Inhibits stream fusion

ghandle :: (MonadCatch m, Exception e) => (e -> Stream m a -> m (Stream m a)) -> Stream m a -> Stream m a Source #

Like handle but the exception handler is also provided with the stream that generated the exception as input. The exception handler can thus re-evaluate the stream to retry the action that failed. The exception handler can again call ghandle on it to retry the action multiple times.

This is highly experimental. In a stream of actions we can map the stream with a retry combinator to retry each action on failure.

Inhibits stream fusion

Pre-release

handle :: (MonadCatch m, Exception e) => (e -> m (Stream m a)) -> Stream m a -> Stream m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument. The exception is caught and handled unless the handler decides to rethrow it. Note that exception handling is not applied to the stream returned by the exception handler.

Observes exceptions only in the stream generation, and not in stream consumers.

Inhibits stream fusion

Generalize Inner Monad

morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #

Transform the inner monad of a stream using a natural transformation.

Example, generalize the inner monad from Identity to any other:

>>> generalizeInner = Stream.morphInner (return . runIdentity)

Also known as hoist.

generalizeInner :: Monad m => Stream Identity a -> Stream m a Source #

Generalize the inner monad of the stream from Identity to any monad.

Definition:

>>> generalizeInner = Stream.morphInner (return . runIdentity)

Transform Inner Monad

liftInnerWith :: Monad (t m) => (forall b. m b -> t m b) -> Stream m a -> Stream (t m) a Source #

Lift the inner monad m of a stream Stream m a to t m using the supplied lift function.

runInnerWith :: Monad m => (forall b. t m b -> m b) -> Stream (t m) a -> Stream m a Source #

Evaluate the inner monad of a stream using the supplied runner function.

runInnerWithState :: Monad m => (forall b. s -> t m b -> m (b, s)) -> m s -> Stream (t m) a -> Stream m (s, a) Source #

Evaluate the inner monad of a stream using the supplied stateful runner function and the initial state. The state returned by an invocation of the runner is supplied as input state to the next invocation.

Fold to Transformer Monad

foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b Source #

Lazy left fold to a transformer monad.

foldrT :: (Monad m, Monad (t m), MonadTrans t) => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b Source #

Right fold to a transformer monad. This is the most general right fold function. foldrS is a special case of foldrT, however foldrS implementation can be more efficient:

>>> foldrS = Stream.foldrT
>>> step f x xs = lift $ f x (runIdentityT xs)
>>> foldrM f z s = runIdentityT $ Stream.foldrT (step f) (lift z) s

foldrT can be used to translate streamly streams to other transformer monads e.g. to a different streaming type.

Pre-release

Inner Monad Operations

liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a Source #

Lift the inner monad m of Stream m a to t m where t is a monad transformer.

runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a Source #

Evaluate the inner monad of a stream as ReaderT.

usingReaderT :: Monad m => m r -> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a) -> Stream m a -> Stream m a Source #

Run a stream transformation using a given environment.

withReaderT :: Monad m => (r2 -> r1) -> Stream (ReaderT r1 m) a -> Stream (ReaderT r2 m) a Source #

Modify the environment of the underlying ReaderT monad.

localReaderT :: Monad m => (r -> r) -> Stream (ReaderT r m) a -> Stream (ReaderT r m) a Source #

Modify the environment of the underlying ReaderT monad.

evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a Source #

Evaluate the inner monad of a stream as StateT.

>>> evalStateT s = fmap snd . Stream.runStateT s

runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a) Source #

Evaluate the inner monad of a stream as StateT and emit the resulting state and value pair after each step.

usingStateT :: Monad m => m s -> (Stream (StateT s m) a -> Stream (StateT s m) b) -> Stream m a -> Stream m b Source #

Run a stateful (StateT) stream transformation using a given state.

>>> usingStateT s f = Stream.evalStateT s . f . Stream.liftInner

See also: scan

Generate

Combining streams to generate streams.

Combine Two Streams

Functions ending in the shape:

Stream m a -> Stream m a -> Stream m a.

Appending

Append a stream after another. A special case of concatMap or unfoldEach Note, appending more than two streams is called concat which could be called appendMany or appendAll in append terminology and is equivalent to concatMap id. Append is equivalent to mergeBy fst.

data AppendState s1 s2 Source #

Constructors

AppendFirst s1 
AppendSecond s2 

append :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.append otherwise.

Fuses two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> s1 = Stream.fromList [1,2]
>>> s2 = Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ s1 `Stream.append` s2
[1,2,3,4]

Interleaving

Interleave elements from two streams alternately. A special case of unfoldEachInterleave. Interleave is equivalent to mergeBy with a round robin merge function.

interleave :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.interleave otherwise.

Interleaves two streams, yielding one element from each stream alternately, starting from the first stream. When one stream is exhausted, all the remaining elements of the other stream are emitted in the output stream.

Both the streams are completely exhausted.

(a b c) (. . .) => a . b . c .
(a b c) (. .  ) => a . b . c
(a b  ) (. . .) => a . b .  .

Examples:

>>> f x y = Stream.toList $ Stream.interleave (Stream.fromList x) (Stream.fromList y)
>>> f "abc" "..."
"a.b.c."
>>> f "abc" ".."
"a.b.c"
>>> f "ab" "..."
"a.b.."

interleaveEndBy' :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleave the two streams such that the elements of the second stream are ended by the elements of the first stream. If one of the streams is exhausted then interleaving stops.

(. . .) (a b c) => a . b . c .
(. .  ) (a b c) => a . b .      -- c is discarded
(. . .) (a b  ) => a . b .      -- . is discarded

Examples:

>>> f x y = Stream.toList $ Stream.interleaveEndBy' (Stream.fromList x) (Stream.fromList y)
>>> f "..." "abc"
"a.b.c."
>>> f ".." "abc"
"a.b."
>>> f "..." "ab"
"a.b."

Definition:

>>> interleaveEndBy' s1 s2 = Stream.unfoldEach Unfold.fromTuple $ Stream.zipWith (,) s2 s1

Similarly, we can defined interleaveBeginBy' as:

>>> interleaveBeginBy' = flip interleaveEndBy'

interleaveSepBy' :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleave the two streams such that the elements of the first stream are infixed between the elements of the second stream. If one of the streams is exhausted then interleaving stops.

(. . .) (a b c) => a . b . c    -- additional . is discarded
(. .  ) (a b c) => a . b . c
(.    ) (a b c) => a . b        -- c is discarded
>>> f x y = Stream.toList $ Stream.interleaveSepBy' (Stream.fromList x) (Stream.fromList y)
>>> f "..." "abc"
"a.b.c"
>>> f ".." "abc"
"a.b.c"
>>> f "." "abc"
"a.b"

interleaveBeginBy :: Stream m a -> Stream m a -> Stream m a Source #

Interleave the two streams such that the elements of the second stream are prefixed by the elements of the first stream. Interleaving stops when and only when the second stream is exhausted. Shortfall of the prefix stream is ignored and excess is discarded.

(. . .) (a b c) => . a . b . c
(. . .) (a b  ) => . a . b      -- additional . is discarded
(. .  ) (a b c) => . a . b c    -- missing . is ignored

Unimplemented

interleaveEndBy :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Like interleaveEndBy' but interleaving stops when and only when the second stream is exhausted. Shortfall of the suffix stream is ignored and excess is discarded.

(. . .) (a b c) => a . b . c .
(. .  ) (a b c) => a . b . c    -- missing . is ignored
(. . .) (a b  ) => a . b .      -- additional . is discarded
>>> f x y = Stream.toList $ Stream.interleaveEndBy (Stream.fromList x) (Stream.fromList y)
>>> f "..." "abc"
"a.b.c."
>>> f ".." "abc"
"a.b.c"
>>> f "..." "ab"
"a.b."

interleaveSepBy :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Like interleaveSepBy' but interleaving stops when and only when the second stream is exhausted. Shortfall of the infix stream is ignored and excess is discarded.

(. . .) (a b c) => a . b . c    -- additional . is discarded
(. .  ) (a b c) => a . b . c
(.    ) (a b c) => a . b c      -- missing . is ignored

Examples:

>>> f x y = Stream.toList $ Stream.interleaveSepBy (Stream.fromList x) (Stream.fromList y)
>>> f "..." "abc"
"a.b.c"
>>> f ".." "abc"
"a.b.c"
>>> f "." "abc"
"a.bc"

Scheduling

Execute streams alternately irrespective of whether they generate elements or not. Note interleave would execute a stream until it yields an element. A special case of unfoldEachRoundRobin.

roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Schedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may choose to Skip producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to async.

Do not use dynamically.

Pre-release

Merging

Interleave elements from two streams based on a condition.

mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.mergeBy otherwise.

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> s1 = Stream.fromList [1,3,5]
>>> s2 = Stream.fromList [2,4,6,8]
>>> Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2
[1,2,3,4,5,6,8]

mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeBy but with a monadic comparison function.

Example, to merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Example, merge two streams in a proportion of 2:1:

>>> :set -fno-warn-unrecognised-warning-flags
>>> :set -fno-warn-x-partial
>>> :{
do
 let s1 = Stream.fromList [1,1,1,1,1,1]
     s2 = Stream.fromList [2,2,2]
 let proportionately m n = do
      ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
      return $ \_ _ -> do
         r <- readIORef ref
         writeIORef ref $ Prelude.tail r
         return $ Prelude.head r
 f <- proportionately 2 1
 xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2
 print xs
:}
[1,1,2,1,1,2,1,1,2]

mergeMinBy :: (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeByM but stops merging as soon as any of the two streams stops.

Unimplemented

mergeFstBy :: (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeByM but stops merging as soon as the first stream stops.

Unimplemented

Combine N Streams

Functions generally ending in these shapes:

concat: f (Stream m a) -> Stream m a
concatMap: (a -> Stream m b) -> Stream m a -> Stream m b
unfoldEach: Unfold m a b -> Stream m a -> Stream m b

unfoldEach

Generate streams by using an unfold on each element of an input stream, append the resulting streams and flatten. A special case of intercalate.

unfoldEachFoldBy :: Fold m b c -> Unfold m a b -> Stream m a -> Stream m c Source #

Stream must be finite. Unfolds each element of the input stream to generate streams. After generating one element from each stream fold those using the supplied fold and emit the result in the output stream. Continue doing this until the streams are exhausted.

Unimplemented

unfoldEachInterleave :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

Like unfoldEach but interleaves the resulting streams instead of appending them. Unfolds each element in the input stream to a stream and then interleave the resulting streams.

>>> lists = Stream.fromList [[1,4,7],[2,5,8],[3,6,9]]
>>> Stream.toList $ Stream.unfoldEachInterleave Unfold.fromList lists
[1,2,3,4,5,6,7,8,9]

This is similar to mergeMapWith using interleave but an order of magnitude more efficient due to fusion.

See also mergeMapWith.

unfoldEachInterleaveRev :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

Like unfoldEachInterleave but reverses the traversal direction after reaching the last stream. This could be little bit more efficient if the order of traversal is not important.

>>> lists = Stream.fromList [[1,4,7],[2,5,8],[3,6,9]]
>>> Stream.toList $ Stream.unfoldEachInterleaveRev Unfold.fromList lists
[1,2,3,6,5,4,7,8,9]

unfoldEachRoundRobin :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

unfoldEachInterleave switches to the next stream whenever a value from a stream is yielded, it does not switch on a Skip. So if a stream keeps skipping for long time other streams won't get a chance to run. unfoldEachRoundRobin switches on Skip as well. So it basically schedules each stream fairly irrespective of whether it produces a value or not.

unfoldEach joined by elements

Like unfoldEach but intersperses an element between the streams after unfolding. A special case of intercalate.

unfoldEachSepBy :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

Definition:

>>> unfoldEachSepBy x = Stream.unfoldEachSepByM (return x)
>>> unfoldEachSepBy x = Stream.intercalateSepBy Unfold.identity (Stream.repeat x)

Usage:

>>> unwords = Stream.unfoldEachSepBy ' '

Pre-release

unfoldEachSepByM :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

Monadic variant of unfoldEachSepBy.

Definition:

>>> unfoldEachSepByM x = Stream.intercalateSepBy Unfold.identity (Stream.repeatM x)

unfoldEachEndBy :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

Definition:

>>> unfoldEachEndBy x = Stream.intercalateEndBy Unfold.identity (Stream.repeat x)

Usage:

>>> unlines = Stream.unfoldEachEndBy '\n'

Pre-release

unfoldEachEndByM :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

Monadic variant of unfoldEachEndBy.

Definition:

>>> unfoldEachEndByM x = Stream.intercalateEndBy Unfold.identity (Stream.repeatM x)

unfoldEach joined by sequences

Like unfoldEach but intersperses a sequence between the unfolded streams before unfolding. A special case of intercalate.

unfoldEachSepBySeq :: Monad m => b -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold each element of the stream, separate the successive unfolds by a sequence generated by unfolding the supplied value.

Definition:

>>> unfoldEachSepBySeq a u = Stream.unfoldEach u . Stream.intersperse a
>>> unfoldEachSepBySeq a u = Stream.intercalateSepBy u (Stream.repeat a) u

Idioms:

>>> intersperse x = Stream.unfoldEachSepBySeq x Unfold.identity
>>> unwords = Stream.unfoldEachSepBySeq " " Unfold.fromList

Usage:

>>> input = Stream.fromList ["abc", "def", "ghi"]
>>> Stream.toList $ Stream.unfoldEachSepBySeq " " Unfold.fromList input
"abc def ghi"

unfoldEachEndBySeq :: Monad m => b -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold each element of the stream, end each unfold by a sequence generated by unfolding the supplied value.

Definition:

>>> unfoldEachEndBySeq a u = Stream.unfoldEach u . Stream.intersperseEndByM a
>>> unfoldEachEndBySeq a u = Stream.intercalateEndBy u (Stream.repeat a) u

Idioms:

>>> intersperseEndByM x = Stream.unfoldEachEndBySeq x Unfold.identity
>>> unlines = Stream.unfoldEachEndBySeq "\n" Unfold.fromList

Usage:

>>> input = Stream.fromList ["abc", "def", "ghi"]
>>> Stream.toList $ Stream.unfoldEachEndBySeq "\n" Unfold.fromList input
"abc\ndef\nghi\n"

unfoldEach joined by streams

Like unfoldEach but intersperses streams between the unfolded streams.

intercalateSepBy :: Monad m => Unfold m b c -> Stream m b -> Unfold m a c -> Stream m a -> Stream m c Source #

The first stream Stream m b is turned into a stream of streams by unfolding each element using the first unfold, similarly Stream m a is also turned into a stream of streams. The second stream of streams is interspersed with the streams from the first stream in an infix manner and then the resulting stream is flattened.

You can think of this as interleaveSepBy on the stream of streams followed by concat. Same as the following but more efficient:

>>> intercalateSepBy u1 s1 u2 s2 = Stream.concat $ Stream.interleaveSepBy (fmap (Stream.unfold u1) s1) (fmap (Stream.unfold u2) s2)

If the separator stream consists of nil streams then it becomes equivalent to unfoldEach:

>>> unfoldEach = Stream.intercalateSepBy (Unfold.nilM (const (return ()))) (Stream.repeat ())

Pre-release

intercalateEndBy :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

See intercalateSepBy for detailed documentation.

You can think of this as interleaveEndBy on the stream of streams followed by concat. Same as the following but more efficient:

>>> intercalateEndBy u1 s1 u2 s2 = Stream.concat $ Stream.interleaveEndBy (fmap (Stream.unfold u1) s1) (fmap (Stream.unfold u2) s2)

Pre-release

Eliminate

Folding and Parsing chunks of streams to eliminate nested streams. Functions generally ending in these shapes:

f (Fold m a b) -> t m a -> t m b
f (Parser a m b) -> t m a -> t m b

Folding

Apply folds on a stream.

foldSequence :: Stream m (Fold m a b) -> Stream m a -> Stream m b Source #

Apply a stream of folds to an input stream and emit the results in the output stream.

Unimplemented

foldIterateM :: Monad m => (b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b Source #

Iterate a fold generator on a stream. The initial value b is used to generate the first fold, the fold is applied on the stream and the result of the fold is used to generate the next fold and so on.

Usage:

>>> import Data.Monoid (Sum(..))
>>> f x = return (Fold.take 2 (Fold.sconcat x))
>>> s = fmap Sum $ Stream.fromList [1..10]
>>> Stream.fold Fold.toList $ fmap getSum $ Stream.foldIterateM f (pure 0) s
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.

Pre-release

Parsing

Parsing is opposite to flattening. parseMany is dual to concatMap or unfoldEach concatMap generates a stream from single values in a stream and flattens, parseMany does the opposite of flattening by splitting the stream and then folds each such split to single value in the output stream.

parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

Apply a Parser repeatedly on a stream and emit the parsed values in the output stream.

Usage:

>>> s = Stream.fromList [1..10]
>>> parser = Parser.takeBetween 0 2 Fold.sum
>>> Stream.toList $ Stream.parseMany parser s
[Right 3,Right 7,Right 11,Right 15,Right 19]

This is the streaming equivalent of the many parse combinator.

Known Issues: When the parser fails there is no way to get the remaining stream.

parseSequence :: Stream m (Parser a m b) -> Stream m a -> Stream m b Source #

Apply a stream of parsers to an input stream and emit the results in the output stream.

Unimplemented

parseManyTill :: Parser a m b -> Parser a m x -> Stream m a -> Stream m b Source #

parseManyTill collect test stream tries the parser test on the input, if test fails it backtracks and tries collect, after collect succeeds test is tried again and so on. The parser stops when test succeeds. The output of test is discarded and the output of collect is emitted in the output stream. The parser fails if collect fails.

Unimplemented

parseIterate :: Monad m => (b -> Parser a m b) -> b -> Stream m a -> Stream m (Either ParseError b) Source #

Iterate a parser generating function on a stream. The initial value b is used to generate the first parser, the parser is applied on the stream and the result is used to generate the next parser and so on.

Example:

>>> import Data.Monoid (Sum(..))
>>> s = Stream.fromList [1..10]
>>> Stream.toList $ fmap getSum $ Stream.catRights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum s
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.

Pre-release

Grouping

Group segments of a stream and fold. Special case of parsing.

groupsWhile :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Keep collecting items in a group as long as the comparison function returns true. The comparison function is cmp old new where old is the first item in the group and new is the incoming item being tested for membership of the group. The collected items are folded by the supplied fold.

Definition:

>>> groupsWhile cmp f = Stream.parseMany (Parser.groupBy cmp f)

groupsRollingBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Definition:

>>> groupsRollingBy cmp f = Stream.parseMany (Parser.groupByRolling cmp f)

Splitting

A special case of parsing.

takeEndBySeq :: forall m a. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Stream m a -> Stream m a Source #

Take the stream until the supplied sequence is encountered. Take the sequence as well and stop.

Usage:

>>> f pat xs = Stream.toList $ Stream.takeEndBySeq (Array.fromList pat) $ Stream.fromList xs
>>> f "fgh" "abcdefghijk"
"abcdefgh"
>>> f "lmn" "abcdefghijk"
"abcdefghijk"
>>> f "" "abcdefghijk"
""

takeEndBySeq_ :: forall m a. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Stream m a -> Stream m a Source #

Take the stream until the supplied sequence is encountered. Do not take the sequence.

Usage:

>>> f pat xs = Stream.toList $ Stream.takeEndBySeq_ (Array.fromList pat) $ Stream.fromList xs
>>> f "fgh" "abcdefghijk"
"abcde"
>>> f "lmn" "abcdefghijk"
"abcdefghijk"
>>> f "" "abcdefghijk"
""

wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split the stream after stripping leading, trailing, and repeated separators determined by the predicate supplied. The tokens after splitting are collected by the supplied fold. In other words, the tokens are parsed in the same way as words are parsed from whitespace separated text.

>>> f x = Stream.toList $ Stream.wordsBy (== '.') Fold.toList $ Stream.fromList x
>>> f "a.b"
["a","b"]
>>> f "a..b"
["a","b"]
>>> f ".a..b."
["a","b"]

splitSepBySeq_ :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

Like splitSepBy_ but splits the stream on a sequence of elements rather than a single element. Parses a sequence of tokens separated by an infixed separator e.g. a;b;c is parsed as a, b, c. If the pattern is empty then each element is a match, thus the fold is finalized on each element.

>>> splitSepBy p xs = Stream.fold Fold.toList $ Stream.splitSepBySeq_ (Array.fromList p) Fold.toList (Stream.fromList xs)
>>> splitSepBy "" ""
[]
>>> splitSepBy "" "a...b"
["a",".",".",".","b"]
>>> splitSepBy ".." ""
[]
>>> splitSepBy ".." "a...b"
["a",".b"]
>>> splitSepBy ".." "abc"
["abc"]
>>> splitSepBy ".." ".."
["",""]
>>> splitSepBy "." ".a"
["","a"]
>>> splitSepBy "." "a."
["a",""]

Uses Rabin-Karp algorithm for substring search.

splitEndBySeq :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

Parses a sequence of tokens suffixed by a separator e.g. a;b;c; is parsed as a;, b;, c;. If the pattern is empty the input stream is returned as it is.

Equivalent to the following:

>>> splitEndBySeq pat f = Stream.foldMany (Fold.takeEndBySeq pat f)

Usage:

>>> f p = Stream.splitEndBySeq (Array.fromList p) Fold.toList
>>> splitEndBy p xs = Stream.fold Fold.toList $ f p (Stream.fromList xs)
>>> splitEndBy "" ""
[]
>>> splitEndBy "" "a...b"
["a",".",".",".","b"]
>>> splitEndBy ".." ""
[]
>>> splitEndBy ".." "a...b"
["a..",".b"]
>>> splitEndBy ".." "abc"
["abc"]
>>> splitEndBy ".." ".."
[".."]
>>> splitEndBy "." ".a"
[".","a"]
>>> splitEndBy "." "a."
["a."]

Uses Rabin-Karp algorithm for substring search.

splitEndBySeq_ :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

Like splitEndBySeq but drops the separators and returns only the tokens.

Equivalent to the following:

>>> splitEndBySeq_ pat f = Stream.foldMany (Fold.takeEndBySeq_ pat f)

Usage:

>>> f p = Stream.splitEndBySeq_ (Array.fromList p) Fold.toList
>>> splitEndBy_ p xs = Stream.fold Fold.toList $ f p (Stream.fromList xs)
>>> splitEndBy_ "" ""
[]
>>> splitEndBy_ "" "a...b"
["a",".",".",".","b"]
>>> splitEndBy_ ".." ""
[]
>>> splitEndBy_ ".." "a...b"
["a",".b"]
>>> splitEndBy_ ".." "abc"
["abc"]
>>> splitEndBy_ ".." ".."
[""]
>>> splitEndBy_ "." ".a"
["","a"]
>>> splitEndBy_ "." "a."
["a"]

Uses Rabin-Karp algorithm for substring search.

splitOnSuffixSeq :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) => Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b Source #

splitOnSuffixSeq withSep pat fld input splits the input using pat as a suffixed separator, the resulting split segments are fed to the fold fld. If withSep is True then the separator sequence is also suffixed with the split segments.

Internal

splitBeginBy_ :: (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split on a prefixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (== .) ".a.b"
["a","b"]

An empty stream results in an empty output stream: > splitOnPrefix' (== .) "" []

An empty segment consisting of only a prefix is folded to the default output of the fold:

> splitOnPrefix' (== .) "."
[""]

> splitOnPrefix' (== .) ".a.b."
["a","b",""]

> splitOnPrefix' (== .) ".a..b"
["a","","b"]

A prefix is optional at the beginning of the stream:

> splitOnPrefix' (== .) "a"
["a"]

> splitOnPrefix' (== .) "a.b"
["a","b"]

splitOnPrefix is an inverse of intercalatePrefix with a single element:

Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id

Unimplemented

splitEndBySeqOneOf :: [Array a] -> Fold m a b -> Stream m a -> Stream m b Source #

Split post any one of the given patterns.

Unimplemented

splitSepBySeqOneOf :: [Array a] -> Fold m a b -> Stream m a -> Stream m b Source #

Split on any one of the given patterns.

Unimplemented

Transform (Nested Containers)

Opposite to compact in ArrayStream

splitInnerBy :: Monad m => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

splitInnerBySuffix :: Monad m => (f a -> Bool) -> (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

Reduce By Streams

dropPrefix :: Stream m a -> Stream m a -> Stream m a Source #

Drop prefix from the input stream if present.

Space: O(1)

See also stripPrefix.

Unimplemented

dropInfix :: Stream m a -> Stream m a -> Stream m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: O(n) where n is the length of the infix.

See also stripInfix.

Unimplemented

dropSuffix :: Stream m a -> Stream m a -> Stream m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: O(n) where n is the length of the suffix.

See also stripSuffix.

Unimplemented

Deprecated

interpose :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Deprecated: Please use unfoldEachSepBy instead.

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

Definition:

>>> unfoldEachSepBy x = Stream.unfoldEachSepByM (return x)
>>> unfoldEachSepBy x = Stream.intercalateSepBy Unfold.identity (Stream.repeat x)

Usage:

>>> unwords = Stream.unfoldEachSepBy ' '

Pre-release

interposeM :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

Deprecated: Please use unfoldEachSepByM instead.

Monadic variant of unfoldEachSepBy.

Definition:

>>> unfoldEachSepByM x = Stream.intercalateSepBy Unfold.identity (Stream.repeatM x)

interposeSuffix :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Deprecated: Please use unfoldEachEndBy instead.

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

Definition:

>>> unfoldEachEndBy x = Stream.intercalateEndBy Unfold.identity (Stream.repeat x)

Usage:

>>> unlines = Stream.unfoldEachEndBy '\n'

Pre-release

interposeSuffixM :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

Deprecated: Please use unfoldEachEndByM instead.

Monadic variant of unfoldEachEndBy.

Definition:

>>> unfoldEachEndByM x = Stream.intercalateEndBy Unfold.identity (Stream.repeatM x)

gintercalate :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

Deprecated: Please use intercalateSepBy instead.

>>> gintercalate u1 s1 u2 s2 = Stream.intercalateSepBy u2 s2 u1 s1

gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

Deprecated: Please use intercalateEndBy instead. Note the change in argument order.

>>> gintercalateSuffix u1 s1 u2 s2 = Stream.intercalateEndBy u2 s2 u1 s1

intercalate :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

Deprecated: Please use unfoldEachSepBySeq instead.

intercalateSuffix :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

Deprecated: Please use unfoldEachEndBySeq instead.

unfoldInterleave :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use unfoldEachInterleaveRev instead.

Like unfoldEachInterleave but reverses the traversal direction after reaching the last stream. This could be little bit more efficient if the order of traversal is not important.

>>> lists = Stream.fromList [[1,4,7],[2,5,8],[3,6,9]]
>>> Stream.toList $ Stream.unfoldEachInterleaveRev Unfold.fromList lists
[1,2,3,6,5,4,7,8,9]

unfoldRoundRobin :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use unfoldEachRoundRobin instead.

unfoldEachInterleave switches to the next stream whenever a value from a stream is yielded, it does not switch on a Skip. So if a stream keeps skipping for long time other streams won't get a chance to run. unfoldEachRoundRobin switches on Skip as well. So it basically schedules each stream fairly irrespective of whether it produces a value or not.

interleaveMin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Deprecated: Please use flip interleaveEndBy' instead.

Like interleave but stops interleaving as soon as any of the two streams stops. The suffix Min in the name determines the stop behavior.

This is the same as interleaveEndBy' but it might emit an additional element at the end.

interleaveFst :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Deprecated: Please use flip interleaveSepBy instead.

interleaveFstSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Deprecated: Please use flip interleaveEndBy instead.

parseManyD :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

Deprecated: Please use parseMany instead.

parseIterateD :: Monad m => (b -> Parser a m b) -> b -> Stream m a -> Stream m (Either ParseError b) Source #

Deprecated: Please use parseIterate instead.

groupsBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use groupsWhile instead. Please note the change in the argument order of the comparison function.

The argument order of the comparison function in groupsWhile is different than that of groupsBy.

In groupsBy the comparison function takes the next element as the first argument and the previous element as the second argument. In groupsWhile the first argument is the previous element and second argument is the next element.

splitOnSeq :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use splitSepBySeq_ instead.

Like splitSepBy_ but splits the stream on a sequence of elements rather than a single element. Parses a sequence of tokens separated by an infixed separator e.g. a;b;c is parsed as a, b, c. If the pattern is empty then each element is a match, thus the fold is finalized on each element.

>>> splitSepBy p xs = Stream.fold Fold.toList $ Stream.splitSepBySeq_ (Array.fromList p) Fold.toList (Stream.fromList xs)
>>> splitSepBy "" ""
[]
>>> splitSepBy "" "a...b"
["a",".",".",".","b"]
>>> splitSepBy ".." ""
[]
>>> splitSepBy ".." "a...b"
["a",".b"]
>>> splitSepBy ".." "abc"
["abc"]
>>> splitSepBy ".." ".."
["",""]
>>> splitSepBy "." ".a"
["","a"]
>>> splitSepBy "." "a."
["a",""]

Uses Rabin-Karp algorithm for substring search.

Mapping

Stateless one-to-one maps.

sequence :: Monad m => Stream m (m a) -> Stream m a Source #

>>> sequence = Stream.mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
>>> Stream.fold Fold.drain $ Stream.sequence s
abc

Mapping Effects

tap :: Monad m => Fold m a b -> Stream m a -> Stream m a Source #

Tap the data flowing through a stream into a Fold. For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.

                  Fold m a b
                      |
-----stream m a ---------------stream m a-----

>>> s = Stream.enumerateFromTo 1 2
>>> Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
1
2

Compare with trace.

tapOffsetEvery :: Monad m => Int -> Int -> Fold m a b -> Stream m a -> Stream m a Source #

trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

>>> s = Stream.enumerateFromTo 1 2
>>> Stream.fold Fold.drain $ Stream.trace print s
1
2

Compare with tap.

trace_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Perform a side effect before yielding each element of the stream and discard the results.

>>> s = Stream.enumerateFromTo 1 2
>>> Stream.fold Fold.drain $ Stream.trace_ (print "got here") s
"got here"
"got here"

Same as intersperseMPrefix_ but always serial.

See also: trace

Pre-release

Folding

foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Composable Scans

postscanl :: Monad m => Scanl m a b -> Stream m a -> Stream m b Source #

Postscan a stream using the given fold. A postscan omits the initial (default) value of the accumulator and includes the final value.

>>> Stream.toList $ Stream.postscanl Scanl.latest (Stream.fromList [])
[]

Compare with scan which includes the initial value as well:

>>> Stream.toList $ Stream.scanl Scanl.latest (Stream.fromList [])
[Nothing]

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

>>> import Data.Maybe (fromJust)
>>> let avg = Scanl.teeWith (/) Scanl.sum (fmap fromIntegral Scanl.length)
>>> s = Stream.enumerateFromTo 1.0 100.0
>>> :{
 Stream.fold Fold.toList
  $ fmap (fromJust . fst)
  $ Stream.takeWhile (\(_,x) -> x <= 10)
  $ Stream.postscanl (Scanl.tee Scanl.latest avg) s
:}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]

scanl :: Monad m => Scanl m a b -> Stream m a -> Stream m b Source #

Strict left scan. Scan a stream using the given fold. Scan includes the initial (default) value of the accumulator as well as the final value. Compare with postscan which omits the initial value.

>>> s = Stream.fromList [1..10]
>>> Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scanl Scanl.sum s
[0,1,3,6]

See also: usingStateT

scanlMany :: Monad m => Scanl m a b -> Stream m a -> Stream m b Source #

Like scanl but restarts scanning afresh when the scanning fold terminates.

scanr :: Monad m => Scanr m a b -> Stream m a -> Stream m b Source #

Use a lazy right Scanr to transform a stream.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

>>> import Data.Maybe (fromJust)
>>> let avg = Scanr.teeWith (/) Scanr.sum (fmap fromIntegral Scanr.length)
>>> s = Stream.enumerateFromTo 1.0 100.0
>>> :{
 Stream.fold Fold.toList
  $ fmap fst
  $ Stream.takeWhile (\(_,x) -> x <= 10)
  $ Stream.scanr (Scanr.tee Scanr.identity avg) s
:}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]

pipe :: Monad m => Pipe m a b -> Stream m a -> Stream m b Source #

Use a Pipe to transform a stream.

Splitting

splitSepBy_ :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split on an infixed separator element, dropping the separator. The supplied Fold is applied on the split segments. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments:

Definition:

Usage:

>>> splitOn p xs = Stream.fold Fold.toList $ Stream.splitSepBy_ p Fold.toList (Stream.fromList xs)
>>> splitOn (== '.') "a.b"
["a","b"]

Splitting an empty stream results in an empty stream i.e. zero splits:

>>> splitOn (== '.') ""
[]

If the stream does not contain the separator then it results in a single split:

>>> splitOn (== '.') "abc"
["abc"]

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

>>> splitOn (== '.') "."
["",""]
>>> splitOn (== '.') ".a"
["","a"]
>>> splitOn (== '.') "a."
["a",""]
>>> splitOn (== '.') "a..b"
["a","","b"]

splitSepBy_ is an inverse of unfoldEachSepBy:

Stream.unfoldEachSepBy '.' Unfold.fromList . Stream.splitSepBy_ (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitSepBy_ (== '.') Fold.toList . Stream.unfoldEachSepBy '.' Unfold.fromList === id

Ad-hoc Scans

Left scans. Stateful, mostly one-to-one maps.

scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

Like scanl' but with a monadic step function and a monadic seed.

scanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b Source #

scanlMAfter' accumulate initial done stream is like scanlM' except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

Pre-release

scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

Strict left scan. Like map, scanl' too is a one to one transformation, however it adds an extra element.

>>> Stream.toList $ Stream.scanl' (+) 0 $ Stream.fromList [1,2,3,4]
[0,1,3,6,10]
>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
[[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of scanl' is the initial value of the accumulator followed by all the intermediate steps and the final result of foldl'.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product of the elements in a stream in one go using a foldl':

>>> Stream.fold (Fold.foldl' (\(s, p) x -> (s + x, p * x)) (0,1)) $ Stream.fromList [1,2,3,4]
(10,24)

Using scanl' we can make it modular by computing the sum in the first stage and passing it down to the next stage for computing the product:

>>> :{
  Stream.fold (Fold.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1))
  $ Stream.scanl' (\(s, _) x -> (s + x, x)) (0,1)
  $ Stream.fromList [1,2,3,4]
:}
(10,24)

IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.

>>> scanl' step z = Stream.scanl (Scanl.mkScanl step z)
>>> scanl' f z xs = Stream.scanlM' (\a b -> return (f a b)) (return z) xs

See also: usingStateT

scanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

scanlBy :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a Source #

Like scanl1' but with a monadic step function.

scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a Source #

Like scanl' but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty.

>>> Stream.toList $ Stream.scanl1' (+) $ Stream.fromList [1,2,3,4]
[1,3,6,10]

scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a Source #

scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a Source #

prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

postscanlBy :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a Source #

postscanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a Source #

postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

postscanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b Source #

postscanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b Source #

postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #

scanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #

scanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b Source #

Filtering

Produce a subset of the stream.

with :: Monad m => (Stream m a -> Stream m (s, a)) -> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)) -> ((s, a) -> b) -> Stream m a -> Stream m a Source #

Modify a Stream m a -> Stream m a stream transformation that accepts a predicate (a -> b) to accept ((s, a) -> b) instead, provided a transformation Stream m a -> Stream m (s, a). Convenient to filter with index or time.

>>> filterWithIndex = Stream.with Stream.indexed Stream.filter

Pre-release

postscanlMaybe :: Monad m => Scanl m a (Maybe b) -> Stream m a -> Stream m b Source #

Use a filtering scan on a stream.

>>> postscanlMaybe f = Stream.catMaybes . Stream.postscanl f

filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

Include only those elements that pass a predicate.

>>> filter p = Stream.filterM (return . p)
>>> filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
>>> filter p = Stream.postscanlMaybe (Scanl.filtering p)

filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as filter but with a monadic predicate.

>>> f p x = p x >>= \r -> return $ if r then Just x else Nothing
>>> filterM p = Stream.mapMaybeM (f p)

deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

>>> input = Stream.fromList [1,3,3,5]
>>> Stream.fold Fold.toList $ Stream.deleteBy (==) 3 input
[1,3,5]

uniqBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a Source #

Drop repeated elements that are adjacent to each other using the supplied comparison function.

>>> uniq = Stream.uniqBy (==)

To strip duplicate path separators:

>>> input = Stream.fromList "//a//b"
>>> f x y = x == '/' && y == '/'
>>> Stream.fold Fold.toList $ Stream.uniqBy f input
"/a/b"

Space: O(1)

Pre-release

uniq :: (Eq a, Monad m) => Stream m a -> Stream m a Source #

Drop repeated elements that are adjacent to each other.

>>> uniq = Stream.uniqBy (==)

prune :: (a -> Bool) -> Stream m a -> Stream m a Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

> prune p = Stream.dropWhileAround p $ Stream.uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList "  hello      world!   ")
"hello world!"

Space: O(1)

Unimplemented

repeated :: Stream m a -> Stream m a Source #

Emit only repeated elements, once.

Unimplemented

Sampling

Value agnostic filtering.

sampleFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a Source #

sampleFromThen offset stride takes the element at offset index and then every element at strides of stride.

>>> Stream.fold Fold.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
[2,5,8]

Trimming

Produce a subset of the stream trimmed at ends.

initNonEmpty :: Monad m => Stream m a -> Stream m a Source #

init for non-empty streams, fails for empty stream case.

tailNonEmpty :: Monad m => Stream m a -> Stream m a Source #

tail for non-empty streams, fails for empty stream case.

See also tail for a non-partial version of this function..

drop :: Monad m => Int -> Stream m a -> Stream m a Source #

Discard first n elements from the stream and take the rest.

dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as dropWhile but with a monadic predicate.

Trimming from end

RingArray array based or buffering operations.

takeWhileLast :: (a -> Bool) -> Stream m a -> Stream m a Source #

Take all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements taken.

Unimplemented

takeWhileAround :: (a -> Bool) -> Stream m a -> Stream m a Source #

Like takeWhile and takeWhileLast combined.

O(n) space, where n is the number elements taken from the end.

Unimplemented

dropLast :: Int -> Stream m a -> Stream m a Source #

Drop n elements at the end of the stream.

O(n) space, where n is the number elements dropped.

Unimplemented

dropWhileLast :: (a -> Bool) -> Stream m a -> Stream m a Source #

Drop all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements dropped.

Unimplemented

dropWhileAround :: (a -> Bool) -> Stream m a -> Stream m a Source #

Like dropWhile and dropWhileLast combined.

O(n) space, where n is the number elements dropped from the end.

Unimplemented

Inserting Elements

Produce a superset of the stream. Value agnostic insertion.

intersperse :: Monad m => a -> Stream m a -> Stream m a Source #

Insert a pure value between successive elements of a stream. It does nothing if stream has less than two elements.

Definition:

>>> intersperse x = Stream.intersperseM (return x)
>>> intersperse x = Stream.unfoldEachSepBy x Unfold.identity
>>> intersperse x = Stream.unfoldEachSepBySeq x Unfold.identity
>>> intersperse x = Stream.interleaveSepBy (Stream.repeat x)

Example:

>>> f x y = Stream.toList $ Stream.intersperse x $ Stream.fromList y
>>> f ',' "abc"
"a,b,c"
>>> f ',' "a"
"a"

intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #

Effectful variant of intersperse. Insert an effect and its output between successive elements of a stream. It does nothing if stream has less than two elements.

Definition:

>>> intersperseM x = Stream.interleaveSepBy (Stream.repeatM x)

intersperseEveryM :: Int -> m a -> Stream m a -> Stream m a Source #

Intersperse a monadic action into the input stream after every n elements.

Definition:

> intersperseEveryM n x = Stream.interleaveEverySepBy n (Stream.repeatM x)

Idioms:

>>> intersperseM = Stream.intersperseEveryM 1
>>> intersperse x = Stream.intersperseEveryM 1 (return x)

Usage:

> input = Stream.fromList "hello"
> Stream.toList $ Stream.intersperseEveryM 2 (return ',') input

"he,ll,o"

Unimplemented

intersperseEndByM :: forall m a. Monad m => m a -> Stream m a -> Stream m a Source #

Insert an effect and its output after every element of a stream.

Definition:

>>> intersperseEndByM x = Stream.interleaveEndBy (Stream.repeatM x)

Usage:

>>> f x y = Stream.toList $ Stream.intersperseEndByM (pure x) $ Stream.fromList y
>>> f ',' "abc"
"a,b,c,"
>>> f ',' "a"
"a,"

Pre-release

intersperseEndByEveryM :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a Source #

Like intersperseEndByM but intersperses an effectful action into the input stream after every n elements and also after the last element.

Example:

>>> input = Stream.fromList "hello"
>>> Stream.toList $ Stream.intersperseEndByEveryM 2 (return ',') input
"he,ll,o,"
>>> f n x y = Stream.toList $ Stream.intersperseEndByEveryM n (pure x) $ Stream.fromList y
>>> f 2 ',' "abcdef"
"ab,cd,ef,"
>>> f 2 ',' "abcdefg"
"ab,cd,ef,g,"
>>> f 2 ',' "a"
"a,"

Pre-release

insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #

insertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp.

>>> insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)
>>> input = Stream.fromList [1,3,5]
>>> Stream.fold Fold.toList $ Stream.insertBy compare 2 input
[1,2,3,5]

Inserting Side Effects

intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Perform a side effect between two successive elements of a stream. It does nothing if the stream has less than two elements.

>>> f x y = Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ x $ Stream.fromList y
>>> f (putChar '.') "abc"
a.b.c
>>> f (putChar '.') "a"
a

Pre-release

intersperseEndByM_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Insert an effect after every element of a stream.

Example:

>>> f x y = Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseEndByM_ x $ Stream.fromList y
>>> f (putChar '.') "abc"
a.b.c.
>>> f (putChar '.') "a"
a.

Pre-release

intersperseBeginByM_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Insert a side effect before every element of a stream.

Definition:

>>> intersperseBeginByM_ = Stream.trace_
>>> intersperseBeginByM_ m = Stream.mapM (\x -> void m >> return x)

Usage:

>>> f x y = Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseBeginByM_ x $ Stream.fromList y
>>> f (putChar '.') "abc"
.a.b.c

Same as trace_.

Pre-release

delay :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds between elements of the stream.

Definition:

>>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>> delay = Stream.intersperseM_ . sleep

Example:

>>> input = Stream.enumerateFromTo 1 3
>>> Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
1
2
3

delayPre :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds before consuming an element of a stream.

Definition:

>>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>> delayPre = Stream.intersperseBeginByM_ . sleep

Example:

>>> input = Stream.enumerateFromTo 1 3
>>> Stream.fold (Fold.drainMapM print) $ Stream.delayPre 1 input
1
2
3

Pre-release

delayPost :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds after consuming an element of a stream.

Definition:

>>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>> delayPost = Stream.intersperseEndByM_ . sleep

Example:

>>> input = Stream.enumerateFromTo 1 3
>>> Stream.fold (Fold.drainMapM print) $ Stream.delayPost 1 input
1
2
3

Pre-release

Reordering

Produce strictly the same set but reordered.

reverse :: Monad m => Stream m a -> Stream m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

Definition:

>>> reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList

reverseUnbox :: (MonadIO m, Unbox a) => Stream m a -> Stream m a Source #

Like reverse but several times faster, requires an Unbox instance.

O(n) space

Pre-release

reassembleBy :: Fold m a b -> (a -> a -> Int) -> Stream m a -> Stream m b Source #

Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.

Unimplemented

Position Indexing

indexed :: Monad m => Stream m a -> Stream m (Int, a) Source #

>>> f = Scanl.mkScanl (\(i, _) x -> (i + 1, x)) (-1,undefined)
>>> indexed = Stream.postscanl f
>>> indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
>>> indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed

Pair each element in a stream with its index, starting from index 0.

>>> Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]

indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a) Source #

>>> f n = Scanl.mkScanl (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
>>> indexedR n = Stream.postscanl (f n)
>>> s n = Stream.enumerateFromThen n (n - 1)
>>> indexedR n = Stream.zipWith (,) (s n)

Pair each element in a stream with its index, starting from the given index n and counting down.

>>> Stream.fold Fold.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]

Time Indexing

timestampWith :: MonadIO m => Double -> Stream m a -> Stream m (AbsTime, a) Source #

Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.

>>> Stream.fold Fold.toList $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
[(AbsTime (TimeSpec {sec = ..., nsec = ...}),1),(AbsTime (TimeSpec {sec = ..., nsec = ...}),2),(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)]

Pre-release

timeIndexWith :: MonadIO m => Double -> Stream m a -> Stream m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.

>>> Stream.fold Fold.toList $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
[(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]

Pre-release

timeIndexed :: MonadIO m => Stream m a -> Stream m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.

>>> Stream.fold Fold.toList $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
[(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]

Pre-release

Searching

findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

>>> findIndices p = Stream.postscanlMaybe (Scanl.findIndices p)

elemIndices :: (Monad m, Eq a) => a -> Stream m a -> Stream m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

>>> elemIndices a = Stream.findIndices (== a)

Rolling map

Map using the previous element.

rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b Source #

Apply a function on every two successive elements of a stream. The first argument of the map function is the previous element and the second argument is the current element. When the current element is the first element, the previous element is Nothing.

Pre-release

rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b Source #

Like rollingMap but with an effectful map function.

Pre-release

rollingMap2 :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b Source #

Like rollingMap but requires at least two elements in the stream, returns an empty stream otherwise.

This is the stream equivalent of the list idiom zipWith f xs (tail xs).

Pre-release

Maybe Streams

mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Equivalent to:

>>> mapMaybe f = Stream.catMaybes . fmap f

mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #

Like mapMaybe but maps a monadic function.

Equivalent to:

>>> mapMaybeM f = Stream.catMaybes . Stream.mapM f
>>> mapM f = Stream.mapMaybeM (\x -> Just <$> f x)

catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a Source #

In a stream of Maybes, discard Nothings and unwrap Justs.

>>> catMaybes = Stream.mapMaybe id
>>> catMaybes = fmap fromJust . Stream.filter isJust

Pre-release

Either Streams

catLefts :: Monad m => Stream m (Either a b) -> Stream m a Source #

Discard Rights and unwrap Lefts in an Either stream.

>>> catLefts = fmap (fromLeft undefined) . Stream.filter isLeft

Pre-release

catRights :: Monad m => Stream m (Either a b) -> Stream m b Source #

Discard Lefts and unwrap Rights in an Either stream.

>>> catRights = fmap (fromRight undefined) . Stream.filter isRight

Pre-release

catEithers :: Monad m => Stream m (Either a a) -> Stream m a Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

>>> catEithers = fmap (either id id)

Pre-release

Deprecated

postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use postscanl instead

scan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use scanl instead

scanMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use scanlMany instead

scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b Source #

Deprecated: Use postscanlMaybe instead

intersperseMSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a Source #

Deprecated: Please use intersperseEndByM instead.

Insert an effect and its output after every element of a stream.

Definition:

>>> intersperseEndByM x = Stream.interleaveEndBy (Stream.repeatM x)

Usage:

>>> f x y = Stream.toList $ Stream.intersperseEndByM (pure x) $ Stream.fromList y
>>> f ',' "abc"
"a,b,c,"
>>> f ',' "a"
"a,"

Pre-release

intersperseMSuffixWith :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a Source #

Deprecated: Please use intersperseEndByEveryM instead.

Like intersperseEndByM but intersperses an effectful action into the input stream after every n elements and also after the last element.

Example:

>>> input = Stream.fromList "hello"
>>> Stream.toList $ Stream.intersperseEndByEveryM 2 (return ',') input
"he,ll,o,"
>>> f n x y = Stream.toList $ Stream.intersperseEndByEveryM n (pure x) $ Stream.fromList y
>>> f 2 ',' "abcdef"
"ab,cd,ef,"
>>> f 2 ',' "abcdefg"
"ab,cd,ef,g,"
>>> f 2 ',' "a"
"a,"

Pre-release

intersperseMSuffix_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Deprecated: Please use intersperseEndByM_ instead.

Insert an effect after every element of a stream.

Example:

>>> f x y = Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseEndByM_ x $ Stream.fromList y
>>> f (putChar '.') "abc"
a.b.c.
>>> f (putChar '.') "a"
a.

Pre-release

intersperseMPrefix_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Deprecated: Please use intersperseBeginByM_ instead.

Insert a side effect before every element of a stream.

Definition:

>>> intersperseBeginByM_ = Stream.trace_
>>> intersperseBeginByM_ m = Stream.mapM (\x -> void m >> return x)

Usage:

>>> f x y = Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseBeginByM_ x $ Stream.fromList y
>>> f (putChar '.') "abc"
.a.b.c

Same as trace_.

Pre-release

strideFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a Source #

Deprecated: Please use sampleFromThen instead.

sampleFromThen offset stride takes the element at offset index and then every element at strides of stride.

>>> Stream.fold Fold.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
[2,5,8]

splitOn :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use splitSepBy_ instead. Note the difference in behavior on splitting empty stream.

Straight Joins

These are set-like operations but not exactly set operations because streams are not necessarily sets, they may have duplicated elements. These operations are generic i.e. they work on streams of unconstrained types, therefore, they have quadratic performance characterstics. For better performance using Set or Map structures see the Streamly.Internal.Data.Stream.Container module.

intersectBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #

intersectBy returns a subsequence of the first stream which intersects with the second stream. Note that this is not a commutative operation unlike a set intersection, because of duplicate elements in the stream the order of the streams matters. This is similar to intersectBy. Note that intersectBy is a special case of innerJoin.

>>> f s1 s2 = Stream.fold Fold.toList $ Stream.intersectBy (==) (Stream.fromList s1) (Stream.fromList s2)
>>> f [1,3,4,4,5] [2,3,4,5,5]
[3,4,4,5]

First stream can be infinite, the second stream must be finite and must be capable of multiple evaluations.

Space: O(n) where n is the number of elements in the second stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

deleteFirstsBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #

Returns a subsequence of the first stream, deleting first occurrences of those elements that are present in the second stream. Note that this is not a commutative operation. This is similar to the deleteFirstsBy.

>>> f xs ys = Stream.fold Fold.toList $ Stream.deleteFirstsBy (==) (Stream.fromList xs) (Stream.fromList ys)
>>> f [1,2,2,3,3,5] [1,2,2,3,4]
[3,5]

The following holds:

deleteFirstsBy (==) (Stream.ordNub s2 `append` s1) s2 === s1
deleteFirstsBy (==) (Stream.ordNub s2 `interleave` s1) s2 === s1

First stream can be infinite, second stream must be finite.

Space: O(m) where m is the number of elements in the first stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

unionBy :: MonadIO m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #

Returns the first stream appended with those unique elements from the second stream that are not already present in the first stream. Note that this is not a commutative operation unlike a set union, argument order matters. The behavior is similar to unionBy.

Equivalent to the following except that s2 is evaluated only once:

>>> unionBy eq s1 s2 = s1 `Stream.append` Stream.deleteFirstsBy eq s1 (Stream.ordNub s2)

Example:

>>> f s1 s2 = Stream.fold Fold.toList $ Stream.unionBy (==) (Stream.fromList s1) (Stream.fromList s2)
>>> f [1,2,2,4] [1,1,2,3,3]
[1,2,2,4,3]

First stream can be infinite, but second stream must be finite. Note that if the first stream is infinite the union means just the first stream. Thus union is useful only when both streams are finite. See sortedUnionBy where union can work on infinite streams if they are sorted.

Space: O(n)

Time: O(m x n)

Pre-release

sortedIntersectBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like intersectBy but assumes that the input streams are sorted in ascending order. To use it on streams sorted in descending order pass an inverted comparison function returning GT for less than and LT for greater than.

Both streams can be infinite.

Space: O(1)

Time: O(m+n)

Pre-release

sortedDeleteFirstsBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

A more efficient deleteFirstsBy for streams sorted in ascending order.

Both streams can be infinite.

Space: O(1)

Unimplemented

sortedUnionBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

A more efficient unionBy for sorted streams.

Note that the behavior is different from unionBy. In unionBy we append the unique elements from second stream only after exhausting the first one whereas in sorted streams we can determine unique elements early even when we are going through the first stream. Thus the result is an interleaving of the two streams, merging those elements from the second stream that are not present in the first.

Space: O(1)

Both streams can be infinite.

Unimplemented

Cross Joins

innerJoin :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b) Source #

Like cross but emits only those tuples where a == b using the supplied equality predicate. This is essentially a cross intersection of two streams.

Definition:

>>> innerJoin eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2

The second (inner) stream must be finite. Moreover, it must be either pure or capable of multiple evaluations. If not then the caller should cache it in an Array, if the type does not have an Unbox instance then use the Generic Array. Convert the array to stream before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

If you care about performance this function should be your last choice among all inner joins. innerJoin is a much faster fused alternative. innerSortedJoin is a faster alternative when streams are sorted. innerOrdJoin is an order of magnitude faster alternative when the type has an Ord instance.

Note: Conceptually, this is a commutative operation. Result includes all the elements from the left and the right stream. The order of streams can be changed without affecting results, except for the ordering within the tuple.

Time: O(m x n)

Pre-release

innerSortedJoin :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b) Source #

A more efficient innerJoin for sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

leftSortedJoin :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b) Source #

A more efficient leftJoin for sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

outerSortedJoin :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b) Source #

A more efficient outerJoin for sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

Deduplication

ordNub :: (Monad m, Ord a) => Stream m a -> Stream m a Source #

nub specialized to Ord types for better performance. Returns a subsequence of the stream removing any duplicate elements.

The memory used is proportional to the number of unique elements in the stream. One way to limit the memory is to use take on the resulting stream to limit the unique elements in the stream.

Joins

leftJoin :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b) Source #

Like innerJoin but emits (a, Just b) whenever a and b are equal, for those a's that are not equal to any b emits (a, Nothing).

This is a generalization of innerJoin to include all elements from the left stream and not just those which have an equal in the right stream. This is not a commutative operation, the order of the stream arguments matters.

All the caveats mentioned in innerJoin apply here as well. Right join is not provided because it is just a flipped left join:

>>> rightJoin eq = flip (Stream.leftJoin eq)

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

Unimplemented

outerJoin :: MonadIO m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b) Source #

Like leftJoin but emits a (Just a, Just b). Like leftJoin, for those a's that are not equal to any b emit (Just a, Nothing), but additionally, for those b's that are not equal to any a emit (Nothing, Just b).

This is a generalization of left join to include all the elements from the right stream as well, in other words it is a combination of left and right joins. This is a commutative operation. The order of stream arguments can be changed without affecting results, except for the ordering of elements in the resulting tuple.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m x n)

Pre-release

Ord Joins

innerOrdJoin :: (Monad m, Ord k) => Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b) Source #

innerJoin specialized to Ord types for better performance.

If the input streams have duplicate keys, the behavior is undefined.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

Pre-release

leftOrdJoin :: (Ord k, Monad m) => Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b) Source #

leftJoin specialized to Ord types for better performance.

Space: O(n)

Time: O(m + n)

Pre-release

outerOrdJoin :: (Ord k, MonadIO m) => Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b) Source #

outerJoin specialized to Ord types for better performance.

Space: O(m + n)

Time: O(m + n)

Pre-release