Loading...

Streamly.Internal.Data.Channel

This module contains operations that are common for Stream and Fold channels.

Channel Config & Stats

Types

newtype Count Source #

Constructors

Count Int64 
Instances
Instances details
Bounded Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Enum Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Num Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Read Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Integral Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Real Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Show Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Eq Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

(==) :: Count -> Count -> Bool Source #

(/=) :: Count -> Count -> Bool Source #

Ord Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

data Limit Source #

Constructors

Unlimited 
Limited Word 
Instances
Instances details
Show Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Eq Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

(==) :: Limit -> Limit -> Bool Source #

(/=) :: Limit -> Limit -> Bool Source #

Ord Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

data ThreadAbort Source #

Channel driver throws this exception to all active workers to clean up the channel.

Constructors

ThreadAbort 

data ChildEvent a Source #

Events that a child thread may send to a parent thread.

Stats

Rate Control

data WorkerInfo Source #

We measure the individual worker latencies to estimate the number of workers needed or the amount of time we have to sleep between dispatches to achieve a particular rate when controlled pace mode it used.

Constructors

WorkerInfo 

Fields

data YieldRateInfo Source #

Rate control.

Constructors

YieldRateInfo 

Fields

  • svarLatencyTarget :: NanoSecond64
     
  • svarLatencyRange :: LatencyRange
     
  • svarRateBuffer :: Int

    Number of yields beyond which we will not try to recover the rate.

  • svarGainedLostYields :: IORef Count

    Yields that we have permanently gained or lost since the start of the channel i.e. we do not want to adjust the rate to make up for this deficit or gain.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads
  • svarAllTimeLatency :: IORef (Count, AbsTime)

    (channel yields from start till now, channel start timestamp) as recorded by the consumer side of the channel.

    LOCKING
    Unlocked access. Modified by the consumer thread, snapshot read by the worker threads.
  • workerBootstrapLatency :: Maybe NanoSecond64

    TODO. Not yet implemented. Worker latency specified by the user to be used as a guide before the first actual measurement arrives.

  • workerPollingInterval :: IORef Count

    After how many yields the worker should update the latency information. If the workerMeasuredLatency is high, this count is kept lower and vice-versa.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads
  • workerPendingLatency :: IORef (Count, Count, NanoSecond64)

    (total yields, measured yields, time taken by measured yields). This is first level collection bucket which is continuously updated by workers and periodically emptied and collected into workerCollectedLatency by the consumer thread.

    "Measured yields" are only those yields for which the latency was measured to be non-zero (note that if the timer resolution is low the measured latency may be zero e.g. on JS platform).

    LOCKING
    Locked access. Atomically modified by the consumer thread as well as worker threads. Workers modify it periodically based on workerPollingInterval and not on every yield to reduce the locking overhead.
  • workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    workerPendingLatency is periodically reset and aggregated into this by the consumer thread. This itself is reset periodically and svarAllTimeLatency, workerMeasuredLatency are updated using it.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads
  • workerMeasuredLatency :: IORef NanoSecond64

    Weighted average of worker latencies in previous measurement periods.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads

Output queue

readOutputQRaw Source #

Arguments

:: IORef ([ChildEvent a], Int)

Channel output queue

-> Maybe SVarStats

Channel stats

-> IO ([ChildEvent a], Int)

(events, count)

Same as readOutputQBasic but additionally update the max output queue size channel stat if the new size is more than current max.

readOutputQBasic Source #

Arguments

:: IORef ([a], Int)

The channel output queue

-> IO ([a], Int)

(events, count)

Read the output queue of the channel. After reading set it to empty list and 0 count.

ringDoorBell Source #

Arguments

:: IORef Bool

If True only then ring the door bell

-> MVar ()

Door bell, put () to ring

-> IO () 

RingArray door bell. The IORef is read after adding a store-load barrier. If the IORef was set to True it is atomically reset to False.

Yield Limit

decrementYieldLimit :: Maybe (IORef Count) -> IO Bool Source #

A worker decrements the yield limit before it executes an action. However, the action may not result in an element being yielded, in that case we have to increment the yield limit.

Note that we need it to be an Int type so that we have the ability to undo a decrement that takes it below zero.

Configuration

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the yield count gap becomes more than rateBuffer (specified as a yield count) we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Constructors

Rate 

Fields

data StopWhen Source #

Specify when the Channel should stop.

Constructors

FirstStops

Stop when the first stream ends.

AllStop

Stop when all the streams end.

AnyStops

Stop when any one stream ends.

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

Default config

magicMaxBuffer :: Word Source #

A magical value for the buffer size arrived at by running the smallest possible task and measuring the optimal value of the buffer for that. This is obviously dependent on hardware, this figure is based on a 2.2GHz intel core-i7 processor.

defaultConfig :: Config Source #

The fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields

Set config

maxThreads :: Int -> Config -> Config Source #

Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

maxYields :: Maybe Int64 -> Config -> Config Source #

The maximum number of yields that this channel would produce. The Channel automatically stops after that. This could be used to limit the speculative execution beyond the limit.

Nothing means there is no limit.

Keep in mind that checking this limit all the time has a performance overhead.

Known Bugs: currently this works only when rate is specified. Known Bugs: for ordered streams sometimes the actual count is less than expected.

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends. When the stream does not end normally, the channel debug information is printed when the channel is garbage collected. If you are expecting but not seeing the debug info try adding a performMajorGC before the program ends.

eager :: Bool -> Config -> Config Source #

By default, processing of output from the worker threads is given priority over dispatching new workers. More workers are dispatched only when there is no output to process. When eager is set to True, workers are dispatched aggresively as long as there is more work to do irrespective of whether there is output pending to be processed by the stream consumer. However, dispatching may stop if maxThreads or maxBuffer is reached.

Note: This option has no effect when rate has been specified.

Note: Not supported with interleaved.

stopWhen :: StopWhen -> Config -> Config Source #

Specify when the Channel should stop.

ordered :: Bool -> Config -> Config Source #

When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.

Note: Not supported with interleaved.

interleaved :: Bool -> Config -> Config Source #

Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.

Note: Can only be used on finite number of streams.

Note: Not supported with ordered.

boundThreads :: Bool -> Config -> Config Source #

Spawn bound threads (i.e., spawn threads using forkOS instead of forkIO). The default value is False.

Currently, this only takes effect only for concurrent folds.

rate :: Maybe Rate -> Config -> Config Source #

Specify the stream evaluation rate of a channel.

A Nothing value means there is no smart rate control, concurrent execution blocks only if maxThreads or maxBuffer is reached, or there are no more concurrent tasks to execute. This is the default.

When rate (throughput) is specified, concurrent production may be ramped up or down automatically to achieve the specified stream throughput. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a channel is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Maximum production rate is given by:

\(rate = \frac{maxThreads}{latency}\)

If we know the average latency of the tasks we can set maxThreads accordingly.

avgRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

minRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

maxRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

constRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Get config

Cleanup

cleanupSVar :: IORef (Set ThreadId) -> IO () Source #

Never called from a worker thread.

Diagnostics

dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String Source #

withDiagMVar :: Bool -> IO String -> String -> IO () -> IO () Source #

MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we can keep it on in production to debug problems quickly if and when they happen, but it may result in unexpected output when threads are left hanging until they are GCed because the consumer went away.

Worker Dispatcher

Operations used by the consumer of the channel.

Latency collection

minThreadDelay :: NanoSecond64 Source #

This is a magic number and it is overloaded, and used at several places to achieve batching:

  1. If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
  2. Collected latencies are computed and transferred to measured latency after a minimum of this period.

collectLatency Source #

Arguments

:: Bool

stat inspection mode

-> SVarStats

Channel stats

-> YieldRateInfo

Channel rate control info

-> Bool

Force batch collection

-> IO (Count, AbsTime, NanoSecond64)

(channel yield count since beginning, beginning timestamp, workerMeasuredLatency)

Always moves workerPendingLatency to workerCollectedLatency:

Moves workerCollectedLatency to svarAllTimeLatency periodically, when the collected batch size hits a limit, or time limit is over, or latency changes beyond a limit. Updates done when the batch is collected:

See also getWorkerLatency.

Thread accounting

allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool Source #

This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway workerThreads will not be empty.

Diagnostics

Channel Workers

Operations used by the workers (producers) of the channel. These operations are thread-safe, these can be called concurrently by workers working in independent Haskell threads, the shared channel data structures are read or updated atomically.

Worker Rate Control

data Work Source #

Describes how to pace the work based on current measurement estimates. If the rate is higher than expected we may have to sleep for some time (BlockWait), or send just one worker with limited yield count (PartialWorker) or send more than one workers with max yield count of each limited to the total maximum target count.

Constructors

BlockWait NanoSecond64

Sleep required before next dispatch

PartialWorker Count

One worker is enough, total yields needed

ManyWorkers Int Count

Worker count, total yields needed overall

Instances
Instances details
Show Work Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Worker

estimateWorkers Source #

Arguments

:: Limit

Channel's max worker limit

-> Count

Channel's yield count since start

-> Count

svarGainedLostYields

-> NanoSecond64

The up time of the channel

-> NanoSecond64

Current workerMeasuredLatency

-> NanoSecond64

svarLatencyTarget

-> LatencyRange

svarLatencyRange

-> Work 

Estimate how many workers and yield count (Work) is required to maintian the target yield rate of the channel.

This is used by the worker dispatcher to estimate how many workers to dispatch. It is also used periodically by the workers to decide whether to stop or continue working.

isBeyondMaxRate Source #

Arguments

:: Limit

Channel's max worker limit

-> IORef Int

Current worker count

-> YieldRateInfo

Channel's rate control info

-> IO Bool

True if we are exceeding the specified rate

Using the channel worker latency and channel yield count stats from the current measurement interval, estimate how many workers are needed to maintain the target rate and compare that with current number of workers. Returns true if we have have more than required workers.

incrWorkerYieldCount Source #

Arguments

:: Limit

Channel's max worker limit

-> IORef Int

Current worker count

-> YieldRateInfo

Channel's rate control info

-> WorkerInfo

Worker's yield count info

-> IO Bool

True means limits are ok and worker can continue

Update the local yield count of the worker and check if:

  • the channel yield rate is beyond max limit
  • worker's yield count is beyond max limit

Workers Sending Events

sendEvent Source #

Arguments

:: IORef ([a], Int)

Queue where the event is added

-> MVar ()

Door bell to ring

-> a

The event to be added

-> IO Int

Length of the queue before adding this event

Low level API to add an event on the channel's output queue. Atomically adds the event to the queue and rings the doorbell if needed to wakeup the consumer thread.

sendYield Source #

Arguments

:: Limit

Channel's max buffer limit

-> Limit

Channel's max worker limit

-> IORef Int

Current worker count

-> Maybe YieldRateInfo

Channel's rate control info

-> IORef ([ChildEvent a], Int)

Queue where the output is added

-> MVar ()

Door bell to ring

-> Maybe WorkerInfo

Worker's yield count info

-> a

The output to be sent

-> IO Bool

True means worker is allowed to continue working

Add a ChildYield event to the channel's output queue.

This is a wrapper over sendEvent, it does a few more things:

  • performs a buffer limit check, returns False if exceeded

When rate control is enabled and WorkerInfo is supplied::

  • increments the worker yield count
  • periodically pushes the worker latency stats to the channel
  • performs a rate limit check, returns False if exceeded

sendStop Source #

Arguments

:: IORef Int

Channel's current worker count

-> Maybe YieldRateInfo

Channel's rate control info

-> IORef ([ChildEvent a], Int)

Queue where the stop event is added

-> MVar ()

Door bell to ring

-> Maybe WorkerInfo

Worker's yield count info

-> IO () 

Add a ChildStop event to the channel's output queue. When rate control is enabled, it pushes the worker latency stats to the channel.

sendException Source #

Arguments

:: IORef ([ChildEvent a], Int)

Queue where the exception event is added

-> MVar ()

Door bell to ring

-> SomeException

The exception to send

-> IO () 

Add a ChildStop event with exception to the channel's output queue.