module Streamly.Internal.Data.Fold.Channel.Type
(
Channel (..)
, OutEvent (..)
, Config
, maxBuffer
, boundThreads
, inspect
, newChannelWith
, newChannelWithScan
, newChannel
, newScanChannel
, sendToWorker
, sendToWorker_
, checkFoldStatus
, dumpChannel
, cleanup
, finalize
)
where
#include "inline.hs"
import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void, when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
(MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as D
import Streamly.Internal.Data.Channel.Types
data OutEvent b =
FoldException ThreadId SomeException
| FoldPartial b
| FoldDone ThreadId b
data Channel m a b = Channel
{
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer :: Limit
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ :: m [ChildEvent a]
, forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue :: IORef ([OutEvent b], Int)
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
}
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a b -> IO String
dumpChannel :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
sv = do
[String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a b -> ThreadId
forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
, MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
sv)
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) Maybe YieldRateInfo
forall a. Maybe a
Nothing (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
]
String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs
sendToDriver :: Channel m a b -> OutEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv OutEvent b
msg = do
IORef ([OutEvent b], Int) -> MVar () -> OutEvent b -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv) OutEvent b
msg
sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> b -> OutEvent b
forall b. ThreadId -> b -> OutEvent b
FoldDone ThreadId
tid b
res)
sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (b -> OutEvent b
forall b. b -> OutEvent b
FoldPartial b
res)
{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> SomeException -> OutEvent b
forall b. ThreadId -> SomeException -> OutEvent b
FoldException ThreadId
tid SomeException
e)
data FromSVarState m a b =
FromSVarRead (Channel m a b)
| FromSVarLoop (Channel m a b) [ChildEvent a]
{-# INLINE_NORMAL fromInputQueue #-}
fromInputQueue :: MonadIO m => Channel m a b -> D.Stream m a
fromInputQueue :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
svar = (State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a))
-> FromSVarState m a b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
[ChildEvent a]
list <- Channel m a b -> m [ChildEvent a]
forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ Channel m a b
sv
Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop Channel m a b
sv []) = Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
ChildEvent a
ChildStopChannel -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState m a b) a
forall s a. Step s a
D.Stop
ChildEvent a
_ -> m (Step (FromSVarState m a b) a)
forall a. HasCallStack => a
undefined
{-# INLINE readInputQChan #-}
readInputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan = do
let ss :: Maybe SVarStats
ss = if Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else Maybe SVarStats
forall a. Maybe a
Nothing
r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then do
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
(Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan)
String
"readInputQChan: nothing to do"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
else ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
{-# INLINE readInputQWithDB #-}
readInputQWithDB :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
chan = do
([ChildEvent a], Int)
r <- Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan
Bool
_ <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan) ()
([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
mkNewChannelWith :: forall m a b. MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> Config
-> IO (Channel m a b)
mkNewChannelWith :: forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar ()
bufferMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: Channel m a b -> Channel m a b
getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
{ inputQueue :: IORef ([ChildEvent a], Int)
inputQueue = IORef ([ChildEvent a], Int)
outQ
, inputItemDoorBell :: MVar ()
inputItemDoorBell = MVar ()
outQMv
, outputQueue :: IORef ([OutEvent b], Int)
outputQueue = IORef ([OutEvent b], Int)
outQRev
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMvRev
, inputSpaceDoorBell :: MVar ()
inputSpaceDoorBell = MVar ()
bufferMv
, maxInputBuffer :: Limit
maxInputBuffer = Config -> Limit
getMaxBuffer Config
cfg
, readInputQ :: m [ChildEvent a]
readInputQ = IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
sv)
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in Channel m a b -> IO (Channel m a b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv
{-# INLINABLE newChannelWith #-}
{-# SPECIALIZE newChannelWith ::
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold IO a b
-> IO (Channel IO a b, ThreadId) #-}
newChannelWith :: (MonadRunInIO m) =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Fold m a b
f = do
let config :: Config
config = Config -> Config
modifier Config
defaultConfig
Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
(Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
(Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)
where
{-# NOINLINE work #-}
work :: Channel m a b -> m ()
work Channel m a b
chan =
let f1 :: Fold m a ()
f1 = (b -> m ()) -> Fold m a b -> Fold m a ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (b -> m ()) -> b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
in Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan
{-# INLINE scanToChannel #-}
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel :: forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel Channel m a b
chan (Scanl s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) =
(s -> a -> m (Step s ()))
-> m (Step s ()) -> (s -> m ()) -> (s -> m ()) -> Scanl m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl s -> a -> m (Step s ())
step1 m (Step s ())
initial1 s -> m ()
forall {m :: * -> *} {p}. Monad m => p -> m ()
extract1 s -> m ()
final1
where
initial1 :: m (Step s ())
initial1 = do
Step s b
r <- m (Step s b)
initial
case Step s b
r of
Fold.Partial s
s -> do
b
b <- s -> m b
extract s
s
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
Step s () -> m (Step s ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s () -> m (Step s ())) -> Step s () -> m (Step s ())
forall a b. (a -> b) -> a -> b
$ s -> Step s ()
forall s b. s -> Step s b
Fold.Partial s
s
Fold.Done b
b ->
() -> Step s ()
forall s b. b -> Step s b
Fold.Done (() -> Step s ()) -> m () -> m (Step s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b)
step1 :: s -> a -> m (Step s ())
step1 s
st a
x = do
Step s b
r <- s -> a -> m (Step s b)
step s
st a
x
case Step s b
r of
Fold.Partial s
s -> do
b
b <- s -> m b
extract s
s
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
Step s () -> m (Step s ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s () -> m (Step s ())) -> Step s () -> m (Step s ())
forall a b. (a -> b) -> a -> b
$ s -> Step s ()
forall s b. s -> Step s b
Fold.Partial s
s
Fold.Done b
b ->
() -> Step s ()
forall s b. b -> Step s b
Fold.Done (() -> Step s ()) -> m () -> m (Step s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b)
extract1 :: p -> m ()
extract1 p
_ = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
final1 :: s -> m ()
final1 s
st = m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (s -> m b
final s
st)
{-# INLINABLE newChannelWithScan #-}
{-# SPECIALIZE newChannelWithScan ::
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl IO a b
-> IO (Channel IO a b, ThreadId) #-}
newChannelWithScan :: (MonadRunInIO m) =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Scanl m a b
f = do
let config :: Config
config = Config -> Config
modifier Config
defaultConfig
Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
(Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
(Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)
where
{-# NOINLINE work #-}
work :: Channel m a b -> m ()
work Channel m a b
chan = Stream m () -> m ()
forall (m :: * -> *) a. Monad m => Stream m a -> m ()
D.drain (Stream m () -> m ()) -> Stream m () -> m ()
forall a b. (a -> b) -> a -> b
$ Scanl m a () -> Stream m a -> Stream m ()
forall (m :: * -> *) a b.
Monad m =>
Scanl m a b -> Stream m a -> Stream m b
D.scanl (Channel m a b -> Scanl m a b -> Scanl m a ()
forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel Channel m a b
chan Scanl m a b
f) (Stream m a -> Stream m ()) -> Stream m a -> Stream m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
(Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Fold m a b
f)
{-# INLINABLE newScanChannel #-}
{-# SPECIALIZE newScanChannel ::
(Config -> Config) -> Scanl IO a b -> IO (Channel IO a b) #-}
newScanChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
modifier Scanl m a b
f = do
IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Scanl m a b
f)
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
([OutEvent b]
list, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
[OutEvent b] -> m (Maybe b)
forall {m :: * -> *} {a}.
MonadThrow m =>
[OutEvent a] -> m (Maybe a)
processEvents ([OutEvent b] -> m (Maybe b)) -> [OutEvent b] -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ [OutEvent b] -> [OutEvent b]
forall a. [a] -> [a]
reverse [OutEvent b]
list
where
{-# INLINE processEvents #-}
processEvents :: [OutEvent a] -> m (Maybe a)
processEvents [] = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
processEvents (OutEvent a
ev : [OutEvent a]
_) = do
case OutEvent a
ev of
FoldException ThreadId
_ SomeException
e -> SomeException -> m (Maybe a)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
e
FoldDone ThreadId
_ a
b -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
b)
FoldPartial a
_ -> m (Maybe a)
forall a. HasCallStack => a
undefined
{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
let limit :: Limit
limit = Channel m a b -> Limit
forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer Channel m a b
sv
case Limit
limit of
Limit
Unlimited -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go
where
go :: m (Maybe b)
go = do
let qref :: IORef ([OutEvent b], Int)
qref = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
chan
Maybe b
status <- do
([OutEvent b]
_, Int
n) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent b], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
else Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
case Maybe b
status of
Just b
_ -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
Maybe b
Nothing -> do
Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
if Bool
r
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
(a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
else do
() <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan)
m (Maybe b)
go
{-# INLINE sendToWorker_ #-}
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
sendToWorker_ :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
chan a
a = m ()
go
where
go :: m ()
go = do
Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
if Bool
r
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
(a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
else do
String -> m ()
forall a. HasCallStack => String -> a
error String
"sendToWorker_: No space available in the buffer"
cleanup :: MonadIO m => Channel m a b -> m ()
cleanup :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
chan = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO String -> String -> IO ()
printSVar (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan) String
"Scan channel done"
finalize :: MonadIO m => Channel m a b -> m ()
finalize :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
chan = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
ChildEvent a
forall a. ChildEvent a
ChildStopChannel