Streamly.Internal.Data.Fold.Prelude
Channel
Type
The fold driver thread queues the input of the fold in the inputQueue
The driver rings the doorbell when the queue transitions from empty to
non-empty state.
The fold consumer thread dequeues the input items from the inputQueue
and
supplies them to the fold. When the fold is done the output of the fold is
placed in inputQueue
and outputDoorBell
is rung.
The fold driver thread keeps watching the outputQueue
, if the fold has
terminated, it stops queueing the input to the inputQueue
If the fold driver runs out of input it stops and waits for the fold to drain the buffered input.
Driver thread ------>------Input Queue and Doorbell ----->-----Fold thread
Driver thread ------<------Output Queue and Doorbell-----<-----Fold thread
Constructors
Channel | |
Fields
|
Constructors
FoldException ThreadId SomeException | |
FoldPartial b | |
FoldDone ThreadId b |
Configuration
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
boundThreads :: Bool -> Config -> Config Source #
Spawn bound threads (i.e., spawn threads using forkOS
instead of
forkIO
). The default value is False
.
Currently, this only takes effect only for concurrent folds.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends. When the
stream does not end normally, the channel debug information is printed when
the channel is garbage collected. If you are expecting but not seeing the
debug info try adding a performMajorGC
before the program ends.
Operations
newChannelWith :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Fold m a b -> m (Channel m a b, ThreadId) Source #
newChannelWithScan :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Scanl m a b -> m (Channel m a b, ThreadId) Source #
newChannel :: MonadRunInIO m => (Config -> Config) -> Fold m a b -> m (Channel m a b) Source #
newScanChannel :: MonadRunInIO m => (Config -> Config) -> Scanl m a b -> m (Channel m a b) Source #
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b) Source #
Push values from a driver to a fold worker via a Channel. Blocks if no
space is available in the buffer. Before pushing a value to the Channel it
polls for events received from the fold worker. If a stop event is received
then it returns True
otherwise false. Propagates exceptions received from
the fold worker.
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m () Source #
Like sendToWorker but only sends, does not receive any events from the fold.
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b) Source #
dumpChannel :: Channel m a b -> IO String Source #
Dump the channel stats for diagnostics. Used when inspect
option is
enabled.
Concurrency
parBuffered :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #
parBuffered
introduces a concurrent stage at the input of the fold. The
inputs are asynchronously queued in a buffer and evaluated concurrently with
the evaluation of the source stream. On finalization, parBuffered
waits for
the asynchronous fold to complete before it returns.
In the following example both the stream and the fold have a 1 second delay, but the delay is not compounded because both run concurrently.
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>>
dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
>>>
Stream.fold dst src
...
Another example:
>>>
Stream.toList $ Stream.groupsOf 4 dst src
...
parLmapM :: (Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r Source #
Evaluate the mapped actions concurrently with respect to each other. The results may be unordered or ordered depending on the configuration.
Unimplemented
parTeeWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #
Execute both the folds in a tee concurrently.
Definition:
>>>
parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
Example:
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
c1 = Fold.lmapM delay Fold.sum
>>>
c2 = Fold.lmapM delay Fold.length
>>>
dst = Fold.parTeeWith id (,) c1 c2
>>>
Stream.fold dst src
...
parDistribute :: MonadAsync m => (Config -> Config) -> [Fold m a b] -> Fold m a [b] Source #
Distribute the input to all the folds in the supplied list concurrently.
Definition:
>>>
parDistribute cfg = Fold.distribute . fmap (Fold.parBuffered cfg)
Example:
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
c = Fold.lmapM delay Fold.sum
>>>
dst = Fold.parDistribute id [c,c,c]
>>>
Stream.fold dst src
...
parPartition :: MonadAsync m => (Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y) Source #
Select first fold for Left input and second for Right input. Both folds run concurrently.
Definition
>>>
parPartition cfg c1 c2 = Fold.partition (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
Example:
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
c1 = Fold.lmapM delay Fold.sum
>>>
c2 = Fold.lmapM delay Fold.sum
>>>
dst = Fold.parPartition id c1 c2
>>>
Stream.fold dst $ (fmap (\x -> if even x then Left x else Right x)) src
...
parUnzipWithM :: MonadAsync m => (Config -> Config) -> (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #
Split and distribute the output to two different folds and then zip the results. Both the consumer folds run concurrently.
Definition
>>>
parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
Example:
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
c1 = Fold.lmapM delay Fold.sum
>>>
c2 = Fold.lmapM delay Fold.sum
>>>
dst = Fold.parUnzipWithM id (pure . id) c1 c2
>>>
Stream.fold dst $ (fmap (\x -> (x, x* x))) src
...
parDistributeScan :: MonadAsync m => (Config -> Config) -> m [Fold m a b] -> Stream m a -> Stream m [b] Source #
Evaluate a stream and send its outputs to zero or more dynamically generated folds. It checks for any new folds at each input generation step. Any new fold is added to the list of folds which are currently running. If there are no folds available, the input is discarded. If a fold completes its output is emitted in the output of the scan.
>>>
import Data.IORef
>>>
ref <- newIORef [Fold.take 2 Fold.sum, Fold.take 2 Fold.length :: Fold.Fold IO Int Int]
>>>
gen = atomicModifyIORef ref (\xs -> ([], xs))
>>>
Stream.toList $ Fold.parDistributeScan id gen (Stream.enumerateFromTo 1 10)
...
parDemuxScan :: (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> m (Fold m a b)) -> Stream m a -> Stream m [(k, b)] Source #
Evaluate a stream and send its outputs to the selected fold. The fold is dynamically selected using a key at the time of the first input seen for that key. Any new fold is added to the list of folds which are currently running. If there are no folds available for a given key, the input is discarded. If a fold completes its output is emitted in the output of the scan.
>>>
import qualified Data.Map.Strict as Map
>>>
import Data.Maybe (fromJust)
>>>
f1 = ("even", Fold.take 2 Fold.sum)
>>>
f2 = ("odd", Fold.take 2 Fold.sum)
>>>
kv = Map.fromList [f1, f2]
>>>
getFold k = return (fromJust $ Map.lookup k kv)
>>>
getKey x = if even x then "even" else "odd"
>>>
input = Stream.enumerateFromTo 1 10
>>>
Stream.toList $ Fold.parDemuxScan id getKey getFold input
...
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #
Deprecated: Please use parBuffered instead.
parBuffered
introduces a concurrent stage at the input of the fold. The
inputs are asynchronously queued in a buffer and evaluated concurrently with
the evaluation of the source stream. On finalization, parBuffered
waits for
the asynchronous fold to complete before it returns.
In the following example both the stream and the fold have a 1 second delay, but the delay is not compounded because both run concurrently.
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>>
dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
>>>
Stream.fold dst src
...
Another example:
>>>
Stream.toList $ Stream.groupsOf 4 dst src
...
Time
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b Source #
takeInterval n fold
uses fold
to fold the input items arriving within
a window of first n
seconds.
>>>
input = Stream.delay 0.2 $ Stream.fromList [1..10]
>>>
Stream.fold (Fold.takeInterval 1.0 Fold.toList) input
[1,2,3,4,5,6]
>>>
f = Fold.takeInterval 0.5 Fold.toList
>>>
Stream.fold Fold.toList $ Stream.foldMany f input
[[1,2,3,4],[5,6,7],[8,9,10]]
Stops when fold
stops or when the timeout occurs. Note that the fold needs
an input after the timeout to stop. For example, if no input is pushed to
the fold until one hour after the timeout had occurred, then the fold will
be done only after consuming that input.
Pre-release
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c Source #
Group the input stream into windows of n second each using the first fold and then fold the resulting groups using the second fold.
>>>
intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
>>>
Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
[[1,2,3,4],[5,6,7],[8,9,10]]
intervalsOf n split = many (takeInterval n split)
Pre-release
Deprecated
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #
A fold to write a stream to an SVar. Unlike toSVar
this does not allow
for concurrent evaluation of the stream, as the fold receives the input one
element at a time, it just forwards the elements to the SVar. However, we
can safely execute the fold in an independent thread, the SVar can act as a
buffer decoupling the sender from the receiver. Also, we can have multiple
folds running concurrently pusing the streams to the SVar.
writeLimited :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #
Like write, but applies a yield limit.