module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
(
newInterleaveChannel
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (newIORef, readIORef)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Channel.Dispatcher (delThread)
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K
import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types
data WorkerStatus = Continue | Suspend
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
Channel m a
-> LinkedQueue (RunInIO m, K.StreamK m a)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueFIFO :: forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m = do
LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m
IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadRunInIO m
=> LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
run :: m ()
run = do
Maybe (RunInIO m, StreamK m a)
work <- IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, StreamK m a)
-> IO (Maybe (RunInIO m, StreamK m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
case Maybe (RunInIO m, StreamK m a)
work of
Maybe (RunInIO m, StreamK m a)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin
(m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. MonadRunInIO m
=> LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) m () -> m WorkerStatus -> m WorkerStatus
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, StreamK m a)
work <- IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, StreamK m a)
-> IO (Maybe (RunInIO m, StreamK m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
case Maybe (RunInIO m, StreamK m a)
work of
Maybe (RunInIO m, StreamK m a)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin
(m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO m b -> IO (StM m b)
forall b. m b -> IO (StM m b)
runin, StreamK m a
m)
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else IO WorkerStatus -> m WorkerStatus
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
WorkerStatus -> IO WorkerStatus
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getFifoSVar :: forall m a. MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
Set.empty
LinkedQueue (RunInIO m, StreamK m a)
q <- IO (LinkedQueue (RunInIO m, StreamK m a))
forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = LinkedQueue (RunInIO m, StreamK m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
Bool
yieldsDone <-
case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- LinkedQueue (RunInIO m, StreamK m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m())
-> Channel m a
getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, readOutputQ :: m [ChildEvent a]
readOutputQ = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
, postProcess :: m Bool
postProcess = Channel m a -> m Bool
postProc Channel m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue = \Bool
_ -> Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q
, eagerDispatch :: m ()
eagerDispatch = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
, isWorkDone :: IO Bool
isWorkDone = Channel m a -> IO Bool
workDone Channel m a
sv
, isQueueDone :: IO Bool
isQueueDone = Channel m a -> IO Bool
workDone Channel m a
sv
, doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ = IORef Bool
wfw
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = IORef (Set ThreadId) -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> ThreadId -> m ()
delThread IORef (Set ThreadId)
running
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a
sv =
case Config -> Maybe Rate
getStreamRate Config
cfg of
Maybe Rate
Nothing ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
Just Rate
_ ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
in Channel m a -> IO (Channel m a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv
{-# INLINABLE newInterleaveChannel #-}
{-# SPECIALIZE newInterleaveChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newInterleaveChannel :: MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel Config -> Config
modifier = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO (Channel m a) -> m (Channel m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a) -> m (Channel m a))
-> IO (Channel m a) -> m (Channel m a)
forall a b. (a -> b) -> a -> b
$ RunInIO m -> Config -> IO (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)