Stream Consumers

We can classify stream consumers in the following categories in order of increasing complexity and power:

Accumulators

These are the simplest folds that never fail and never terminate, they accumulate the input values forever and can always accept new inputs (never terminate) and always have a valid result value. A sum operation is an example of an accumulator. Traditional Haskell left folds like foldl are accumulators.

We can distribute an input stream to two or more accumulators using a tee style composition. Accumulators cannot be applied on a stream one after the other, which we call a serial append style composition of folds. This is because accumulators never terminate, since the first accumulator in a series will never terminate, the next one will never get to run.

Terminating Folds

Terminating folds are accumulators that can terminate. Once a fold terminates it no longer accepts any more inputs. Terminating folds can be used in a serial append style composition where one fold can be applied after the other on an input stream. We can apply a terminating fold repeatedly on an input stream, splitting the stream and consuming it in fragments. Terminating folds never fail, therefore, they do not need backtracking.

The take operation is an example of a terminating fold It terminates after consuming n items. Coupled with an accumulator (e.g. sum) it can be used to split and process the stream into chunks of fixed size.

Terminating Folds with Leftovers

The next upgrade after terminating folds is terminating folds with leftover inputs. Consider the example of takeWhile operation, it needs to inspect an element for termination decision. However, it does not consume the element on which it terminates. To implement takeWhile a terminating fold will have to implement a way to return unconsumed input to the fold driver.

Single element leftover case is the most common and its easy to implement it in terminating folds using a Done1 constructor in the Step type which indicates that the last element was not consumed by the fold. The following additional operations can be implemented as terminating folds if we do that.

takeWhile
groupBy
wordBy

However, it creates several complications. The many combinator requires a Partial1 (Partial with leftover) to handle a Done1 from the top level fold, for efficient implementation. If the collecting fold in "many" returns a Partial1 or Done1 then what to do with all the elements that have been consumed?

Similarly, in distribute, if one fold consumes a value and others say its a leftover then what do we do? Folds like "many" require the leftover to be fed to it again. So in a distribute operation those folds which gave a leftover will have to be fed the leftover while the folds that consumed will have to be fed the next input. This is very complicated to implement. We have the same issue in backtracking parsers being used in a distribute operation.

To avoid these issues we want to enforce by typing that the collecting folds can never return a leftover. So we need a fold type without Done1 or Partial1. This leads us to design folds to never return a leftover and the use cases of single leftover are transferred to parsers where we have general backtracking mechanism and single leftover is just a special case of backtracking.

This means: takeWhile, groupBy, wordBy would be implemented as parsers. "take 0" can implemented as a fold if we make initial return Step type. "takeInterval" can be implemented without Done1.

Parsers

The next upgrade after terminating folds with a leftover are parsers. Parsers are terminating folds that can fail and backtrack. Parsers can be composed using an alternative style composition where they can backtrack and apply another parser if one parser fails. satisfy is a simple example of a parser, it would succeed if the condition is satisfied and it would fail otherwise, on failure an alternative parser can be used on the same input.

Types for Stream Consumers

In streamly, there is no separate type for accumulators. Terminating folds are a superset of accumulators and to avoid too many types we represent both using the same type, Fold.

We do not club the leftovers functionality with terminating folds because of the reasons explained earlier. Instead combinators that require leftovers are implemented as the Parser type. This is a sweet spot to balance ease of use, type safety and performance. Using separate Accumulator and terminating fold types would encode more information in types but it would make ease of use, implementation, maintenance effort worse. Combining Accumulator, terminating folds and Parser into a single Parser type would make ease of use even better but type safety and performance worse.

One of the design requirements that we have placed for better ease of use and code reuse is that Parser type should be a strict superset of the Fold type i.e. it can do everything that a Fold can do and more. Therefore, folds can be easily upgraded to parsers and we can use parser combinators on folds as well when needed.

Fold Design

A fold is represented by a collection of "initial", "step" and "extract" functions. The "initial" action generates the initial state of the fold. The state is internal to the fold and maintains the accumulated output. The "step" function is invoked using the current state and the next input value and results in a Partial or Done. A Partial returns the next intermediate state of the fold, a Done indicates that the fold has terminated and returns the final value of the accumulator.

Every Partial indicates that a new accumulated output is available. The accumulated output can be extracted from the state at any point using "extract". "extract" can never fail. A fold returns a valid output even without any input i.e. even if you call "extract" on "initial" state it provides an output. This is not true for parsers.

In general, "extract" is used in two cases:

  • When the fold is used as a scan extract is called on the intermediate state every time it is yielded by the fold, the resulting value is yielded as a stream.
  • When the fold is used as a regular fold, extract is called once when we are done feeding input to the fold.

Alternate Designs

An alternate and simpler design would be to return the intermediate output via Partial along with the state, instead of using "extract" on the yielded state and remove the extract function altogether.

This may even facilitate more efficient implementation. Extract from the intermediate state after each yield may be more costly compared to the fold step itself yielding the output. The fold may have more efficient ways to retrieve the output rather than stuffing it in the state and using extract on the state.

However, removing extract altogether may lead to less optimal code in some cases because the driver of the fold needs to thread around the intermediate output to return it if the stream stops before the fold could Done. When using this approach, the parseMany (FL.take filesize) benchmark shows a 2x worse performance even after ensuring everything fuses. So we keep the "extract" approach to ensure better perf in all cases.

But we could still yield both state and the output in Partial, the output can be used for the scan use case, instead of using extract. Extract would then be used only for the case when the stream stops before the fold completes.

Accumulators and Terminating Folds

Folds in this module can be classified in two categories viz. accumulators and terminating folds. Accumulators do not have a terminating condition, they run forever and consume the entire stream, for example the length fold. Terminating folds have a terminating condition and can terminate without consuming the entire stream, for example, the head fold.

Monoids

Monoids allow generalized, modular folding. The accumulators in this module can be expressed using mconcat and a suitable Monoid. Instead of writing folds we can write Monoids and turn them into folds.

Performance Notes

Prelude module provides fold functions to directly fold streams e.g. Streamly.Prelude/sum serves the same purpose as Fold/sum. However, the functions in Streamly.Prelude cannot be efficiently combined together e.g. we cannot drive the input stream through sum and length fold functions simultaneously. Using the Fold type we can efficiently split the stream across multiple folds because it allows the compiler to perform stream fusion optimizations.

Types

data Step s b Source #

Represents the result of the step of a Fold. Partial returns an intermediate state of the fold, the fold step can be called again with the state or the driver can use extract on the state to get the result out. Done returns the final result and the fold cannot be driven further.

Pre-release

Constructors

Partial !s 
Done !b 
Instances
Instances details
Bifunctor Step Source #

first maps over Partial and second maps over Done.

Instance details

Defined in Streamly.Internal.Data.Fold.Type

Methods

bimap :: (a -> b) -> (c -> d) -> Step a c -> Step b d #

first :: (a -> b) -> Step a c -> Step b c #

second :: (b -> c) -> Step a b -> Step a c #

Functor (Step s) Source #

fmap maps over Done.

fmap = second
Instance details

Defined in Streamly.Internal.Data.Fold.Type

Methods

fmap :: (a -> b) -> Step s a -> Step s b #

(<$) :: a -> Step s b -> Step s a #

data Fold m a b Source #

The type Fold m a b having constructor Fold step initial extract represents a fold over an input stream of values of type a to a final value of type b in Monad m.

The fold uses an intermediate state s as accumulator, the type s is internal to the specific fold definition. The initial value of the fold state s is returned by initial. The step function consumes an input and either returns the final result b if the fold is done or the next intermediate state (see Step). At any point the fold driver can extract the result from the intermediate state using the extract function.

NOTE: The constructor is not yet exposed via exposed modules, smart constructors are provided to create folds. If you think you need the constructor of this type please consider using the smart constructors in Streamly.Internal.Data.Fold instead.

since 0.8.0 (type changed)

Since: 0.7.0

Constructors

forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)

Fold step initial extract

Instances
Instances details
Functor m => Functor (Fold m a) Source #

Maps a function on the output of the fold (the type b).

Instance details

Defined in Streamly.Internal.Data.Fold.Type

Methods

fmap :: (a0 -> b) -> Fold m a a0 -> Fold m a b #

(<$) :: a0 -> Fold m a b -> Fold m a a0 #

Constructors

foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b Source #

Make a fold from a left fold style pure step function and initial value of the accumulator.

If your Fold returns only Partial (i.e. never returns a Done) then you can use foldl'* constructors.

A fold with an extract function can be expressed using fmap:

mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b
mkfoldlx step initial extract = fmap extract (foldl' step initial)

See also: Streamly.Prelude.foldl'

Since: 0.8.0

foldlM' :: Monad m => (b -> a -> m b) -> m b -> Fold m a b Source #

Make a fold from a left fold style monadic step function and initial value of the accumulator.

A fold with an extract function can be expressed using rmapM:

mkFoldlxM :: Functor m => (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
mkFoldlxM step initial extract = rmapM extract (foldlM' step initial)

See also: Streamly.Prelude.foldlM'

Since: 0.8.0

foldl1' :: Monad m => (a -> a -> a) -> Fold m a (Maybe a) Source #

Make a strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

See also: Streamly.Prelude.foldl1'

Pre-release

foldr :: Monad m => (a -> b -> b) -> b -> Fold m a b Source #

Make a fold using a right fold style step function and a terminal value. It performs a strict right fold via a left fold using function composition. Note that this is strict fold, it can only be useful for constructing strict structures in memory. For reductions this will be very inefficient.

For example,

toList = foldr (:) []

See also: foldr

Since: 0.8.0

foldrM :: Monad m => (a -> b -> m b) -> m b -> Fold m a b Source #

Like foldr but with a monadic step function.

For example,

toList = foldrM (\a xs -> return $ a : xs) (return [])

See also: foldrM

Pre-release

mkFold :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b Source #

Make a terminating fold using a pure step function, a pure initial state and a pure state extraction function.

Pre-release

mkFold_ :: Monad m => (b -> a -> Step b b) -> Step b b -> Fold m a b Source #

Similar to mkFold but the final state extracted is identical to the intermediate state.

mkFold_ step initial = mkFold step initial id

Pre-release

mkFoldM :: (s -> a -> m (Step s b)) -> m (Step s b) -> (s -> m b) -> Fold m a b Source #

Make a terminating fold with an effectful step function and initial state, and a state extraction function.

mkFoldM = Fold

We can just use Fold but it is provided for completeness.

Pre-release

mkFoldM_ :: Monad m => (b -> a -> m (Step b b)) -> m (Step b b) -> Fold m a b Source #

Similar to mkFoldM but the final state extracted is identical to the intermediate state.

mkFoldM_ step initial = mkFoldM step initial return

Pre-release

Folds

fromPure :: Applicative m => b -> Fold m a b Source #

A fold that always yields a pure value without consuming any input.

Pre-release

fromEffect :: Applicative m => m b -> Fold m a b Source #

A fold that always yields the result of an effectful action without consuming any input.

Pre-release

drain :: Monad m => Fold m a () Source #

A fold that drains all its input, running the effects and discarding the results.

drain = drainBy (const (return ()))

Since: 0.7.0

toList :: Monad m => Fold m a [a] Source #

Folds the input stream to a list.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array.Foreign instead.

toList = foldr (:) []

Since: 0.7.0

Combinators

Mapping output

rmapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c Source #

Map a monadic function on the output of a fold.

Since: 0.8.0

Mapping Input

map :: (a -> b) -> Fold m b r -> Fold m a r Source #

Internal

lmap :: (a -> b) -> Fold m b r -> Fold m a r Source #

lmap f fold maps the function f on the input of the fold.

>>> Stream.fold (Fold.lmap (\x -> x * x) Fold.sum) (Stream.enumerateFromTo 1 100)
338350
lmap = Fold.lmapM return

Since: 0.8.0

lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r Source #

lmapM f fold maps the monadic function f on the input of the fold.

Since: 0.8.0

Filtering

filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r Source #

Include only those elements that pass a predicate.

>>> Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
40
filter f = Fold.filterM (return . f)

Since: 0.8.0

filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r Source #

Like filter but with a monadic predicate.

Since: 0.8.0

catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b Source #

Modify a fold to receive a Maybe input, the Just values are unwrapped and sent to the original fold, Nothing values are discarded.

Since: 0.8.0

Trimming

take :: Monad m => Int -> Fold m a b -> Fold m a b Source #

Take at most n input elements and fold them using the supplied fold. A negative count is treated as 0.

>>> Stream.fold (Fold.take 2 Fold.toList) $ Stream.fromList [1..10]
[1,2]

Since: 0.8.0

takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b Source #

takeInterval n fold uses fold to fold the input items arriving within a window of first n seconds.

>>> Stream.fold (Fold.takeInterval 1.0 Fold.toList) $ Stream.delay 0.1 $ Stream.fromList [1..]
[1,2,3,4,5,6,7,8,9,10,11]

Stops when fold stops or when the timeout occurs. Note that the fold needs an input after the timeout to stop. For example, if no input is pushed to the fold until one hour after the timeout had occurred, then the fold will be done only after consuming that input.

Pre-release

Serial Append

serialWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #

Sequential fold application. Apply two folds sequentially to an input stream. The input is provided to the first fold, when it is done - the remaining input is provided to the second fold. When the second fold is done or if the input stream is over, the outputs of the two folds are combined using the supplied function.

>>> f = Fold.serialWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
>>> Stream.fold f $ Stream.fromList "header: hello\n"
("header: ","hello\n")

Note: This is dual to appending streams using serial.

Note: this implementation allows for stream fusion but has quadratic time complexity, because each composition adds a new branch that each subsequent fold's input element has to traverse, therefore, it cannot scale to a large number of compositions. After around 100 compositions the performance starts dipping rapidly compared to a CPS style implementation.

Time: O(n^2) where n is the number of compositions.

Since: 0.8.0

serial_ :: Fold m x a -> Fold m x b -> Fold m x b Source #

Same as applicative *>. Run two folds serially one after the other discarding the result of the first.

Unimplemented

Parallel Distribution

data GenericRunner sL sR bL bR Source #

Constructors

RunBoth !sL !sR 
RunLeft !sL !bR 
RunRight !bL !sR 

teeWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #

teeWith k f1 f2 distributes its input to both f1 and f2 until both of them terminate and combines their output using k.

>>> avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>> Stream.fold avg $ Stream.fromList [1.0..100.0]
50.5
teeWith k f1 f2 = fmap (uncurry k) ((Fold.tee f1 f2)

For applicative composition using this combinator see Streamly.Internal.Data.Fold.Tee.

See also: Streamly.Internal.Data.Fold.Tee

Since: 0.8.0

teeWithFst :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d Source #

Like teeWith but terminates as soon as the first fold terminates.

Unimplemented

teeWithMin :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d Source #

Like teeWith but terminates as soon as any one of the two folds terminates.

Unimplemented

Parallel Alternative

shortest :: Fold m x a -> Fold m x a -> Fold m x a Source #

Shortest alternative. Apply both folds in parallel but choose the result from the one which consumed least input i.e. take the shortest succeeding fold.

Unimplemented

longest :: Fold m x a -> Fold m x a -> Fold m x a Source #

Longest alternative. Apply both folds in parallel but choose the result from the one which consumed more input i.e. take the longest succeeding fold.

Unimplemented

Splitting

data ManyState s1 s2 Source #

many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Collect zero or more applications of a fold. many split collect applies the split fold repeatedly on the input stream and accumulates zero or more fold results using collect.

>>> two = Fold.take 2 Fold.toList
>>> twos = Fold.many two Fold.toList
>>> Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]

Stops when collect stops.

See also: concatMap, foldMany

Since: 0.8.0

manyPost :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Like many, but inner fold emits an output at the end even if no input is received.

Internal

See also: concatMap, foldMany

chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c Source #

chunksOf n split collect repeatedly applies the split fold to chunks of n items in the input stream and supplies the result to the collect fold.

>>> twos = Fold.chunksOf 2 Fold.toList Fold.toList
>>> Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]
chunksOf n split = many (take n split)

Stops when collect stops.

Since: 0.8.0

intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c Source #

Group the input stream into windows of n second each using the first fold and then fold the resulting groups using the second fold.

>>> intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
>>> Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
[[1,2,3,4],[5,6,7],[8,9,10]]
intervalsOf n split = many (takeInterval n split)

Pre-release

Nesting

concatMap :: Monad m => (b -> Fold m a c) -> Fold m a b -> Fold m a c Source #

Map a Fold returning function on the result of a Fold and run the returned fold. This operation can be used to express data dependencies between fold operations.

Let's say the first element in the stream is a count of the following elements that we have to add, then:

>>> import Data.Maybe (fromJust)
>>> count = fmap fromJust Fold.head
>>> total n = Fold.take n Fold.sum
>>> Stream.fold (Fold.concatMap total count) $ Stream.fromList [10,9..1]
45

Time: O(n^2) where n is the number of compositions.

See also: foldIterateM

Since: 0.8.0

Running Partially

duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b) Source #

Modify the fold such that it returns a new Fold instead of the output. If the fold was already done the returned fold would always yield the result. If the fold was partial, the returned fold starts from where we left i.e. it uses the last accumulator value as the initial value of the accumulator. Thus we can resume the fold later and feed it more input.

>>> :{
do
 more <- Stream.fold (Fold.duplicate Fold.sum) (Stream.enumerateFromTo 1 10)
 evenMore <- Stream.fold (Fold.duplicate more) (Stream.enumerateFromTo 11 20)
 Stream.fold evenMore (Stream.enumerateFromTo 21 30)
:}
465

Pre-release

initialize :: Monad m => Fold m a b -> m (Fold m a b) Source #

Run the initialization effect of a fold. The returned fold would use the value returned by this effect as its initial value.

Pre-release

runStep :: Monad m => Fold m a b -> a -> m (Fold m a b) Source #

Run one step of a fold and store the accumulator as an initial value in the returned fold.

Pre-release

Fold2

data Fold2 m c a b Source #

Experimental type to provide a side input to the fold for generating the initial state. For example, if we have to fold chunks of a stream and write each chunk to a different file, then we can generate the file name using a monadic action. This is a generalized version of Fold.

Internal

Constructors

forall s. Fold2 (s -> a -> m s) (c -> m s) (s -> m b)

Fold step inject extract

simplify :: Functor m => Fold2 m c a b -> c -> Fold m a b Source #

Convert more general type Fold2 into a simpler type Fold

Internal

chunksOf2 :: Monad m => Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c Source #

Internal

streamly-0.8.0Streamly.Internal.Data.Fold.Type