Streamly.Internal.Data.SVar
Deprecated: SVar is replaced by Channel.
Documentation
Adjusting Limits
incrementYieldLimit :: SVar t m a -> IO () Source #
decrementBufferLimit :: SVar t m a -> IO () Source #
incrementBufferLimit :: SVar t m a -> IO () Source #
resetBufferLimit :: SVar t m a -> IO () Source #
Rate Control
Constructors
BlockWait NanoSecond64 | |
PartialWorker Count | |
ManyWorkers Int Count |
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool Source #
estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work Source #
updateYieldCount :: WorkerInfo -> IO Count Source #
minThreadDelay :: NanoSecond64 Source #
This is a magic number and it is overloaded, and used at several places to achieve batching:
- If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
- Collected latencies are computed and transferred to measured latency after a minimum of this period.
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool Source #
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO () Source #
Send Events
send :: SVar t m a -> ChildEvent a -> IO Int Source #
This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.
ringDoorBell :: SVar t m a -> IO () Source #
Yield
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool Source #
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int Source #
Stop
sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #
Exception
handleChildException :: SVar t m a -> SomeException -> IO () Source #
handleFoldException :: SVar t m a -> SomeException -> IO () Source #
Latency collection
collectLatency :: SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #
Diagnostics
Thread accounting
allThreadsDone :: MonadIO m => SVar t m a -> m Bool Source #
This is safe even if we are adding more threads concurrently because if
a child thread is adding another thread then anyway workerThreads
will
not be empty.
Dispatching
recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #
pushWorker :: MonadAsync m => Count -> SVar t m a -> m () Source #
pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #
In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool Source #
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool Source #
sendWorkerWait :: MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m () Source #
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #
sendWorkerDelay :: SVar t m a -> IO () Source #
sendWorkerDelayPaced :: SVar t m a -> IO () Source #
Read Output
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int) Source #
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int) Source #
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
Postprocess Hook After Reading
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool Source #
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool Source #
Release Resources
cleanupSVar :: SVar t m a -> IO () Source #
cleanupSVarFromWorker :: SVar t m a -> IO () Source #
New SVar
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo) Source #
Parallel
newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #
Ahead
data HeapDequeueResult t m a Source #
dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a) Source #
dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a) Source #
requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO () Source #
updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO () Source #
newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #