module Streamly.Internal.Data.SVar.Worker
(
decrementYieldLimit
, incrementYieldLimit
, decrementBufferLimit
, incrementBufferLimit
, resetBufferLimit
, Work (..)
, isBeyondMaxRate
, estimateWorkers
, updateYieldCount
, minThreadDelay
, workerRateControl
, workerUpdateLatency
, send
, ringDoorBell
, sendYield
, sendToProducer
, sendStop
, sendStopToProducer
, handleChildException
, handleFoldException
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (MVar, tryPutMVar)
import Control.Exception (SomeException(..), assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (IORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), diffAbsTime64, fromRelTime64)
import Streamly.Internal.Data.SVar.Type
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
sv =
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just IORef Count
ref -> do
Count
r <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1, Count
x)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Count
r Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar t m a
sv =
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just IORef Count
ref -> IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)
{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
sv =
case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
let ref :: IORef Count
ref = SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
Count
old <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
old Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
case SVar t m a -> PushBufferPolicy
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy SVar t m a
sv of
PushBufferPolicy
PushBufferBlock -> IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropNew -> do
Bool
block <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$
\([ChildEvent a]
es, Int
n) ->
case [ChildEvent a]
es of
[] -> (([],Int
n), Bool
True)
ChildEvent a
_ : [ChildEvent a]
xs -> (([ChildEvent a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1), Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
block IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropOld -> IO ()
forall a. (?callStack::CallStack) => a
undefined
where
blockAndRetry :: IO ()
blockAndRetry = do
let ref :: IORef Count
ref = SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
Count
old <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
if Count
old Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1
then IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO Bool
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
else IO ()
blockAndRetry
{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar t m a
sv =
case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO Bool
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv =
case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
n -> IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv)
(Count -> Count -> Count
forall a b. a -> b -> a
const (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n))
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
Count
cnt <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt1 :: Count
cnt1 = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1
IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
Count -> IO Count
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
in Count
ymax Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
ymax
{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv = do
IO ()
storeLoadBarrier
Bool
w <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) (Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False)
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
Int
oldlen <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Int)) -> IO Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
((ChildEvent a
msg ChildEvent a -> [ChildEvent a] -> [ChildEvent a]
forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Int
n)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen
send :: SVar t m a -> ChildEvent a -> IO Int
send :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv = IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv ChildEvent a
msg = do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar t m a
sv)
(SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar t m a
sv) ChildEvent a
msg
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
(Count
cnt0, AbsTime
t0) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
Count
cnt1 <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt :: Count
cnt = Count
cnt1 Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
cnt0
if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then do
AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
let period :: NanoSecond64
period = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
IORef (Count, AbsTime) -> (Count, AbsTime) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64)))
-> Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a b. (a -> b) -> a -> b
$ (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
else Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
case Maybe (Count, NanoSecond64)
r of
Just (Count
cnt, NanoSecond64
period) -> do
let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
(Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref (((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ())
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a b. (a -> b) -> a -> b
$
\(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt, Count
n Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
Maybe (Count, NanoSecond64)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data Work
= BlockWait NanoSecond64
| PartialWorker Count
| ManyWorkers Int Count
deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
(Int -> Work -> ShowS)
-> (Work -> String) -> ([Work] -> ShowS) -> Show Work
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Work -> ShowS
showsPrec :: Int -> Work -> ShowS
$cshow :: Work -> String
show :: Work -> String
$cshowList :: [Work] -> ShowS
showList :: [Work] -> ShowS
Show
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
(Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let latCount :: Count
latCount = Count
colCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curCount
latTime :: NanoSecond64
latTime = NanoSecond64
colTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
totalCount :: Count
totalCount = Count
colTotalCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curTotalCount
newLat :: NanoSecond64
newLat =
if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then let lat :: NanoSecond64
lat = NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
in (NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
else NanoSecond64
prevLat
(Count, AbsTime, NanoSecond64) -> IO (Count, AbsTime, NanoSecond64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
let
targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
1) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
effectiveYields :: Count
effectiveYields = Count
svarYields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
gainLossYields
deltaYields :: Count
deltaYields = NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
effectiveYields
in if Count
deltaYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq :: Double
deltaYieldsFreq =
Count -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/
NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
yieldsFreq :: Double
yieldsFreq = Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 (Int64 -> NanoSecond64) -> Int64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ Double -> Int64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
adjustedLat :: NanoSecond64
adjustedLat = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
(LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
then Count -> Work
PartialWorker Count
deltaYields
else let workers :: NanoSecond64
workers = NanoSecond64 -> NanoSecond64
forall {p}. (Ord p, Num p) => p -> p
withLimit (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
limited :: NanoSecond64
limited = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
workers (Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
in Int -> Count -> Work
ManyWorkers (NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
else
let expectedDuration :: NanoSecond64
expectedDuration = Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
s :: NanoSecond64
s = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
if NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
where
withLimit :: p -> p
withLimit p
n =
case Limit
workerLimit of
Limit
Unlimited -> p
n
Limited Word
x -> p -> p -> p
forall a. Ord a => a -> a -> a
min p
n (Word -> p
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo = do
(Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let duration :: NanoSecond64
duration = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
Count
gainLoss <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers (SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv) Count
count Count
gainLoss NanoSecond64
duration
NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo)
Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case Work
work of
PartialWorker Count
_yields -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
ManyWorkers Int
n Count
_ -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
BlockWait NanoSecond64
_ -> Bool
True
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
ycnt = do
Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo)
if Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt Count -> Count -> Count
forall a. Integral a => a -> a -> a
`mod` Count
i) Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0
then do
YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo
SVar t m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo
else Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo = do
Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo
Bool
beyondMaxRate <- SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
cnt
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar t m a
sv Maybe WorkerInfo
mwinfo ChildEvent a
msg = do
Int
oldlen <- SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg
let limit :: Limit
limit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv
Bool
bufferSpaceOk <- case Limit
limit of
Limit
Unlimited -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ (Int
oldlen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
active)
Bool
rateLimitOk <-
case Maybe WorkerInfo
mwinfo of
Just WorkerInfo
winfo ->
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just YieldRateInfo
yinfo -> SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo
Maybe WorkerInfo
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
sv Maybe WorkerInfo
mwinfo = do
IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
case (Maybe WorkerInfo
mwinfo, SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv) of
(Just WorkerInfo
winfo, Just YieldRateInfo
info) ->
WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info
(Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)
{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))