Streamly.Data.Fold.Prelude
All Fold related combinators including the streamly-core Streamly.Data.Fold module, concurrency, unordered container operations.
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
:set -XFlexibleContexts
>>>
import Control.Concurrent (threadDelay)
>>>
import Data.List (sortOn)
>>>
import Data.HashMap.Strict (HashMap)
>>>
import Streamly.Data.Fold (Fold)
>>>
import Streamly.Data.Stream (Stream)
>>>
import qualified Data.HashMap.Strict as HM
>>>
import qualified Streamly.Data.Fold.Prelude as Fold
>>>
import qualified Streamly.Data.Stream.Prelude as Stream
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Data.Fold as Fold
>>>
import qualified Streamly.Internal.Data.Fold.Prelude as Fold
Streamly.Data.Fold
All Streamly.Data.Fold combinators are re-exported via this module. For more pre-release combinators also see Streamly.Internal.Data.Fold module.
module Streamly.Data.Fold
Concurrent Operations
Configuration
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
boundThreads :: Bool -> Config -> Config Source #
Spawn bound threads (i.e., spawn threads using forkOS
instead of
forkIO
). The default value is False
.
Currently, this only takes effect only for concurrent folds.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends. When the
stream does not end normally, the channel debug information is printed when
the channel is garbage collected. If you are expecting but not seeing the
debug info try adding a performMajorGC
before the program ends.
Combinators
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #
Deprecated: Please use parBuffered instead.
parBuffered
introduces a concurrent stage at the input of the fold. The
inputs are asynchronously queued in a buffer and evaluated concurrently with
the evaluation of the source stream. On finalization, parBuffered
waits for
the asynchronous fold to complete before it returns.
In the following example both the stream and the fold have a 1 second delay, but the delay is not compounded because both run concurrently.
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>>
dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
>>>
Stream.fold dst src
...
Another example:
>>>
Stream.toList $ Stream.groupsOf 4 dst src
...
Container Related
toHashMapIO :: (MonadIO m, Hashable k) => (a -> k) -> Fold m a b -> Fold m a (HashMap k b) Source #
Split the input stream based on a hashable component of the key field and fold each split using the given fold. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.
Consider a stream of key value pairs:
>>>
input = Stream.fromList [("k1",1),("k1",1.1),("k2",2), ("k2",2.2)]
Classify each key to a different hash bin and fold the bins:
>>>
classify = Fold.toHashMapIO fst (Fold.lmap snd Fold.toList)
>>>
sortOn fst . HM.toList <$> Stream.fold classify input :: IO [(String, [Double])]
[("k1",[1.0,1.1]),("k2",[2.0,2.2])]
Pre-release