module Streamly.Internal.Data.Fold.Concurrent
(
parBuffered
, parLmapM
, parTeeWith
, parDistribute
, parPartition
, parUnzipWithM
, parDistributeScan
, parDemuxScan
, parEval
)
where
#include "inline.hs"
#include "deprecation.h"
import Control.Concurrent (newEmptyMVar, takeMVar, throwTo)
import Control.Monad.Catch (throwM)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Fold as Fold
import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types
{-# INLINABLE parBuffered #-}
parBuffered, parEval
:: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parBuffered :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
modifier Fold m a b
f =
(Channel m a b -> a -> m (Step (Channel m a b) b))
-> m (Step (Channel m a b) b)
-> (Channel m a b -> m b)
-> (Channel m a b -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Channel m a b -> a -> m (Step (Channel m a b) b)
forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> a -> m (Step (Channel m a b) b)
step m (Step (Channel m a b) b)
forall {b}. m (Step (Channel m a b) b)
initial Channel m a b -> m b
forall {p} {a}. p -> a
extract Channel m a b -> m b
forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> m b
final
where
initial :: m (Step (Channel m a b) b)
initial = Channel m a b -> Step (Channel m a b) b
forall s b. s -> Step s b
Partial (Channel m a b -> Step (Channel m a b) b)
-> m (Channel m a b) -> m (Step (Channel m a b) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Config -> Config) -> Fold m a b -> m (Channel m a b)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f
step :: Channel m a b -> a -> m (Step (Channel m a b) b)
step Channel m a b
chan a
a = do
Maybe b
status <- Channel m a b -> a -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a
Step (Channel m a b) b -> m (Step (Channel m a b) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Channel m a b) b -> m (Step (Channel m a b) b))
-> Step (Channel m a b) b -> m (Step (Channel m a b) b)
forall a b. (a -> b) -> a -> b
$ case Maybe b
status of
Maybe b
Nothing -> Channel m a b -> Step (Channel m a b) b
forall s b. s -> Step s b
Partial Channel m a b
chan
Just b
b -> b -> Step (Channel m a b) b
forall s b. b -> Step s b
Done b
b
extract :: p -> a
extract p
_ = [Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"Concurrent folds do not support scanning"
final :: Channel m a b -> m b
final Channel m a b
chan = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
ChildEvent a
forall a. ChildEvent a
ChildStopChannel
Maybe b
status <- Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
case Maybe b
status of
Maybe b
Nothing -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
(Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(Channel m a b -> IO [Char]
forall (m :: * -> *) a b. Channel m a b -> IO [Char]
dumpChannel Channel m a b
chan)
[Char]
"parBuffered: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
Channel m a b -> m b
final Channel m a b
chan
Just b
b -> do
Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
chan
b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
b
RENAME(parEval,parBuffered)
{-# INLINABLE parLmapM #-}
parLmapM ::
(Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
parLmapM :: forall a (m :: * -> *) b r.
(Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
parLmapM = (Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
forall a. HasCallStack => a
undefined
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
(Config -> Config)
-> (a -> b -> c)
-> Fold m x a
-> Fold m x b
-> Fold m x c
parTeeWith :: forall (m :: * -> *) a b c x.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
parTeeWith Config -> Config
cfg a -> b -> c
f Fold m x a
c1 Fold m x b
c2 = (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
Fold.teeWith a -> b -> c
f ((Config -> Config) -> Fold m x a -> Fold m x a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m x a
c1) ((Config -> Config) -> Fold m x b -> Fold m x b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m x b
c2)
{-# INLINABLE parDistribute #-}
parDistribute :: MonadAsync m =>
(Config -> Config) -> [Fold m a b] -> Fold m a [b]
parDistribute :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> [Fold m a b] -> Fold m a [b]
parDistribute Config -> Config
cfg = [Fold m a b] -> Fold m a [b]
forall (m :: * -> *) a b. Monad m => [Fold m a b] -> Fold m a [b]
Fold.distribute ([Fold m a b] -> Fold m a [b])
-> ([Fold m a b] -> [Fold m a b]) -> [Fold m a b] -> Fold m a [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Fold m a b -> Fold m a b) -> [Fold m a b] -> [Fold m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Config -> Config) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg)
{-# INLINABLE parPartition #-}
parPartition :: MonadAsync m =>
(Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
parPartition :: forall (m :: * -> *) b x c y.
MonadAsync m =>
(Config -> Config)
-> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
parPartition Config -> Config
cfg Fold m b x
c1 Fold m c y
c2 = Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
forall (m :: * -> *) b x c y.
Monad m =>
Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
Fold.partition ((Config -> Config) -> Fold m b x -> Fold m b x
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m b x
c1) ((Config -> Config) -> Fold m c y -> Fold m c y
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m c y
c2)
{-# INLINABLE parUnzipWithM #-}
parUnzipWithM :: MonadAsync m
=> (Config -> Config) -> (a -> m (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y)
parUnzipWithM :: forall (m :: * -> *) a b c x y.
MonadAsync m =>
(Config -> Config)
-> (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
parUnzipWithM Config -> Config
cfg a -> m (b, c)
f Fold m b x
c1 Fold m c y
c2 = (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
forall (m :: * -> *) a b c x y.
Monad m =>
(a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
Fold.unzipWithM a -> m (b, c)
f ((Config -> Config) -> Fold m b x -> Fold m b x
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m b x
c1) ((Config -> Config) -> Fold m c y -> Fold m c y
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m c y
c2)
{-# ANN type ScanState Fuse #-}
data ScanState s q db f =
ScanInit
| ScanGo s q db [f]
| ScanDrain q db [f]
| ScanStop
{-# INLINE parDistributeScan #-}
parDistributeScan :: MonadAsync m =>
(Config -> Config) -> m [Fold m a b] -> Stream m a -> Stream m [b]
parDistributeScan :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> m [Fold m a b] -> Stream m a -> Stream m [b]
parDistributeScan Config -> Config
cfg m [Fold m a b]
getFolds (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
(State StreamK m [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Stream m [b]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall {m :: * -> *} {a}.
State StreamK m a
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
step ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanInit
where
processOutputs :: [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
events [a]
done = do
case [OutEvent a]
events of
[] -> ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [a]
done)
(OutEvent a
x:[OutEvent a]
xs) ->
case OutEvent a
x of
FoldException ThreadId
_tid SomeException
ex -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a]))
-> IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ([(Channel m a b, ThreadId)], [a])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
FoldDone ThreadId
tid a
b ->
let ch :: [(Channel m a b, ThreadId)]
ch = ((Channel m a b, ThreadId) -> Bool)
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Channel m a b
_, ThreadId
t) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(Channel m a b, ThreadId)]
chans
in [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
ch [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)
FoldPartial a
_ -> m ([(Channel m a b, ThreadId)], [a])
forall a. HasCallStack => a
undefined
collectOutputs :: IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent a], Int)
qref [(Channel m a b, ThreadId)]
chans = do
([OutEvent a]
_, Int
n) <- IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent a], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
[OutEvent a]
r <- (([OutEvent a], Int) -> [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent a], Int) -> [OutEvent a]
forall a b. (a, b) -> a
fst (m ([OutEvent a], Int) -> m [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent a], Int)
qref
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
forall {m :: * -> *} {a} {b} {a}.
MonadIO m =>
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
r []
else ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [])
step :: State StreamK m a
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
step State StreamK m a
_ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanInit = do
IORef ([OutEvent b], Int)
q <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
state IORef ([OutEvent b], Int)
q MVar ()
db [])
step State StreamK m a
gst (ScanGo s
st IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
[Fold m a b]
fxs <- m [Fold m a b]
getFolds
[(Channel m a b, ThreadId)]
newChans <- (Fold m a b -> m (Channel m a b, ThreadId))
-> [Fold m a b] -> m [(Channel m a b, ThreadId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
Prelude.mapM (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
q MVar ()
db Config -> Config
cfg) [Fold m a b]
fxs
let allChans :: [(Channel m a b, ThreadId)]
allChans = [(Channel m a b, ThreadId)]
chans [(Channel m a b, ThreadId)]
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. [a] -> [a] -> [a]
++ [(Channel m a b, ThreadId)]
newChans
([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
allChans
Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next <- case Step s a
res of
Yield a
x s
s -> do
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
`sendToWorker_` a
x) (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
Skip s
s -> do
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
Step s a
Stop -> do
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
then Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. s -> Step s a
Skip ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
else Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
step State StreamK m a
_ (ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
chans
case [(Channel m a b, ThreadId)]
running of
[] -> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanStop
[(Channel m a b, ThreadId)]
_ -> do
if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. s -> Step s a
Skip (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
else Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. a -> s -> Step s a
Yield [b]
outputs (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
step State StreamK m a
_ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanStop = Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. Step s a
Stop
{-# ANN type DemuxState Fuse #-}
data DemuxState s q db f =
DemuxInit
| DemuxGo s q db f
| DemuxDrain q db f
| DemuxStop
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Fold m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Fold m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan Config -> Config
cfg a -> k
getKey k -> m (Fold m a b)
getFold (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
(State StreamK m [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Stream m [(k, b)]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall {m :: * -> *} {a}.
State StreamK m a
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
step DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxInit
where
processOutputs :: Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
events [(a, b)]
done = do
case [OutEvent (a, b)]
events of
[] -> (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [(a, b)]
done)
(OutEvent (a, b)
x:[OutEvent (a, b)]
xs) ->
case OutEvent (a, b)
x of
FoldException ThreadId
_tid SomeException
ex -> do
let chans :: [(Channel m a b, ThreadId)]
chans = ((a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId))
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId)
forall a b. (a, b) -> b
snd ([(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)])
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> a -> b
$ Map a (Channel m a b, ThreadId) -> [(a, (Channel m a b, ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map a (Channel m a b, ThreadId)
keyToChan
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)]))
-> IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Map a (Channel m a b, ThreadId), [(a, b)])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
FoldDone ThreadId
_tid o :: (a, b)
o@(a
k, b
_) ->
let ch :: Map a (Channel m a b, ThreadId)
ch = a
-> Map a (Channel m a b, ThreadId)
-> Map a (Channel m a b, ThreadId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete a
k Map a (Channel m a b, ThreadId)
keyToChan
in Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
ch [OutEvent (a, b)]
xs ((a, b)
o(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)
FoldPartial (a, b)
_ -> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. HasCallStack => a
undefined
collectOutputs :: IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (a, b)], Int)
qref Map a (Channel m a b, ThreadId)
keyToChan = do
([OutEvent (a, b)]
_, Int
n) <- IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent (a, b)], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
[OutEvent (a, b)]
r <- (([OutEvent (a, b)], Int) -> [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent (a, b)], Int) -> [OutEvent (a, b)]
forall a b. (a, b) -> a
fst (m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent (a, b)], Int)
qref
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall {m :: * -> *} {a} {a} {b} {b}.
(MonadIO m, Ord a) =>
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
r []
else (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [])
step :: State StreamK m a
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
step State StreamK m a
_ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
DemuxInit = do
IORef ([OutEvent (k, b)], Int)
q <- IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int)))
-> IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent (k, b)], Int) -> IO (IORef ([OutEvent (k, b)], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
state IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
forall k a. Map k a
Map.empty)
step State StreamK m a
gst (DemuxGo s
st IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
(Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
next <- case Step s a
res of
Yield a
x s
s -> do
let k :: k
k = a -> k
getKey a
x
(Map k (Channel m a (k, b), ThreadId)
keyToChan2, Channel m a (k, b)
ch) <-
case k
-> Map k (Channel m a (k, b), ThreadId)
-> Maybe (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Channel m a (k, b), ThreadId)
keyToChan1 of
Maybe (Channel m a (k, b), ThreadId)
Nothing -> do
Fold m a b
fld <- k -> m (Fold m a b)
getFold k
k
r :: (Channel m a (k, b), ThreadId)
r@(Channel m a (k, b)
chan, ThreadId
_) <- IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a (k, b)
-> m (Channel m a (k, b), ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent (k, b)], Int)
q MVar ()
db Config -> Config
cfg ((b -> (k, b)) -> Fold m a b -> Fold m a (k, b)
forall a b. (a -> b) -> Fold m a a -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k
k,) Fold m a b
fld)
(Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (k
-> (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k (Channel m a (k, b), ThreadId)
r Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
Just (Channel m a (k, b)
chan, ThreadId
_) -> (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
Channel m a (k, b) -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a (k, b)
ch a
x
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan2
Skip s
s ->
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
Step s a
Stop -> do
let chans :: [Channel m a (k, b)]
chans = ((k, (Channel m a (k, b), ThreadId)) -> Channel m a (k, b))
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Channel m a (k, b), ThreadId) -> Channel m a (k, b)
forall a b. (a, b) -> a
fst ((Channel m a (k, b), ThreadId) -> Channel m a (k, b))
-> ((k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId))
-> (k, (Channel m a (k, b), ThreadId))
-> Channel m a (k, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId)
forall a b. (a, b) -> b
snd) ([(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)])
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> a -> b
$ Map k (Channel m a (k, b), ThreadId)
-> [(k, (Channel m a (k, b), ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map k (Channel m a (k, b), ThreadId)
keyToChan1
(Channel m a (k, b) -> m ()) -> [Channel m a (k, b)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a (k, b) -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize [Channel m a (k, b)]
chans
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
then Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. s -> Step s a
Skip DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
next
else Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
next
step State StreamK m a
_ (DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
(Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
if Map k (Channel m a (k, b), ThreadId) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Channel m a (k, b), ThreadId)
keyToChan1
then Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxStop
else do
if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. s -> Step s a
Skip (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
else Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
step State StreamK m a
_ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
DemuxStop = Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. Step s a
Stop