{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Async {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
(
AsyncT(..)
, Async
, consMAsync
, asyncK
, mkAsyncK
, mkAsyncD
, WAsyncT(..)
, WAsync
, consMWAsync
, wAsyncK
)
where
import Control.Concurrent (myThreadId)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import Streamly.Internal.Data.Stream.SVar.Generate (fromSVar, fromSVarD)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), Step(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)
import Streamly.Internal.Data.SVar
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
(forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a ((r -> r) -> Stream m a -> Stream m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
in State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar t m a
sv IORef [(RunInIO m, t m a)]
q (RunInIO m, t m a)
m = do
IORef [(RunInIO m, t m a)]
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, t m a)]
q (([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ())
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(RunInIO m, t m a)]
ms -> (RunInIO m, t m a)
m (RunInIO m, t m a) -> [(RunInIO m, t m a)] -> [(RunInIO m, t m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, t m a)]
ms
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
res
then State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q (([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a)))
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], Maybe (RunInIO m, Stream m a)
forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, (RunInIO m, Stream m a) -> Maybe (RunInIO m, Stream m a)
forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q (([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a)))
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], Maybe (RunInIO m, Stream m a)
forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, (RunInIO m, Stream m a) -> Maybe (RunInIO m, Stream m a)
forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
SVar t m a
-> LinkedQueue (RunInIO m, t m a)
-> (RunInIO m, t m a)
-> IO ()
enqueueFIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m = do
LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadRunInIO m
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, Stream m a)
-> IO (Maybe (RunInIO m, Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. MonadRunInIO m
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, Stream m a)
-> IO (Maybe (RunInIO m, Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else IO WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
WorkerStatus -> IO WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getLifoSVar State StreamK m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
IORef [(RunInIO m, Stream m a)]
q <- [(RunInIO m, Stream m a)] -> IO (IORef [(RunInIO m, Stream m a)])
forall a. a -> IO (IORef a)
newIORef ([] :: [(RunInIO m, Stream m a)])
Maybe (IORef Count)
yl <- case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State StreamK m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State StreamK m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = [(RunInIO m, Stream m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([(RunInIO m, Stream m a)] -> Bool)
-> IO [(RunInIO m, Stream m a)] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [(RunInIO m, Stream m a)] -> IO [(RunInIO m, Stream m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- [(RunInIO m, Stream m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([(RunInIO m, Stream m a)] -> Bool)
-> IO [(RunInIO m, Stream m a)] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [(RunInIO m, Stream m a)] -> IO [(RunInIO m, Stream m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a -> m Bool
postProc SVar StreamK m a -> IO Bool
workDone IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State StreamK m a
st) (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a
sv
, postProcess :: m Bool
postProcess = SVar StreamK m a -> m Bool
postProc SVar StreamK m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} SVar StreamK m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q
, isWorkDone :: IO Bool
isWorkDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar StreamK m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar StreamK m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State StreamK m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State StreamK m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar StreamK m a
sv =
case State StreamK m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State StreamK m a
st of
Maybe Rate
Nothing ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited
Just Rate
_ ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited
in SVar StreamK m a -> IO (SVar StreamK m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar StreamK m a
sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getFifoSVar State StreamK m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
LinkedQueue (RunInIO m, Stream m a)
q <- IO (LinkedQueue (RunInIO m, Stream m a))
forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State StreamK m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State StreamK m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = LinkedQueue (RunInIO m, Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- LinkedQueue (RunInIO m, Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a -> m Bool
postProc SVar StreamK m a -> IO Bool
workDone LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State StreamK m a
st) (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a
sv
, postProcess :: m Bool
postProcess = SVar StreamK m a -> m Bool
postProc SVar StreamK m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} SVar StreamK m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q
, isWorkDone :: IO Bool
isWorkDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
WAsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar StreamK m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar StreamK m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State StreamK m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State StreamK m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar StreamK m a
sv =
case State StreamK m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State StreamK m a
st of
Maybe Rate
Nothing ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited
Just Rate
_ ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited
in SVar StreamK m a -> IO (SVar StreamK m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar StreamK m a
sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
st Stream m a
m = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar StreamK m a
sv <- IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar StreamK m a) -> m (SVar StreamK m a))
-> IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getLifoSVar State StreamK m a
st RunInIO m
mrun
SVar StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar StreamK m a
sv Stream m a
m
{-# INLINABLE mkAsyncK #-}
mkAsyncK :: MonadAsync m => Stream m a -> Stream m a
mkAsyncK :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncK Stream m a
m = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) Stream m a
m
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv
{-# INLINE_NORMAL mkAsyncD #-}
mkAsyncD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkAsyncD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncD Stream m a
m = (State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
SVar StreamK m a
sv <- State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
gst (Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK Stream m a
m)
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromSVarD SVar StreamK m a
sv
step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
D.Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
D.Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
D.Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Step s a
D.Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
D.Stop
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newWAsyncVar State StreamK m a
st Stream m a
m = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar StreamK m a
sv <- IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar StreamK m a) -> m (SVar StreamK m a))
-> IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getFifoSVar State StreamK m a
st RunInIO m
mrun
SVar StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar StreamK m a
sv Stream m a
m
forkSVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- case SVarStyle
style of
SVarStyle
AsyncVar -> State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
st (Stream m a -> Stream m a -> Stream m a
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
WAsyncVar -> State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newWAsyncVar State StreamK m a
st (Stream m a -> Stream m a -> Stream m a
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
_ -> [Char] -> m (SVar StreamK m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"illegal svar type"
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv
where
concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, StreamK m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar StreamK m a)
_ -> State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2)
{-# INLINE asyncK #-}
asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
asyncK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK = SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
AsyncVar
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync m a
m (AsyncT Stream m a
r) = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype AsyncT m a = AsyncT {forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT :: Stream m a}
instance MonadTrans AsyncT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
lift = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a)
-> (m a -> Stream m a) -> m a -> AsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
type Async = AsyncT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
append :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append (AsyncT Stream m a
m1) (AsyncT Stream m a
m2) = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AsyncT m a) where
<> :: AsyncT m a -> AsyncT m a -> AsyncT m a
(<>) = AsyncT m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append
instance MonadAsync m => Monoid (AsyncT m a) where
mempty :: AsyncT m a
mempty = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappend = AsyncT m a -> AsyncT m a -> AsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync (AsyncT Stream m (a -> b)
m1) (AsyncT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AsyncT m a
pure = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> (a -> Stream m a) -> a -> AsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
(<*>) = AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync ::
AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync (AsyncT Stream m a
m) a -> AsyncT m b
f = Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m (AsyncT m b -> Stream m b
forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT (AsyncT m b -> Stream m b) -> (a -> AsyncT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AsyncT m b
f)
instance MonadAsync m => Monad (AsyncT m) where
return :: forall a. a -> AsyncT m a
return = a -> AsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
(>>=) = AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE wAsyncK #-}
wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
wAsyncK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK = SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
WAsyncVar
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m a
m (WAsyncT Stream m a
r) = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype WAsyncT m a = WAsyncT {forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT :: Stream m a}
instance MonadTrans WAsyncT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
lift = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a)
-> (m a -> Stream m a) -> m a -> WAsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
type WAsync = WAsyncT IO
{-# INLINE wAppend #-}
{-# SPECIALIZE wAppend :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
wAppend :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend :: forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend (WAsyncT Stream m a
m1) (WAsyncT Stream m a
m2) = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (WAsyncT m a) where
<> :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
(<>) = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty :: WAsyncT m a
mempty = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappend = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync ::
WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync (WAsyncT Stream m (a -> b)
m1) (WAsyncT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
pure :: forall a. a -> WAsyncT m a
pure = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a)
-> (a -> Stream m a) -> a -> WAsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
<*> :: forall a b. WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
(<*>) = WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync ::
WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync (WAsyncT Stream m a
m) a -> WAsyncT m b
f = Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m (WAsyncT m b -> Stream m b
forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT (WAsyncT m b -> Stream m b)
-> (a -> WAsyncT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> WAsyncT m b
f)
instance MonadAsync m => Monad (WAsyncT m) where
return :: forall a. a -> WAsyncT m a
return = a -> WAsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
(>>=) = WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)