-- |
-- Module      : Streamly.Internal.Data.SVar.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.SVar.Type
    -- * Parent child communication
      ThreadAbort (..)
    , ChildEvent (..)
    , RunInIO(..)
    , AheadHeapEntry (..)

    -- * SVar
    , Count (..)
    , Limit (..)
    , SVarStyle (..)
    , SVarStopStyle (..)
    , SVarStats (..)
    , WorkerInfo (..)
    , PushBufferPolicy(..)
    , LatencyRange (..)
    , YieldRateInfo (..)
    , SVar (..)

    -- * State threaded around the stream
    , Rate (..)
    , State (streamVar)

    -- ** Default State
    , magicMaxBuffer
    , defState

    -- ** Type cast
    , adaptState

    -- ** State accessors
    , getMaxThreads
    , setMaxThreads
    , getMaxBuffer
    , setMaxBuffer
    , getStreamRate
    , setStreamRate
    , getStreamLatency
    , setStreamLatency
    , getYieldLimit
    , setYieldLimit
    , getInspectMode
    , setInspectMode

import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException(..), Exception)
import Control.Monad.Trans.Control (MonadBaseControl(StM))
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef (IORef)
import Data.Kind (Type)
import Data.Set (Set)

import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))

newtype Count = Count Int64
    deriving ( Count -> Count -> Bool
(Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
/= :: Count -> Count -> Bool
             , ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
(Int -> ReadS Count)
-> ReadS [Count]
-> ReadPrec Count
-> ReadPrec [Count]
-> Read Count
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS Count
readsPrec :: Int -> ReadS Count
$creadList :: ReadS [Count]
readList :: ReadS [Count]
$creadPrec :: ReadPrec Count
readPrec :: ReadPrec Count
$creadListPrec :: ReadPrec [Count]
readListPrec :: ReadPrec [Count]
             , Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
(Int -> Count -> ShowS)
-> (Count -> String) -> ([Count] -> ShowS) -> Show Count
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Count -> ShowS
showsPrec :: Int -> Count -> ShowS
$cshow :: Count -> String
show :: Count -> String
$cshowList :: [Count] -> ShowS
showList :: [Count] -> ShowS
             , Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
(Count -> Count)
-> (Count -> Count)
-> (Int -> Count)
-> (Count -> Int)
-> (Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> Count -> [Count])
-> Enum Count
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: Count -> Count
succ :: Count -> Count
$cpred :: Count -> Count
pred :: Count -> Count
$ctoEnum :: Int -> Count
toEnum :: Int -> Count
$cfromEnum :: Count -> Int
fromEnum :: Count -> Int
$cenumFrom :: Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromThenTo :: Count -> Count -> Count -> [Count]
             , Count
Count -> Count -> Bounded Count
forall a. a -> a -> Bounded a
$cminBound :: Count
minBound :: Count
$cmaxBound :: Count
maxBound :: Count
             , Integer -> Count
Count -> Count
Count -> Count -> Count
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Integer -> Count)
-> Num Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c- :: Count -> Count -> Count
- :: Count -> Count -> Count
$c* :: Count -> Count -> Count
* :: Count -> Count -> Count
$cnegate :: Count -> Count
negate :: Count -> Count
$cabs :: Count -> Count
abs :: Count -> Count
$csignum :: Count -> Count
signum :: Count -> Count
$cfromInteger :: Integer -> Count
fromInteger :: Integer -> Count
             , Num Count
Ord Count
Num Count -> Ord Count -> (Count -> Rational) -> Real Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
$ctoRational :: Count -> Rational
toRational :: Count -> Rational
             , Enum Count
Real Count
Real Count
-> Enum Count
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> (Count, Count))
-> (Count -> Count -> (Count, Count))
-> (Count -> Integer)
-> Integral Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
$cquot :: Count -> Count -> Count
quot :: Count -> Count -> Count
$crem :: Count -> Count -> Count
rem :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
div :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
mod :: Count -> Count -> Count
$cquotRem :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
divMod :: Count -> Count -> (Count, Count)
$ctoInteger :: Count -> Integer
toInteger :: Count -> Integer
             , Eq Count
Eq Count
-> (Count -> Count -> Ordering)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> Ord Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Count -> Count -> Ordering
compare :: Count -> Count -> Ordering
$c< :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
>= :: Count -> Count -> Bool
$cmax :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
min :: Count -> Count -> Count

-- Parent child thread communication type

data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
(Int -> ThreadAbort -> ShowS)
-> (ThreadAbort -> String)
-> ([ThreadAbort] -> ShowS)
-> Show ThreadAbort
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ThreadAbort -> ShowS
showsPrec :: Int -> ThreadAbort -> ShowS
$cshow :: ThreadAbort -> String
show :: ThreadAbort -> String
$cshowList :: [ThreadAbort] -> ShowS
showList :: [ThreadAbort] -> ShowS

instance Exception ThreadAbort

-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
      ChildYield a
    | ChildStop ThreadId (Maybe SomeException)

newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO b }
newtype RunInIO m = RunInIO { forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO :: forall b. m b -> IO (StM m b) }

-- | Sorting out-of-turn outputs in a heap for Ahead style streams
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
    | AheadEntryPure a
    | AheadEntryStream (RunInIO m, t m a)
#undef Type

-- SVar: the state for thread management

-- | Identify the type of the SVar. Two computations using the same style can
-- be scheduled on the same SVar.
data SVarStyle =
      AsyncVar             -- depth first concurrent
    | WAsyncVar            -- breadth first concurrent
    | ParallelVar          -- all parallel
    | AheadVar             -- Concurrent look ahead
    deriving (SVarStyle -> SVarStyle -> Bool
(SVarStyle -> SVarStyle -> Bool)
-> (SVarStyle -> SVarStyle -> Bool) -> Eq SVarStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SVarStyle -> SVarStyle -> Bool
== :: SVarStyle -> SVarStyle -> Bool
$c/= :: SVarStyle -> SVarStyle -> Bool
/= :: SVarStyle -> SVarStyle -> Bool
Eq, Int -> SVarStyle -> ShowS
[SVarStyle] -> ShowS
SVarStyle -> String
(Int -> SVarStyle -> ShowS)
-> (SVarStyle -> String)
-> ([SVarStyle] -> ShowS)
-> Show SVarStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SVarStyle -> ShowS
showsPrec :: Int -> SVarStyle -> ShowS
$cshow :: SVarStyle -> String
show :: SVarStyle -> String
$cshowList :: [SVarStyle] -> ShowS
showList :: [SVarStyle] -> ShowS

-- | An SVar or a Stream Var is a conduit to the output from multiple streams
-- running concurrently and asynchronously. An SVar can be thought of as an
-- asynchronous IO handle. We can write any number of streams to an SVar in a
-- non-blocking manner and then read them back at any time at any pace.  The
-- SVar would run the streams asynchronously and accumulate results. An SVar
-- may not really execute the stream completely and accumulate all the results.
-- However, it ensures that the reader can read the results at whatever paces
-- it wants to read. The SVar monitors and adapts to the consumer's pace.
-- An SVar is a mini scheduler, it has an associated workLoop that holds the
-- stream tasks to be picked and run by a pool of worker threads. It has an
-- associated output queue where the output stream elements are placed by the
-- worker threads. A outputDoorBell is used by the worker threads to intimate the
-- consumer thread about availability of new results in the output queue. More
-- workers are added to the SVar by 'fromStreamVar' on demand if the output
-- produced is not keeping pace with the consumer. On bounded SVars, workers
-- block on the output queue to provide throttling of the producer  when the
-- consumer is not pulling fast enough.  The number of workers may even get
-- reduced depending on the consuming pace.
-- New work is enqueued either at the time of creation of the SVar or as a
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
-- already enqueued computations get evaluated. See 'joinStreamVarAsync'.

-- We measure the individual worker latencies to estimate the number of workers
-- needed or the amount of time we have to sleep between dispatches to achieve
-- a particular rate when controlled pace mode it used.
data WorkerInfo = WorkerInfo
    { WorkerInfo -> Count
workerYieldMax   :: Count -- 0 means unlimited
    -- total number of yields by the worker till now
    , WorkerInfo -> IORef Count
workerYieldCount    :: IORef Count
    -- yieldCount at start, timestamp
    , WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart  :: IORef (Count, AbsTime)

data LatencyRange = LatencyRange
    { LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
    , LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
    } deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
(Int -> LatencyRange -> ShowS)
-> (LatencyRange -> String)
-> ([LatencyRange] -> ShowS)
-> Show LatencyRange
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LatencyRange -> ShowS
showsPrec :: Int -> LatencyRange -> ShowS
$cshow :: LatencyRange -> String
show :: LatencyRange -> String
$cshowList :: [LatencyRange] -> ShowS
showList :: [LatencyRange] -> ShowS

-- Rate control.
data YieldRateInfo = YieldRateInfo
    { YieldRateInfo -> NanoSecond64
svarLatencyTarget    :: NanoSecond64
    , YieldRateInfo -> LatencyRange
svarLatencyRange     :: LatencyRange
    , YieldRateInfo -> Int
svarRateBuffer       :: Int

    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count

    -- Actual latency/througput as seen from the consumer side, we count the
    -- yields and the time it took to generates those yields. This is used to
    -- increase or decrease the number of workers needed to achieve the desired
    -- rate. The idle time of workers is adjusted in this, so that we only
    -- account for the rate when the consumer actually demands data.
    -- XXX interval latency is enough, we can move this under diagnostics build
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)

    -- XXX Worker latency specified by the user to be used before the first
    -- actual measurement arrives. Not yet implemented
    , YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64

    -- After how many yields the worker should update the latency information.
    -- If the latency is high, this count is kept lower and vice-versa.  XXX If
    -- the latency suddenly becomes too high this count may remain too high for
    -- long time, in such cases the consumer can change it.
    -- 0 means no latency computation
    -- XXX this is derivable from workerMeasuredLatency, can be removed.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count

    -- This is in progress latency stats maintained by the workers which we
    -- empty into workerCollectedLatency stats at certain intervals - whenever
    -- we process the stream elements yielded in this period. The first count
    -- is all yields, the second count is only those yields for which the
    -- latency was measured to be non-zero (note that if the timer resolution
    -- is low the measured latency may be zero e.g. on JS platform).
    -- [LOCKING] Locked access. Modified by the consumer thread as well as
    -- worker threads. Workers modify it periodically based on
    -- workerPollingInterval and not on every yield to reduce the locking
    -- overhead.
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency   :: IORef (Count, Count, NanoSecond64)

    -- This is the second level stat which is an accmulation from
    -- workerPendingLatency stats. We keep accumulating latencies in this
    -- bucket until we have stats for a sufficient period and then we reset it
    -- to start collecting for the next period and retain the computed average
    -- latency for the last period in workerMeasuredLatency.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    -- Latency as measured by workers, aggregated for the last period.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64

data SVarStats = SVarStats {
      SVarStats -> IORef Int
totalDispatches  :: IORef Int
    , SVarStats -> IORef Int
maxWorkers       :: IORef Int
    , SVarStats -> IORef Int
maxOutQSize      :: IORef Int
    , SVarStats -> IORef Int
maxHeapSize      :: IORef Int
    , SVarStats -> IORef Int
maxWorkQSize     :: IORef Int
    , SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
    , SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef (Maybe AbsTime)
svarStopTime     :: IORef (Maybe AbsTime)

-- This is essentially a 'Maybe Word' type
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
(Int -> Limit -> ShowS)
-> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Limit -> ShowS
showsPrec :: Int -> Limit -> ShowS
$cshow :: Limit -> String
show :: Limit -> String
$cshowList :: [Limit] -> ShowS
showList :: [Limit] -> ShowS

instance Eq Limit where
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
Unlimited == Limited Word
_ = Bool
    Limited Word
_ == Limit
Unlimited = Bool
    Limited Word
x == Limited Word
y = Word
x Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word

instance Ord Limit where
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
Unlimited <= Limited Word
_ = Bool
    Limited Word
_ <= Limit
Unlimited = Bool
    Limited Word
x <= Limited Word
y = Word
x Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
<= Word

-- When to stop the composed stream.
data SVarStopStyle =
      StopNone -- stops only when all streams are finished
    | StopAny  -- stop when any stream finishes
    | StopBy   -- stop when a specific stream finishes
    deriving (SVarStopStyle -> SVarStopStyle -> Bool
(SVarStopStyle -> SVarStopStyle -> Bool)
-> (SVarStopStyle -> SVarStopStyle -> Bool) -> Eq SVarStopStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SVarStopStyle -> SVarStopStyle -> Bool
== :: SVarStopStyle -> SVarStopStyle -> Bool
$c/= :: SVarStopStyle -> SVarStopStyle -> Bool
/= :: SVarStopStyle -> SVarStopStyle -> Bool
Eq, Int -> SVarStopStyle -> ShowS
[SVarStopStyle] -> ShowS
SVarStopStyle -> String
(Int -> SVarStopStyle -> ShowS)
-> (SVarStopStyle -> String)
-> ([SVarStopStyle] -> ShowS)
-> Show SVarStopStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SVarStopStyle -> ShowS
showsPrec :: Int -> SVarStopStyle -> ShowS
$cshow :: SVarStopStyle -> String
show :: SVarStopStyle -> String
$cshowList :: [SVarStopStyle] -> ShowS
showList :: [SVarStopStyle] -> ShowS

-- | Buffering policy for persistent push workers (in ParallelT).  In a pull
-- style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on
-- demand, workers terminate if the buffer is full or if the consumer is not
-- cosuming fast enough.  In a push style SVar, a worker is dispatched only
-- once, workers are persistent and keep pushing work to the consumer via a
-- bounded buffer. If the buffer becomes full the worker either blocks, or it
-- can drop an item from the buffer to make space.
-- Pull style SVars are useful in lazy stream evaluation whereas push style
-- SVars are useful in strict left Folds.
-- XXX Maybe we can separate the implementation in two different types instead
-- of using a common SVar type.
data PushBufferPolicy =
      PushBufferDropNew  -- drop the latest element and continue
    | PushBufferDropOld  -- drop the oldest element and continue
    | PushBufferBlock    -- block the thread until space
                         -- becomes available

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
-- pointing to the original data and the new updates won't reflect there.
-- Any updateable parts must be kept in mutable references (IORef).
data SVar t m a = SVar
    -- Read only state
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle       :: SVarStyle
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun        :: RunInIO m
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle   :: SVarStopStyle
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy      :: IORef ThreadId

    -- Shared output queue (events, length)
    -- XXX For better efficiency we can try a preallocated array type (perhaps
    -- something like a vector) that allows an O(1) append. That way we will
    -- avoid constructing and reversing the list. Possibly we can also avoid
    -- the GC copying overhead. When the size increases we should be able to
    -- allocate the array in chunks.
    -- [LOCKING] Frequent locked access. This is updated by workers on each
    -- yield and once in a while read by the consumer thread. This could have
    -- big locking overhead if the number of workers is high.
    -- XXX We can use a per-CPU data structure to reduce the locking overhead.
    -- However, a per-cpu structure cannot guarantee the exact sequence in
    -- which the elements were added, though that may not be important.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue    :: IORef ([ChildEvent a], Int)

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty, or a work item is queued by a worker to the work queue and
    -- needDoorBell is set by the consumer.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell :: MVar ()  -- signal the consumer about output
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ    :: m [ChildEvent a]
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess    :: m Bool

    -- channel to send events from the consumer to the worker. Used to send
    -- exceptions from a fold driver to the fold computation running as a
    -- consumer thread in the concurrent fold cases. Currently only one event
    -- is sent by the fold so we do not really need a queue for it.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer :: MVar ()

    -- Combined/aggregate parameters
    -- This is truncated to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit :: Limit
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit :: Limit
    -- These two are valid and used only when maxBufferLimit is Limited.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace  :: IORef Count
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy :: PushBufferPolicy
    -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
    -- block on it when the buffer becomes full. No overhead unless the buffer
    -- becomes full.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar :: MVar ()

    -- [LOCKING] Read only access by consumer when dispatching a worker.
    -- Decremented by workers when picking work and undo decrement if the
    -- worker does not yield a value.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork  :: Maybe (IORef Count)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo  :: Maybe YieldRateInfo

    -- Used only by bounded SVar types
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue        :: (RunInIO m, t m a) -> IO ()
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone     :: IO Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone    :: IO Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell   :: IORef Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop       :: Maybe WorkerInfo -> m ()

    -- Shared, thread tracking
    -- [LOCKING] Updated unlocked only by consumer thread in case of
    -- Async/Ahead style SVars. Updated locked by worker threads in case of
    -- Parallel style.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads  :: IORef (Set ThreadId)
    -- [LOCKING] Updated locked by consumer thread when dispatching a worker
    -- and by the worker threads when the thread stops. This is read unsafely
    -- at several places where we want to rely on an approximate value.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount    :: IORef Int
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread  :: ThreadId -> m ()
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar :: MVar ()

    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats      :: SVarStats
    -- to track garbage collection of SVar
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef ())
svarRef        :: Maybe (IORef ())

    -- Only for diagnostics
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode :: Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator    :: ThreadId
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap     :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
                              , Maybe Int)
    -- Shared work queue (stream, seqNo)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue :: IORef ([t m a], Int)

-- Overall state threaded around a stream

-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'.  We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
-- much as 'rateBuffer'.
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap.  In other words, it determines the maximum yield
-- latency.  Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap.  In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
-- /Since: 0.5.0 ("Streamly")/
-- @since 0.8.0
data Rate = Rate
    { Rate -> Double
rateLow    :: Double -- ^ The lower rate limit
    , Rate -> Double
rateGoal   :: Double -- ^ The target rate we want to achieve
    , Rate -> Double
rateHigh   :: Double -- ^ The upper rate limit
    , Rate -> Int
rateBuffer :: Int    -- ^ Maximum slack from the goal

-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
-- XXX make all these Limited types and use phantom types to distinguish them
data State t m a = State
    { -- one shot configuration, automatically reset for each API call
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar   :: Maybe (SVar t m a)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit  :: Maybe Count

    -- persistent configuration, state that remains valid until changed by
    -- an explicit setting via a combinator.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh    :: Limit
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh     :: Limit
    -- XXX these two can be collapsed into a single type
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency  :: Maybe NanoSecond64 -- bootstrap latency
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate  :: Maybe Rate
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode    :: Bool

-- State defaults and reset

-- A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that.  This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word

-- The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs.
defState :: State t m a
defState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState = State
    { streamVar :: Maybe (SVar t m a)
streamVar = Maybe (SVar t m a)
forall a. Maybe a
    , _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
    , _threadsHigh :: Limit
_threadsHigh = Limit
    , _bufferHigh :: Limit
_bufferHigh = Limit
    , _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
    , _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
    , _inspectMode :: Bool
_inspectMode = Bool

-- XXX if perf gets affected we can have all the Nothing params in a single
-- structure so that we reset is fast. We can also use rewrite rules such that
-- reset occurs only in concurrent streams to reduce the impact on serial
-- streams.
-- We can optimize this so that we clear it only if it is a Just value, it
-- results in slightly better perf for zip/zipM but the performance of scan
-- worsens a lot, it does not fuse.
-- XXX This has a side effect of clearing the SVar and yieldLimit, therefore it
-- should not be used to convert from the same type to the same type, unless
-- you want to clear the SVar. For clearing the SVar you should be using the
-- appropriate unStream functions instead.
-- | Adapt the stream state from one type to another.
adaptState :: State t m a -> State t n b
adaptState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
st = State t m a
    { streamVar :: Maybe (SVar t n b)
streamVar = Maybe (SVar t n b)
forall a. Maybe a
    , _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a

-- Smart get/set routines for State

-- Use get/set routines instead of directly accessing the State fields
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
lim State t m a
st =
    State t m a
st { _yieldLimit :: Maybe Count
_yieldLimit =
            case Maybe Int64
lim of
                Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
                Just Int64
n  ->
                    if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
                    then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
                    else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64

getYieldLimit :: State t m a -> Maybe Count
getYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit = State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count

setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State t m a
st =
    State t m a
st { _threadsHigh :: Limit
_threadsHigh =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
            then Limit
            else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
                 then Limit
                 else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int

getMaxThreads :: State t m a -> Limit
getMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit

setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State t m a
st =
    State t m a
st { _bufferHigh :: Limit
_bufferHigh =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
            then Limit
            else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
                 then Limit
                 else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int

getMaxBuffer :: State t m a -> Limit
getMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit

setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State t m a
st = State t m a
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }

getStreamRate :: State t m a -> Maybe Rate
getStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate = State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate

setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State t m a
st =
    State t m a
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
            then Maybe NanoSecond64
forall a. Maybe a
            else NanoSecond64 -> Maybe NanoSecond64
forall a. a -> Maybe a
Just (Int -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int

getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64

setInspectMode :: State t m a -> State t m a
setInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State t m a
st = State t m a
st { _inspectMode :: Bool
_inspectMode = Bool
True }

getInspectMode :: State t m a -> Bool
getInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool