Streams represented as chains of functions calls using Continuation Passing Style (CPS), suitable for dynamically composing potentially large number of streams.

Unlike the statically fused operations in Streamly.Data.Stream, StreamK
operations are less efficient, involving a function call overhead for each
element, but they exhibit linear O(n) time complexity wrt to the number of
stream compositions. Therefore, they are suitable for dynamically composing
streams e.g. appending potentially infinite streams in recursive loops.
While fused streams can be used to efficiently process elements as small as
a single byte, CPS streams are typically used on bigger chunks of data to
avoid the larger overhead per element. For more details See the ```
Stream vs
StreamK
```

section in the Streamly.Data.Stream module.

In addition to the combinators in this module, you can use operations from
Streamly.Data.Stream for StreamK as well by converting StreamK to Stream
(`toStream`

), and vice-versa (`fromStream`

). Please refer to
Streamly.Internal.Data.StreamK for more functions that have not yet been
released.

For documentation see the corresponding combinators in Streamly.Data.Stream. Documentation has been omitted in this module unless there is a difference worth mentioning or if the combinator does not exist in Streamly.Data.Stream.

## Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

`>>>`

`:m`

`>>>`

`import Control.Concurrent (threadDelay)`

`>>>`

`import Data.Function (fix, (&))`

`>>>`

`import Data.Semigroup (cycle1)`

`>>>`

`effect n = print n >> return n`

`>>>`

`import Streamly.Data.StreamK (StreamK)`

`>>>`

`import qualified Streamly.Data.Fold as Fold`

`>>>`

`import qualified Streamly.Data.Parser as Parser`

`>>>`

`import qualified Streamly.Data.Stream as Stream`

`>>>`

`import qualified Streamly.Data.StreamK as StreamK`

`>>>`

`import qualified Streamly.FileSystem.Dir as Dir`

For APIs that have not been released yet.

`>>>`

`import qualified Streamly.Internal.Data.StreamK as StreamK`

`>>>`

`import qualified Streamly.Internal.FileSystem.Dir as Dir`

## Overview

Continuation passing style (CPS) stream implementation. The `K`

in `StreamK`

stands for Kontinuation.

StreamK can be constructed like lists, except that they use `nil`

instead of
'[]' and `cons`

instead of `:`

.

`cons`

adds a pure value at the head of the stream:

`>>>`

`import Streamly.Data.StreamK (StreamK, cons, consM, nil)`

`>>>`

`stream = 1 `cons` 2 `cons` nil :: StreamK IO Int`

You can use operations from Streamly.Data.Stream for StreamK as well by
converting StreamK to Stream (`toStream`

), and vice-versa (`fromStream`

).

`>>>`

[1,2]`Stream.fold Fold.toList $ StreamK.toStream stream -- IO [Int]`

`consM`

adds an effect at the head of the stream:

`>>>`

`stream = effect 1 `consM` effect 2 `consM` nil`

`>>>`

1 2 [1,2]`Stream.fold Fold.toList $ StreamK.toStream stream`

## Type

##### Instances

## Construction

### Primitives

Primitives to construct a stream from pure values or monadic actions. All other stream construction and generation combinators described later can be expressed in terms of these primitives. However, the special versions provided in this module can be much more efficient in some cases. Users can create custom combinators using these primitives.

A stream that terminates without producing any output or side effect.

`>>>`

[]`Stream.fold Fold.toList (StreamK.toStream StreamK.nil)`

nilM :: Applicative m => m b -> StreamK m a Source #

A stream that terminates without producing any output, but produces a side effect.

`>>>`

"nil" []`Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))`

*Pre-release*

cons :: a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add a pure value at the head of an existing stream::

`>>>`

`s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil`

`>>>`

[1,2,3]`Stream.fold Fold.toList (StreamK.toStream s)`

It can be used efficiently with `foldr`

:

`>>>`

`fromFoldable = Prelude.foldr StreamK.cons StreamK.nil`

Same as the following but more efficient:

`>>>`

`cons x xs = return x `StreamK.consM` xs`

consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add an effectful value at the head of an existing stream::

`>>>`

`s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil`

`>>>`

hello world`Stream.fold Fold.drain (StreamK.toStream s)`

It can be used efficiently with `foldr`

:

`>>>`

`fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil`

Same as the following but more efficient:

`>>>`

`consM x xs = StreamK.fromEffect x `StreamK.append` xs`

### From Values

fromEffect :: Monad m => m a -> StreamK m a Source #

### From Stream

Please note that `Stream`

type does not observe any exceptions from
the consumer of the stream whereas `StreamK`

does.

### From Containers

fromFoldable :: Foldable f => f a -> StreamK m a Source #

`>>>`

`fromFoldable = Prelude.foldr StreamK.cons StreamK.nil`

Construct a stream from a `Foldable`

containing pure values:

## Elimination

### Primitives

### Parsing

parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #

Run a `ParserK`

over a `StreamK`

. Please use `parseChunks`

where possible,
for better performance.

parseBreak :: forall m a b. Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #

Similar to `parseBreak`

but works on singular elements.

parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

## Transformation

## Combining Two Streams

Unlike the operations in Streamly.Data.Stream, these operations can
be used to dynamically compose large number of streams e.g. using the
`concatMapWith`

and `mergeMapWith`

operations. They have a linear O(n)
time complexity wrt to the number of streams being composed.

### Appending

### Interleaving

interleave :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Note: When joining many streams in a left associative manner earlier
streams will get exponential priority than the ones joining later. Because
of exponentially high weighting of left streams it can be used with
`concatMapWith`

even on a large number of streams.

### Merging

mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Merging of `n`

streams can be performed by combining the streams pair
wise using `mergeMapWith`

to give O(n * log n) time complexity. If used
with `concatMapWith`

it will have O(n^2) performance.

### Zipping

zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Zipping of `n`

streams can be performed by combining the streams pair
wise using `mergeMapWith`

with O(n * log n) time complexity. If used
with `concatMapWith`

it will have O(n^2) performance.

### Cross Product

crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Definition:

`>>>`

`crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2`

Note that the second stream is evaluated multiple times.

## Stream of streams

Some useful idioms:

`>>>`

`concatFoldableWith f = Prelude.foldr f StreamK.nil`

`>>>`

`concatMapFoldableWith f g = Prelude.foldr (f . g) StreamK.nil`

`>>>`

`concatForFoldableWith f xs g = Prelude.foldr (f . g) StreamK.nil xs`

concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Perform a `concatMap`

using a specified concat strategy. The first
argument specifies a merge or concat function that is used to merge the
streams generated by the map function.

mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.

For example, you can sort a stream using merge sort like this:

`>>>`

`s = StreamK.fromStream $ Stream.fromList [5,1,7,9,2]`

`>>>`

`generate = StreamK.fromPure`

`>>>`

`combine = StreamK.mergeBy compare`

`>>>`

[1,2,5,7,9]`Stream.fold Fold.toList $ StreamK.toStream $ StreamK.mergeMapWith combine generate s`

Note that if the stream length is not a power of 2, the binary tree composed by mergeMapWith would not be balanced, which may or may not be important depending on what you are trying to achieve.

*Caution: the stream of streams must be finite*

*Pre-release*

## Buffered Operations

sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #

Sort the input stream using a supplied comparison function.

Sorting can be achieved by simply:

`>>>`

`sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure`

However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.

*O(n) space*

## Exceptions

Please note that `Stream`

type does not observe any exceptions from
the consumer of the stream whereas `StreamK`

does.

handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #

Like Streamly.Data.Stream.`handle`

but with one
significant difference, this function observes exceptions from the consumer
of the stream as well.

You can also convert `StreamK`

to `Stream`

and use exception handling from
`Stream`

module:

`>>>`

`handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)`

## Resource Management

Please note that `Stream`

type does not observe any exceptions from
the consumer of the stream whereas `StreamK`

does.

bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #

Like Streamly.Data.Stream.`bracketIO`

but with one
significant difference, this function observes exceptions from the consumer
of the stream as well. Therefore, it cleans up the resource promptly when
the consumer encounters an exception.

You can also convert `StreamK`

to `Stream`

and use resource handling from
`Stream`

module:

`>>>`

`bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)`