Fast, composable stream consumers with ability to terminate, supporting stream fusion.
Using Folds
This module provides elementary folds and fold combinators that can be used
to consume a stream of data and reduce it to a final value, or transform it
in a stateful manner using scans. A data stream can be reduced into a stream
of folded data elements by folding segments of the stream. Fold combinators
can be used to compose multiple folds in parallel or to create a pipeline of
folds such that the next fold consumes the result of the previous fold. To
run these folds on a stream see fold
,
scan
, postscan
,
scanMaybe
, foldMany
and other
operations accepting Fold
type as argument Streamly.Data.Stream.
Reducing a Stream
A Fold
is a consumer of a stream of values. A fold driver (such as
fold
) initializes the fold accumulator
, runs the
fold step
function in a loop, processing the input stream one element at a
time and accumulating the result. The loop continues until the fold
terminates, at which point the accumulated result is returned.
For example, a sum
Fold represents a stream consumer that adds the values
in the input stream:
>>>
Stream.fold Fold.sum $ Stream.fromList [1..100]
5050
Conceptually, a Fold
is a data type that mimics a strict left fold
(foldl
). The above example is similar to a left fold using
(+)
as the step and 0
as the initial value of the accumulator:
>>>
Data.List.foldl' (+) 0 [1..100]
5050
Fold
s have an early termination capability e.g. the one
fold terminates
after consuming one element:
>>>
Stream.fold Fold.one $ Stream.fromList [1..]
Just 1
The above example is similar to the following right fold:
>>>
Prelude.foldr (\x _ > Just x) Nothing [1..]
Just 1
Fold
s can be combined together using combinators. For example, to create a
fold that sums first two elements in a stream:
>>>
sumTwo = Fold.take 2 Fold.sum
>>>
Stream.fold sumTwo $ Stream.fromList [1..100]
3
Parallel Composition
Folds can be combined to run in parallel on the same input. For example, to compute the average of numbers in a stream without going through the stream twice:
>>>
avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>>
Stream.fold avg $ Stream.fromList [1.0..100.0]
50.5
Folds can be combined so as to partition the input stream over multiple folds. For example, to count even and odd numbers in a stream:
>>>
split n = if even n then Left n else Right n
>>>
stream = fmap split $ Stream.fromList [1..100]
>>>
countEven = fmap (("Even " ++) . show) Fold.length
>>>
countOdd = fmap (("Odd " ++) . show) Fold.length
>>>
f = Fold.partition countEven countOdd
>>>
Stream.fold f stream
("Even 50","Odd 50")
Sequential Composition
Terminating folds can be combined to parse the stream serially such that the first fold consumes the input until it terminates and the second fold consumes the rest of the input until it terminates:
>>>
f = Fold.splitWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
>>>
Stream.fold f $ Stream.fromList "header: hello\n"
("header: ","hello\n")
Splitting a Stream
A Fold
can be applied repeatedly on a stream to transform it to a stream
of fold results. To split a stream on newlines:
>>>
f = Fold.takeEndBy (== '\n') Fold.toList
>>>
Stream.fold Fold.toList $ Stream.foldMany f $ Stream.fromList "Hello there!\nHow are you\n"
["Hello there!\n","How are you\n"]
Similarly, we can split the input of a fold too:
>>>
Stream.fold (Fold.many f Fold.toList) $ Stream.fromList "Hello there!\nHow are you\n"
["Hello there!\n","How are you\n"]
Folds vs. Streams
We can often use streams or folds to achieve the same goal. However, streams
are more efficient in composition of producers (e.g.
append
or mergeBy
) whereas folds are
more efficient in composition of consumers (e.g. splitWith
, partition
or teeWith
).
Streams are producers, transformations on streams happen on the output side:
>>>
:{
f stream = Stream.filter odd stream & fmap (+1) & Stream.fold Fold.sum :}
>>>
f $ Stream.fromList [1..100 :: Int]
2550
Folds are stream consumers with an input stream and an output value, stream transformations on folds happen on the input side:
>>>
:{
f = Fold.filter odd $ Fold.lmap (+1) $ Fold.sum :}
>>>
Stream.fold f $ Stream.fromList [1..100 :: Int]
2550
Notice the similiarity in the definition of f
in both cases, the only
difference is the composition by &
vs $
and the use lmap
vs map
, the
difference is due to output vs input side transformations.
Fusion Limitations
Folds support stream fusion for generating loops comparable to the speed of C. However, it has some limitations. For fusion to work, the folds must be inlined, folds must be statically known and not generated dynamically, folds should not be passed recursively.
Another limitation is due to the quadratic complexity causing slowdown when
too many nested compositions are used. Especially, the performance of the
Applicative instance and splitting operations (e.g. splitWith
) degrades
quadratically (O(n^2)) when combined n
times, roughly 8 or less sequenced
operations are fine. For these cases folds can be converted to parsers and
then used as ParserK.
Experimental APIs
Please refer to Streamly.Internal.Data.Fold for more functions that have not yet been released.
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
:set XFlexibleContexts
>>>
import Control.Monad (void)
>>>
import qualified Data.Foldable as Foldable
>>>
import Data.Function ((&))
>>>
import Data.Functor.Identity (Identity, runIdentity)
>>>
import Data.IORef (newIORef, readIORef, writeIORef)
>>>
import Data.Maybe (fromJust, isJust)
>>>
import Data.Monoid (Endo(..), Last(..), Sum(..))
>>>
import Streamly.Data.Array (Array)
>>>
import Streamly.Data.Fold (Fold, Tee(..))
>>>
import Streamly.Data.Stream (Stream)
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.MutArray as MutArray
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.StreamK as StreamK
>>>
import qualified Streamly.Data.Unfold as Unfold
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Data.Fold as Fold
Running A Fold
drive :: Monad m => Stream m a > Fold m a b > m b Source #
Drive a fold using the supplied Stream
, reducing the resulting
expression strictly at each step.
Definition:
>>>
drive = flip Stream.fold
Example:
>>>
Fold.drive (Stream.enumerateFromTo 1 100) Fold.sum
5050
Fold Type
The type Fold m a b
represents a consumer of an input stream of values
of type a
and returning a final value of type b
in Monad
m
. The
constructor of a fold is Fold step initial extract final
.
The fold uses an internal state of type s
. The initial value of the state
s
is created by initial
. This function is called once and only once
before the fold starts consuming input. Any resource allocation can be done
in this function.
The step
function is called on each input, it consumes an input and
returns the next intermediate state (see Step
) or the final result b
if
the fold terminates.
If the fold is used as a scan, the extract
function is used by the scan
driver to map the current state s
of the fold to the fold result. Thus
extract
can be called multiple times. In some folds, where scanning does
not make sense, this function is left unimplemented; such folds cannot be
used as scans.
Before a fold terminates, final
is called once and only once (unless the
fold terminated in initial
itself). Any resources allocated by initial
can be released in final
. In folds that do not require any cleanup
extract
and final
are typically the same.
When implementing fold combinators, care should be taken to cleanup any
state of the argument folds held by the fold by calling the respective
final
at all exit points of the fold. Also, final
should not be called
more than once. Note that if a fold terminates by Done
constructor, there
is no state to cleanup.
NOTE: The constructor is not yet released, smart constructors are provided to create folds.
Instances
Monad m => Applicative (Fold m a) Source # 

Defined in Streamly.Internal.Data.Fold.Type  
Functor m => Functor (Fold m a) Source #  Maps a function on the output of the fold (the type 
Tee
is a newtype wrapper over the Fold
type providing distributing
Applicative
, Semigroup
, Monoid
, Num
, Floating
and Fractional
instances.
The input received by the composed Tee
is replicated and distributed to
the constituent folds of the Tee
.
For example, to compute the average of numbers in a stream without going through the stream twice:
>>>
avg = (/) <$> (Tee Fold.sum) <*> (Tee $ fmap fromIntegral Fold.length)
>>>
Stream.fold (unTee avg) $ Stream.fromList [1.0..100.0]
50.5
Similarly, the Semigroup
and Monoid
instances of Tee
distribute the
input to both the folds and combine the outputs using Monoid or Semigroup
instances of the output types:
>>>
import Data.Monoid (Sum(..))
>>>
t = Tee Fold.one <> Tee Fold.latest
>>>
Stream.fold (unTee t) (fmap Sum $ Stream.enumerateFromTo 1.0 100.0)
Just (Sum {getSum = 101.0})
The Num
, Floating
, and Fractional
instances work in the same way.
Instances
Monad m => Applicative (Tee m a) Source # 

Defined in Streamly.Internal.Data.Fold.Tee  
Functor m => Functor (Tee m a) Source #  
(Monoid b, Monad m) => Monoid (Tee m a b) Source # 

(Semigroup b, Monad m) => Semigroup (Tee m a b) Source # 

(Monad m, Floating b) => Floating (Tee m a b) Source #  Binary 
Defined in Streamly.Internal.Data.Fold.Tee exp :: Tee m a b > Tee m a b Source # log :: Tee m a b > Tee m a b Source # sqrt :: Tee m a b > Tee m a b Source # (**) :: Tee m a b > Tee m a b > Tee m a b Source # logBase :: Tee m a b > Tee m a b > Tee m a b Source # sin :: Tee m a b > Tee m a b Source # cos :: Tee m a b > Tee m a b Source # tan :: Tee m a b > Tee m a b Source # asin :: Tee m a b > Tee m a b Source # acos :: Tee m a b > Tee m a b Source # atan :: Tee m a b > Tee m a b Source # sinh :: Tee m a b > Tee m a b Source # cosh :: Tee m a b > Tee m a b Source # tanh :: Tee m a b > Tee m a b Source # asinh :: Tee m a b > Tee m a b Source # acosh :: Tee m a b > Tee m a b Source # atanh :: Tee m a b > Tee m a b Source # log1p :: Tee m a b > Tee m a b Source # expm1 :: Tee m a b > Tee m a b Source #  
(Monad m, Num b) => Num (Tee m a b) Source #  Binary 
Defined in Streamly.Internal.Data.Fold.Tee (+) :: Tee m a b > Tee m a b > Tee m a b Source # () :: Tee m a b > Tee m a b > Tee m a b Source # (*) :: Tee m a b > Tee m a b > Tee m a b Source # negate :: Tee m a b > Tee m a b Source # abs :: Tee m a b > Tee m a b Source # signum :: Tee m a b > Tee m a b Source # fromInteger :: Integer > Tee m a b Source #  
(Monad m, Fractional b) => Fractional (Tee m a b) Source #  Binary 
Constructors
foldl' :: Monad m => (b > a > b) > b > Fold m a b Source #
Make a fold from a left fold style pure step function and initial value of the accumulator.
If your Fold
returns only Partial
(i.e. never returns a Done
) then you
can use foldl'*
constructors.
A fold with an extract function can be expressed using fmap:
mkfoldlx :: Monad m => (s > a > s) > s > (s > b) > Fold m a b mkfoldlx step initial extract = fmap extract (foldl' step initial)
foldlM' :: Monad m => (b > a > m b) > m b > Fold m a b Source #
Make a fold from a left fold style monadic step function and initial value of the accumulator.
A fold with an extract function can be expressed using rmapM:
mkFoldlxM :: Functor m => (s > a > m s) > m s > (s > m b) > Fold m a b mkFoldlxM step initial extract = rmapM extract (foldlM' step initial)
foldl1' :: Monad m => (a > a > a) > Fold m a (Maybe a) Source #
Make a strict left fold, for nonempty streams, using first element as the starting value. Returns Nothing if the stream is empty.
Prerelease
foldlM1' :: Monad m => (a > a > m a) > Fold m a (Maybe a) Source #
Like 'foldl1'' but with a monadic step function.
Prerelease
foldr' :: Monad m => (a > b > b) > b > Fold m a b Source #
Make a fold using a right fold style step function and a terminal value. It performs a strict right fold via a left fold using function composition. Note that a strict right fold can only be useful for constructing strict structures in memory. For reductions this will be very inefficient.
Definitions:
>>>
foldr' f z = fmap (flip appEndo z) $ Fold.foldMap (Endo . f)
>>>
foldr' f z = fmap ($ z) $ Fold.foldl' (\g x > g . f x) id
Example:
>>>
Stream.fold (Fold.foldr' (:) []) $ Stream.enumerateFromTo 1 5
[1,2,3,4,5]
Folds
Accumulators
Folds that never terminate, these folds are much like strict left
folds. mconcat
is the fundamental accumulator. All other accumulators
can be expressed in terms of mconcat
using a suitable Monoid. Instead
of writing folds we could write Monoids and turn them into folds.
sconcat :: (Monad m, Semigroup a) => a > Fold m a a Source #
Semigroup concat. Append the elements of an input stream to a provided starting value.
Definition:
>>>
sconcat = Fold.foldl' (<>)
>>>
semigroups = fmap Data.Monoid.Sum $ Stream.enumerateFromTo 1 10
>>>
Stream.fold (Fold.sconcat 10) semigroups
Sum {getSum = 65}
drain :: Monad m => Fold m a () Source #
A fold that drains all its input, running the effects and discarding the results.
>>>
drain = Fold.drainMapM (const (return ()))
>>>
drain = Fold.foldl' (\_ _ > ()) ()
drainMapM :: Monad m => (a > m b) > Fold m a () Source #
Definitions:
>>>
drainMapM f = Fold.lmapM f Fold.drain
>>>
drainMapM f = Fold.foldMapM (void . f)
Drain all input after passing it through a monadic function. This is the dual of mapM_ on stream producers.
length :: Monad m => Fold m a Int Source #
Determine the length of the input stream.
Definition:
>>>
length = Fold.lengthGeneric
>>>
length = fmap getSum $ Fold.foldMap (Sum . const 1)
countDistinct :: (Monad m, Ord a) => Fold m a Int Source #
Count nonduplicate elements in the stream.
Definition:
>>>
countDistinct = fmap Set.size Fold.toSet
>>>
countDistinct = Fold.postscan Fold.nub $ Fold.catMaybes $ Fold.length
The memory used is proportional to the number of distinct elements in the stream, to guard against using too much memory use it as a scan and terminate if the count reaches more than a threshold.
Space: \(\mathcal{O}(n)\)
Prerelease
countDistinctInt :: Monad m => Fold m Int Int Source #
Like countDistinct
but specialized to a stream of Int
, for better
performance.
Definition:
>>>
countDistinctInt = fmap IntSet.size Fold.toIntSet
>>>
countDistinctInt = Fold.postscan Fold.nubInt $ Fold.catMaybes $ Fold.length
Prerelease
frequency :: (Monad m, Ord a) => Fold m a (Map a Int) Source #
Determine the frequency of each element in the stream.
You can just collect the keys of the resulting map to get the unique elements in the stream.
Definition:
>>>
frequency = Fold.toMap id Fold.length
sum :: (Monad m, Num a) => Fold m a a Source #
Determine the sum of all elements of a stream of numbers. Returns additive
identity (0
) when the stream is empty. Note that this is not numerically
stable for floating point numbers.
>>>
sum = Fold.cumulative Fold.windowSum
Same as following but numerically stable:
>>>
sum = Fold.foldl' (+) 0
>>>
sum = fmap Data.Monoid.getSum $ Fold.foldMap Data.Monoid.Sum
product :: (Monad m, Num a, Eq a) => Fold m a a Source #
Determine the product of all elements of a stream of numbers. Returns
multiplicative identity (1
) when the stream is empty. The fold terminates
when it encounters (0
) in its input.
Same as the following but terminates on multiplication by 0
:
>>>
product = fmap Data.Monoid.getProduct $ Fold.foldMap Data.Monoid.Product
mean :: (Monad m, Fractional a) => Fold m a a Source #
Compute a numerically stable arithmetic mean of all elements in the input stream.
rollingHash :: (Monad m, Enum a) => Fold m a Int64 Source #
Compute an Int
sized polynomial rolling hash of a stream.
>>>
rollingHash = Fold.rollingHashWithSalt Fold.defaultSalt
rollingHashWithSalt :: (Monad m, Enum a) => Int64 > Fold m a Int64 Source #
Compute an Int
sized polynomial rolling hash
H = salt * k ^ n + c1 * k ^ (n  1) + c2 * k ^ (n  2) + ... + cn * k ^ 0
Where c1
, c2
, cn
are the elements in the input stream and k
is a
constant.
This hash is often used in RabinKarp string search algorithm.
toList :: Monad m => Fold m a [a] Source #
Folds the input stream to a list.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.
>>>
toList = Fold.foldr' (:) []
toListRev :: Monad m => Fold m a [a] Source #
Buffers the input stream to a list in the reverse order of the input.
Definition:
>>>
toListRev = Fold.foldl' (flip (:)) []
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
toSet :: (Monad m, Ord a) => Fold m a (Set a) Source #
Fold the input to a set.
Definition:
>>>
toSet = Fold.foldl' (flip Set.insert) Set.empty
toIntSet :: Monad m => Fold m Int IntSet Source #
Fold the input to an int set. For integer inputs this performs better than
toSet
.
Definition:
>>>
toIntSet = Fold.foldl' (flip IntSet.insert) IntSet.empty
topBy :: (MonadIO m, Unbox a) => (a > a > Ordering) > Int > Fold m a (MutArray a) Source #
Get the top n
elements using the supplied comparison function.
To get bottom n elements instead:
>>>
bottomBy cmp = Fold.topBy (flip cmp)
Example:
>>>
stream = Stream.fromList [2::Int,7,9,3,1,5,6,11,17]
>>>
Stream.fold (Fold.topBy compare 3) stream >>= MutArray.toList
[17,11,9]
Prerelease
NonEmpty Accumulators
Accumulators that do not have a default value, therefore, return
Nothing
on an empty stream.
latest :: Monad m => Fold m a (Maybe a) Source #
Returns the latest element of the input stream, if any.
>>>
latest = Fold.foldl1' (\_ x > x)
>>>
latest = fmap getLast $ Fold.foldMap (Last . Just)
maximumBy :: Monad m => (a > a > Ordering) > Fold m a (Maybe a) Source #
Determine the maximum element in a stream using the supplied comparison function.
maximum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #
Determine the maximum element in a stream.
Definitions:
>>>
maximum = Fold.maximumBy compare
>>>
maximum = Fold.foldl1' max
Same as the following but without a default maximum. The Max
Monoid uses
the minBound
as the default maximum:
>>>
maximum = fmap Data.Semigroup.getMax $ Fold.foldMap Data.Semigroup.Max
minimumBy :: Monad m => (a > a > Ordering) > Fold m a (Maybe a) Source #
Computes the minimum element with respect to the given comparison function
minimum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #
Determine the minimum element in a stream using the supplied comparison function.
Definitions:
>>>
minimum = Fold.minimumBy compare
>>>
minimum = Fold.foldl1' min
Same as the following but without a default minimum. The Min
Monoid uses the
maxBound
as the default maximum:
>>>
maximum = fmap Data.Semigroup.getMin $ Fold.foldMap Data.Semigroup.Min
Filtering Scanners
Accumulators that are usually run as a scan using the scanMaybe
combinator.
findIndices :: Monad m => (a > Bool) > Fold m a (Maybe Int) Source #
Returns the index of the latest element if the element satisfies the given predicate.
elemIndices :: (Monad m, Eq a) => a > Fold m a (Maybe Int) Source #
Returns the index of the latest element if the element matches the given value.
Definition:
>>>
elemIndices a = Fold.findIndices (== a)
deleteBy :: Monad m => (a > a > Bool) > a > Fold m a (Maybe a) Source #
Returns the latest element omitting the first occurrence that satisfies the given equality predicate.
Example:
>>>
input = Stream.fromList [1,3,3,5]
>>>
Stream.fold Fold.toList $ Stream.scanMaybe (Fold.deleteBy (==) 3) input
[1,3,5]
uniqBy :: Monad m => (a > a > Bool) > Fold m a (Maybe a) Source #
Return the latest unique element using the supplied comparison function.
Returns Nothing
if the current element is same as the last element
otherwise returns Just
.
Example, strip duplicate path separators:
>>>
input = Stream.fromList "//a//b"
>>>
f x y = x == '/' && y == '/'
>>>
Stream.fold Fold.toList $ Stream.scanMaybe (Fold.uniqBy f) input
"/a/b"
Space: O(1)
Prerelease
Terminating Folds
one :: Monad m => Fold m a (Maybe a) Source #
Take one element from the stream and stop.
Definition:
>>>
one = Fold.maybe Just
This is similar to the stream uncons
operation.
index :: Monad m => Int > Fold m a (Maybe a) Source #
Return the element at the given index.
Definition:
>>>
index = Fold.indexGeneric
find :: Monad m => (a > Bool) > Fold m a (Maybe a) Source #
Returns the first element that satisfies the given predicate.
findM :: Monad m => (a > m Bool) > Fold m a (Maybe a) Source #
Returns the first element that satisfies the given predicate.
Prerelease
lookup :: (Eq a, Monad m) => a > Fold m (a, b) (Maybe b) Source #
In a stream of (keyvalue) pairs (a, b)
, return the value b
of the
first pair where the key equals the given value a
.
Definition:
>>>
lookup x = fmap snd <$> Fold.find ((== x) . fst)
findIndex :: Monad m => (a > Bool) > Fold m a (Maybe Int) Source #
Returns the first index that satisfies the given predicate.
elemIndex :: (Eq a, Monad m) => a > Fold m a (Maybe Int) Source #
Returns the first index where a given value is found in the stream.
Definition:
>>>
elemIndex a = Fold.findIndex (== a)
notElem :: (Eq a, Monad m) => a > Fold m a Bool Source #
Returns True
if the given element is not present in the stream.
Definition:
>>>
notElem a = Fold.all (/= a)
all :: Monad m => (a > Bool) > Fold m a Bool Source #
Returns True
if all elements of the input satisfy the predicate.
Definition:
>>>
all p = Fold.lmap p Fold.and
Example:
>>>
Stream.fold (Fold.all (== 0)) $ Stream.fromList [1,0,1]
False
any :: Monad m => (a > Bool) > Fold m a Bool Source #
Returns True
if any element of the input satisfies the predicate.
Definition:
>>>
any p = Fold.lmap p Fold.or
Example:
>>>
Stream.fold (Fold.any (== 0)) $ Stream.fromList [1,0,1]
True
Incremental builders
Mutable arrays (Streamly.Data.MutArray) are basic builders. You can
use the snoc
or
writeAppend
operations to incrementally build
mutable arrays. The addOne
and addStream
combinators can be used to
incrementally build any type of structure using a fold, including arrays
or a stream of arrays.
Use pinned arrays if you are going to use the data for IO.
addOne :: Monad m => a > Fold m a b > m (Fold m a b) Source #
Append a singleton value to the fold.
See examples under addStream
.
Prerelease
addStream :: Monad m => Stream m a > Fold m a b > m (Fold m a b) Source #
Append a stream to a fold to build the fold accumulator incrementally. We
can repeatedly call addStream
on the same fold to continue building the
fold and finally use drive
to finish the fold and extract the result. Also
see the addOne
operation which is a singleton version
of addStream
.
Definitions:
>>>
addStream stream = Fold.drive stream . Fold.duplicate
Example, build a list incrementally:
>>>
:{
pure (Fold.toList :: Fold IO Int [Int]) >>= Fold.addOne 1 >>= Fold.addStream (Stream.enumerateFromTo 2 4) >>= Fold.drive Stream.nil >>= print :} [1,2,3,4]
This can be used as an O(n) list append compared to the O(n^2) ++
when
used for incrementally building a list.
Example, build a stream incrementally:
>>>
:{
pure (Fold.toStream :: Fold IO Int (Stream Identity Int)) >>= Fold.addOne 1 >>= Fold.addStream (Stream.enumerateFromTo 2 4) >>= Fold.drive Stream.nil >>= print :} fromList [1,2,3,4]
This can be used as an O(n) stream append compared to the O(n^2) <>
when
used for incrementally building a stream.
Example, build an array incrementally:
>>>
:{
pure (Array.write :: Fold IO Int (Array Int)) >>= Fold.addOne 1 >>= Fold.addStream (Stream.enumerateFromTo 2 4) >>= Fold.drive Stream.nil >>= print :} fromList [1,2,3,4]
Example, build an array stream incrementally:
>>>
:{
let f :: Fold IO Int (Stream Identity (Array Int)) f = Fold.groupsOf 2 (Array.writeN 3) Fold.toStream in pure f >>= Fold.addOne 1 >>= Fold.addStream (Stream.enumerateFromTo 2 4) >>= Fold.drive Stream.nil >>= print :} fromList [fromList [1,2],fromList [3,4]]
duplicate :: Monad m => Fold m a b > Fold m a (Fold m a b) Source #
duplicate
provides the ability to run a fold in parts. The duplicated
fold consumes the input and returns the same fold as output instead of
returning the final result, the returned fold can be run later to consume
more input.
duplicate
essentially appends a stream to the fold without finishing the
fold. Compare with snoc
which appends a singleton value to the fold.
Prerelease
Combinators
Combinators are modifiers of folds. In the type Fold m a b
, a
is
the input type and b
is the output type. Transformations can be
applied either on the input side (contravariant) or on the output side
(covariant). Therefore, combinators are of one of the following general
shapes:
... > Fold m a b > Fold m c b
(input transformation)... > Fold m a b > Fold m a c
(output transformation)
The input side transformations are more interesting for folds. Most of
the following sections describe the input transformation operations on a
fold. When an operation makes sense on both input and output side we use
the prefix l
(for left) for input side operations and the prefix r
(for right) for output side operations.
Mapping on output
The Functor
instance of a fold maps on the output of the fold:
>>>
Stream.fold (fmap show Fold.sum) (Stream.enumerateFromTo 1 100)
"5050"
rmapM :: Monad m => (b > m c) > Fold m a b > Fold m a c Source #
Map a monadic function on the output of a fold.
Mapping on Input
lmap :: (a > b) > Fold m b r > Fold m a r Source #
lmap f fold
maps the function f
on the input of the fold.
Definition:
>>>
lmap = Fold.lmapM return
Example:
>>>
sumSquared = Fold.lmap (\x > x * x) Fold.sum
>>>
Stream.fold sumSquared (Stream.enumerateFromTo 1 100)
338350
lmapM :: Monad m => (a > m b) > Fold m b r > Fold m a r Source #
lmapM f fold
maps the monadic function f
on the input of the fold.
Scanning and Filtering
scanMaybe :: Monad m => Fold m a (Maybe b) > Fold m b c > Fold m a c Source #
Use a Maybe
returning fold as a filtering scan.
>>>
scanMaybe p f = Fold.postscan p (Fold.catMaybes f)
Prerelease
filter :: Monad m => (a > Bool) > Fold m a r > Fold m a r Source #
Include only those elements that pass a predicate.
>>>
Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
40
>>>
filter p = Fold.scanMaybe (Fold.filtering p)
>>>
filter p = Fold.filterM (return . p)
>>>
filter p = Fold.mapMaybe (\x > if p x then Just x else Nothing)
filterM :: Monad m => (a > m Bool) > Fold m a r > Fold m a r Source #
Like filter
but with a monadic predicate.
>>>
f p x = p x >>= \r > return $ if r then Just x else Nothing
>>>
filterM p = Fold.mapMaybeM (f p)
mapMaybe :: Monad m => (a > Maybe b) > Fold m b r > Fold m a r Source #
mapMaybe f fold
maps a Maybe
returning function f
on the input of
the fold, filters out Nothing
elements, and return the values extracted
from Just
.
>>>
mapMaybe f = Fold.lmap f . Fold.catMaybes
>>>
mapMaybe f = Fold.mapMaybeM (return . f)
>>>
f x = if even x then Just x else Nothing
>>>
fld = Fold.mapMaybe f Fold.toList
>>>
Stream.fold fld (Stream.enumerateFromTo 1 10)
[2,4,6,8,10]
catEithers :: Fold m a b > Fold m (Either a a) b Source #
Remove the either wrapper and flatten both lefts and as well as rights in the output stream.
Definition:
>>>
catEithers = Fold.lmap (either id id)
Prerelease
Trimming
take :: Monad m => Int > Fold m a b > Fold m a b Source #
Take at most n
input elements and fold them using the supplied fold. A
negative count is treated as 0.
>>>
Stream.fold (Fold.take 2 Fold.toList) $ Stream.fromList [1..10]
[1,2]
takeEndBy :: Monad m => (a > Bool) > Fold m a b > Fold m a b Source #
Take the input, stop when the predicate succeeds taking the succeeding element as well.
Example:
>>>
input = Stream.fromList "hello\nthere\n"
>>>
line = Fold.takeEndBy (== '\n') Fold.toList
>>>
Stream.fold line input
"hello\n"
>>>
Stream.fold Fold.toList $ Stream.foldMany line input
["hello\n","there\n"]
takeEndBy_ :: Monad m => (a > Bool) > Fold m a b > Fold m a b Source #
Like takeEndBy
but drops the element on which the predicate succeeds.
Example:
>>>
input = Stream.fromList "hello\nthere\n"
>>>
line = Fold.takeEndBy_ (== '\n') Fold.toList
>>>
Stream.fold line input
"hello"
>>>
Stream.fold Fold.toList $ Stream.foldMany line input
["hello","there"]
Splitting
splitWith :: Monad m => (a > b > c) > Fold m x a > Fold m x b > Fold m x c Source #
Sequential fold application. Apply two folds sequentially to an input stream. The input is provided to the first fold, when it is done  the remaining input is provided to the second fold. When the second fold is done or if the input stream is over, the outputs of the two folds are combined using the supplied function.
Example:
>>>
header = Fold.take 8 Fold.toList
>>>
line = Fold.takeEndBy (== '\n') Fold.toList
>>>
f = Fold.splitWith (,) header line
>>>
Stream.fold f $ Stream.fromList "header: hello\n"
("header: ","hello\n")
Note: This is dual to appending streams using append
.
Note: this implementation allows for stream fusion but has quadratic time complexity, because each composition adds a new branch that each subsequent fold's input element has to traverse, therefore, it cannot scale to a large number of compositions. After around 100 compositions the performance starts dipping rapidly compared to a CPS style implementation.
For larger number of compositions you can convert the fold to a parser and use ParserK.
Time: O(n^2) where n is the number of compositions.
many :: Monad m => Fold m a b > Fold m b c > Fold m a c Source #
Collect zero or more applications of a fold. many first second
applies
the first
fold repeatedly on the input stream and accumulates it's results
using the second
fold.
>>>
two = Fold.take 2 Fold.toList
>>>
twos = Fold.many two Fold.toList
>>>
Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]
Stops when second
fold stops.
groupsOf :: Monad m => Int > Fold m a b > Fold m b c > Fold m a c Source #
groupsOf n split collect
repeatedly applies the split
fold to chunks
of n
items in the input stream and supplies the result to the collect
fold.
Definition:
>>>
groupsOf n split = Fold.many (Fold.take n split)
Example:
>>>
twos = Fold.groupsOf 2 Fold.toList Fold.toList
>>>
Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]
Stops when collect
stops.
Parallel Distribution
For applicative composition using distribution see Streamly.Internal.Data.Fold.Tee.
teeWith :: Monad m => (a > b > c) > Fold m x a > Fold m x b > Fold m x c Source #
teeWith k f1 f2
distributes its input to both f1
and f2
until both
of them terminate and combines their output using k
.
Definition:
>>>
teeWith k f1 f2 = fmap (uncurry k) (Fold.tee f1 f2)
Example:
>>>
avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>>
Stream.fold avg $ Stream.fromList [1.0..100.0]
50.5
For applicative composition using this combinator see Streamly.Data.Fold.Tee.
See also: Streamly.Data.Fold.Tee
Note that nested applications of teeWith do not fuse.
tee :: Monad m => Fold m a b > Fold m a c > Fold m a (b, c) Source #
Distribute one copy of the stream to each fold and zip the results.
Fold m a b stream m a m (b,c) Fold m a c
Definition:
>>>
tee = Fold.teeWith (,)
Example:
>>>
t = Fold.tee Fold.sum Fold.length
>>>
Stream.fold t (Stream.enumerateFromTo 1.0 100.0)
(5050.0,100)
distribute :: Monad m => [Fold m a b] > Fold m a [b] Source #
Distribute one copy of the stream to each fold and collect the results in a container.
Fold m a b stream m a m [b] Fold m a b   ...
>>>
Stream.fold (Fold.distribute [Fold.sum, Fold.length]) (Stream.enumerateFromTo 1 5)
[15,5]
>>>
distribute = Prelude.foldr (Fold.teeWith (:)) (Fold.fromPure [])
This is the consumer side dual of the producer side sequence
operation.
Stops when all the folds stop.
Partitioning
Direct items in the input stream to different folds using a binary fold selector.
Keyvalue Collectors
toMap :: (Monad m, Ord k) => (a > k) > Fold m a b > Fold m a (Map k b) Source #
Split the input stream based on a key field and fold each split using the given fold. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.
Example:
>>>
import Data.Map.Strict (Map)
>>>
:{
let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)] classify = Fold.toMap fst (Fold.lmap snd Fold.toList) in Stream.fold classify input :: IO (Map String [Double]) :} fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]
Once the classifier fold terminates for a particular key any further inputs in that bucket are ignored.
Space used is proportional to the number of keys seen till now and monotonically increases because it stores whether a key has been seen or not.
See demuxToMap
for a more powerful version where you can use a different
fold for each key. A simpler version of toMap
retaining only the last
value for a key can be written as:
>>>
toMap = Fold.foldl' (\kv (k, v) > Map.insert k v kv) Map.empty
Stops: never
Prerelease
toMapIO :: (MonadIO m, Ord k) => (a > k) > Fold m a b > Fold m a (Map k b) Source #
Same as toMap
but maybe faster because it uses mutable cells as
fold accumulators in the Map.
demuxToMap :: (Monad m, Ord k) => (a > k) > (a > m (Fold m a b)) > Fold m a (Map k b) Source #
This collects all the results of demux
in a Map.
demuxToMapIO :: (MonadIO m, Ord k) => (a > k) > (a > m (Fold m a b)) > Fold m a (Map k b) Source #
Same as demuxToMap
but uses demuxIO
for better performance.
Keyvalue Scanners
classify :: (Monad m, Ord k) => (a > k) > Fold m a b > Fold m a (m (Map k b), Maybe (k, b)) Source #
Folds the values for each key using the supplied fold. When scanning, as soon as the fold is complete, its result is available in the second component of the tuple. The first component of the tuple is a snapshot of the inprogress folds.
Once the fold for a key is done, any future values of the key are ignored.
Definition:
>>>
classify f fld = Fold.demux f (const fld)
classifyIO :: (MonadIO m, Ord k) => (a > k) > Fold m a b > Fold m a (m (Map k b), Maybe (k, b)) Source #
Same as classify except that it uses mutable IORef cells in the Map providing better performance. Be aware that if this is used as a scan, the values in the intermediate Maps would be mutable.
Definitions:
>>>
classifyIO f fld = Fold.demuxIO f (const fld)
demux :: (Monad m, Ord k) => (a > k) > (a > m (Fold m a b)) > Fold m a (m (Map k b), Maybe (k, b)) Source #
demux getKey getFold
: In a key value stream, fold values corresponding
to each key using a key specific fold. getFold
is invoked to generate a
key specific fold when a key is encountered for the first time in the
stream.
The first component of the output tuple is a keyvalue Map of inprogress folds. The fold returns the fold result as the second component of the output tuple whenever a fold terminates.
If a fold terminates, another instance of the fold is started upon receiving
an input with that key, getFold
is invoked again whenever the key is
encountered again.
This can be used to scan a stream and collect the results from the scan output.
Since the fold generator function is monadic we can add folds dynamically. For example, we can maintain a Map of keys to folds in an IORef and lookup the fold from that corresponding to a key. This Map can be changed dynamically, folds for new keys can be added or folds for old keys can be deleted or modified.
Compare with classify
, the fold in classify
is a static fold.
Prerelease
demuxIO :: (MonadIO m, Ord k) => (a > k) > (a > m (Fold m a b)) > Fold m a (m (Map k b), Maybe (k, b)) Source #
This is specialized version of demux
that uses mutable IO cells as
fold accumulators for better performance.
Keep in mind that the values in the returned Map may be changed by the ongoing fold if you are using those concurrently in another thread.
Unzipping
unzip :: Monad m => Fold m a x > Fold m b y > Fold m (a, b) (x, y) Source #
Send the elements of tuples in a stream of tuples through two different folds.
Fold m a x stream of (a,b) m (x,y) Fold m b y
Definition:
>>>
unzip = Fold.unzipWith id
This is the consumer side dual of the producer side zip
operation.
Nesting
concatMap :: Monad m => (b > Fold m a c) > Fold m a b > Fold m a c Source #
Map a Fold
returning function on the result of a Fold
and run the
returned fold. This operation can be used to express data dependencies
between fold operations.
Let's say the first element in the stream is a count of the following elements that we have to add, then:
>>>
import Data.Maybe (fromJust)
>>>
count = fmap fromJust Fold.one
>>>
total n = Fold.take n Fold.sum
>>>
Stream.fold (Fold.concatMap total count) $ Stream.fromList [10,9..1]
45
This does not fuse completely, see refold
for a fusible alternative.
Time: O(n^2) where n
is the number of compositions.
See also: foldIterateM
, refold
Transforming the Monad
morphInner :: (forall x. m x > n x) > Fold m a b > Fold n a b Source #
Change the underlying monad of a fold. Also known as hoist.
Prerelease
Deprecated
chunksOf :: Monad m => Int > Fold m a b > Fold m b c > Fold m a c Source #
Deprecated: Please use groupsOf
instead
head :: Monad m => Fold m a (Maybe a) Source #
Deprecated: Please use "one" instead
Extract the first element of the stream, if any.
>>>
head = Fold.one
sequence :: Monad m => Fold m a (m b) > Fold m a b Source #
Deprecated: Use "rmapM id" instead
Flatten the monadic output of a fold to pure output.
mapM :: Monad m => (b > m c) > Fold m a b > Fold m a c Source #
Deprecated: Use rmapM instead
Map a monadic function on the output of a fold.
variance :: (Monad m, Fractional a) => Fold m a a Source #
Deprecated: Use the streamlystatistics package instead
Compute a numerically stable (population) variance over all elements in the input stream.