Loading...

Streamly.Internal.Data.Stream.Prelude

Concurrency Channels

Type

data Channel m a Source #

A mutable channel to evaluate multiple streams concurrently and provide the combined results as output stream.

There are only two actors working on the channel data structure, the event processing loop (single thread), and the workers (multiple threads). Locking notes are provided below for concurrent access.

Constructors

Channel 

Fields

  • svarMrun :: RunInIO m

    Runner for the monadic actions in the stream. Captures the monad state at the point where the channel was created and uses the same state to run all actions.

  • maxBufferLimit :: Limit

    Maximum size of the outputQueue. The actual worst case buffer could be double of this as the event loop may read the queue and the workers may fill it up even before the event loop has started consuming.

  • outputQueue :: IORef ([ChildEvent a], Int)

    (events, count): worker event queue of the channel. This is where the workers queue the results and other events.

    LOCKING
    Frequently locked. This is locked and updated by workers on each yield, and locked, updated by the event loop thread once in a while for reading. Workers' locking contention may be high if there are a large number of workers.
  • outputDoorBell :: MVar ()

    Door bell for workers to wakeup the event loop.

    LOCKING
    Infrequently locked. Used only when the outputQueue transitions from empty to non-empty, or a work item is queued by a worker to the work queue and doorBellOnWorkQ is set by the event loop.
  • readOutputQ :: m [ChildEvent a]

    Function to read the output queue of the channel, depends on the rate control option.

  • postProcess :: m Bool

    Function to invoke after all the events read in a batch are processed i.e. before we go on to read the next batch, depends on the rate control option.

  • remainingWork :: Maybe (IORef Count)

    Tracks how many yields are remaining before the channel stops, used when maxYields option is enabled.

    LOCKING
    Read only access by event loop when dispatching a worker. Decremented by workers when picking work and undo decrement if the worker does not yield a value.
  • isWorkDone :: IO Bool

    Determine if there is no more work to do. When maxYields is set for the channel we may be done even if the work queue still has work.

  • yieldRateInfo :: Maybe YieldRateInfo

    Rate control information for the channel used when rate control is enabled,

  • doorBellOnWorkQ :: IORef Bool

    When set to True, ring outputDoorBell when a work item is queued on the work queue. This is set by the dispatcher before going to sleep. It wants to be woken up whenever the work queue got more work to do so that it can dispatch a worker.

  • eagerDispatch :: m ()

    This is a hook which is invoked whenever the tail of the stream is re-enqueued on the work queue. Normally, this is set to a noop. When eager option is enabled this is set to an unconditional worker dispatch function. This ensures that we eagerly sends a worker as long as there is work to do.

  • enqueue :: (RunInIO m, StreamK m a) -> IO ()

    Enqueue a stream for evaluation on the channel. The first element of the tuple is the runner function which is used to run the stream actions in a specific monadic context.

  • isQueueDone :: IO Bool

    Determine if the work queue is empty, therefore, there is no more work to do.

  • workLoop :: Maybe WorkerInfo -> m ()

    Worker function. It is implicitly aware of the work queue. It dequeues a work item from the queue and runs it. It keeps on doing this in a loop until it determines that it needs to stop.

    Normally, the worker stops when the work queue becomes empty or the work rate is higher than the target rate when rate control is enabled. It stops by sending a ChildStop event to the channel

    When rate control is enabled, the worker is dispatched with a WorkerInfo record which is used by the worker to maintain rate control information and communicate it to the channel.

  • maxWorkerLimit :: Limit

    This is capped to maxBufferLimit if set to more than that. Otherwise potentially each worker may yield one value to the buffer in the worst case exceeding the requested buffer size.

  • workerThreads :: IORef (Set ThreadId)

    Tracks all active worker threads. An entry is added by the dispatcher when a worker is dispatched, and removed whenever the event processing loop receives a ChildStop event.

    LOCKING
    Updated unlocked, only by the event loop thread.
  • workerCount :: IORef Int

    Total number of active worker threads.

    LOCKING
    Updated locked, by the event loop thread when dispatching a worker and by a worker thread when the thread stops. This is read without lock at several places where we want to rely on an approximate value.
  • accountThread :: ThreadId -> m ()
     
  • workerStopMVar :: MVar ()

    Used when ordered is enabled. This is a lock to stop the workers one at a time. Stopping one might affect whether the other should stop.

  • svarRef :: Maybe (IORef ())

    A weak IORef to call a cleanup function when the channel is garbage collected.

  • svarStats :: SVarStats

    Stats collection.

  • svarInspectMode :: Bool

    When inspect mode is enabled we report diagnostic data about the channel at certain points.

  • svarCreator :: ThreadId

    threadId of the thread that created the channel

Configuration

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

Limits

maxThreads :: Int -> Config -> Config Source #

Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

maxYields :: Maybe Int64 -> Config -> Config Source #

The maximum number of yields that this channel would produce. The Channel automatically stops after that. This could be used to limit the speculative execution beyond the limit.

Nothing means there is no limit.

Keep in mind that checking this limit all the time has a performance overhead.

Known Bugs: currently this works only when rate is specified. Known Bugs: for ordered streams sometimes the actual count is less than expected.

Rate Control

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the yield count gap becomes more than rateBuffer (specified as a yield count) we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Constructors

Rate 

Fields

rate :: Maybe Rate -> Config -> Config Source #

Specify the stream evaluation rate of a channel.

A Nothing value means there is no smart rate control, concurrent execution blocks only if maxThreads or maxBuffer is reached, or there are no more concurrent tasks to execute. This is the default.

When rate (throughput) is specified, concurrent production may be ramped up or down automatically to achieve the specified stream throughput. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a channel is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Maximum production rate is given by:

\(rate = \frac{maxThreads}{latency}\)

If we know the average latency of the tasks we can set maxThreads accordingly.

avgRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

minRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

maxRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

constRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Stop behavior

data StopWhen Source #

Specify when the Channel should stop.

Constructors

FirstStops

Stop when the first stream ends.

AllStop

Stop when all the streams end.

AnyStops

Stop when any one stream ends.

stopWhen :: StopWhen -> Config -> Config Source #

Specify when the Channel should stop.

Scheduling behavior

eager :: Bool -> Config -> Config Source #

By default, processing of output from the worker threads is given priority over dispatching new workers. More workers are dispatched only when there is no output to process. When eager is set to True, workers are dispatched aggresively as long as there is more work to do irrespective of whether there is output pending to be processed by the stream consumer. However, dispatching may stop if maxThreads or maxBuffer is reached.

Note: This option has no effect when rate has been specified.

Note: Not supported with interleaved.

ordered :: Bool -> Config -> Config Source #

When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.

Note: Not supported with interleaved.

interleaved :: Bool -> Config -> Config Source #

Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.

Note: Can only be used on finite number of streams.

Note: Not supported with ordered.

Diagnostics

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends. When the stream does not end normally, the channel debug information is printed when the channel is garbage collected. If you are expecting but not seeing the debug info try adding a performMajorGC before the program ends.

Sending Worker Events

yieldWith Source #

Arguments

:: Maybe WorkerInfo

Rate control info for the worker

-> Channel m a 
-> a 
-> IO Bool

True means the worker can continue otherwise stop.

Used by workers to send a value to the channel's output stream.

When a worker is dispatched, a WorkerInfo record is supplied to it by the dispatcher. This record contains the timestamp at the time of dispatch. Whenever the worker yields a value, the yield count in the WorkerInfo is incremented. If the channel has rate control enabled, the yield count and time duration is periodically (based on workerPollingInterval) pushed to the channel's workerPendingLatency stat. It is done only if the workerPollingInterval is non-zero.

Queues the event but returns False if:

  • the buffer limit is exceeding
  • channel yield rate is exceeding (when rate control is enabled and WorkerInfo is available)

This is a thread-safe API and can be called by anyone from anywhere. Even a thread that is not registered as a worker with the channel can use it but when rate control is enabled, it might confuse the rate control mechanism if we use workers beyond the knowledge of dispatcher.

stopWith :: Maybe WorkerInfo -> Channel m a -> IO () Source #

Send a ChildStop event to the channel, used when the worker stops yielding and exits. The final update of the collected latency stats in WorkerInfo is pushed to the channel. Upon receiving the ChildStop event the channel would remove the worker from its set of registered workers.

A worker that uses this API must have been registered on the Channel prior to invoking this API. This is usually done by the dispatcher when the worker is dispatched.

exceptionWith :: Maybe WorkerInfo -> Channel m a -> SomeException -> IO () Source #

Like stopWith but marks the stop event with the specified exception.

shutdown :: MonadIO m => Channel m a -> m () Source #

Send a ChildStopChannel event to shutdown the channel. Upon receiving the event the event processing loop kills all the registered worker threads and stops the channel.

Diagnostics

dumpChannel :: Channel m a -> IO String Source #

Dump the channel stats for diagnostics. Used when inspect option is enabled.

Allocation

newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a) Source #

Create a new async style concurrent stream evaluation channel. The monad state used to run the stream actions is taken from the call site of newAppendChannel.

This is a low level API, use newChannel instead.

newInterleaveChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a) Source #

Create a new interleaved style concurrent stream evaluation channel. The monad state used to run the stream actions is taken from the call site of newInterleaveChannel.

This is a low level API, use newChannel instead.

newChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a) Source #

Create a new concurrent stream evaluation channel. The monad state used to run the stream actions is captured from the call site of newChannel.

Event Processing Loop

Worker Dispatching

Low level functions used to build readOutputQ and postProcess functions.

forkWorker Source #

Arguments

:: MonadRunInIO m 
=> Count

max yield limit for the worker

-> Channel m a 
-> m () 

Low level API to create a worker. Forks a thread which executes the workLoop of the channel.

dispatchWorker Source #

Arguments

:: MonadRunInIO m 
=> Count

max yield limit for the worker

-> Channel m a 
-> m Bool

can dispatch more workers

Higher level API to dispatch a worker, it uses forkWorker to create a worker.

Dispatches a worker only if all of the following are true:

  • the channel has work to do
  • max thread count is not reached
  • max buffer limit is not reached

It is possible that no worker is dispatched even when there is no outstanding worker - only if any of the following is true:

  • maxBuffer limit is 0
  • maxThreads limit is set to 0
  • there is output pending in the output buffer

In all other cases a worker is guaranteed to be dispatched.

dispatchWorkerPaced Source #

Arguments

:: MonadRunInIO m 
=> Channel m a 
-> m Bool

True means can dispatch more

Like dispatchWorker but with rate control. The number of workers to be dispatched are decided based on the target rate. Uses dispatchWorker to actually dispatch when required. It may block wait until there is time to dispatch.

It guarantees that if there is no outstanding worker and there is work pending then it dispatches a worker though it may block for some time before it does that depending on the rate goal.

dispatchAllWait Source #

Arguments

:: MonadIO m 
=> Bool

eager option is on

-> (Channel m a -> IO ())

delay function

-> (Channel m a -> m Bool)

dispatcher function

-> Channel m a 
-> m () 

Dispatches as many workers as it can until output is seen in the event queue of the channel. If the dispatcher function returns False then no more dispatches can be done. If no more dispatches are possible blocks until output arrives in the event queue.

When this function returns we are sure that there is some output available.

Before we call this function we must ensure that there is either a pending worker or pending work, otherwise it might block forever. If there is pending work and no pending worker the dispatcher function must ensure that it dispatches a worker.

sendWorkerDelay :: Channel m a -> IO () Source #

Noop as of now.

sendWorkerDelayPaced :: Channel m a -> IO () Source #

Noop as of now.

startChannel :: MonadRunInIO m => Channel m a -> m () Source #

Start the evaluation of the channel's work queue by kicking off a worker. Note: Work queue must not be empty otherwise the worker will exit without doing anything.

Reading Events

Low level functions used to build fromChannelK.

readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a] Source #

Read the channel's output queue. When there is no output dispatches workers and waits for output (using sendWorkerWait). Always ensures that there is at least one outstanding worker.

To be used as readOutputQ function for the channel.

readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a] Source #

Same as readOutputQBounded but uses dispatchWorkerPaced to dispatch workers with rate control.

To be used as readOutputQ function for the channel when rate control is on.

postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool Source #

If there is work to do ensure that we have at least one worker disptached.

To be used as postProcess function for the channel.

postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool Source #

If there is work to do dispatch as many workers as the target rate requires.

To be used as postProcess function for the channel when rate control is enabled.

Reading Stream

fromChannelK :: MonadAsync m => Channel m a -> StreamK m a Source #

Draw a stream from a concurrent channel. The stream consists of the evaluated values from the input streams that were enqueued on the channel using toChannelK.

This is the event processing loop for the channel which does two things, (1) dispatch workers, (2) process the events sent by the workers. Workers are dispatched based on the channel's configuration settings.

The stream stops and the channel is shutdown if any of the following occurs:

  • the work queue becomes empty
  • channel's max yield limit is reached
  • an exception is thrown by a worker
  • shutdown is called on the channel

Before the channel stops, all the workers are drained and no more workers are dispatched. When the channel is garbage collected a ThreadAbort exception is thrown to all pending workers. If inspect option is enabled then channel's stats are printed on stdout when the channel stops.

CAUTION! This API must not be called more than once on a channel.

fromChannel :: MonadAsync m => Channel m a -> Stream m a Source #

A wrapper over fromChannelK for Stream type.

Enqueuing Work

toChannelK :: MonadRunInIO m => Channel m a -> StreamK m a -> m () Source #

High level function to enqueue a work item on the channel. The fundamental unit of work is a stream. Each stream enqueued on the channel is picked up and evaluated by a worker thread. The worker evaluates the stream it picked up serially. When multiple streams are queued on the channel each stream can be evaluated concurrently by different workers.

Note that the items in each stream are not concurrently evaluated, streams are fundamentally serial, therefore, elements in one particular stream will be generated serially one after the other. Only two or more streams can be run concurrently with each other.

See chanConcatMapK for concurrent evaluation of each element of a stream. Alternatively, you can wrap each element of the original stream into a stream generating action and queue all those streams on the channel. Then all of them would be evaluated concurrently. However, that would not be streaming in nature, it would require buffering space for the entire original stream. Prefer chanConcatMapK for larger streams.

Items from each evaluated streams are queued to the same output queue of the channel which can be read using fromChannelK. toChannelK can be called multiple times to enqueue multiple streams on the channel.

toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m () Source #

A wrapper over toChannelK for Stream type.

chanConcatMapK :: MonadAsync m => (Config -> Config) -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Make a concurrent stream evaluator from a stream, to be used in withChannelK or toChannelK. Maps a stream generation function on each element of the stream, the evaluation of the map on each element happens concurrently. All the generated streams are merged together in the output of the channel. The scheduling and termination behavior depends on the channel settings.

Note that if you queue a stream on the channel using toChannelK, it will be picked up by a worker and the worker would evaluate the entire stream serially and emit the results on the channel. However, if you transform the stream using parConcatMapChanK and queue it on the channel, it parallelizes the function map on each element of the stream. The simplest example is parConcatMapChanK id id which is equivalent to evaluating each element of the stream concurrently.

A channel worker evaluating this function would enqueue the tail on the channel's work queue and go on to evaluate the head generating an output stream. The tail is picked up by another worker which does the same and so on.

Evaluation

withChannelK Source #

Arguments

:: MonadAsync m 
=> (Config -> Config)

config modifier

-> StreamK m a

input stream

-> (Channel m b -> StreamK m a -> StreamK m b)

stream evaluator

-> StreamK m b

output stream

Allocate a channel and evaluate the stream concurrently using the channel and the supplied evaluator function. The evaluator is run in a worker thread.

withChannel :: MonadAsync m => (Config -> Config) -> Stream m a -> (Channel m b -> Stream m a -> Stream m b) -> Stream m b Source #

A wrapper over withChannelK, converts Stream to StreamK and invokes withChannelK.

Concurrent Streams

Imports

Imports for example snippets in this module.

>>> :m
>>> {-# LANGUAGE FlexibleContexts #-}
>>> import Control.Concurrent (threadDelay)
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.Internal.Data.Stream as Stream
>>> import qualified Streamly.Internal.Data.Stream.Prelude as Stream
>>> import Prelude hiding (concatMap, concat, zipWith)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int
:}

Types

type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Combinators

Stream combinators using a concurrent channel

Evaluate

Evaluates a stream concurrently using a channel.

parBuffered :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a Source #

parBuffered evaluates a stream as a whole asynchronously with respect to the consumer of the stream. A worker thread evaluates multiple elements of the stream ahead of time and buffers the results; the consumer of the stream runs in another thread consuming the elements from the buffer, thus decoupling the production and consumption of the stream. parBuffered can be used to run different stages of a pipeline concurrently.

It is important to note that parBuffered does not evaluate individual actions in the stream concurrently with respect to each other, it merely evaluates the stream serially but in a different thread than the consumer thread, thus the consumer and producer can run concurrently. See parMapM and parSequence to evaluate actions in the stream concurrently.

The evaluation requires only one thread as only one stream needs to be evaluated. Therefore, the concurrency options that are relevant to multiple streams do not apply here e.g. maxThreads, eager, interleaved, ordered, stopWhen options do not have any effect on parBuffered.

Useful idioms:

>>> parUnfoldrM step = Stream.parBuffered id . Stream.unfoldrM step
>>> parIterateM step = Stream.parBuffered id . Stream.iterateM step

Generate

Uses a single channel to evaluate all actions.

parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a Source #

Definition:

>>> parRepeatM cfg = Stream.parSequence cfg . Stream.repeat

Generate a stream by repeatedly executing a monadic action forever.

parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a Source #

Generate a stream by concurrently performing a monadic action n times.

Definition:

>>> parReplicateM cfg n = Stream.parSequence cfg . Stream.replicate n

Example, parReplicateM in the following example executes all the replicated actions concurrently, thus taking only 1 second:

>>> Stream.fold Fold.drain $ Stream.parReplicateM id 10 $ delay 1
...

Map

Uses a single channel to evaluate all actions.

parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b Source #

Definition:

>>> parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)

For example, the following finishes in 3 seconds (as opposed to 6 seconds) because all actions run in parallel. Even though results are available out of order they are ordered due to the config option:

>>> f x = delay x >> return x
>>> Stream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1]
1 sec
2 sec
3 sec
[3,2,1]

parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a Source #

Definition:

>>> parSequence modifier = Stream.parMapM modifier id

Useful idioms:

>>> parFromListM = Stream.parSequence id . Stream.fromList
>>> parFromFoldableM = Stream.parSequence id . StreamK.toStream . StreamK.fromFoldable

Combine two

Use a channel for each pair.

parTwo :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a -> Stream m a Source #

Binary operation to evaluate two streams concurrently using a channel.

If you want to combine more than two streams you almost always want the parList or parConcat operation instead. The performance of this operation degrades rapidly when more streams are combined as each operation adds one more concurrent channel. On the other hand, parConcat uses a single channel for all streams. However, with this operation you can precisely control the scheduling by creating arbitrary shape expression trees.

Definition:

>>> parTwo cfg x y = Stream.parList cfg [x, y]

Example, the following code finishes in 4 seconds:

>>> async = Stream.parTwo id
>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> Stream.fold Fold.toList $ stream1 `async` stream2
2 sec
4 sec
[2,4]

parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Evaluates the streams being zipped in separate threads than the consumer. The zip function is evaluated in the consumer thread.

>>> parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parBuffered cfg m1) (Stream.parBuffered cfg m2)

Multi-stream concurrency options won't apply here, see the notes in parBuffered.

If you want to evaluate the zip function as well in a separate thread, you can use a parBuffered on parZipWithM.

parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

>>> parZipWith cfg f = Stream.parZipWithM cfg (\a b -> return $ f a b)
>>> m1 = Stream.fromList [1,2,3]
>>> m2 = Stream.fromList [4,5,6]
>>> Stream.fold Fold.toList $ Stream.parZipWith id (,) m1 m2
[(1,4),(2,5),(3,6)]

parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeByM but evaluates both the streams concurrently.

Definition:

>>> parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parBuffered cfg m1) (Stream.parBuffered cfg m2)

parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeBy but evaluates both the streams concurrently.

Definition:

>>> parMergeBy cfg f = Stream.parMergeByM cfg (\a b -> return $ f a b)

List of streams

Shares a single channel across many streams.

parListLazy :: MonadAsync m => [Stream m a] -> Stream m a Source #

Like concat but works on a list of streams.

>>> parListLazy = Stream.parList id

parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a Source #

Like parListLazy but with ordered on.

>>> parListOrdered = Stream.parList (Stream.ordered True)

parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a Source #

Like parListLazy but interleaves the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.

>>> parListInterleaved = Stream.parList (Stream.interleaved True)

parListEager :: MonadAsync m => [Stream m a] -> Stream m a Source #

Like parListLazy but with eager on.

>>> parListEager = Stream.parList (Stream.eager True)

parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a Source #

Like parListEager but stops the output as soon as the first stream stops.

>>> parListEagerFst = Stream.parList (Stream.eager True . Stream.stopWhen Stream.FirstStops)

parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a Source #

Like parListEager but stops the output as soon as any of the two streams stops.

Definition:

>>> parListEagerMin = Stream.parList (Stream.eager True . Stream.stopWhen Stream.AnyStops)

parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a Source #

Like parConcat but works on a list of streams.

>>> parList modifier = Stream.parConcat modifier . Stream.fromList

Stream of streams

Apply

parCrossApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #

Apply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.

Concat

Shares a single channel across many streams.

parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a Source #

Evaluate the streams in the input stream concurrently and combine them.

>>> parConcat modifier = Stream.parConcatMap modifier id

parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #

Map each element of the input to a stream and then concurrently evaluate and concatenate the resulting streams. Multiple streams may be evaluated concurrently but earlier streams are perferred. Output from the streams are used as they arrive.

Definition:

>>> parConcatMap modifier f stream = Stream.parConcat modifier $ fmap f stream

Examples:

>>> f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap cfg id $ Stream.fromList xs

The following streams finish in 4 seconds:

>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> stream3 = Stream.fromEffect (delay 1)
>>> f id [stream1, stream2, stream3]
1 sec
2 sec
4 sec
[1,2,4]

Limiting threads to 2 schedules the third stream only after one of the first two has finished, releasing a thread:

>>> f (Stream.maxThreads 2) [stream1, stream2, stream3]
...
[2,1,4]

When used with a Single thread it behaves like serial concatMap:

>>> f (Stream.maxThreads 1) [stream1, stream2, stream3]
...
[4,2,1]
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> f (Stream.maxThreads 1) [stream1, stream2]
[1,2,3,4,5,6]

Schedule all streams in a round robin fashion over the available threads:

>>> f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap (Stream.interleaved True . cfg) id $ Stream.fromList xs
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> f (Stream.maxThreads 1) [stream1, stream2]
[1,4,2,5,3,6]

ConcatIterate

parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a Source #

Same as concatIterate but concurrent.

Pre-release

Reactive

newStreamAndCallback :: MonadAsync m => m (a -> m (), Stream m a) Source #

Returns an entangled pair of a callback and a stream i.e. whenever the callback is called a value appears in the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.

The callback queues a value to a concurrent channel associated with the stream. The stream can be evaluated safely in any thread.

Pre-release

fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a Source #

fromCallback f creates an entangled pair of a callback and a stream i.e. whenever the callback is called a value appears in the stream. The function f is invoked with the callback as argument, and the stream is returned. f would store the callback for calling it later for generating values in the stream.

The callback queues a value to a concurrent channel associated with the stream. The stream can be evaluated safely in any thread.

Pre-release

parTapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #

parTapCount predicate fold stream taps the count of those elements in the stream that pass the predicate. The resulting count stream is sent to a fold running concurrently in another thread.

For example, to print the count of elements processed every second:

>>> rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 1
>>> report = Stream.fold (Fold.drainMapM print) . rate
>>> tap = Stream.parTapCount (const True) report
>>> go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0

Note: This may not work correctly on 32-bit machines because of Int overflow.

Pre-release

tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #

Deprecated: Please use parTapCount instead.

Same as parTapCount. Deprecated.

Deprecated

parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a Source #

Deprecated: Please use parBuffered instead.

parBuffered evaluates a stream as a whole asynchronously with respect to the consumer of the stream. A worker thread evaluates multiple elements of the stream ahead of time and buffers the results; the consumer of the stream runs in another thread consuming the elements from the buffer, thus decoupling the production and consumption of the stream. parBuffered can be used to run different stages of a pipeline concurrently.

It is important to note that parBuffered does not evaluate individual actions in the stream concurrently with respect to each other, it merely evaluates the stream serially but in a different thread than the consumer thread, thus the consumer and producer can run concurrently. See parMapM and parSequence to evaluate actions in the stream concurrently.

The evaluation requires only one thread as only one stream needs to be evaluated. Therefore, the concurrency options that are relevant to multiple streams do not apply here e.g. maxThreads, eager, interleaved, ordered, stopWhen options do not have any effect on parBuffered.

Useful idioms:

>>> parUnfoldrM step = Stream.parBuffered id . Stream.unfoldrM step
>>> parIterateM step = Stream.parBuffered id . Stream.iterateM step

parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #

Deprecated: Please use parCrossApply instead.

Apply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.

Time

Imports for Examples

Imports for example snippets in this module.

>>> :m
>>> import Control.Concurrent (threadDelay)
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.Stream.Prelude as Stream
>>> import qualified Streamly.Internal.Data.Stream as Stream (delayPost, timestamped)
>>> import qualified Streamly.Internal.Data.Stream.Concurrent as Stream (parListEagerFst)
>>> import qualified Streamly.Internal.Data.Stream.Time as Stream
>>> import Prelude hiding (concatMap, concat)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int
:}

Timers

periodic :: MonadIO m => m a -> Double -> Stream m a Source #

Generate a stream by running an action periodically at the specified time interval.

ticks :: MonadIO m => Double -> Stream m () Source #

Generate a tick stream consisting of () elements, each tick is generated after the specified time delay given in seconds.

>>> ticks = Stream.periodic (return ())

ticksRate :: MonadAsync m => Rate -> Stream m () Source #

Generate a tick stream, ticks are generated at the specified Rate. The rate is adaptive, the tick generation speed can be increased or decreased at different times to achieve the specified rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum rate achieved by a stream is governed by the processor speed.

>>> tickStream = Stream.repeatM (return ())
>>> ticksRate r = Stream.parBuffered (Stream.rate (Just r)) tickStream

interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a Source #

Intersperse a monadic action into the input stream after every n seconds.

Definition:

>>> interject n f xs = Stream.parListEagerFst [xs, Stream.periodic f n]

Example:

>>> s = Stream.fromList "hello"
>>> input = Stream.mapM (\x -> threadDelay 1000000 >> putChar x) s
>>> Stream.fold Fold.drain $ Stream.interject (putChar ',') 1.05 input
h,e,l,l,o

Trimming

takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #

takeInterval interval runs the stream only upto the specified time interval in seconds.

The interval starts when the stream is evaluated for the first time.

takeLastInterval :: Double -> Stream m a -> Stream m a Source #

Take time interval i seconds at the end of the stream.

O(n) space, where n is the number elements taken.

Unimplemented

dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #

dropInterval interval drops all the stream elements that are generated before the specified interval in seconds has passed.

The interval begins when the stream is evaluated for the first time.

dropLastInterval :: Int -> Stream m a -> Stream m a Source #

Drop time interval i seconds at the end of the stream.

O(n) space, where n is the number elements dropped.

Unimplemented

Chunking

intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b Source #

Group the input stream into windows of n second each and then fold each group using the provided fold function.

If the fold terminates before the interval is over, the next collection is started, thus multiple collection can happen in the same interval. If the fold does not terminate before the interval is over, the fold will be forced to terminate at the interval end.

Example:

>>> twoPerSec = Stream.parBuffered (Stream.constRate 2) $ Stream.enumerateFrom 1
>>> intervals = Stream.intervalsOf 1 Fold.toList twoPerSec
>>> Stream.fold Fold.toList $ Stream.take 2 intervals
[...,...]

boundedIntervalsOf :: Int -> Double -> Int -> Fold m a b -> Stream m a -> Stream m b Source #

Like intervalsOf but with an additional argument to limit the number of collected items to a max limit. If the limit is reached, the fold output is emitted and the next interval is started.

An alternative behavior would be to emit multiple elements in the same interval if the size is exceeded, keeping the intervals fixed. That can be achieved by using intervalsOf and using a fold that terminates on the limit.

Unimplemented

timedGroupsOf :: MonadAsync m => Double -> Int -> Fold m a b -> Stream m a -> Stream m b Source #

Like groupsOf but if the group is not completed within the specified time interval then emit whatever we have collected till now. The group timeout is reset whenever a group is emitted. The granularity of the clock is 100 ms.

Note that it will not emit any output unless at least one item has been collected in the group. The time interval is only for timing out the already collected but not yet complete group.

>>> s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
>>> f = Stream.fold (Fold.drainMapM print) $ Stream.timedGroupsOf 1 5 Fold.toList s

Pre-release

timedChunksOf :: (MonadAsync m, Unbox a) => Double -> Int -> Stream m a -> Stream m (Array a) Source #

Like chunksOf from the Array module but emits the chunk after the timeout even if we have not yet collected the requested size.

timedChunksOf' :: (MonadAsync m, Unbox a) => Double -> Int -> Stream m a -> Stream m (Array a) Source #

Like timedChunksOf but creates pinned arrays. If the chunks are smaller than LARGE_OBJECT_THRESHOLD then this routine may be useful for better performance if the arrays are to be sent for IO. This will avoid a copy for pinning by the IO routines.

Sampling

sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #

Continuously evaluate the input stream and sample the last event in each time window of n seconds.

This is also known as throttle in some libraries.

>>> sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.latest

sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #

Like sampleInterval but samples at the beginning of the time window.

>>> sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one

sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #

Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.

This is known as debounce in some libraries.

The clock granularity is 10 ms.

sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #

Like sampleBurstEnd but samples the event at the beginning of the burst instead of at the end of it.

Windowed Sessions

classifySessionsByGeneric Source #

Arguments

:: forall m f a b. (MonadAsync m, IsMap f) 
=> Proxy (f :: Type -> Type) 
-> Double

timer tick in seconds

-> Bool

reset the timer when an event is received

-> (Int -> m Bool)

predicate to eject sessions based on session count

-> Double

session timeout in seconds

-> Fold m a b

Fold to be applied to session data

-> Stream m (AbsTime, (Key f, a))

timestamp, (session key, session data)

-> Stream m (Key f, b)

session key, fold result

classifySessionsBy Source #

Arguments

:: (MonadAsync m, Ord k) 
=> Double

timer tick in seconds

-> Bool

reset the timer when an event is received

-> (Int -> m Bool)

predicate to eject sessions based on session count

-> Double

session timeout in seconds

-> Fold m a b

Fold to be applied to session data

-> Stream m (AbsTime, (k, a))

timestamp, (session key, session data)

-> Stream m (k, b)

session key, fold result

classifySessionsBy tick keepalive predicate timeout fold stream classifies an input event stream consisting of (timestamp, (key, value)) into sessions based on the key, folding all the values corresponding to the same key into a session using the supplied fold.

When the fold terminates or a timeout occurs, a tuple consisting of the session key and the folded value is emitted in the output stream. The timeout is measured from the first event in the session. If the keepalive option is set to True the timeout is reset to 0 whenever an event is received.

The timestamp in the input stream is an absolute time from some epoch, characterizing the time when the input event was generated. The notion of current time is maintained by a monotonic event time clock using the timestamps seen in the input stream. The latest timestamp seen till now is used as the base for the current time. When no new events are seen, a timer is started with a clock resolution of tick seconds. This timer is used to detect session timeouts in the absence of new events.

To ensure an upper bound on the memory used the number of sessions can be limited to an upper bound. If the ejection predicate returns True, the oldest session is ejected before inserting a new session.

When the stream ends any buffered sessions are ejected immediately.

If a session key is received even after a session has finished, another session is created for that key.

>>> :{
Stream.fold (Fold.drainMapM print)
    $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
    $ Stream.timestamped
    $ Stream.delay 0.1
    $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c'])
:}
(1,"abc")
(2,"abc")
(3,"abc")

Pre-release

classifySessionsOf Source #

Arguments

:: (MonadAsync m, Ord k) 
=> (Int -> m Bool)

predicate to eject sessions on session count

-> Double

time window size

-> Fold m a b

Fold to be applied to session data

-> Stream m (AbsTime, (k, a))

timestamp, (session key, session data)

-> Stream m (k, b) 

Same as classifySessionsBy with a timer tick of 1 second and keepalive option set to False.

>>> classifySessionsOf = Stream.classifySessionsBy 1 False

Pre-release

classifyKeepAliveSessions Source #

Arguments

:: (MonadAsync m, Ord k) 
=> (Int -> m Bool)

predicate to eject sessions on session count

-> Double

session inactive timeout

-> Fold m a b

Fold to be applied to session payload data

-> Stream m (AbsTime, (k, a))

timestamp, (session key, session data)

-> Stream m (k, b) 

Same as classifySessionsBy with a timer tick of 1 second and keepalive option set to True.

classifyKeepAliveSessions = classifySessionsBy 1 True

Pre-release

Buffering

Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end.

bufferLatest :: Stream m a -> Stream m (Maybe a) Source #

Always produce the latest available element from the stream without any delay. The stream is continuously evaluated at the highest possible rate and only the latest element is retained for sampling.

Unimplemented

bufferLatestN :: Int -> Stream m a -> Stream m a Source #

Evaluate the input stream continuously and keep only the latest n elements in a ring buffer, keep discarding the older ones to make space for the new ones. When the output stream is evaluated the buffer collected till now is streamed and it starts filling again.

Unimplemented

bufferOldestN :: Int -> Stream m a -> Stream m a Source #

Evaluate the input stream continuously and keep only the oldest n elements in the buffer, discard the new ones when the buffer is full. When the output stream is evaluated the collected buffer is streamed and the buffer starts filling again.

Unimplemented

Deprecated

groupsOfTimeout :: MonadAsync m => Int -> Double -> Fold m a b -> Stream m a -> Stream m b Source #

Deprecated: Please use timedGroupsOf instead.

Lifted

after :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a Source #

Run the action m b whenever the stream Stream m a stops normally, or if it is garbage collected after a partial lazy evaluation.

The semantics of the action m b are similar to the semantics of cleanup action in bracket.

See also after_

bracket :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #

Run the alloc action IO b with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output b of the IO action as input to the function b -> Stream m a to generate an output stream.

b is usually a resource under the IO monad, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> m c, runs whenever (1) the stream ends normally, (2) due to a sync or async exception or, (3) if it gets garbage collected after a partial lazy evaluation. The exception is not caught, it is rethrown.

bracket only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.

See also: bracket_

Inhibits stream fusion

bracket3 :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> Stream m a) -> Stream m a Source #

Like bracket but can use 3 separate cleanup actions depending on the mode of termination:

  1. When the stream stops normally
  2. When the stream is garbage collected
  3. When the stream encounters an exception

bracket3 before onStop onGC onException action runs action using the result of before. If the stream stops, onStop action is executed, if the stream is abandoned onGC is executed, if the stream encounters an exception onException is executed.

The exception is not caught, it is rethrown.

Pre-release

finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a Source #

Run the action m b whenever the stream Stream m a stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

The semantics of running the action m b are similar to the cleanup action semantics described in bracket.

>>> finally action xs = Stream.bracket (return ()) (const action) (const xs)

See also finally_

Inhibits stream fusion

retry Source #

Arguments

:: (MonadCatch m, Exception e, Ord e) 
=> Map e Int

map from exception to retry count

-> (e -> Stream m a)

default handler for those exceptions that are not in the map

-> Stream m a 
-> Stream m a 

retry takes 3 arguments

  1. A map m whose keys are exceptions and values are the number of times to retry the action given that the exception occurs.
  2. A handler han that decides how to handle an exception when the exception cannot be retried.
  3. The stream itself that we want to run this mechanism on.

When evaluating a stream if an exception occurs,

  1. The stream evaluation aborts
  2. The exception is looked up in m

a. If the exception exists and the mapped value is > 0 then,

i. The value is decreased by 1.

ii. The stream is resumed from where the exception was called, retrying the action.

b. If the exception exists and the mapped value is == 0 then the stream evaluation stops.

c. If the exception does not exist then we handle the exception using han.

Internal

afterD :: MonadRunInIO m => m b -> Stream m a -> Stream m a Source #

bracket3D :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> Stream m a) -> Stream m a Source #

retryD Source #

Arguments

:: forall e m a. (Exception e, Ord e, MonadCatch m) 
=> Map e Int

map from exception to retry count

-> (e -> Stream m a)

default handler for those exceptions that are not in the map

-> Stream m a 
-> Stream m a 

See retry