Loading...

Streamly.Data.StreamK

Streams represented as chains of function calls using Continuation Passing Style (CPS), suitable for dynamically and recursively composing potentially large number of streams. The K in StreamK stands for Kontinuation.

Unlike the statically fused operations in Streamly.Data.Stream, StreamK operations are less efficient, involving a function call overhead for each element, but they exhibit linear O(n) time complexity wrt to the number of stream compositions. Therefore, they are suitable for dynamically composing streams e.g. appending potentially infinite streams in recursive loops. While fused streams can be used efficiently to process elements as small as a single byte, CPS streams are typically used on bigger chunks of data to avoid the larger overhead per element.

In addition to the combinators in this module, you can use operations from Streamly.Data.Stream for StreamK as well by converting StreamK to Stream (toStream), and vice-versa (fromStream). Please refer to Streamly.Internal.Data.StreamK for more functions that have not yet been released.

For documentation see the corresponding combinators in Streamly.Data.Stream. Documentation has been omitted in this module unless there is a difference worth mentioning or if the combinator does not exist in Streamly.Data.Stream.

Overview

StreamK can be constructed like lists, except that they use nil instead of '[]' and cons instead of :.

>>> import Streamly.Data.StreamK (StreamK, cons, consM, nil)

cons constructs a stream from pure values:

>>> stream = 1 `cons` 2 `cons` nil :: StreamK IO Int

Operations from Streamly.Data.Stream can be used for StreamK as well by converting StreamK to Stream (toStream), and vice-versa (fromStream).

>>> Stream.fold Fold.toList $ StreamK.toStream stream -- IO [Int]
[1,2]

Stream can also be constructed from effects not just pure values:

>>> effect n = print n >> return n
>>> stream = effect 1 `consM` effect 2 `consM` nil
>>> Stream.fold Fold.toList $ StreamK.toStream stream
1
2
[1,2]

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> import Control.Concurrent (threadDelay)
>>> import Data.Function (fix, (&))
>>> import Data.Semigroup (cycle1)
>>> effect n = print n >> return n
>>> import Streamly.Data.StreamK (StreamK)
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.FileSystem.DirIO as Dir

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.FileSystem.Path as Path
>>> import qualified Streamly.Internal.Data.StreamK as StreamK
>>> import qualified Streamly.Internal.FileSystem.DirIO as Dir

Type

data StreamK m a Source #

Instances
Instances details
(Foldable m, Monad m) => Foldable (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 Source #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 Source #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 Source #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b Source #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b Source #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b Source #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b Source #

foldr1 :: (a -> a -> a) -> StreamK m a -> a Source #

foldl1 :: (a -> a -> a) -> StreamK m a -> a Source #

toList :: StreamK m a -> [a] Source #

null :: StreamK m a -> Bool Source #

length :: StreamK m a -> Int Source #

elem :: Eq a => a -> StreamK m a -> Bool Source #

maximum :: Ord a => StreamK m a -> a Source #

minimum :: Ord a => StreamK m a -> a Source #

sum :: Num a => StreamK m a -> a Source #

product :: Num a => StreamK m a -> a Source #

Traversable (StreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) Source #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) Source #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) Source #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) Source #

Monad m => Functor (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b Source #

(<$) :: a -> StreamK m b -> StreamK m a Source #

a ~ Char => IsString (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: StreamK m a Source #

mappend :: StreamK m a -> StreamK m a -> StreamK m a Source #

mconcat :: [StreamK m a] -> StreamK m a Source #

Semigroup (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a Source #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a Source #

stimes :: Integral b => b -> StreamK m a -> StreamK m a Source #

IsList (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (StreamK Identity a) Source #

Read a => Read (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a

CrossStreamK

data CrossStreamK m a Source #

A newtype wrapper for the StreamK type adding a cross product style monad instance.

A Monad bind behaves like a for loop:

>>> :{
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do
    x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2]
    -- Perform the following actions for each x in the stream
    return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do
    x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2]
    y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [3,4]
    -- Perform the following actions for each x, for each y
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]
Instances
Instances details
MonadTrans CrossStreamK Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

lift :: Monad m => m a -> CrossStreamK m a Source #

MonadIO m => MonadIO (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

liftIO :: IO a -> CrossStreamK m a Source #

(Foldable m, Monad m) => Foldable (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => CrossStreamK m m0 -> m0 Source #

foldMap :: Monoid m0 => (a -> m0) -> CrossStreamK m a -> m0 Source #

foldMap' :: Monoid m0 => (a -> m0) -> CrossStreamK m a -> m0 Source #

foldr :: (a -> b -> b) -> b -> CrossStreamK m a -> b Source #

foldr' :: (a -> b -> b) -> b -> CrossStreamK m a -> b Source #

foldl :: (b -> a -> b) -> b -> CrossStreamK m a -> b Source #

foldl' :: (b -> a -> b) -> b -> CrossStreamK m a -> b Source #

foldr1 :: (a -> a -> a) -> CrossStreamK m a -> a Source #

foldl1 :: (a -> a -> a) -> CrossStreamK m a -> a Source #

toList :: CrossStreamK m a -> [a] Source #

null :: CrossStreamK m a -> Bool Source #

length :: CrossStreamK m a -> Int Source #

elem :: Eq a => a -> CrossStreamK m a -> Bool Source #

maximum :: Ord a => CrossStreamK m a -> a Source #

minimum :: Ord a => CrossStreamK m a -> a Source #

sum :: Num a => CrossStreamK m a -> a Source #

product :: Num a => CrossStreamK m a -> a Source #

Traversable (CrossStreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monad m => Applicative (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

pure :: a -> CrossStreamK m a Source #

(<*>) :: CrossStreamK m (a -> b) -> CrossStreamK m a -> CrossStreamK m b Source #

liftA2 :: (a -> b -> c) -> CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m c Source #

(*>) :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b Source #

(<*) :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m a Source #

Monad m => Functor (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> CrossStreamK m a -> CrossStreamK m b Source #

(<$) :: a -> CrossStreamK m b -> CrossStreamK m a Source #

Monad m => Monad (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(>>=) :: CrossStreamK m a -> (a -> CrossStreamK m b) -> CrossStreamK m b Source #

(>>) :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b Source #

return :: a -> CrossStreamK m a Source #

MonadThrow m => MonadThrow (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

throwM :: Exception e => e -> CrossStreamK m a Source #

a ~ Char => IsString (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (CrossStreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Semigroup (CrossStreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

IsList (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (CrossStreamK Identity a) Source #

Read a => Read (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

unCross :: CrossStreamK m a -> StreamK m a Source #

Unwrap the StreamK type from CrossStreamK newtype.

This is a type level operation with no runtime overhead.

mkCross :: StreamK m a -> CrossStreamK m a Source #

Wrap the StreamK type in a CrossStreamK newtype to enable cross product style applicative and monad instances.

This is a type level operation with no runtime overhead.

Construction

Primitives

Primitives to construct a stream from pure values or monadic actions. All other stream construction and generation combinators described later can be expressed in terms of these primitives. However, the special versions provided in this module can be much more efficient in some cases. Users can create custom combinators using these primitives.

nil :: StreamK m a Source #

A stream that terminates without producing any output or side effect.

>>> Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]

nilM :: Applicative m => m b -> StreamK m a Source #

A stream that terminates without producing any output, but produces a side effect.

>>> Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil"
[]

Pre-release

cons :: a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add a pure value at the head of an existing stream:

>>> s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>> Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]

Unlike Streamly.Data.Stream cons StreamK cons can be used recursively:

>>> repeat x = let xs = StreamK.cons x xs in xs
>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

cons is same as the following but more efficient:

>>> cons x xs = return x `StreamK.consM` xs

consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add an effectful value at the head of an existing stream::

>>> s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>> Stream.fold Fold.drain (StreamK.toStream s)
hello
world

It can be used efficiently with foldr:

>>> fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil

Same as the following but more efficient:

>>> consM x xs = StreamK.fromEffect x `StreamK.append` xs

From Values

fromPure :: a -> StreamK m a Source #

fromEffect :: Monad m => m a -> StreamK m a Source #

From Stream

Please note that Stream type does not observe any exceptions from the consumer of the stream whereas StreamK does.

fromStream :: Monad m => Stream m a -> StreamK m a Source #

Convert a fused Stream to StreamK.

For example:

>>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
>>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
[1,2,3,4]

toStream :: Applicative m => StreamK m a -> Stream m a Source #

Convert a StreamK to a fused Stream.

From Containers

fromFoldable :: Foldable f => f a -> StreamK m a Source #

>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

Construct a stream from a Foldable containing pure values:

Elimination

Primitives

uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) Source #

drain :: Monad m => StreamK m a -> m () Source #

Parsing

parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #

Run a ParserK over a StreamK. Please use parseChunks where possible, for better performance.

parseBreak :: forall m a b. Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #

Similar to parseBreak but works on singular elements.

parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Run a ParserK over a chunked StreamK and return the parse result and the remaining Stream.

parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

Transformation

mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b Source #

dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a Source #

take :: Int -> StreamK m a -> StreamK m a Source #

filter :: (a -> Bool) -> StreamK m a -> StreamK m a Source #

Combining Two Streams

Unlike the operations in Streamly.Data.Stream, these operations can be used to dynamically compose large number of streams e.g. using the concatMapWith and mergeMapWith operations. They have a linear O(n) time complexity wrt to the number of streams being composed.

Appending

append :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Unlike Streamly.Data.Stream append StreamK append can be used recursively:

>>> cycle xs = let ys = xs `StreamK.append` ys in ys

Interleaving

interleave :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Note: When joining many streams in a left associative manner earlier streams will get exponential priority than the ones joining later. Because of exponentially high weighting of left streams it can be used with concatMapWith even on a large number of streams.

Merging

mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Merging of n streams can be performed by combining the streams pair wise using mergeMapWith to give O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Zipping

zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Zipping of n streams can be performed by combining the streams pair wise using mergeMapWith with O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Cross Product

crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Definition:

>>> crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2

Note that the second stream is evaluated multiple times.

Stream of streams

Some useful idioms:

>>> concatFoldableWith f = Prelude.foldr f StreamK.nil
>>> concatMapFoldableWith f g = Prelude.foldr (f . g) StreamK.nil
>>> concatForFoldableWith f xs g = Prelude.foldr (f . g) StreamK.nil xs

concatEffect :: Monad m => m (StreamK m a) -> StreamK m a Source #

concatMap :: (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function.

For example, interleaving n streams in a left biased manner:

>>> fromList = StreamK.fromStream . Stream.fromList
>>> toList = Stream.toList . StreamK.toStream
>>> lists = fromList [[1,5],[2,6],[3,7],[4,8]]
>>> toList $ StreamK.concatMapWith StreamK.interleave fromList lists
[1,2,5,3,6,4,7,8]

For a fair interleaving example see mergeMapWith.

mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.

For example, sorting a stream using merge sort:

>>> fromList = StreamK.fromStream . Stream.fromList
>>> toList = Stream.toList . StreamK.toStream
>>> generate = StreamK.fromPure
>>> combine = StreamK.mergeBy compare
>>> toList $ StreamK.mergeMapWith combine generate (fromList [5,1,7,9,2])
[1,2,5,7,9]

Interleaving n streams in a balanced manner:

>>> lists = fromList [[1,4,7],[2,5,8],[3,6,9]]
>>> toList $ StreamK.mergeMapWith StreamK.interleave fromList lists
[1,3,2,6,4,9,5,7,8]

See unfoldEachInterleave for a much faster fused version of the above example.

Note that if the stream length is not a power of 2, the binary tree composed by mergeMapWith is not balanced, which may or may not be important depending on what you are trying to achieve. This also explains the order of the output in the interleaving example above.

Caution: the stream of streams must be finite

Pre-release

Buffered Operations

sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #

Sort the input stream using a supplied comparison function.

Sorting can be achieved by simply:

>>> sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure

However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.

O(n) space

Exceptions

Please note that Stream type does not observe any exceptions from the consumer of the stream whereas StreamK does.

handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #

Like Streamly.Data.Stream.handle but with one significant difference, this function observes exceptions from the consumer of the stream as well.

You can also convert StreamK to Stream and use exception handling from Stream module:

>>> handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)

Resource Management

Please note that Stream type does not observe any exceptions from the consumer of the stream whereas StreamK does.

bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #

Like Streamly.Data.Stream.bracketIO but with one significant difference, this function observes exceptions from the consumer of the stream as well. Therefore, it cleans up the resource promptly when the consumer encounters an exception.

You can also convert StreamK to Stream and use resource handling from Stream module:

>>> bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)