-- | -- Module : Streamly.Internal.Data.SVar.Type -- Copyright : (c) 2017 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC module Streamly.Internal.Data.SVar.Type ( -- * Parent child communication ThreadAbort (..) , ChildEvent (..) , RunInIO(..) , AheadHeapEntry (..) -- * SVar , Count (..) , Limit (..) , SVarStyle (..) , SVarStopStyle (..) , SVarStats (..) , WorkerInfo (..) , PushBufferPolicy(..) , LatencyRange (..) , YieldRateInfo (..) , SVar (..) -- * State threaded around the stream , Rate (..) , State (streamVar) -- ** Default State , magicMaxBuffer , defState -- ** Type cast , adaptState -- ** State accessors , getMaxThreads , setMaxThreads , getMaxBuffer , setMaxBuffer , getStreamRate , setStreamRate , getStreamLatency , setStreamLatency , getYieldLimit , setYieldLimit , getInspectMode , setInspectMode ) where import Control.Concurrent (ThreadId) import Control.Concurrent.MVar (MVar) import Control.Exception (SomeException(..), Exception) #ifndef USE_UNLIFTIO import Control.Monad.Trans.Control (MonadBaseControl(StM)) #endif import Data.Heap (Heap, Entry(..)) import Data.Int (Int64) import Data.IORef (IORef) import Data.Kind (Type) import Data.Set (Set) import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..)) newtype Count = Count Int64 deriving ( Count -> Count -> Bool (Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a $c== :: Count -> Count -> Bool == :: Count -> Count -> Bool $c/= :: Count -> Count -> Bool /= :: Count -> Count -> Bool Eq , ReadPrec [Count] ReadPrec Count Int -> ReadS Count ReadS [Count] (Int -> ReadS Count) -> ReadS [Count] -> ReadPrec Count -> ReadPrec [Count] -> Read Count forall a. (Int -> ReadS a) -> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a $creadsPrec :: Int -> ReadS Count readsPrec :: Int -> ReadS Count $creadList :: ReadS [Count] readList :: ReadS [Count] $creadPrec :: ReadPrec Count readPrec :: ReadPrec Count $creadListPrec :: ReadPrec [Count] readListPrec :: ReadPrec [Count] Read , Int -> Count -> ShowS [Count] -> ShowS Count -> String (Int -> Count -> ShowS) -> (Count -> String) -> ([Count] -> ShowS) -> Show Count forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> Count -> ShowS showsPrec :: Int -> Count -> ShowS $cshow :: Count -> String show :: Count -> String $cshowList :: [Count] -> ShowS showList :: [Count] -> ShowS Show , Int -> Count Count -> Int Count -> [Count] Count -> Count Count -> Count -> [Count] Count -> Count -> Count -> [Count] (Count -> Count) -> (Count -> Count) -> (Int -> Count) -> (Count -> Int) -> (Count -> [Count]) -> (Count -> Count -> [Count]) -> (Count -> Count -> [Count]) -> (Count -> Count -> Count -> [Count]) -> Enum Count forall a. (a -> a) -> (a -> a) -> (Int -> a) -> (a -> Int) -> (a -> [a]) -> (a -> a -> [a]) -> (a -> a -> [a]) -> (a -> a -> a -> [a]) -> Enum a $csucc :: Count -> Count succ :: Count -> Count $cpred :: Count -> Count pred :: Count -> Count $ctoEnum :: Int -> Count toEnum :: Int -> Count $cfromEnum :: Count -> Int fromEnum :: Count -> Int $cenumFrom :: Count -> [Count] enumFrom :: Count -> [Count] $cenumFromThen :: Count -> Count -> [Count] enumFromThen :: Count -> Count -> [Count] $cenumFromTo :: Count -> Count -> [Count] enumFromTo :: Count -> Count -> [Count] $cenumFromThenTo :: Count -> Count -> Count -> [Count] enumFromThenTo :: Count -> Count -> Count -> [Count] Enum , Count Count -> Count -> Bounded Count forall a. a -> a -> Bounded a $cminBound :: Count minBound :: Count $cmaxBound :: Count maxBound :: Count Bounded , Integer -> Count Count -> Count Count -> Count -> Count (Count -> Count -> Count) -> (Count -> Count -> Count) -> (Count -> Count -> Count) -> (Count -> Count) -> (Count -> Count) -> (Count -> Count) -> (Integer -> Count) -> Num Count forall a. (a -> a -> a) -> (a -> a -> a) -> (a -> a -> a) -> (a -> a) -> (a -> a) -> (a -> a) -> (Integer -> a) -> Num a $c+ :: Count -> Count -> Count + :: Count -> Count -> Count $c- :: Count -> Count -> Count - :: Count -> Count -> Count $c* :: Count -> Count -> Count * :: Count -> Count -> Count $cnegate :: Count -> Count negate :: Count -> Count $cabs :: Count -> Count abs :: Count -> Count $csignum :: Count -> Count signum :: Count -> Count $cfromInteger :: Integer -> Count fromInteger :: Integer -> Count Num , Num Count Ord Count Num Count -> Ord Count -> (Count -> Rational) -> Real Count Count -> Rational forall a. Num a -> Ord a -> (a -> Rational) -> Real a $ctoRational :: Count -> Rational toRational :: Count -> Rational Real , Enum Count Real Count Real Count -> Enum Count -> (Count -> Count -> Count) -> (Count -> Count -> Count) -> (Count -> Count -> Count) -> (Count -> Count -> Count) -> (Count -> Count -> (Count, Count)) -> (Count -> Count -> (Count, Count)) -> (Count -> Integer) -> Integral Count Count -> Integer Count -> Count -> (Count, Count) Count -> Count -> Count forall a. Real a -> Enum a -> (a -> a -> a) -> (a -> a -> a) -> (a -> a -> a) -> (a -> a -> a) -> (a -> a -> (a, a)) -> (a -> a -> (a, a)) -> (a -> Integer) -> Integral a $cquot :: Count -> Count -> Count quot :: Count -> Count -> Count $crem :: Count -> Count -> Count rem :: Count -> Count -> Count $cdiv :: Count -> Count -> Count div :: Count -> Count -> Count $cmod :: Count -> Count -> Count mod :: Count -> Count -> Count $cquotRem :: Count -> Count -> (Count, Count) quotRem :: Count -> Count -> (Count, Count) $cdivMod :: Count -> Count -> (Count, Count) divMod :: Count -> Count -> (Count, Count) $ctoInteger :: Count -> Integer toInteger :: Count -> Integer Integral , Eq Count Eq Count -> (Count -> Count -> Ordering) -> (Count -> Count -> Bool) -> (Count -> Count -> Bool) -> (Count -> Count -> Bool) -> (Count -> Count -> Bool) -> (Count -> Count -> Count) -> (Count -> Count -> Count) -> Ord Count Count -> Count -> Bool Count -> Count -> Ordering Count -> Count -> Count forall a. Eq a -> (a -> a -> Ordering) -> (a -> a -> Bool) -> (a -> a -> Bool) -> (a -> a -> Bool) -> (a -> a -> Bool) -> (a -> a -> a) -> (a -> a -> a) -> Ord a $ccompare :: Count -> Count -> Ordering compare :: Count -> Count -> Ordering $c< :: Count -> Count -> Bool < :: Count -> Count -> Bool $c<= :: Count -> Count -> Bool <= :: Count -> Count -> Bool $c> :: Count -> Count -> Bool > :: Count -> Count -> Bool $c>= :: Count -> Count -> Bool >= :: Count -> Count -> Bool $cmax :: Count -> Count -> Count max :: Count -> Count -> Count $cmin :: Count -> Count -> Count min :: Count -> Count -> Count Ord ) ------------------------------------------------------------------------------ -- Parent child thread communication type ------------------------------------------------------------------------------ data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS [ThreadAbort] -> ShowS ThreadAbort -> String (Int -> ThreadAbort -> ShowS) -> (ThreadAbort -> String) -> ([ThreadAbort] -> ShowS) -> Show ThreadAbort forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> ThreadAbort -> ShowS showsPrec :: Int -> ThreadAbort -> ShowS $cshow :: ThreadAbort -> String show :: ThreadAbort -> String $cshowList :: [ThreadAbort] -> ShowS showList :: [ThreadAbort] -> ShowS Show instance Exception ThreadAbort -- | Events that a child thread may send to a parent thread. data ChildEvent a = ChildYield a | ChildStop ThreadId (Maybe SomeException) #ifdef USE_UNLIFTIO newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO b } #else newtype RunInIO m = RunInIO { forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b) runInIO :: forall b. m b -> IO (StM m b) } #endif -- | Sorting out-of-turn outputs in a heap for Ahead style streams data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a = AheadEntryNull | AheadEntryPure a | AheadEntryStream (RunInIO m, t m a) #undef Type ------------------------------------------------------------------------------ -- SVar: the state for thread management ------------------------------------------------------------------------------ -- | Identify the type of the SVar. Two computations using the same style can -- be scheduled on the same SVar. data SVarStyle = AsyncVar -- depth first concurrent | WAsyncVar -- breadth first concurrent | ParallelVar -- all parallel | AheadVar -- Concurrent look ahead deriving (SVarStyle -> SVarStyle -> Bool (SVarStyle -> SVarStyle -> Bool) -> (SVarStyle -> SVarStyle -> Bool) -> Eq SVarStyle forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a $c== :: SVarStyle -> SVarStyle -> Bool == :: SVarStyle -> SVarStyle -> Bool $c/= :: SVarStyle -> SVarStyle -> Bool /= :: SVarStyle -> SVarStyle -> Bool Eq, Int -> SVarStyle -> ShowS [SVarStyle] -> ShowS SVarStyle -> String (Int -> SVarStyle -> ShowS) -> (SVarStyle -> String) -> ([SVarStyle] -> ShowS) -> Show SVarStyle forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> SVarStyle -> ShowS showsPrec :: Int -> SVarStyle -> ShowS $cshow :: SVarStyle -> String show :: SVarStyle -> String $cshowList :: [SVarStyle] -> ShowS showList :: [SVarStyle] -> ShowS Show) -- | An SVar or a Stream Var is a conduit to the output from multiple streams -- running concurrently and asynchronously. An SVar can be thought of as an -- asynchronous IO handle. We can write any number of streams to an SVar in a -- non-blocking manner and then read them back at any time at any pace. The -- SVar would run the streams asynchronously and accumulate results. An SVar -- may not really execute the stream completely and accumulate all the results. -- However, it ensures that the reader can read the results at whatever paces -- it wants to read. The SVar monitors and adapts to the consumer's pace. -- -- An SVar is a mini scheduler, it has an associated workLoop that holds the -- stream tasks to be picked and run by a pool of worker threads. It has an -- associated output queue where the output stream elements are placed by the -- worker threads. A outputDoorBell is used by the worker threads to intimate the -- consumer thread about availability of new results in the output queue. More -- workers are added to the SVar by 'fromStreamVar' on demand if the output -- produced is not keeping pace with the consumer. On bounded SVars, workers -- block on the output queue to provide throttling of the producer when the -- consumer is not pulling fast enough. The number of workers may even get -- reduced depending on the consuming pace. -- -- New work is enqueued either at the time of creation of the SVar or as a -- result of executing the parallel combinators i.e. '<|' and '<|>' when the -- already enqueued computations get evaluated. See 'joinStreamVarAsync'. -- We measure the individual worker latencies to estimate the number of workers -- needed or the amount of time we have to sleep between dispatches to achieve -- a particular rate when controlled pace mode it used. data WorkerInfo = WorkerInfo { WorkerInfo -> Count workerYieldMax :: Count -- 0 means unlimited -- total number of yields by the worker till now , WorkerInfo -> IORef Count workerYieldCount :: IORef Count -- yieldCount at start, timestamp , WorkerInfo -> IORef (Count, AbsTime) workerLatencyStart :: IORef (Count, AbsTime) } data LatencyRange = LatencyRange { LatencyRange -> NanoSecond64 minLatency :: NanoSecond64 , LatencyRange -> NanoSecond64 maxLatency :: NanoSecond64 } deriving Int -> LatencyRange -> ShowS [LatencyRange] -> ShowS LatencyRange -> String (Int -> LatencyRange -> ShowS) -> (LatencyRange -> String) -> ([LatencyRange] -> ShowS) -> Show LatencyRange forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> LatencyRange -> ShowS showsPrec :: Int -> LatencyRange -> ShowS $cshow :: LatencyRange -> String show :: LatencyRange -> String $cshowList :: [LatencyRange] -> ShowS showList :: [LatencyRange] -> ShowS Show -- Rate control. data YieldRateInfo = YieldRateInfo { YieldRateInfo -> NanoSecond64 svarLatencyTarget :: NanoSecond64 , YieldRateInfo -> LatencyRange svarLatencyRange :: LatencyRange , YieldRateInfo -> Int svarRateBuffer :: Int -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely -- read by the worker threads , YieldRateInfo -> IORef Count svarGainedLostYields :: IORef Count -- Actual latency/througput as seen from the consumer side, we count the -- yields and the time it took to generates those yields. This is used to -- increase or decrease the number of workers needed to achieve the desired -- rate. The idle time of workers is adjusted in this, so that we only -- account for the rate when the consumer actually demands data. -- XXX interval latency is enough, we can move this under diagnostics build -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely -- read by the worker threads , YieldRateInfo -> IORef (Count, AbsTime) svarAllTimeLatency :: IORef (Count, AbsTime) -- XXX Worker latency specified by the user to be used before the first -- actual measurement arrives. Not yet implemented , YieldRateInfo -> Maybe NanoSecond64 workerBootstrapLatency :: Maybe NanoSecond64 -- After how many yields the worker should update the latency information. -- If the latency is high, this count is kept lower and vice-versa. XXX If -- the latency suddenly becomes too high this count may remain too high for -- long time, in such cases the consumer can change it. -- 0 means no latency computation -- XXX this is derivable from workerMeasuredLatency, can be removed. -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely -- read by the worker threads , YieldRateInfo -> IORef Count workerPollingInterval :: IORef Count -- This is in progress latency stats maintained by the workers which we -- empty into workerCollectedLatency stats at certain intervals - whenever -- we process the stream elements yielded in this period. The first count -- is all yields, the second count is only those yields for which the -- latency was measured to be non-zero (note that if the timer resolution -- is low the measured latency may be zero e.g. on JS platform). -- [LOCKING] Locked access. Modified by the consumer thread as well as -- worker threads. Workers modify it periodically based on -- workerPollingInterval and not on every yield to reduce the locking -- overhead. -- (allYieldCount, yieldCount, timeTaken) , YieldRateInfo -> IORef (Count, Count, NanoSecond64) workerPendingLatency :: IORef (Count, Count, NanoSecond64) -- This is the second level stat which is an accmulation from -- workerPendingLatency stats. We keep accumulating latencies in this -- bucket until we have stats for a sufficient period and then we reset it -- to start collecting for the next period and retain the computed average -- latency for the last period in workerMeasuredLatency. -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely -- read by the worker threads -- (allYieldCount, yieldCount, timeTaken) , YieldRateInfo -> IORef (Count, Count, NanoSecond64) workerCollectedLatency :: IORef (Count, Count, NanoSecond64) -- Latency as measured by workers, aggregated for the last period. -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely -- read by the worker threads , YieldRateInfo -> IORef NanoSecond64 workerMeasuredLatency :: IORef NanoSecond64 } data SVarStats = SVarStats { SVarStats -> IORef Int totalDispatches :: IORef Int , SVarStats -> IORef Int maxWorkers :: IORef Int , SVarStats -> IORef Int maxOutQSize :: IORef Int , SVarStats -> IORef Int maxHeapSize :: IORef Int , SVarStats -> IORef Int maxWorkQSize :: IORef Int , SVarStats -> IORef (Count, NanoSecond64) avgWorkerLatency :: IORef (Count, NanoSecond64) , SVarStats -> IORef NanoSecond64 minWorkerLatency :: IORef NanoSecond64 , SVarStats -> IORef NanoSecond64 maxWorkerLatency :: IORef NanoSecond64 , SVarStats -> IORef (Maybe AbsTime) svarStopTime :: IORef (Maybe AbsTime) } -- This is essentially a 'Maybe Word' type data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS [Limit] -> ShowS Limit -> String (Int -> Limit -> ShowS) -> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> Limit -> ShowS showsPrec :: Int -> Limit -> ShowS $cshow :: Limit -> String show :: Limit -> String $cshowList :: [Limit] -> ShowS showList :: [Limit] -> ShowS Show instance Eq Limit where Limit Unlimited == :: Limit -> Limit -> Bool == Limit Unlimited = Bool True Limit Unlimited == Limited Word _ = Bool False Limited Word _ == Limit Unlimited = Bool False Limited Word x == Limited Word y = Word x Word -> Word -> Bool forall a. Eq a => a -> a -> Bool == Word y instance Ord Limit where Limit Unlimited <= :: Limit -> Limit -> Bool <= Limit Unlimited = Bool True Limit Unlimited <= Limited Word _ = Bool False Limited Word _ <= Limit Unlimited = Bool True Limited Word x <= Limited Word y = Word x Word -> Word -> Bool forall a. Ord a => a -> a -> Bool <= Word y -- When to stop the composed stream. data SVarStopStyle = StopNone -- stops only when all streams are finished | StopAny -- stop when any stream finishes | StopBy -- stop when a specific stream finishes deriving (SVarStopStyle -> SVarStopStyle -> Bool (SVarStopStyle -> SVarStopStyle -> Bool) -> (SVarStopStyle -> SVarStopStyle -> Bool) -> Eq SVarStopStyle forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a $c== :: SVarStopStyle -> SVarStopStyle -> Bool == :: SVarStopStyle -> SVarStopStyle -> Bool $c/= :: SVarStopStyle -> SVarStopStyle -> Bool /= :: SVarStopStyle -> SVarStopStyle -> Bool Eq, Int -> SVarStopStyle -> ShowS [SVarStopStyle] -> ShowS SVarStopStyle -> String (Int -> SVarStopStyle -> ShowS) -> (SVarStopStyle -> String) -> ([SVarStopStyle] -> ShowS) -> Show SVarStopStyle forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> SVarStopStyle -> ShowS showsPrec :: Int -> SVarStopStyle -> ShowS $cshow :: SVarStopStyle -> String show :: SVarStopStyle -> String $cshowList :: [SVarStopStyle] -> ShowS showList :: [SVarStopStyle] -> ShowS Show) -- | Buffering policy for persistent push workers (in ParallelT). In a pull -- style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on -- demand, workers terminate if the buffer is full or if the consumer is not -- cosuming fast enough. In a push style SVar, a worker is dispatched only -- once, workers are persistent and keep pushing work to the consumer via a -- bounded buffer. If the buffer becomes full the worker either blocks, or it -- can drop an item from the buffer to make space. -- -- Pull style SVars are useful in lazy stream evaluation whereas push style -- SVars are useful in strict left Folds. -- -- XXX Maybe we can separate the implementation in two different types instead -- of using a common SVar type. -- data PushBufferPolicy = PushBufferDropNew -- drop the latest element and continue | PushBufferDropOld -- drop the oldest element and continue | PushBufferBlock -- block the thread until space -- becomes available -- IMPORTANT NOTE: we cannot update the SVar after generating it as we have -- references to the original SVar stored in several functions which will keep -- pointing to the original data and the new updates won't reflect there. -- Any updateable parts must be kept in mutable references (IORef). -- data SVar t m a = SVar { -- Read only state forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> SVarStyle svarStyle :: SVarStyle , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> RunInIO m svarMrun :: RunInIO m , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> SVarStopStyle svarStopStyle :: SVarStopStyle , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef ThreadId svarStopBy :: IORef ThreadId -- Shared output queue (events, length) -- XXX For better efficiency we can try a preallocated array type (perhaps -- something like a vector) that allows an O(1) append. That way we will -- avoid constructing and reversing the list. Possibly we can also avoid -- the GC copying overhead. When the size increases we should be able to -- allocate the array in chunks. -- -- [LOCKING] Frequent locked access. This is updated by workers on each -- yield and once in a while read by the consumer thread. This could have -- big locking overhead if the number of workers is high. -- -- XXX We can use a per-CPU data structure to reduce the locking overhead. -- However, a per-cpu structure cannot guarantee the exact sequence in -- which the elements were added, though that may not be important. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef ([ChildEvent a], Int) outputQueue :: IORef ([ChildEvent a], Int) -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty -- to non-empty, or a work item is queued by a worker to the work queue and -- needDoorBell is set by the consumer. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> MVar () outputDoorBell :: MVar () -- signal the consumer about output , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> m [ChildEvent a] readOutputQ :: m [ChildEvent a] , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> m Bool postProcess :: m Bool -- channel to send events from the consumer to the worker. Used to send -- exceptions from a fold driver to the fold computation running as a -- consumer thread in the concurrent fold cases. Currently only one event -- is sent by the fold so we do not really need a queue for it. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef ([ChildEvent a], Int) outputQueueFromConsumer :: IORef ([ChildEvent a], Int) , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> MVar () outputDoorBellFromConsumer :: MVar () -- Combined/aggregate parameters -- This is truncated to maxBufferLimit if set to more than that. Otherwise -- potentially each worker may yield one value to the buffer in the worst -- case exceeding the requested buffer size. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Limit maxWorkerLimit :: Limit , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Limit maxBufferLimit :: Limit -- These two are valid and used only when maxBufferLimit is Limited. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef Count pushBufferSpace :: IORef Count , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> PushBufferPolicy pushBufferPolicy :: PushBufferPolicy -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers -- block on it when the buffer becomes full. No overhead unless the buffer -- becomes full. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> MVar () pushBufferMVar :: MVar () -- [LOCKING] Read only access by consumer when dispatching a worker. -- Decremented by workers when picking work and undo decrement if the -- worker does not yield a value. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Maybe (IORef Count) remainingWork :: Maybe (IORef Count) , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Maybe YieldRateInfo yieldRateInfo :: Maybe YieldRateInfo -- Used only by bounded SVar types , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> (RunInIO m, t m a) -> IO () enqueue :: (RunInIO m, t m a) -> IO () , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IO Bool isWorkDone :: IO Bool , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IO Bool isQueueDone :: IO Bool , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef Bool needDoorBell :: IORef Bool , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Maybe WorkerInfo -> m () workLoop :: Maybe WorkerInfo -> m () -- Shared, thread tracking -- [LOCKING] Updated unlocked only by consumer thread in case of -- Async/Ahead style SVars. Updated locked by worker threads in case of -- Parallel style. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef (Set ThreadId) workerThreads :: IORef (Set ThreadId) -- [LOCKING] Updated locked by consumer thread when dispatching a worker -- and by the worker threads when the thread stops. This is read unsafely -- at several places where we want to rely on an approximate value. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef Int workerCount :: IORef Int , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> ThreadId -> m () accountThread :: ThreadId -> m () , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> MVar () workerStopMVar :: MVar () , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> SVarStats svarStats :: SVarStats -- to track garbage collection of SVar , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Maybe (IORef ()) svarRef :: Maybe (IORef ()) -- Only for diagnostics , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> Bool svarInspectMode :: Bool , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> ThreadId svarCreator :: ThreadId , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a)) , Maybe Int) -- Shared work queue (stream, seqNo) , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IORef ([t m a], Int) aheadWorkQueue :: IORef ([t m a], Int) } ------------------------------------------------------------------------------- -- Overall state threaded around a stream ------------------------------------------------------------------------------- -- | Specifies the stream yield rate in yields per second (@Hertz@). -- We keep accumulating yield credits at 'rateGoal'. At any point of time we -- allow only as many yields as we have accumulated as per 'rateGoal' since the -- start of time. If the consumer or the producer is slower or faster, the -- actual rate may fall behind or exceed 'rateGoal'. We try to recover the gap -- between the two by increasing or decreasing the pull rate from the producer. -- However, if the gap becomes more than 'rateBuffer' we try to recover only as -- much as 'rateBuffer'. -- -- 'rateLow' puts a bound on how low the instantaneous rate can go when -- recovering the rate gap. In other words, it determines the maximum yield -- latency. Similarly, 'rateHigh' puts a bound on how high the instantaneous -- rate can go when recovering the rate gap. In other words, it determines the -- minimum yield latency. We reduce the latency by increasing concurrency, -- therefore we can say that it puts an upper bound on concurrency. -- -- If the 'rateGoal' is 0 or negative the stream never yields a value. -- If the 'rateBuffer' is 0 or negative we do not attempt to recover. -- -- /Since: 0.5.0 ("Streamly")/ -- -- @since 0.8.0 data Rate = Rate { Rate -> Double rateLow :: Double -- ^ The lower rate limit , Rate -> Double rateGoal :: Double -- ^ The target rate we want to achieve , Rate -> Double rateHigh :: Double -- ^ The upper rate limit , Rate -> Int rateBuffer :: Int -- ^ Maximum slack from the goal } -- XXX we can put the resettable fields in a oneShotConfig field and others in -- a persistentConfig field. That way reset would be fast and scalable -- irrespective of the number of fields. -- -- XXX make all these Limited types and use phantom types to distinguish them data State t m a = State { -- one shot configuration, automatically reset for each API call forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe (SVar t m a) streamVar :: Maybe (SVar t m a) , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe Count _yieldLimit :: Maybe Count -- persistent configuration, state that remains valid until changed by -- an explicit setting via a combinator. , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Limit _threadsHigh :: Limit , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Limit _bufferHigh :: Limit -- XXX these two can be collapsed into a single type , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe NanoSecond64 _streamLatency :: Maybe NanoSecond64 -- bootstrap latency , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe Rate _maxStreamRate :: Maybe Rate , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Bool _inspectMode :: Bool } ------------------------------------------------------------------------------- -- State defaults and reset ------------------------------------------------------------------------------- -- A magical value for the buffer size arrived at by running the smallest -- possible task and measuring the optimal value of the buffer for that. This -- is obviously dependent on hardware, this figure is based on a 2.2GHz intel -- core-i7 processor. magicMaxBuffer :: Word magicMaxBuffer :: Word magicMaxBuffer = Word 1500 defaultMaxThreads, defaultMaxBuffer :: Limit defaultMaxThreads :: Limit defaultMaxThreads = Word -> Limit Limited Word magicMaxBuffer defaultMaxBuffer :: Limit defaultMaxBuffer = Word -> Limit Limited Word magicMaxBuffer -- The fields prefixed by an _ are not to be accessed or updated directly but -- via smart accessor APIs. defState :: State t m a defState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState = State { streamVar :: Maybe (SVar t m a) streamVar = Maybe (SVar t m a) forall a. Maybe a Nothing , _yieldLimit :: Maybe Count _yieldLimit = Maybe Count forall a. Maybe a Nothing , _threadsHigh :: Limit _threadsHigh = Limit defaultMaxThreads , _bufferHigh :: Limit _bufferHigh = Limit defaultMaxBuffer , _maxStreamRate :: Maybe Rate _maxStreamRate = Maybe Rate forall a. Maybe a Nothing , _streamLatency :: Maybe NanoSecond64 _streamLatency = Maybe NanoSecond64 forall a. Maybe a Nothing , _inspectMode :: Bool _inspectMode = Bool False } -- XXX if perf gets affected we can have all the Nothing params in a single -- structure so that we reset is fast. We can also use rewrite rules such that -- reset occurs only in concurrent streams to reduce the impact on serial -- streams. -- We can optimize this so that we clear it only if it is a Just value, it -- results in slightly better perf for zip/zipM but the performance of scan -- worsens a lot, it does not fuse. -- -- XXX This has a side effect of clearing the SVar and yieldLimit, therefore it -- should not be used to convert from the same type to the same type, unless -- you want to clear the SVar. For clearing the SVar you should be using the -- appropriate unStream functions instead. -- -- | Adapt the stream state from one type to another. adaptState :: State t m a -> State t n b adaptState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State t m a st = State t m a st { streamVar :: Maybe (SVar t n b) streamVar = Maybe (SVar t n b) forall a. Maybe a Nothing , _yieldLimit :: Maybe Count _yieldLimit = Maybe Count forall a. Maybe a Nothing } ------------------------------------------------------------------------------- -- Smart get/set routines for State ------------------------------------------------------------------------------- -- Use get/set routines instead of directly accessing the State fields setYieldLimit :: Maybe Int64 -> State t m a -> State t m a setYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Maybe Int64 -> State t m a -> State t m a setYieldLimit Maybe Int64 lim State t m a st = State t m a st { _yieldLimit :: Maybe Count _yieldLimit = case Maybe Int64 lim of Maybe Int64 Nothing -> Maybe Count forall a. Maybe a Nothing Just Int64 n -> if Int64 n Int64 -> Int64 -> Bool forall a. Ord a => a -> a -> Bool <= Int64 0 then Count -> Maybe Count forall a. a -> Maybe a Just Count 0 else Count -> Maybe Count forall a. a -> Maybe a Just (Int64 -> Count forall a b. (Integral a, Num b) => a -> b fromIntegral Int64 n) } getYieldLimit :: State t m a -> Maybe Count getYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe Count getYieldLimit = State t m a -> Maybe Count forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe Count _yieldLimit setMaxThreads :: Int -> State t m a -> State t m a setMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Int -> State t m a -> State t m a setMaxThreads Int n State t m a st = State t m a st { _threadsHigh :: Limit _threadsHigh = if Int n Int -> Int -> Bool forall a. Ord a => a -> a -> Bool < Int 0 then Limit Unlimited else if Int n Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0 then Limit defaultMaxThreads else Word -> Limit Limited (Int -> Word forall a b. (Integral a, Num b) => a -> b fromIntegral Int n) } getMaxThreads :: State t m a -> Limit getMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Limit getMaxThreads = State t m a -> Limit forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Limit _threadsHigh setMaxBuffer :: Int -> State t m a -> State t m a setMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Int -> State t m a -> State t m a setMaxBuffer Int n State t m a st = State t m a st { _bufferHigh :: Limit _bufferHigh = if Int n Int -> Int -> Bool forall a. Ord a => a -> a -> Bool < Int 0 then Limit Unlimited else if Int n Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0 then Limit defaultMaxBuffer else Word -> Limit Limited (Int -> Word forall a b. (Integral a, Num b) => a -> b fromIntegral Int n) } getMaxBuffer :: State t m a -> Limit getMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Limit getMaxBuffer = State t m a -> Limit forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Limit _bufferHigh setStreamRate :: Maybe Rate -> State t m a -> State t m a setStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Maybe Rate -> State t m a -> State t m a setStreamRate Maybe Rate r State t m a st = State t m a st { _maxStreamRate :: Maybe Rate _maxStreamRate = Maybe Rate r } getStreamRate :: State t m a -> Maybe Rate getStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe Rate getStreamRate = State t m a -> Maybe Rate forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe Rate _maxStreamRate setStreamLatency :: Int -> State t m a -> State t m a setStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Int -> State t m a -> State t m a setStreamLatency Int n State t m a st = State t m a st { _streamLatency :: Maybe NanoSecond64 _streamLatency = if Int n Int -> Int -> Bool forall a. Ord a => a -> a -> Bool <= Int 0 then Maybe NanoSecond64 forall a. Maybe a Nothing else NanoSecond64 -> Maybe NanoSecond64 forall a. a -> Maybe a Just (Int -> NanoSecond64 forall a b. (Integral a, Num b) => a -> b fromIntegral Int n) } getStreamLatency :: State t m a -> Maybe NanoSecond64 getStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe NanoSecond64 getStreamLatency = State t m a -> Maybe NanoSecond64 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe NanoSecond64 _streamLatency setInspectMode :: State t m a -> State t m a setInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> State t m a setInspectMode State t m a st = State t m a st { _inspectMode :: Bool _inspectMode = Bool True } getInspectMode :: State t m a -> Bool getInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Bool getInspectMode = State t m a -> Bool forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Bool _inspectMode