module Streamly.Internal.Data.Scanl.Concurrent
(
parTeeWith
, parDistributeScan
, parDemuxScan
)
where
#include "inline.hs"
import Control.Concurrent (newEmptyMVar, takeMVar, throwTo)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Fold (Step (..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))
import qualified Data.Map.Strict as Map
import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
(Config -> Config)
-> (a -> b -> c)
-> Scanl m x a
-> Scanl m x b
-> Scanl m x c
parTeeWith :: forall (m :: * -> *) a b c x.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c
parTeeWith Config -> Config
cfg a -> b -> c
f Scanl m x a
c1 Scanl m x b
c2 = (Tuple3' (Channel m x a) (Channel m x b) c
-> x -> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c))
-> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
-> (Tuple3' (Channel m x a) (Channel m x b) c -> m c)
-> (Tuple3' (Channel m x a) (Channel m x b) c -> m c)
-> Scanl m x c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl Tuple3' (Channel m x a) (Channel m x b) c
-> x -> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
forall {m :: * -> *} {a} {c}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Tuple3' (Channel m a a) (Channel m a b) c
-> a -> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
step m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
initial Tuple3' (Channel m x a) (Channel m x b) c -> m c
forall {m :: * -> *} {a} {b} {a}. Monad m => Tuple3' a b a -> m a
extract Tuple3' (Channel m x a) (Channel m x b) c -> m c
forall {m :: * -> *} {a} {b} {a} {b} {b}.
MonadIO m =>
Tuple3' (Channel m a b) (Channel m a b) b -> m b
final
where
getResponse :: Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch1 Channel m a b
ch2 = do
let db1 :: MVar ()
db1 = Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
ch1
let q1 :: IORef ([OutEvent b], Int)
q1 = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
ch1
([OutEvent b]
xs1, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> (([OutEvent b], Int)
-> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([OutEvent b], Int)
q1 ((([OutEvent b], Int)
-> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int))
-> (([OutEvent b], Int)
-> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ \([OutEvent b], Int)
x -> (([],Int
0), ([OutEvent b], Int)
x)
case [OutEvent b]
xs1 of
[] -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db1
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch1 Channel m a b
ch2
OutEvent b
x1 : [] -> do
case OutEvent b
x1 of
FoldException ThreadId
_tid SomeException
ex -> do
Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
ch1
Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
ch2
IO (Either b b) -> m (Either b b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either b b) -> m (Either b b))
-> IO (Either b b) -> m (Either b b)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Either b b)
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
FoldDone ThreadId
_tid b
b -> Either b b -> m (Either b b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either b b
forall a b. a -> Either a b
Left b
b)
FoldPartial b
b -> Either b b -> m (Either b b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either b b
forall a b. b -> Either a b
Right b
b)
[OutEvent b]
_ -> [Char] -> m (Either b b)
forall a. HasCallStack => [Char] -> a
error [Char]
"parTeeWith: not expecting more than one msg in q"
processResponses :: a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses a
ch1 b
ch2 Either a a
r1 Either b b
r2 =
Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c))
-> Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c)
forall a b. (a -> b) -> a -> b
$ case Either a a
r1 of
Left a
b1 -> do
case Either b b
r2 of
Left b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
Right b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
Right a
b1 -> do
case Either b b
r2 of
Left b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
Right b
b2 -> Tuple3' a b c -> Step (Tuple3' a b c) c
forall s b. s -> Step s b
Partial (Tuple3' a b c -> Step (Tuple3' a b c) c)
-> Tuple3' a b c -> Step (Tuple3' a b c) c
forall a b. (a -> b) -> a -> b
$ a -> b -> c -> Tuple3' a b c
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' a
ch1 b
ch2 (a -> b -> c
f a
b1 b
b2)
initial :: m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
initial = do
Channel m x a
ch1 <- (Config -> Config) -> Scanl m x a -> m (Channel m x a)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
cfg Scanl m x a
c1
Channel m x b
ch2 <- (Config -> Config) -> Scanl m x b -> m (Channel m x b)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
cfg Scanl m x b
c2
Either a a
r1 <- Channel m x a -> Channel m x b -> m (Either a a)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m x a
ch1 Channel m x b
ch2
Either b b
r2 <- Channel m x b -> Channel m x a -> m (Either b b)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m x b
ch2 Channel m x a
ch1
Channel m x a
-> Channel m x b
-> Either a a
-> Either b b
-> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
forall {m :: * -> *} {a} {b}.
Monad m =>
a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses Channel m x a
ch1 Channel m x b
ch2 Either a a
r1 Either b b
r2
step :: Tuple3' (Channel m a a) (Channel m a b) c
-> a -> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
step (Tuple3' Channel m a a
ch1 Channel m a b
ch2 c
_) a
x = do
Channel m a a -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a a
ch1 a
x
Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
ch2 a
x
Either a a
r1 <- Channel m a a -> Channel m a b -> m (Either a a)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a a
ch1 Channel m a b
ch2
Either b b
r2 <- Channel m a b -> Channel m a a -> m (Either b b)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch2 Channel m a a
ch1
Channel m a a
-> Channel m a b
-> Either a a
-> Either b b
-> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
forall {m :: * -> *} {a} {b}.
Monad m =>
a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses Channel m a a
ch1 Channel m a b
ch2 Either a a
r1 Either b b
r2
extract :: Tuple3' a b a -> m a
extract (Tuple3' a
_ b
_ a
x) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
final :: Tuple3' (Channel m a b) (Channel m a b) b -> m b
final (Tuple3' Channel m a b
ch1 Channel m a b
ch2 b
x) = do
Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
ch1
Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
ch2
b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
x
{-# ANN type ScanState Fuse #-}
data ScanState s q db f =
ScanInit
| ScanGo s q db [f]
| ScanDrain q db [f]
| ScanStop
{-# INLINE parDistributeScan #-}
parDistributeScan :: MonadAsync m =>
(Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScan :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScan Config -> Config
cfg m [Scanl m a b]
getFolds (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
(State StreamK m [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Stream m [b]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall {m :: * -> *} {a}.
State StreamK m a
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
step ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanInit
where
processOutputs :: [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
events [a]
done = do
case [OutEvent a]
events of
[] -> ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [a]
done)
(OutEvent a
x:[OutEvent a]
xs) ->
case OutEvent a
x of
FoldException ThreadId
_tid SomeException
ex -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a]))
-> IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ([(Channel m a b, ThreadId)], [a])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
FoldDone ThreadId
tid a
b ->
let ch :: [(Channel m a b, ThreadId)]
ch = ((Channel m a b, ThreadId) -> Bool)
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Channel m a b
_, ThreadId
t) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(Channel m a b, ThreadId)]
chans
in [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
ch [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)
FoldPartial a
b ->
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)
collectOutputs :: IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent a], Int)
qref [(Channel m a b, ThreadId)]
chans = do
([OutEvent a]
_, Int
n) <- IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent a], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
[OutEvent a]
r <- (([OutEvent a], Int) -> [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent a], Int) -> [OutEvent a]
forall a b. (a, b) -> a
fst (m ([OutEvent a], Int) -> m [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent a], Int)
qref
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
forall {m :: * -> *} {a} {b} {a}.
MonadIO m =>
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
r []
else ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [])
step :: State StreamK m a
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
step State StreamK m a
_ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanInit = do
IORef ([OutEvent b], Int)
q <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
state IORef ([OutEvent b], Int)
q MVar ()
db [])
step State StreamK m a
gst (ScanGo s
st IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
[Scanl m a b]
fxs <- m [Scanl m a b]
getFolds
[(Channel m a b, ThreadId)]
newChans <- (Scanl m a b -> m (Channel m a b, ThreadId))
-> [Scanl m a b] -> m [(Channel m a b, ThreadId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
Prelude.mapM (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
q MVar ()
db Config -> Config
cfg) [Scanl m a b]
fxs
let allChans :: [(Channel m a b, ThreadId)]
allChans = [(Channel m a b, ThreadId)]
chans [(Channel m a b, ThreadId)]
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. [a] -> [a] -> [a]
++ [(Channel m a b, ThreadId)]
newChans
([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
allChans
Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next <- case Step s a
res of
Yield a
x s
s -> do
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
`sendToWorker_` a
x) (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
Skip s
s -> do
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
Step s a
Stop -> do
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
then Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. s -> Step s a
Skip ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
else Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
step State StreamK m a
_ (ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
chans
case [(Channel m a b, ThreadId)]
running of
[] -> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanStop
[(Channel m a b, ThreadId)]
_ -> do
if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. s -> Step s a
Skip (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
else Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]))
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. a -> s -> Step s a
Yield [b]
outputs (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
step State StreamK m a
_ ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanStop = Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
-> m (Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
(ScanState
s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
[b]
forall s a. Step s a
Stop
{-# ANN type DemuxState Fuse #-}
data DemuxState s q db f =
DemuxInit
| DemuxGo s q db f
| DemuxDrain q db f
| DemuxStop
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Scanl m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Scanl m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan Config -> Config
cfg a -> k
getKey k -> m (Scanl m a b)
getFold (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
(State StreamK m [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Stream m [(k, b)]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall {m :: * -> *} {a}.
State StreamK m a
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
step DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxInit
where
processOutputs :: Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
events [(a, b)]
done = do
case [OutEvent (a, b)]
events of
[] -> (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [(a, b)]
done)
(OutEvent (a, b)
x:[OutEvent (a, b)]
xs) ->
case OutEvent (a, b)
x of
FoldException ThreadId
_tid SomeException
ex -> do
let chans :: [(Channel m a b, ThreadId)]
chans = ((a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId))
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId)
forall a b. (a, b) -> b
snd ([(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)])
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> a -> b
$ Map a (Channel m a b, ThreadId) -> [(a, (Channel m a b, ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map a (Channel m a b, ThreadId)
keyToChan
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
(Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)]))
-> IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Map a (Channel m a b, ThreadId), [(a, b)])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
FoldDone ThreadId
_tid o :: (a, b)
o@(a
k, b
_) ->
let ch :: Map a (Channel m a b, ThreadId)
ch = a
-> Map a (Channel m a b, ThreadId)
-> Map a (Channel m a b, ThreadId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete a
k Map a (Channel m a b, ThreadId)
keyToChan
in Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
ch [OutEvent (a, b)]
xs ((a, b)
o(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)
FoldPartial (a, b)
b ->
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
xs ((a, b)
b(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)
collectOutputs :: IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (a, b)], Int)
qref Map a (Channel m a b, ThreadId)
keyToChan = do
([OutEvent (a, b)]
_, Int
n) <- IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent (a, b)], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
[OutEvent (a, b)]
r <- (([OutEvent (a, b)], Int) -> [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent (a, b)], Int) -> [OutEvent (a, b)]
forall a b. (a, b) -> a
fst (m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent (a, b)], Int)
qref
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall {m :: * -> *} {a} {a} {b} {b}.
(MonadIO m, Ord a) =>
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
r []
else (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [])
step :: State StreamK m a
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
step State StreamK m a
_ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
DemuxInit = do
IORef ([OutEvent (k, b)], Int)
q <- IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int)))
-> IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent (k, b)], Int) -> IO (IORef ([OutEvent (k, b)], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
state IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
forall k a. Map k a
Map.empty)
step State StreamK m a
gst (DemuxGo s
st IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
(Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
next <- case Step s a
res of
Yield a
x s
s -> do
let k :: k
k = a -> k
getKey a
x
(Map k (Channel m a (k, b), ThreadId)
keyToChan2, Channel m a (k, b)
ch) <-
case k
-> Map k (Channel m a (k, b), ThreadId)
-> Maybe (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Channel m a (k, b), ThreadId)
keyToChan1 of
Maybe (Channel m a (k, b), ThreadId)
Nothing -> do
Scanl m a b
fld <- k -> m (Scanl m a b)
getFold k
k
r :: (Channel m a (k, b), ThreadId)
r@(Channel m a (k, b)
chan, ThreadId
_) <- IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a (k, b)
-> m (Channel m a (k, b), ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent (k, b)], Int)
q MVar ()
db Config -> Config
cfg ((b -> (k, b)) -> Scanl m a b -> Scanl m a (k, b)
forall a b. (a -> b) -> Scanl m a a -> Scanl m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k
k,) Scanl m a b
fld)
(Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (k
-> (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k (Channel m a (k, b), ThreadId)
r Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
Just (Channel m a (k, b)
chan, ThreadId
_) -> (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
Channel m a (k, b) -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a (k, b)
ch a
x
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan2
Skip s
s ->
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
Step s a
Stop -> do
let chans :: [Channel m a (k, b)]
chans = ((k, (Channel m a (k, b), ThreadId)) -> Channel m a (k, b))
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Channel m a (k, b), ThreadId) -> Channel m a (k, b)
forall a b. (a, b) -> a
fst ((Channel m a (k, b), ThreadId) -> Channel m a (k, b))
-> ((k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId))
-> (k, (Channel m a (k, b), ThreadId))
-> Channel m a (k, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId)
forall a b. (a, b) -> b
snd) ([(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)])
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> a -> b
$ Map k (Channel m a (k, b), ThreadId)
-> [(k, (Channel m a (k, b), ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map k (Channel m a (k, b), ThreadId)
keyToChan1
(Channel m a (k, b) -> m ()) -> [Channel m a (k, b)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a (k, b) -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize [Channel m a (k, b)]
chans
DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
then Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. s -> Step s a
Skip DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
next
else Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
next
step State StreamK m a
_ (DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
(Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
if Map k (Channel m a (k, b), ThreadId) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Channel m a (k, b), ThreadId)
keyToChan1
then Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxStop
else do
if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. s -> Step s a
Skip (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
else Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
-> Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
step State StreamK m a
_ DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId))
DemuxStop = Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
-> m (Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
(DemuxState
s
(IORef ([OutEvent (k, b)], Int))
(MVar ())
(Map k (Channel m a (k, b), ThreadId)))
[(k, b)]
forall s a. Step s a
Stop