Loading...

Streamly.Internal.Data.Scanl.Prelude

Channel

Type

data Channel m a b Source #

The fold driver thread queues the input of the fold in the inputQueue The driver rings the doorbell when the queue transitions from empty to non-empty state.

The fold consumer thread dequeues the input items from the inputQueue and supplies them to the fold. When the fold is done the output of the fold is placed in inputQueue and outputDoorBell is rung.

The fold driver thread keeps watching the outputQueue, if the fold has terminated, it stops queueing the input to the inputQueue

If the fold driver runs out of input it stops and waits for the fold to drain the buffered input.

Driver thread ------>------Input Queue and Doorbell ----->-----Fold thread

Driver thread ------<------Output Queue and Doorbell-----<-----Fold thread

Constructors

Channel 

Fields

Configuration

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

boundThreads :: Bool -> Config -> Config Source #

Spawn bound threads (i.e., spawn threads using forkOS instead of forkIO). The default value is False.

Currently, this only takes effect only for concurrent folds.

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends. When the stream does not end normally, the channel debug information is printed when the channel is garbage collected. If you are expecting but not seeing the debug info try adding a performMajorGC before the program ends.

Operations

newChannelWith :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Fold m a b -> m (Channel m a b, ThreadId) Source #

newChannelWithScan :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Scanl m a b -> m (Channel m a b, ThreadId) Source #

newChannel :: MonadRunInIO m => (Config -> Config) -> Fold m a b -> m (Channel m a b) Source #

newScanChannel :: MonadRunInIO m => (Config -> Config) -> Scanl m a b -> m (Channel m a b) Source #

sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b) Source #

Push values from a driver to a fold worker via a Channel. Blocks if no space is available in the buffer. Before pushing a value to the Channel it polls for events received from the fold worker. If a stop event is received then it returns True otherwise false. Propagates exceptions received from the fold worker.

sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m () Source #

Like sendToWorker but only sends, does not receive any events from the fold.

checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b) Source #

Poll for events sent by the fold worker to the fold driver. The fold consumer can send a Stop event or an exception. When a Stop is received this function returns True. If an exception is recieved then it throws the exception.

dumpChannel :: Channel m a b -> IO String Source #

Dump the channel stats for diagnostics. Used when inspect option is enabled.

cleanup :: MonadIO m => Channel m a b -> m () Source #

finalize :: MonadIO m => Channel m a b -> m () Source #

Concurrency

parTeeWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c Source #

Execute both the scans in a tee concurrently.

Example:

>>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>> delay x = threadDelay 1000000 >> print x >> return x
>>> c1 = Scanl.lmapM delay Scanl.sum
>>> c2 = Scanl.lmapM delay Scanl.length
>>> dst = Scanl.parTeeWith id (,) c1 c2
>>> Stream.toList $ Stream.scanl dst src
...

parDistributeScan :: MonadAsync m => (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b] Source #

Evaluate a stream and scan its outputs using zero or more dynamically generated parallel scans. It checks for any new folds at each input generation step. Any new fold is added to the list of folds which are currently running. If there are no folds available, the input is discarded. If a fold completes its output is emitted in the output of the scan. The outputs of the parallel scans are merged in the output stream.

>>> import Data.IORef
>>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
>>> gen = atomicModifyIORef ref (\xs -> ([], xs))
>>> Stream.toList $ Scanl.parDistributeScan id gen (Stream.enumerateFromTo 1 10)
...

parDemuxScan :: (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> m (Scanl m a b)) -> Stream m a -> Stream m [(k, b)] Source #

Evaluate a stream and send its outputs to the selected scan. The scan is dynamically selected using a key at the time of the first input seen for that key. Any new scan is added to the list of scans which are currently running. If there are no scans available for a given key, the input is discarded. If a constituent scan completes its output is emitted in the output of the composed scan.

>>> import qualified Data.Map.Strict as Map
>>> import Data.Maybe (fromJust)
>>> f1 = ("even", Scanl.take 5 Scanl.sum)
>>> f2 = ("odd", Scanl.take 5 Scanl.sum)
>>> kv = Map.fromList [f1, f2]
>>> getScan k = return (fromJust $ Map.lookup k kv)
>>> getKey x = if even x then "even" else "odd"
>>> input = Stream.enumerateFromTo 1 10
>>> Stream.toList $ Scanl.parDemuxScan id getKey getScan input
...