-- |
-- Module      : Streamly.Internal.Data.Channel.Worker
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Collecting results from child workers in a streamed fashion

module Streamly.Internal.Data.Channel.Worker
    (
      Work (..)
    , estimateWorkers
    , isBeyondMaxRate
    , workerRateControl

    -- * Send Events
    , sendWithDoorBell
    , sendYield
    , sendStop
    , handleChildException -- XXX rename to sendException
    )
where

import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (MVar, tryPutMVar)
import Control.Exception (SomeException(..), assert)
import Control.Monad (when, void)
import Data.IORef (IORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (AbsTime, NanoSecond64(..), diffAbsTime64, fromRelTime64)

import Streamly.Internal.Data.Channel.Types

-------------------------------------------------------------------------------
-- Yield control
-------------------------------------------------------------------------------

updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
    Count
cnt <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
    let cnt1 :: Count
cnt1 = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1
    IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
    Count -> IO Count
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1

isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
    let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
    in Count
ymax Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
ymax

-------------------------------------------------------------------------------
-- Sending results from worker
-------------------------------------------------------------------------------

{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
    IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
    -- XXX can the access to outputQueue be made faster somehow?
    Int
oldlen <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Int)) -> IO Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
        ((ChildEvent a
msg ChildEvent a -> [ChildEvent a] -> [ChildEvent a]
forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Int
n)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- The wake up must happen only after the store has finished otherwise
        -- we can have lost wakeup problems.
        IO ()
writeBarrier
        -- Since multiple workers can try this at the same time, it is possible
        -- that we may put a spurious MVar after the consumer has already seen
        -- the output. But that's harmless, at worst it may cause the consumer
        -- to read the queue again and find it empty.
        -- The important point is that the consumer is guaranteed to receive a
        -- doorbell if something was added to the queue after it empties it.
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
    Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen

-------------------------------------------------------------------------------
-- Collect and update worker latency
-------------------------------------------------------------------------------

workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
    (Count
cnt0, AbsTime
t0) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
    Count
cnt1 <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
    let cnt :: Count
cnt = Count
cnt1 Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
cnt0

    if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
    then do
        AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
        let period :: NanoSecond64
period = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
        IORef (Count, AbsTime) -> (Count, AbsTime) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
        Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64)))
-> Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a b. (a -> b) -> a -> b
$ (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
    else Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing

-- XXX There are a number of gotchas in measuring latencies.
-- 1) We measure latencies only when a worker yields a value
-- 2) It is possible that a stream calls the stop continuation, in which case
-- the worker would not yield a value and we would not account that worker in
-- latencies. Even though this case should ideally be accounted we do not
-- account it because we cannot or do not distinguish it from the case
-- described next.
-- 3) It is possible that a worker returns without yielding anything because it
-- never got a chance to pick up work.
-- 4) If the system timer resolution is lower than the latency, the latency
-- computation turns out to be zero.
--
-- We can fix this if we measure the latencies by counting the work items
-- picked rather than based on the outputs yielded.
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
    Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
    case Maybe (Count, NanoSecond64)
r of
        Just (Count
cnt, NanoSecond64
period) -> do
        -- NOTE: On JS platform the timer resolution could be pretty low. When
        -- the timer resolution is low, measurement of latencies could be
        -- tricky. All the worker latencies will turn out to be zero if they
        -- are lower than the resolution. We only take into account those
        -- measurements which are more than the timer resolution.

            let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
                (Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
            IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref (((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
 -> IO ())
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a b. (a -> b) -> a -> b
$
                    \(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt, Count
n Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
        Maybe (Count, NanoSecond64)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-------------------------------------------------------------------------------
-- Worker rate control
-------------------------------------------------------------------------------

-- We either block, or send one worker with limited yield count or one or more
-- workers with unlimited yield count.
data Work
    = BlockWait NanoSecond64
    | PartialWorker Count
    | ManyWorkers Int Count
    deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
(Int -> Work -> ShowS)
-> (Work -> String) -> ([Work] -> ShowS) -> Show Work
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Work -> ShowS
showsPrec :: Int -> Work -> ShowS
$cshow :: Work -> String
show :: Work -> String
$cshowList :: [Work] -> ShowS
showList :: [Work] -> ShowS
Show

-- | Another magic number! When we have to start more workers to cover up a
-- number of yields that we are lagging by then we cannot start one worker for
-- each yield because that may be a very big number and if the latency of the
-- workers is low these number of yields could be very high. We assume that we
-- run each extra worker for at least this much time.
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000

-- | Get the worker latency without resetting workerPendingLatency
-- Returns (total yield count, base time, measured latency)
-- CAUTION! keep it in sync with collectLatency
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo  = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo

    (Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
    (Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
    (Count
lcount, AbsTime
ltime)     <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
    NanoSecond64
prevLat             <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured

    let latCount :: Count
latCount = Count
colCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curCount
        latTime :: NanoSecond64
latTime  = NanoSecond64
colTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
        totalCount :: Count
totalCount = Count
colTotalCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curTotalCount
        newLat :: NanoSecond64
newLat =
            if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
            then let lat :: NanoSecond64
lat = NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
                 -- XXX Give more weight to new?
                 in (NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
            else NanoSecond64
prevLat
    (Count, AbsTime, NanoSecond64) -> IO (Count, AbsTime, NanoSecond64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)

-- XXX we can use phantom types to distinguish the duration/latency/expectedLat
estimateWorkers
    :: Limit
    -> Count
    -> Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> LatencyRange
    -> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
                NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
    -- XXX we can have a maxEfficiency combinator as well which runs the
    -- producer at the maximal efficiency i.e. the number of workers are chosen
    -- such that the latency is minimum or within a range. Or we can call it
    -- maxWorkerLatency.
    --
    let
        -- How many workers do we need to achieve the required rate?
        --
        -- When the workers are IO bound we can increase the throughput by
        -- increasing the number of workers as long as the IO device has enough
        -- capacity to process all the requests concurrently. If the IO
        -- bandwidth is saturated increasing the workers won't help. Also, if
        -- the CPU utilization in processing all these requests exceeds the CPU
        -- bandwidth, then increasing the number of workers won't help.
        --
        -- When the workers are purely CPU bound, increasing the workers beyond
        -- the number of CPUs won't help.
        --
        -- TODO - measure the CPU and IO requirements of the workers. Have a
        -- way to specify the max bandwidth of the underlying IO mechanism and
        -- use that to determine the max rate of workers, and also take the CPU
        -- bandwidth into account. We can also discover the IO bandwidth if we
        -- know that we are not CPU bound, then how much steady state rate are
        -- we able to achieve. Design tests for CPU bound and IO bound cases.

        -- Calculate how many yields are we ahead or behind to match the exact
        -- required rate. Based on that we increase or decrease the effective
        -- workers.
        --
        -- When the worker latency is lower than required latency we begin with
        -- a yield and then wait rather than first waiting and then yielding.
        targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
1) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
        effectiveYields :: Count
effectiveYields = Count
svarYields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
gainLossYields
        deltaYields :: Count
deltaYields = NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
effectiveYields

        -- We recover the deficit by running at a higher/lower rate for a
        -- certain amount of time. To keep the effective rate in reasonable
        -- limits we use rateRecoveryTime, minLatency and maxLatency.
        in  if Count
deltaYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
            then
                let deltaYieldsFreq :: Double
                    deltaYieldsFreq :: Double
deltaYieldsFreq =
                        Count -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/
                            NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
                    yieldsFreq :: Double
yieldsFreq = Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
                    totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
                    requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 (Int64 -> NanoSecond64) -> Int64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ Double -> Int64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
                    adjustedLat :: NanoSecond64
adjustedLat = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
                                      (LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
                in  Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
                    if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
                    then Count -> Work
PartialWorker Count
deltaYields
                    else let workers :: NanoSecond64
workers = NanoSecond64 -> NanoSecond64
forall {p}. (Ord p, Num p) => p -> p
withLimit (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
                             limited :: NanoSecond64
limited = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
workers (Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
                         in Int -> Count -> Work
ManyWorkers (NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
            else
                let expectedDuration :: NanoSecond64
expectedDuration = Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
                    sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
                    maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
                    s :: NanoSecond64
s = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
                in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
                    -- if s is less than 0 it means our maxSleepTime is less
                    -- than the worker latency.
                    if NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
    where
        withLimit :: p -> p
withLimit p
n =
            case Limit
workerLimit of
                Limit
Unlimited -> p
n
                Limited Word
x -> p -> p -> p
forall a. Ord a => a -> a -> a
min p
n (Word -> p
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)

isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo = do
    (Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
rateInfo
    AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
    let duration :: NanoSecond64
duration = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
    let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
rateInfo
    Count
gainLoss <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
rateInfo)
    let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
count Count
gainLoss NanoSecond64
duration
                               NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
rateInfo)
    Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
workerCount
    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case Work
work of
        -- XXX set the worker's maxYields or polling interval based on yields
        PartialWorker Count
_yields -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
        ManyWorkers Int
n Count
_ -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
        BlockWait NanoSecond64
_ -> Bool
True

-- XXX we should do rate control periodically based on the total yields rather
-- than based on the worker local yields as other workers may have yielded more
-- and we should stop based on the aggregate yields. However, latency update
-- period can be based on individual worker yields.
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic ::
       Limit
    -> IORef Int
    -> YieldRateInfo
    -> WorkerInfo
    -> Count
    -> IO Bool
checkRatePeriodic :: Limit
-> IORef Int -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo Count
ycnt = do
    Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
rateInfo)
    -- XXX use generation count to check if the interval has been updated
    if Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt Count -> Count -> Count
forall a. Integral a => a -> a -> a
`mod` Count
i) Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0
    then do
        YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
rateInfo WorkerInfo
workerInfo
        -- XXX not required for parallel streams
        Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo
    else Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- | CAUTION! this also updates the yield count and therefore should be called
-- only when we are actually yielding an element.
{-# NOINLINE workerRateControl #-}
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo = do
    Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
workerInfo
    Bool
beyondMaxRate <- Limit
-> IORef Int -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo Count
cnt
    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
workerInfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)

-------------------------------------------------------------------------------
-- Send a yield event
-------------------------------------------------------------------------------

-- XXX we should do rate control here but not latency update in case of ahead
-- streams. latency update must be done when we yield directly to outputQueue
-- or when we yield to heap.

-- | Returns whether the worker should continue (True) or stop (False).
{-# INLINE sendYield #-}
sendYield ::
       Limit
    -> Limit
    -> IORef Int
    -> Maybe WorkerInfo
    -> Maybe YieldRateInfo
    -> IORef ([ChildEvent a], Int)
    -> MVar ()
    -> ChildEvent a
    -> IO Bool
sendYield :: forall a.
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield Limit
bufferLimit Limit
workerLimit IORef Int
workerCount Maybe WorkerInfo
workerInfo Maybe YieldRateInfo
rateInfo IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg =
    do
    Int
oldlen <- IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg
    Bool
bufferSpaceOk <-
        case Limit
bufferLimit of
            Limit
Unlimited -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Limited Word
lim -> do
                Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
workerCount
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ (Int
oldlen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
active)
    Bool
rateLimitOk <-
        case Maybe WorkerInfo
workerInfo of
            Just WorkerInfo
winfo ->
                case Maybe YieldRateInfo
rateInfo of
                    Maybe YieldRateInfo
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                    Just YieldRateInfo
yinfo ->
                        Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl Limit
workerLimit IORef Int
workerCount YieldRateInfo
yinfo WorkerInfo
winfo
            Maybe WorkerInfo
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk

-------------------------------------------------------------------------------
-- Send a Stop event
-------------------------------------------------------------------------------

{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
    Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo

{-# INLINABLE sendStop #-}
sendStop ::
       IORef Int
    -> Maybe WorkerInfo
    -> Maybe YieldRateInfo
    -> IORef ([ChildEvent a], Int)
    -> MVar ()
    -> IO ()
sendStop :: forall a.
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop IORef Int
workerCount Maybe WorkerInfo
workerInfo Maybe YieldRateInfo
rateInfo IORef ([ChildEvent a], Int)
q MVar ()
bell = do
    IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Int
workerCount ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
    case (Maybe WorkerInfo
workerInfo, Maybe YieldRateInfo
rateInfo) of
      (Just WorkerInfo
winfo, Just YieldRateInfo
rinfo) ->
          WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
rinfo
      (Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
          () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid ->
        IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)

{-# NOINLINE handleChildException #-}
handleChildException ::
    IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException IORef ([ChildEvent a], Int)
q MVar ()
bell SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))