module Streamly.Internal.Data.Stream.Channel.Append
(
newAppendChannel
)
where
import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import qualified Data.Heap as H
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Channel.Worker
import Streamly.Internal.Data.Stream.Channel.Consumer
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Type
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
Channel m a
-> IORef [(RunInIO m, K.StreamK m a)]
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueLIFO :: forall (m :: * -> *) a.
Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef [(RunInIO m, StreamK m a)]
q (RunInIO m, StreamK m a)
m = do
IORef [(RunInIO m, StreamK m a)]
-> ([(RunInIO m, StreamK m a)] -> [(RunInIO m, StreamK m a)])
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, StreamK m a)]
q ((RunInIO m, StreamK m a)
m (RunInIO m, StreamK m a)
-> [(RunInIO m, StreamK m a)] -> [(RunInIO m, StreamK m a)]
forall a. a -> [a] -> [a]
:)
IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
IORef [(RunInIO m, K.StreamK m a)]
-> m (Maybe (RunInIO m, K.StreamK m a))
dequeue :: forall (m :: * -> *) a.
MonadIO m =>
IORef [(RunInIO m, StreamK m a)]
-> m (Maybe (RunInIO m, StreamK m a))
dequeue IORef [(RunInIO m, StreamK m a)]
qref =
IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, StreamK m a)]
-> ([(RunInIO m, StreamK m a)]
-> ([(RunInIO m, StreamK m a)], Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, StreamK m a)]
qref
(([(RunInIO m, StreamK m a)]
-> ([(RunInIO m, StreamK m a)], Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a)))
-> ([(RunInIO m, StreamK m a)]
-> ([(RunInIO m, StreamK m a)], Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ \case
((RunInIO m, StreamK m a)
x : [(RunInIO m, StreamK m a)]
xs) -> ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a) -> Maybe (RunInIO m, StreamK m a)
forall a. a -> Maybe a
Just (RunInIO m, StreamK m a)
x)
[(RunInIO m, StreamK m a)]
x -> ([(RunInIO m, StreamK m a)]
x, Maybe (RunInIO m, StreamK m a)
forall a. Maybe a
Nothing)
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef [(RunInIO m, K.StreamK m a)]
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef [(RunInIO m, StreamK m a)]
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
run :: m ()
run = do
Maybe (RunInIO m, StreamK m a)
work <- IORef [(RunInIO m, StreamK m a)]
-> m (Maybe (RunInIO m, StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef [(RunInIO m, StreamK m a)]
-> m (Maybe (RunInIO m, StreamK m a))
dequeue IORef [(RunInIO m, StreamK m a)]
qref
case Maybe (RunInIO m, StreamK m a)
work of
Maybe (RunInIO m, StreamK m a)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> (m WorkerStatus -> IO (StM m WorkerStatus)) -> StreamK m a -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin StreamK m a
m
process :: (m WorkerStatus -> IO (StM m WorkerStatus)) -> StreamK m a -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
runin StreamK m a
m = do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined
a -> StreamK m a -> m WorkerStatus
yieldk
a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
(WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue)
StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
where
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
if Bool
res
then State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef [(RunInIO m, StreamK m a)]
qref (RunInIO m
runInIO, StreamK m a
r)
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef [(RunInIO m, K.StreamK m a)]
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef [(RunInIO m, StreamK m a)]
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) m () -> m WorkerStatus -> m WorkerStatus
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, StreamK m a)
work <- IORef [(RunInIO m, StreamK m a)]
-> m (Maybe (RunInIO m, StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef [(RunInIO m, StreamK m a)]
-> m (Maybe (RunInIO m, StreamK m a))
dequeue IORef [(RunInIO m, StreamK m a)]
qref
case Maybe (RunInIO m, StreamK m a)
work of
Maybe (RunInIO m, StreamK m a)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
Just (RunInIO m, StreamK m a)
item -> (RunInIO m, StreamK m a) -> m ()
process (RunInIO m, StreamK m a)
item
process :: (RunInIO m, StreamK m a) -> m ()
process item :: (RunInIO m, StreamK m a)
item@(RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined
a -> StreamK m a -> m WorkerStatus
yieldk
a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
m WorkerStatus
incrContinue
StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef [(RunInIO m, StreamK m a)]
qref (RunInIO m, StreamK m a)
item
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
where
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef [(RunInIO m, StreamK m a)]
qref (RunInIO m
runInIO, StreamK m a
r)
WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
{-# ANN enqueueAhead "HLint: ignore" #-}
{-# INLINE enqueueAhead #-}
enqueueAhead ::
Channel m a
-> IORef ([K.StreamK m a], Int)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueAhead :: forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
q (RunInIO m, StreamK m a)
m = do
IORef ([StreamK m a], Int)
-> (([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([StreamK m a], Int)
q ((([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ())
-> (([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \([StreamK m a]
xs, Int
n) -> ((RunInIO m, StreamK m a) -> StreamK m a
forall a b. (a, b) -> b
snd (RunInIO m, StreamK m a)
mStreamK m a -> [StreamK m a] -> [StreamK m a]
forall a. a -> [a] -> [a]
:[StreamK m a]
xs, Int
n)
IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([K.StreamK m a], Int) -> m (Maybe (K.StreamK m a, Int))
dequeueAhead :: forall (m :: * -> *) a.
MonadIO m =>
IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q = IO (Maybe (StreamK m a, Int)) -> m (Maybe (StreamK m a, Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (StreamK m a, Int)) -> m (Maybe (StreamK m a, Int)))
-> IO (Maybe (StreamK m a, Int)) -> m (Maybe (StreamK m a, Int))
forall a b. (a -> b) -> a -> b
$
IORef ([StreamK m a], Int)
-> (([StreamK m a], Int)
-> (([StreamK m a], Int), Maybe (StreamK m a, Int)))
-> IO (Maybe (StreamK m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([StreamK m a], Int)
q ((([StreamK m a], Int)
-> (([StreamK m a], Int), Maybe (StreamK m a, Int)))
-> IO (Maybe (StreamK m a, Int)))
-> (([StreamK m a], Int)
-> (([StreamK m a], Int), Maybe (StreamK m a, Int)))
-> IO (Maybe (StreamK m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), Maybe (StreamK m a, Int)
forall a. Maybe a
Nothing)
(StreamK m a
x : [StreamK m a]
xs, Int
n) -> (([StreamK m a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (StreamK m a, Int) -> Maybe (StreamK m a, Int)
forall a. a -> Maybe a
Just (StreamK m a
x, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
{-# INLINE dequeueAheadSeqCheck #-}
dequeueAheadSeqCheck :: MonadIO m
=> IORef ([K.StreamK m a], Int) -> Int -> m (Maybe (K.StreamK m a))
dequeueAheadSeqCheck :: forall (m :: * -> *) a.
MonadIO m =>
IORef ([StreamK m a], Int) -> Int -> m (Maybe (StreamK m a))
dequeueAheadSeqCheck IORef ([StreamK m a], Int)
q Int
seqNo = IO (Maybe (StreamK m a)) -> m (Maybe (StreamK m a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (StreamK m a)) -> m (Maybe (StreamK m a)))
-> IO (Maybe (StreamK m a)) -> m (Maybe (StreamK m a))
forall a b. (a -> b) -> a -> b
$
IORef ([StreamK m a], Int)
-> (([StreamK m a], Int)
-> (([StreamK m a], Int), Maybe (StreamK m a)))
-> IO (Maybe (StreamK m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([StreamK m a], Int)
q ((([StreamK m a], Int)
-> (([StreamK m a], Int), Maybe (StreamK m a)))
-> IO (Maybe (StreamK m a)))
-> (([StreamK m a], Int)
-> (([StreamK m a], Int), Maybe (StreamK m a)))
-> IO (Maybe (StreamK m a))
forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), Maybe (StreamK m a)
forall a. Maybe a
Nothing)
(StreamK m a
x : [StreamK m a]
xs, Int
n) ->
if Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
seqNo
then (([StreamK m a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), StreamK m a -> Maybe (StreamK m a)
forall a. a -> Maybe a
Just StreamK m a
x)
else ((StreamK m a
x StreamK m a -> [StreamK m a] -> [StreamK m a]
forall a. a -> [a] -> [a]
: [StreamK m a]
xs, Int
n), Maybe (StreamK m a)
forall a. Maybe a
Nothing)
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data AheadHeapEntry m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (RunInIO m, Maybe a, K.StreamK m a)
data HeapDequeueResult m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult m a)
dequeueFromHeap :: forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar =
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a))
-> IO (HeapDequeueResult m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a))
-> IO (HeapDequeueResult m a))
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a))
-> IO (HeapDequeueResult m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
pair, HeapDequeueResult m a
forall (m :: * -> *) a. HeapDequeueResult m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
r = Heap (Entry Int (AheadHeapEntry m a))
-> Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry m a))
hp
case Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry m a)
ent@(Entry Int
seqNo AheadHeapEntry m a
_ev), Heap (Entry Int (AheadHeapEntry m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry m a) -> HeapDequeueResult m a
forall (m :: * -> *) a.
Entry Int (AheadHeapEntry m a) -> HeapDequeueResult m a
Ready Entry Int (AheadHeapEntry m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
pair, Int -> HeapDequeueResult m a
forall (m :: * -> *) a. Int -> HeapDequeueResult m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
pair, Int -> HeapDequeueResult m a
forall (m :: * -> *) a. Int -> HeapDequeueResult m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult m a)
dequeueFromHeapSeq :: forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar Int
i =
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a))
-> IO (HeapDequeueResult m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a))
-> IO (HeapDequeueResult m a))
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a))
-> IO (HeapDequeueResult m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
r = Heap (Entry Int (AheadHeapEntry m a))
-> Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry m a))
hp
case Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry m a)
ent@(Entry Int
seqNo AheadHeapEntry m a
_ev), Heap (Entry Int (AheadHeapEntry m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry m a) -> HeapDequeueResult m a
forall (m :: * -> *) a.
Entry Int (AheadHeapEntry m a) -> HeapDequeueResult m a
Ready Entry Int (AheadHeapEntry m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult m a
forall (m :: * -> *) a. Int -> HeapDequeueResult m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry m a),
Heap (Entry Int (AheadHeapEntry m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult m a
forall (m :: * -> *) a. Int -> HeapDequeueResult m a
Waiting Int
i)
Just Int
_ -> [Char]
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
HeapDequeueResult m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a)
-> Int
-> IO ()
requeueOnHeapTop :: forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry m a)
ent Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry m a)
-> Heap (Entry Int (AheadHeapEntry m a))
-> Heap (Entry Int (AheadHeapEntry m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry m a)
ent Heap (Entry Int (AheadHeapEntry m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE underMaxHeap #-}
underMaxHeap ::
Channel m a
-> Heap (Entry Int (AheadHeapEntry m a))
-> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
Channel m a -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry m a))
hp = do
([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
let maxHeap :: Limit
maxHeap = case Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv of
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)) , Maybe Int)
-> IO Bool
preStopCheck :: forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap =
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> IO Bool)
-> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- Channel m a -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry m a))
hp
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv)
let stopping :: IO Bool
stopping = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
beyondRate <-
Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv) (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) YieldRateInfo
yinfo
if Bool
beyondRate then IO Bool
stopping else IO Bool
continue
else IO Bool
stopping
abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo = do
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
processHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool
-> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry m a -> m ()
loopHeap Int
sno AheadHeapEntry m a
entry
where
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
Int -> m ()
nextHeap Int
seqNo
yieldStreamFromHeap :: Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo a
a StreamK m a
r = do
Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r
runStreamWithYieldLimit :: Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r = do
Bool
_ <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
continue
then do
let stopk :: m ()
stopk = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
Int -> m ()
nextHeap Int
seqNo
State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stopk
StreamK m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry m a)
ent = Int -> AheadHeapEntry m a -> Entry Int (AheadHeapEntry m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Maybe a, StreamK m a) -> AheadHeapEntry m a
forall (m :: * -> *) a.
(RunInIO m, Maybe a, StreamK m a) -> AheadHeapEntry m a
AheadEntryStream (RunInIO m
runIn, Maybe a
forall a. Maybe a
Nothing, StreamK m a
r))
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Entry Int (AheadHeapEntry m a)
ent Int
seqNo
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a, Int)
work <- IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
case Maybe (StreamK m a, Int)
work of
Maybe (StreamK m a, Int)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
Just (StreamK m a
m, Int
seqNo) -> do
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult m a
res <- IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a))
-> IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult m a)
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult m a
res of
Ready (Entry Int
seqNo AheadHeapEntry m a
hent) -> Int -> AheadHeapEntry m a -> m ()
loopHeap Int
seqNo AheadHeapEntry m a
hent
HeapDequeueResult m a
Clearing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap
if Bool
r
then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
else Int -> m ()
processWorkQueue Int
prevSeqNo
else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
loopHeap :: Int -> AheadHeapEntry m a -> m ()
loopHeap Int
seqNo AheadHeapEntry m a
ent =
case AheadHeapEntry m a
ent of
AheadHeapEntry m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv) (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Just a
a, StreamK m a
r) -> do
let
action :: m ()
action = do
m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv) (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r
go :: m ()
go = do
StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin m ()
action
StM m () -> m ()
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
if Bool
stopping
then do
Bool
stopIt <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap
if Bool
stopIt
then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap (Int -> AheadHeapEntry m a -> Entry Int (AheadHeapEntry m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry m a
ent) Int
seqNo
Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
else m ()
go
else m ()
go
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Maybe a
Nothing, StreamK m a
r) -> do
let
action :: m ()
action = Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r
go :: m ()
go = do
StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin m ()
action
StM m () -> m ()
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
if Bool
stopping
then do
Bool
stopIt <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap
if Bool
stopIt
then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap (Int -> AheadHeapEntry m a -> Entry Int (AheadHeapEntry m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry m a
ent) Int
seqNo
Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
else m ()
go
else m ()
go
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult m a
r <- IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a))
-> IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult m a)
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap
case HeapDequeueResult m a
r of
Ready (Entry Int
seqNo AheadHeapEntry m a
hent) ->
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry m a
hent Int
seqNo Bool
True
HeapDequeueResult m a
_ -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
data HeapStatus = HContinue | HStop
processWithoutToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.StreamK m a
-> Int
-> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo = do
let stopk :: m WorkerStatus
stopk = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
AheadHeapEntry m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry m a -> m WorkerStatus
toHeap AheadHeapEntry m a
forall (m :: * -> *) a. AheadHeapEntry m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(\a
a StreamK m a
r -> do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
AheadHeapEntry m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry m a -> m WorkerStatus
toHeap (AheadHeapEntry m a -> m WorkerStatus)
-> AheadHeapEntry m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ (RunInIO m, Maybe a, StreamK m a) -> AheadHeapEntry m a
forall (m :: * -> *) a.
(RunInIO m, Maybe a, StreamK m a) -> AheadHeapEntry m a
AheadEntryStream (RunInIO m
runIn, a -> Maybe a
forall a. a -> Maybe a
Just a
a, StreamK m a
r))
(AheadHeapEntry m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry m a -> m WorkerStatus
toHeap (AheadHeapEntry m a -> m WorkerStatus)
-> (a -> AheadHeapEntry m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry m a
forall (m :: * -> *) a. a -> AheadHeapEntry m a
AheadEntryPure)
m WorkerStatus
stopk
StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry m a -> m WorkerStatus
toHeap AheadHeapEntry m a
ent = do
Heap (Entry Int (AheadHeapEntry m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry m a)))
-> m (Heap (Entry Int (AheadHeapEntry m a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry m a)))
-> m (Heap (Entry Int (AheadHeapEntry m a))))
-> IO (Heap (Entry Int (AheadHeapEntry m a)))
-> m (Heap (Entry Int (AheadHeapEntry m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry m a))))
-> IO (Heap (Entry Int (AheadHeapEntry m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry m a))))
-> IO (Heap (Entry Int (AheadHeapEntry m a))))
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry m a))))
-> IO (Heap (Entry Int (AheadHeapEntry m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry m a))
hp' = Entry Int (AheadHeapEntry m a)
-> Heap (Entry Int (AheadHeapEntry m a))
-> Heap (Entry Int (AheadHeapEntry m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int -> AheadHeapEntry m a -> Entry Int (AheadHeapEntry m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry m a
ent) Heap (Entry Int (AheadHeapEntry m a))
hp
in Bool
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry m a)))
-> ((Heap (Entry Int (AheadHeapEntry m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry m a)))
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry m a))
hp')
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) (Heap (Entry Int (AheadHeapEntry m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry m a))
newHp)
Bool
heapOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry m a))
newHp
HeapStatus
status <-
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <-
IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
incrWorkerYieldCount
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
YieldRateInfo
yinfo
WorkerInfo
info
if Bool
rateOk
then HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.StreamK m a
-> Int
-> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
action Int
sno = do
let stopk :: m TokenWorkerStatus
stopk = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m TokenWorkerStatus
r <-
IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun
(m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stopk StreamK m a
action
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
if Bool
continue
then TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
else do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO ()
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a StreamK m a
r = do
Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> a -> IO Bool
forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
sv a
a
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stopk :: m TokenWorkerStatus
stopk = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stopk
StreamK m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry m a)
ent = Int -> AheadHeapEntry m a -> Entry Int (AheadHeapEntry m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Maybe a, StreamK m a) -> AheadHeapEntry m a
forall (m :: * -> *) a.
(RunInIO m, Maybe a, StreamK m a) -> AheadHeapEntry m a
AheadEntryStream (RunInIO m
runIn, Maybe a
forall a. Maybe a
Nothing, StreamK m a
r))
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Entry Int (AheadHeapEntry m a)
ent Int
seqNo
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
let preExit :: m ()
preExit = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO ()
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Int
nextSeqNo
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a)
work <- IORef ([StreamK m a], Int) -> Int -> m (Maybe (StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([StreamK m a], Int) -> Int -> m (Maybe (StreamK m a))
dequeueAheadSeqCheck IORef ([StreamK m a], Int)
q Int
nextSeqNo
case Maybe (StreamK m a)
work of
Maybe (StreamK m a)
Nothing -> m ()
preExit m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
Just StreamK m a
m -> do
let stopk :: m TokenWorkerStatus
stopk = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
nextSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
nextSeqNo)
(Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
nextSeqNo)
m TokenWorkerStatus
stopk
StreamK m a
m
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
else m ()
preExit m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult m a
r <- IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a))
-> IO (HeapDequeueResult m a) -> m (HeapDequeueResult m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult m a)
forall (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap
case HeapDequeueResult m a
r of
Ready (Entry Int
seqNo AheadHeapEntry m a
hent) ->
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry m a
hent Int
seqNo Bool
False
HeapDequeueResult m a
Clearing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
Waiting Int
_ -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a, Int)
work <- IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
case Maybe (StreamK m a, Int)
work of
Maybe (StreamK m a, Int)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe WorkerInfo -> Channel m a -> IO ()
forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
sv
Just (StreamK m a
m, Int
seqNo) -> do
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo
getLifoSVar :: forall m a. MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
outH <- (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
Set.empty
IORef [(RunInIO m, StreamK m a)]
q <- [(RunInIO m, StreamK m a)] -> IO (IORef [(RunInIO m, StreamK m a)])
forall a. a -> IO (IORef a)
newIORef ([] :: [(RunInIO m, K.StreamK m a)])
IORef ([StreamK m a], Int)
aheadQ <- ([StreamK m a], Int) -> IO (IORef ([StreamK m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <-
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = do
[(RunInIO m, StreamK m a)]
xs <- IORef [(RunInIO m, StreamK m a)] -> IO [(RunInIO m, StreamK m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, StreamK m a)]
q
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(RunInIO m, StreamK m a)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
xs)
let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
Bool
yieldsDone <-
case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished Channel m a
sv
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let eagerEval :: Bool
eagerEval = Config -> Bool
getEagerDispatch Config
cfg
inOrder :: Bool
inOrder = Config -> Bool
getOrdered Config
cfg
let getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, K.StreamK m a)]
-> Channel m a
-> Maybe WorkerInfo
-> m())
-> Channel m a
getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, readOutputQ :: m [ChildEvent a]
readOutputQ = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
, postProcess :: m Bool
postProcess = Channel m a -> m Bool
postProc Channel m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop =
if Bool
inOrder
then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
outH Channel m a
sv
else IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop IORef [(RunInIO m, StreamK m a)]
q Channel m a
sv
, enqueue :: (RunInIO m, StreamK m a) -> IO ()
enqueue =
if Bool
inOrder
then Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
else Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef [(RunInIO m, StreamK m a)]
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef [(RunInIO m, StreamK m a)]
q
, eagerDispatch :: m ()
eagerDispatch = Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
eagerEval (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
, isWorkDone :: IO Bool
isWorkDone =
if Bool
inOrder
then Channel m a
-> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
outH
else Channel m a -> IO Bool
workDone Channel m a
sv
, isQueueDone :: IO Bool
isQueueDone =
if Bool
inOrder
then Channel m a -> IORef ([StreamK m a], Int) -> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
else Channel m a -> IO Bool
workDone Channel m a
sv
, doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ = IORef Bool
wfw
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread IORef (Set ThreadId)
running MVar ()
outQMv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a
sv =
case Config -> Maybe Rate
getStreamRate Config
cfg of
Maybe Rate
Nothing ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
Just Rate
_ ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, StreamK m a)]
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
in Channel m a -> IO (Channel m a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q = do
Bool
queueDone <- IORef (t a, b) -> IO Bool
forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
yref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- Channel m a -> IORef (t a, b) -> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall a. t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
{-# INLINABLE newAppendChannel #-}
{-# SPECIALIZE newAppendChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a)
newAppendChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO (Channel m a) -> m (Channel m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a) -> m (Channel m a))
-> IO (Channel m a) -> m (Channel m a)
forall a b. (a -> b) -> a -> b
$ RunInIO m -> Config -> IO (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)