-- |
-- Module      : Streamly.Internal.Data.SVar.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.SVar.Type
    (
    -- * Parent child communication
      ThreadAbort (..)
    , ChildEvent (..)
    , RunInIO(..)
    , AheadHeapEntry (..)

    -- * SVar
    , Count (..)
    , Limit (..)
    , SVarStyle (..)
    , SVarStopStyle (..)
    , SVarStats (..)
    , WorkerInfo (..)
    , PushBufferPolicy(..)
    , LatencyRange (..)
    , YieldRateInfo (..)
    , SVar (..)

    -- * State threaded around the stream
    , Rate (..)
    , State (streamVar)

    -- ** Default State
    , magicMaxBuffer
    , defState

    -- ** Type cast
    , adaptState

    -- ** State accessors
    , getMaxThreads
    , setMaxThreads
    , getMaxBuffer
    , setMaxBuffer
    , getStreamRate
    , setStreamRate
    , getStreamLatency
    , setStreamLatency
    , getYieldLimit
    , setYieldLimit
    , getInspectMode
    , setInspectMode
    )
where

import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException(..), Exception)
#ifndef USE_UNLIFTIO
import Control.Monad.Trans.Control (MonadBaseControl(StM))
#endif
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef (IORef)
import Data.Kind (Type)
import Data.Set (Set)

import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))

newtype Count = Count Int64
    deriving ( Count -> Count -> Bool
(Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
/= :: Count -> Count -> Bool
Eq
             , ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
(Int -> ReadS Count)
-> ReadS [Count]
-> ReadPrec Count
-> ReadPrec [Count]
-> Read Count
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS Count
readsPrec :: Int -> ReadS Count
$creadList :: ReadS [Count]
readList :: ReadS [Count]
$creadPrec :: ReadPrec Count
readPrec :: ReadPrec Count
$creadListPrec :: ReadPrec [Count]
readListPrec :: ReadPrec [Count]
Read
             , Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
(Int -> Count -> ShowS)
-> (Count -> String) -> ([Count] -> ShowS) -> Show Count
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Count -> ShowS
showsPrec :: Int -> Count -> ShowS
$cshow :: Count -> String
show :: Count -> String
$cshowList :: [Count] -> ShowS
showList :: [Count] -> ShowS
Show
             , Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
(Count -> Count)
-> (Count -> Count)
-> (Int -> Count)
-> (Count -> Int)
-> (Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> Count -> [Count])
-> Enum Count
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: Count -> Count
succ :: Count -> Count
$cpred :: Count -> Count
pred :: Count -> Count
$ctoEnum :: Int -> Count
toEnum :: Int -> Count
$cfromEnum :: Count -> Int
fromEnum :: Count -> Int
$cenumFrom :: Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromThenTo :: Count -> Count -> Count -> [Count]
Enum
             , Count
Count -> Count -> Bounded Count
forall a. a -> a -> Bounded a
$cminBound :: Count
minBound :: Count
$cmaxBound :: Count
maxBound :: Count
Bounded
             , Integer -> Count
Count -> Count
Count -> Count -> Count
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Integer -> Count)
-> Num Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c- :: Count -> Count -> Count
- :: Count -> Count -> Count
$c* :: Count -> Count -> Count
* :: Count -> Count -> Count
$cnegate :: Count -> Count
negate :: Count -> Count
$cabs :: Count -> Count
abs :: Count -> Count
$csignum :: Count -> Count
signum :: Count -> Count
$cfromInteger :: Integer -> Count
fromInteger :: Integer -> Count
Num
             , Num Count
Ord Count
Num Count -> Ord Count -> (Count -> Rational) -> Real Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
$ctoRational :: Count -> Rational
toRational :: Count -> Rational
Real
             , Enum Count
Real Count
Real Count
-> Enum Count
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> (Count, Count))
-> (Count -> Count -> (Count, Count))
-> (Count -> Integer)
-> Integral Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
$cquot :: Count -> Count -> Count
quot :: Count -> Count -> Count
$crem :: Count -> Count -> Count
rem :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
div :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
mod :: Count -> Count -> Count
$cquotRem :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
divMod :: Count -> Count -> (Count, Count)
$ctoInteger :: Count -> Integer
toInteger :: Count -> Integer
Integral
             , Eq Count
Eq Count
-> (Count -> Count -> Ordering)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> Ord Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Count -> Count -> Ordering
compare :: Count -> Count -> Ordering
$c< :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
>= :: Count -> Count -> Bool
$cmax :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
min :: Count -> Count -> Count
Ord
             )

------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------

data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
(Int -> ThreadAbort -> ShowS)
-> (ThreadAbort -> String)
-> ([ThreadAbort] -> ShowS)
-> Show ThreadAbort
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ThreadAbort -> ShowS
showsPrec :: Int -> ThreadAbort -> ShowS
$cshow :: ThreadAbort -> String
show :: ThreadAbort -> String
$cshowList :: [ThreadAbort] -> ShowS
showList :: [ThreadAbort] -> ShowS
Show

instance Exception ThreadAbort

-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
      ChildYield a
    | ChildStop ThreadId (Maybe SomeException)

#ifdef USE_UNLIFTIO
newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO b }
#else
newtype RunInIO m = RunInIO { forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO :: forall b. m b -> IO (StM m b) }
#endif

-- | Sorting out-of-turn outputs in a heap for Ahead style streams
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
      AheadEntryNull
    | AheadEntryPure a
    | AheadEntryStream (RunInIO m, t m a)
#undef Type

------------------------------------------------------------------------------
-- SVar: the state for thread management
------------------------------------------------------------------------------

-- | Identify the type of the SVar. Two computations using the same style can
-- be scheduled on the same SVar.
data SVarStyle =
      AsyncVar             -- depth first concurrent
    | WAsyncVar            -- breadth first concurrent
    | ParallelVar          -- all parallel
    | AheadVar             -- Concurrent look ahead
    deriving (SVarStyle -> SVarStyle -> Bool
(SVarStyle -> SVarStyle -> Bool)
-> (SVarStyle -> SVarStyle -> Bool) -> Eq SVarStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SVarStyle -> SVarStyle -> Bool
== :: SVarStyle -> SVarStyle -> Bool
$c/= :: SVarStyle -> SVarStyle -> Bool
/= :: SVarStyle -> SVarStyle -> Bool
Eq, Int -> SVarStyle -> ShowS
[SVarStyle] -> ShowS
SVarStyle -> String
(Int -> SVarStyle -> ShowS)
-> (SVarStyle -> String)
-> ([SVarStyle] -> ShowS)
-> Show SVarStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SVarStyle -> ShowS
showsPrec :: Int -> SVarStyle -> ShowS
$cshow :: SVarStyle -> String
show :: SVarStyle -> String
$cshowList :: [SVarStyle] -> ShowS
showList :: [SVarStyle] -> ShowS
Show)

-- | An SVar or a Stream Var is a conduit to the output from multiple streams
-- running concurrently and asynchronously. An SVar can be thought of as an
-- asynchronous IO handle. We can write any number of streams to an SVar in a
-- non-blocking manner and then read them back at any time at any pace.  The
-- SVar would run the streams asynchronously and accumulate results. An SVar
-- may not really execute the stream completely and accumulate all the results.
-- However, it ensures that the reader can read the results at whatever paces
-- it wants to read. The SVar monitors and adapts to the consumer's pace.
--
-- An SVar is a mini scheduler, it has an associated workLoop that holds the
-- stream tasks to be picked and run by a pool of worker threads. It has an
-- associated output queue where the output stream elements are placed by the
-- worker threads. A outputDoorBell is used by the worker threads to intimate the
-- consumer thread about availability of new results in the output queue. More
-- workers are added to the SVar by 'fromStreamVar' on demand if the output
-- produced is not keeping pace with the consumer. On bounded SVars, workers
-- block on the output queue to provide throttling of the producer  when the
-- consumer is not pulling fast enough.  The number of workers may even get
-- reduced depending on the consuming pace.
--
-- New work is enqueued either at the time of creation of the SVar or as a
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
-- already enqueued computations get evaluated. See 'joinStreamVarAsync'.

-- We measure the individual worker latencies to estimate the number of workers
-- needed or the amount of time we have to sleep between dispatches to achieve
-- a particular rate when controlled pace mode it used.
data WorkerInfo = WorkerInfo
    { WorkerInfo -> Count
workerYieldMax   :: Count -- 0 means unlimited
    -- total number of yields by the worker till now
    , WorkerInfo -> IORef Count
workerYieldCount    :: IORef Count
    -- yieldCount at start, timestamp
    , WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart  :: IORef (Count, AbsTime)
    }

data LatencyRange = LatencyRange
    { LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
    , LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
    } deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
(Int -> LatencyRange -> ShowS)
-> (LatencyRange -> String)
-> ([LatencyRange] -> ShowS)
-> Show LatencyRange
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LatencyRange -> ShowS
showsPrec :: Int -> LatencyRange -> ShowS
$cshow :: LatencyRange -> String
show :: LatencyRange -> String
$cshowList :: [LatencyRange] -> ShowS
showList :: [LatencyRange] -> ShowS
Show

-- Rate control.
data YieldRateInfo = YieldRateInfo
    { YieldRateInfo -> NanoSecond64
svarLatencyTarget    :: NanoSecond64
    , YieldRateInfo -> LatencyRange
svarLatencyRange     :: LatencyRange
    , YieldRateInfo -> Int
svarRateBuffer       :: Int

    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count

    -- Actual latency/througput as seen from the consumer side, we count the
    -- yields and the time it took to generates those yields. This is used to
    -- increase or decrease the number of workers needed to achieve the desired
    -- rate. The idle time of workers is adjusted in this, so that we only
    -- account for the rate when the consumer actually demands data.
    -- XXX interval latency is enough, we can move this under diagnostics build
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)

    -- XXX Worker latency specified by the user to be used before the first
    -- actual measurement arrives. Not yet implemented
    , YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64

    -- After how many yields the worker should update the latency information.
    -- If the latency is high, this count is kept lower and vice-versa.  XXX If
    -- the latency suddenly becomes too high this count may remain too high for
    -- long time, in such cases the consumer can change it.
    -- 0 means no latency computation
    -- XXX this is derivable from workerMeasuredLatency, can be removed.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count

    -- This is in progress latency stats maintained by the workers which we
    -- empty into workerCollectedLatency stats at certain intervals - whenever
    -- we process the stream elements yielded in this period. The first count
    -- is all yields, the second count is only those yields for which the
    -- latency was measured to be non-zero (note that if the timer resolution
    -- is low the measured latency may be zero e.g. on JS platform).
    -- [LOCKING] Locked access. Modified by the consumer thread as well as
    -- worker threads. Workers modify it periodically based on
    -- workerPollingInterval and not on every yield to reduce the locking
    -- overhead.
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency   :: IORef (Count, Count, NanoSecond64)

    -- This is the second level stat which is an accmulation from
    -- workerPendingLatency stats. We keep accumulating latencies in this
    -- bucket until we have stats for a sufficient period and then we reset it
    -- to start collecting for the next period and retain the computed average
    -- latency for the last period in workerMeasuredLatency.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    -- Latency as measured by workers, aggregated for the last period.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
    }

data SVarStats = SVarStats {
      SVarStats -> IORef Int
totalDispatches  :: IORef Int
    , SVarStats -> IORef Int
maxWorkers       :: IORef Int
    , SVarStats -> IORef Int
maxOutQSize      :: IORef Int
    , SVarStats -> IORef Int
maxHeapSize      :: IORef Int
    , SVarStats -> IORef Int
maxWorkQSize     :: IORef Int
    , SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
    , SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef (Maybe AbsTime)
svarStopTime     :: IORef (Maybe AbsTime)
}

-- This is essentially a 'Maybe Word' type
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
(Int -> Limit -> ShowS)
-> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Limit -> ShowS
showsPrec :: Int -> Limit -> ShowS
$cshow :: Limit -> String
show :: Limit -> String
$cshowList :: [Limit] -> ShowS
showList :: [Limit] -> ShowS
Show

instance Eq Limit where
    Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
    Limit
Unlimited == Limited Word
_ = Bool
False
    Limited Word
_ == Limit
Unlimited = Bool
False
    Limited Word
x == Limited Word
y = Word
x Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
y

instance Ord Limit where
    Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
    Limit
Unlimited <= Limited Word
_ = Bool
False
    Limited Word
_ <= Limit
Unlimited = Bool
True
    Limited Word
x <= Limited Word
y = Word
x Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
<= Word
y

-- When to stop the composed stream.
data SVarStopStyle =
      StopNone -- stops only when all streams are finished
    | StopAny  -- stop when any stream finishes
    | StopBy   -- stop when a specific stream finishes
    deriving (SVarStopStyle -> SVarStopStyle -> Bool
(SVarStopStyle -> SVarStopStyle -> Bool)
-> (SVarStopStyle -> SVarStopStyle -> Bool) -> Eq SVarStopStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SVarStopStyle -> SVarStopStyle -> Bool
== :: SVarStopStyle -> SVarStopStyle -> Bool
$c/= :: SVarStopStyle -> SVarStopStyle -> Bool
/= :: SVarStopStyle -> SVarStopStyle -> Bool
Eq, Int -> SVarStopStyle -> ShowS
[SVarStopStyle] -> ShowS
SVarStopStyle -> String
(Int -> SVarStopStyle -> ShowS)
-> (SVarStopStyle -> String)
-> ([SVarStopStyle] -> ShowS)
-> Show SVarStopStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SVarStopStyle -> ShowS
showsPrec :: Int -> SVarStopStyle -> ShowS
$cshow :: SVarStopStyle -> String
show :: SVarStopStyle -> String
$cshowList :: [SVarStopStyle] -> ShowS
showList :: [SVarStopStyle] -> ShowS
Show)

-- | Buffering policy for persistent push workers (in ParallelT).  In a pull
-- style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on
-- demand, workers terminate if the buffer is full or if the consumer is not
-- cosuming fast enough.  In a push style SVar, a worker is dispatched only
-- once, workers are persistent and keep pushing work to the consumer via a
-- bounded buffer. If the buffer becomes full the worker either blocks, or it
-- can drop an item from the buffer to make space.
--
-- Pull style SVars are useful in lazy stream evaluation whereas push style
-- SVars are useful in strict left Folds.
--
-- XXX Maybe we can separate the implementation in two different types instead
-- of using a common SVar type.
--
data PushBufferPolicy =
      PushBufferDropNew  -- drop the latest element and continue
    | PushBufferDropOld  -- drop the oldest element and continue
    | PushBufferBlock    -- block the thread until space
                         -- becomes available

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
-- pointing to the original data and the new updates won't reflect there.
-- Any updateable parts must be kept in mutable references (IORef).
--
data SVar t m a = SVar
    {
    -- Read only state
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle       :: SVarStyle
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun        :: RunInIO m
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle   :: SVarStopStyle
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy      :: IORef ThreadId

    -- Shared output queue (events, length)
    -- XXX For better efficiency we can try a preallocated array type (perhaps
    -- something like a vector) that allows an O(1) append. That way we will
    -- avoid constructing and reversing the list. Possibly we can also avoid
    -- the GC copying overhead. When the size increases we should be able to
    -- allocate the array in chunks.
    --
    -- [LOCKING] Frequent locked access. This is updated by workers on each
    -- yield and once in a while read by the consumer thread. This could have
    -- big locking overhead if the number of workers is high.
    --
    -- XXX We can use a per-CPU data structure to reduce the locking overhead.
    -- However, a per-cpu structure cannot guarantee the exact sequence in
    -- which the elements were added, though that may not be important.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue    :: IORef ([ChildEvent a], Int)

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty, or a work item is queued by a worker to the work queue and
    -- needDoorBell is set by the consumer.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell :: MVar ()  -- signal the consumer about output
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ    :: m [ChildEvent a]
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess    :: m Bool

    -- channel to send events from the consumer to the worker. Used to send
    -- exceptions from a fold driver to the fold computation running as a
    -- consumer thread in the concurrent fold cases. Currently only one event
    -- is sent by the fold so we do not really need a queue for it.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer :: MVar ()

    -- Combined/aggregate parameters
    -- This is truncated to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit :: Limit
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit :: Limit
    -- These two are valid and used only when maxBufferLimit is Limited.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace  :: IORef Count
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy :: PushBufferPolicy
    -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
    -- block on it when the buffer becomes full. No overhead unless the buffer
    -- becomes full.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar :: MVar ()

    -- [LOCKING] Read only access by consumer when dispatching a worker.
    -- Decremented by workers when picking work and undo decrement if the
    -- worker does not yield a value.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork  :: Maybe (IORef Count)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo  :: Maybe YieldRateInfo

    -- Used only by bounded SVar types
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue        :: (RunInIO m, t m a) -> IO ()
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone     :: IO Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone    :: IO Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell   :: IORef Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop       :: Maybe WorkerInfo -> m ()

    -- Shared, thread tracking
    -- [LOCKING] Updated unlocked only by consumer thread in case of
    -- Async/Ahead style SVars. Updated locked by worker threads in case of
    -- Parallel style.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads  :: IORef (Set ThreadId)
    -- [LOCKING] Updated locked by consumer thread when dispatching a worker
    -- and by the worker threads when the thread stops. This is read unsafely
    -- at several places where we want to rely on an approximate value.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount    :: IORef Int
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread  :: ThreadId -> m ()
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar :: MVar ()

    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats      :: SVarStats
    -- to track garbage collection of SVar
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef ())
svarRef        :: Maybe (IORef ())

    -- Only for diagnostics
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode :: Bool
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator    :: ThreadId
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap     :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
                              , Maybe Int)
    -- Shared work queue (stream, seqNo)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue :: IORef ([t m a], Int)
    }

-------------------------------------------------------------------------------
-- Overall state threaded around a stream
-------------------------------------------------------------------------------

-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'.  We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
-- much as 'rateBuffer'.
--
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap.  In other words, it determines the maximum yield
-- latency.  Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap.  In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
--
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
--
-- /Since: 0.5.0 ("Streamly")/
--
-- @since 0.8.0
data Rate = Rate
    { Rate -> Double
rateLow    :: Double -- ^ The lower rate limit
    , Rate -> Double
rateGoal   :: Double -- ^ The target rate we want to achieve
    , Rate -> Double
rateHigh   :: Double -- ^ The upper rate limit
    , Rate -> Int
rateBuffer :: Int    -- ^ Maximum slack from the goal
    }

-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
--
-- XXX make all these Limited types and use phantom types to distinguish them
data State t m a = State
    { -- one shot configuration, automatically reset for each API call
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar   :: Maybe (SVar t m a)
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit  :: Maybe Count

    -- persistent configuration, state that remains valid until changed by
    -- an explicit setting via a combinator.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh    :: Limit
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh     :: Limit
    -- XXX these two can be collapsed into a single type
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency  :: Maybe NanoSecond64 -- bootstrap latency
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate  :: Maybe Rate
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode    :: Bool
    }

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

-- A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that.  This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer

-- The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs.
defState :: State t m a
defState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState = State
    { streamVar :: Maybe (SVar t m a)
streamVar = Maybe (SVar t m a)
forall a. Maybe a
Nothing
    , _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
    , _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
    , _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
    , _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
Nothing
    , _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
Nothing
    , _inspectMode :: Bool
_inspectMode = Bool
False
    }

-- XXX if perf gets affected we can have all the Nothing params in a single
-- structure so that we reset is fast. We can also use rewrite rules such that
-- reset occurs only in concurrent streams to reduce the impact on serial
-- streams.
-- We can optimize this so that we clear it only if it is a Just value, it
-- results in slightly better perf for zip/zipM but the performance of scan
-- worsens a lot, it does not fuse.
--
-- XXX This has a side effect of clearing the SVar and yieldLimit, therefore it
-- should not be used to convert from the same type to the same type, unless
-- you want to clear the SVar. For clearing the SVar you should be using the
-- appropriate unStream functions instead.
--
-- | Adapt the stream state from one type to another.
adaptState :: State t m a -> State t n b
adaptState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
st = State t m a
st
    { streamVar :: Maybe (SVar t n b)
streamVar = Maybe (SVar t n b)
forall a. Maybe a
Nothing
    , _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
    }

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

-- Use get/set routines instead of directly accessing the State fields
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
lim State t m a
st =
    State t m a
st { _yieldLimit :: Maybe Count
_yieldLimit =
            case Maybe Int64
lim of
                Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
Nothing
                Just Int64
n  ->
                    if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0
                    then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
0
                    else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
       }

getYieldLimit :: State t m a -> Maybe Count
getYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit = State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit

setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State t m a
st =
    State t m a
st { _threadsHigh :: Limit
_threadsHigh =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
            then Limit
Unlimited
            else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                 then Limit
defaultMaxThreads
                 else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getMaxThreads :: State t m a -> Limit
getMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh

setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State t m a
st =
    State t m a
st { _bufferHigh :: Limit
_bufferHigh =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
            then Limit
Unlimited
            else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                 then Limit
defaultMaxBuffer
                 else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getMaxBuffer :: State t m a -> Limit
getMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh

setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State t m a
st = State t m a
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }

getStreamRate :: State t m a -> Maybe Rate
getStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate = State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate

setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State t m a
st =
    State t m a
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
            then Maybe NanoSecond64
forall a. Maybe a
Nothing
            else NanoSecond64 -> Maybe NanoSecond64
forall a. a -> Maybe a
Just (Int -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency

setInspectMode :: State t m a -> State t m a
setInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State t m a
st = State t m a
st { _inspectMode :: Bool
_inspectMode = Bool
True }

getInspectMode :: State t m a -> Bool
getInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode