-- |
-- Module      : Streamly.Internal.Data.Channel.Dispatcher
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Channel.Dispatcher
    (
    -- ** Latency collection
      minThreadDelay
    , collectLatency

    -- ** Thread accounting
    , addThread
    , delThread
    , modifyThread
    , allThreadsDone
    , recordMaxWorkers

    -- ** Diagnostics
    , dumpSVarStats
    )
where

import Data.Set (Set)
import Control.Concurrent (MVar, ThreadId)
import Control.Concurrent.MVar (tryPutMVar)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (IORef, modifyIORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       ( AbsTime, NanoSecond64(..), diffAbsTime64, showNanoSecond64
       , showRelTime64)

import qualified Data.Set as S

import Streamly.Internal.Data.Channel.Types

-------------------------------------------------------------------------------
-- Worker latency data processing
-------------------------------------------------------------------------------

-- | This is a magic number and it is overloaded, and used at several places to
-- achieve batching:
--
-- 1. If we have to sleep to slowdown this is the minimum period that we
--    accumulate before we sleep. Also, workers do not stop until this much
--    sleep time is accumulated.
-- 3. Collected latencies are computed and transferred to measured latency
--    after a minimum of this period.
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000

-- | Every once in a while workers update the latencies and check the yield rate.
-- They return if we are above the expected yield rate. If we check too often
-- it may impact performance, if we check less often we may have a stale
-- picture. We update every minThreadDelay but we translate that into a yield
-- count based on latency so that the checking overhead is little.
--
-- XXX use a generation count to indicate that the value is updated. If the
-- value is updated an existing worker must check it again on the next yield.
-- Otherwise it is possible that we may keep updating it and because of the mod
-- worker keeps skipping it.
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo NanoSecond64
latency = do
    let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
        -- This depends on the rate, if the rate is low, latencies are
        -- small, by the time we poll it might be too late and we may have
        -- yielded too many results.
        -- cnt = max 1 $ minThreadDelay `div` latency
        cnt :: NanoSecond64
cnt = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
1 (NanoSecond64
latency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo)
        period :: NanoSecond64
period = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
cnt (Word -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)

    IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef (NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
period)

{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVarStats -> NanoSecond64 -> IO ()
recordMinMaxLatency :: SVarStats -> NanoSecond64 -> IO ()
recordMinMaxLatency SVarStats
ss NanoSecond64
new = do
    NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
< NanoSecond64
minLat Bool -> Bool -> Bool
|| NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss) NanoSecond64
new

    NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
maxLat) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss) NanoSecond64
new

recordAvgLatency :: SVarStats -> (Count, NanoSecond64) -> IO ()
recordAvgLatency :: SVarStats -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVarStats
ss (Count
count, NanoSecond64
time) = do
    IORef (Count, NanoSecond64)
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss) (((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ())
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a b. (a -> b) -> a -> b
$
        \(Count
cnt, NanoSecond64
t) -> (Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time)

-- | Add the 'workerPendingLatency' to 'workerCollectedLatency' and reset it to
-- zeroes. Return the added counts.
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
    :: IORef (Count, Count, NanoSecond64) -- ^ 'workerPendingLatency'
    -> IORef (Count, Count, NanoSecond64) -- ^ 'workerCollectedLatency'
    -> IO (Count, Maybe (Count, NanoSecond64))
    -- ^ (total yield count, Maybe (total latency count, total latency time)).
    -- Latency count and time are reported only when both are non-zero to avoid
    -- arithemetic exceptions in calculations.
collectWorkerPendingLatency :: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col = do
    (Count
fcount, Count
count, NanoSecond64
time) <- IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64)
    -> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Count, Count, NanoSecond64)
cur (((Count, Count, NanoSecond64)
  -> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
 -> IO (Count, Count, NanoSecond64))
-> ((Count, Count, NanoSecond64)
    -> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ \(Count, Count, NanoSecond64)
v -> ((Count
0,Count
0,NanoSecond64
0), (Count, Count, NanoSecond64)
v)

    (Count
fcnt, Count
cnt, NanoSecond64
t) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
    let totalCount :: Count
totalCount = Count
fcnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
fcount
        latCount :: Count
latCount   = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count
        latTime :: NanoSecond64
latTime    = NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time
    IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
totalCount, Count
latCount, NanoSecond64
latTime)

    Bool -> IO () -> IO ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
latCount Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0 Bool -> Bool -> Bool
|| NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
/= NanoSecond64
0) (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    let latPair :: Maybe (Count, NanoSecond64)
latPair =
            if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
            then (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
latCount, NanoSecond64
latTime)
            else Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
    (Count, Maybe (Count, NanoSecond64))
-> IO (Count, Maybe (Count, NanoSecond64))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
totalCount, Maybe (Count, NanoSecond64)
latPair)

{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
    :: Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> Bool
shouldUseCollectedBatch :: Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
collectedYields NanoSecond64
collectedTime NanoSecond64
newLat NanoSecond64
prevLat =
    let r :: Double
r = NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
newLat Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
prevLat :: Double
    in     (Count
collectedYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
        Bool -> Bool -> Bool
|| (NanoSecond64
collectedTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
minThreadDelay)
        Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 Bool -> Bool -> Bool
&& (Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
2 Bool -> Bool -> Bool
|| Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.5))
        Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0)

-- CAUTION! keep it in sync with getWorkerLatency

-- | Always moves 'workerPendingLatency' to 'workerCollectedLatency':
--
--  * 'workerCollectedLatency' always incremented by 'workerPendingLatency'
--  * 'workerPendingLatency' always reset to 0
--
-- Moves 'workerCollectedLatency' to 'svarAllTimeLatency' periodically, when
-- the collected batch size hits a limit, or time limit is over, or latency
-- changes beyond a limit. Updates done when the batch is collected:
--
--  * 'svarAllTimeLatency' yield count updated
--  * 'workerMeasuredLatency' set to (new+prev)/2
--  * 'workerPollingInterval' set using max of new/prev worker latency
--  * 'workerCollectedLatency' reset to 0
--
-- See also 'getWorkerLatency'.
--
collectLatency ::
       Bool -- ^ stat inspection mode
    -> SVarStats -- ^ Channel stats
    -> YieldRateInfo -- ^ Channel rate control info
    -> Bool -- ^ Force batch collection
    -> IO (Count, AbsTime, NanoSecond64)
    -- ^ (channel yield count since beginning, beginning timestamp, 'workerMeasuredLatency')
collectLatency :: Bool
-> SVarStats
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency Bool
inspecting SVarStats
ss YieldRateInfo
yinfo Bool
drain = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo

    (Count
newCount, Maybe (Count, NanoSecond64)
newLatPair) <- IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col
    (Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
    NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured

    let newLcount :: Count
newLcount = Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
newCount
        retWith :: c -> m (Count, AbsTime, c)
retWith c
lat = (Count, AbsTime, c) -> m (Count, AbsTime, c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
newLcount, AbsTime
ltime, c
lat)

    case Maybe (Count, NanoSecond64)
newLatPair of
        Maybe (Count, NanoSecond64)
Nothing -> NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
        Just (Count
count, NanoSecond64
time) -> do
            let newLat :: NanoSecond64
newLat = NanoSecond64
time NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
count
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
inspecting (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVarStats -> NanoSecond64 -> IO ()
recordMinMaxLatency SVarStats
ss NanoSecond64
newLat
            -- When we have collected a significant sized batch we compute the
            -- new latency using that batch and return the new latency,
            -- otherwise we return the previous latency derived from the
            -- previous batch.
            if Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
newCount NanoSecond64
time NanoSecond64
newLat NanoSecond64
prevLat Bool -> Bool -> Bool
|| Bool
drain
            then do
                -- XXX make this NOINLINE?
                YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
newLat NanoSecond64
prevLat)
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
inspecting (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVarStats -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVarStats
ss (Count
count, NanoSecond64
time)
                IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
0, Count
0, NanoSecond64
0)
                IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef NanoSecond64
measured ((NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
newLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2)
                IORef (Count, AbsTime)
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Count, AbsTime)
longTerm (((Count, AbsTime) -> (Count, AbsTime)) -> IO ())
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Count
_, AbsTime
t) -> (Count
newLcount, AbsTime
t)
                NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
newLat
            else NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat

-------------------------------------------------------------------------------
-- Dumping the SVar for debug/diag
-------------------------------------------------------------------------------

dumpSVarStats :: Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats :: Bool -> Maybe YieldRateInfo -> SVarStats -> IO [Char]
dumpSVarStats Bool
inspecting Maybe YieldRateInfo
rateInfo SVarStats
ss = do
    case Maybe YieldRateInfo
rateInfo of
        Maybe YieldRateInfo
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just YieldRateInfo
yinfo -> do
            (Count, AbsTime, NanoSecond64)
_ <- IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
 -> IO (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ Bool
-> SVarStats
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency Bool
inspecting SVarStats
ss YieldRateInfo
yinfo Bool
True
            () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    Int
dispatches <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
totalDispatches SVarStats
ss
    Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxWorkers SVarStats
ss
    Int
maxOq <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxOutQSize SVarStats
ss
    -- maxHp <- readIORef $ maxHeapSize ss
    NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss
    NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss
    (Count
avgCnt, NanoSecond64
avgTime) <- IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef (IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64))
-> IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss
    Maybe AbsTime
stopTime <- IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime SVarStats
ss)
    let stopReason :: [Char]
stopReason =
            case Maybe AbsTime
stopTime of
                Maybe AbsTime
Nothing -> [Char]
"on GC"
                Just AbsTime
_ -> [Char]
"normal"
    (Count
svarCnt, Count
svarGainLossCnt, RelTime64
svarLat, RelTime64
interval) <- case Maybe YieldRateInfo
rateInfo of
        Maybe YieldRateInfo
Nothing -> (Count, Count, RelTime64, RelTime64)
-> IO (Count, Count, RelTime64, RelTime64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0, RelTime64
0)
        Just YieldRateInfo
yinfo -> do
            (Count
cnt, AbsTime
startTime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (IORef (Count, AbsTime) -> IO (Count, AbsTime))
-> IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
            RelTime64
interval <-
                case Maybe AbsTime
stopTime of
                    Maybe AbsTime
Nothing -> do
                        AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
                        RelTime64 -> IO RelTime64
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
startTime)
                    Just AbsTime
t -> do
                        RelTime64 -> IO RelTime64
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t AbsTime
startTime)
            if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
            then do
                Count
gl <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
                (Count, Count, RelTime64, RelTime64)
-> IO (Count, Count, RelTime64, RelTime64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt, RelTime64
interval)
            else (Count, Count, RelTime64, RelTime64)
-> IO (Count, Count, RelTime64, RelTime64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0, RelTime64
interval)

    [Char] -> IO [Char]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Char] -> IO [Char]) -> [Char] -> IO [Char]
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
        [ [Char]
"stop reason = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> [Char]
stopReason
        ,  if RelTime64
interval RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
> RelTime64
0
           then [Char]
"\nlife time = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> RelTime64 -> [Char]
showRelTime64 RelTime64
interval
           else [Char]
""
        , [Char]
"\ntotal dispatches = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Int -> [Char]
forall a. Show a => a -> [Char]
show Int
dispatches
        , [Char]
"\nmax workers = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxWrk
        , [Char]
"\nmax outQSize = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxOq
        , if NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
          then [Char]
"\nmin worker latency = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> [Char]
showNanoSecond64 NanoSecond64
minLat
          else [Char]
""
        , if NanoSecond64
maxLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
          then [Char]
"\nmax worker latency = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> [Char]
showNanoSecond64 NanoSecond64
maxLat
          else [Char]
""
        , if Count
avgCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
          then let lat :: NanoSecond64
lat = NanoSecond64
avgTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
avgCnt
                in [Char]
"\navg worker latency = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> [Char]
showNanoSecond64 NanoSecond64
lat
          else [Char]
""
        , if RelTime64
svarLat RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
> RelTime64
0
          then [Char]
"\nchannel latency = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> RelTime64 -> [Char]
showRelTime64 RelTime64
svarLat
          else [Char]
""
        , if Count
svarCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
          then [Char]
"\nchannel yield count = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Count -> [Char]
forall a. Show a => a -> [Char]
show Count
svarCnt
          else [Char]
""
        , if Count
svarGainLossCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
          then [Char]
"\nchannel gain/loss yield count = " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Count -> [Char]
forall a. Show a => a -> [Char]
show Count
svarGainLossCnt
          else [Char]
""
        ]

-------------------------------------------------------------------------------
-- Thread accounting
-------------------------------------------------------------------------------

-- Thread tracking is needed for two reasons:
--
-- 1) Killing threads on exceptions. Threads may not be left to go away by
-- themselves because they may run for significant times before going away or
-- worse they may be stuck in IO and never go away.
--
-- 2) To know when all threads are done and the stream has ended.

{-# NOINLINE addThread #-}
addThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
addThread :: forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> ThreadId -> m ()
addThread IORef (Set ThreadId)
workerSet ThreadId
tid =
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Set ThreadId)
workerSet (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid)

-- This is cheaper than modifyThread because we do not have to send a
-- outputDoorBell This can make a difference when more workers are being
-- dispatched.
{-# INLINE delThread #-}
delThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
delThread :: forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> ThreadId -> m ()
delThread IORef (Set ThreadId)
workerSet ThreadId
tid =
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Set ThreadId)
workerSet (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid)

-- If present then delete else add. This takes care of out of order add and
-- delete i.e. a delete arriving before we even added a thread.
-- This occurs when the forked thread is done even before the 'addThread' right
-- after the fork gets a chance to run.
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread :: forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread IORef (Set ThreadId)
workerSet MVar ()
bell ThreadId
tid = do
    Set ThreadId
changed <- IO (Set ThreadId) -> m (Set ThreadId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Set ThreadId) -> m (Set ThreadId))
-> IO (Set ThreadId) -> m (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId)
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Set ThreadId)
workerSet ((Set ThreadId -> (Set ThreadId, Set ThreadId))
 -> IO (Set ThreadId))
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ \Set ThreadId
old ->
        if ThreadId -> Set ThreadId -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member ThreadId
tid Set ThreadId
old
        then let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
new)
        else let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
old)
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set ThreadId -> Bool
forall a. Set a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set ThreadId
changed) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
         IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            IO ()
writeBarrier
            IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()

-- | This is safe even if we are adding more threads concurrently because if
-- a child thread is adding another thread then anyway 'workerThreads' will
-- not be empty.
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone :: forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone IORef (Set ThreadId)
ref = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> Bool
forall a. Set a -> Bool
S.null (Set ThreadId -> Bool) -> IO (Set ThreadId) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef IORef (Set ThreadId)
ref

-------------------------------------------------------------------------------
-- Dispatching workers
-------------------------------------------------------------------------------

{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => IORef Int -> SVarStats -> m ()
recordMaxWorkers :: forall (m :: * -> *). MonadIO m => IORef Int -> SVarStats -> m ()
recordMaxWorkers IORef Int
countRef SVarStats
ss = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
countRef
    Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxWorkers SVarStats
ss)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxWrk) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxWorkers SVarStats
ss) Int
active
    IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef Int
totalDispatches SVarStats
ss) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)