-- |
-- Module      : Streamly.Internal.Data.Scanl.Concurrent
-- Copyright   : (c) 2024 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Scanl.Concurrent
    (
      parTeeWith
    , parDistributeScan
    , parDemuxScan
    )
where

#include "inline.hs"

import Control.Concurrent (newEmptyMVar, takeMVar, throwTo)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Fold (Step (..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))

import qualified Data.Map.Strict as Map

import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import Control.Concurrent (threadDelay)
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Scanl as Scanl
-- >>> import qualified Streamly.Internal.Data.Scanl.Concurrent as Scanl

-------------------------------------------------------------------------------
-- Concurrent scans
-------------------------------------------------------------------------------

-- | Execute both the scans in a tee concurrently.
--
-- Example:
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Scanl.lmapM delay Scanl.sum
-- >>> c2 = Scanl.lmapM delay Scanl.length
-- >>> dst = Scanl.parTeeWith id (,) c1 c2
-- >>> Stream.toList $ Stream.scanl dst src
-- ...
--
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
       (Config -> Config)
    -> (a -> b -> c)
    -> Scanl m x a
    -> Scanl m x b
    -> Scanl m x c
parTeeWith :: forall (m :: * -> *) a b c x.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c
parTeeWith Config -> Config
cfg a -> b -> c
f Scanl m x a
c1 Scanl m x b
c2 = (Tuple3' (Channel m x a) (Channel m x b) c
 -> x -> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c))
-> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
-> (Tuple3' (Channel m x a) (Channel m x b) c -> m c)
-> (Tuple3' (Channel m x a) (Channel m x b) c -> m c)
-> Scanl m x c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl Tuple3' (Channel m x a) (Channel m x b) c
-> x -> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
forall {m :: * -> *} {a} {c}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Tuple3' (Channel m a a) (Channel m a b) c
-> a -> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
step m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
initial Tuple3' (Channel m x a) (Channel m x b) c -> m c
forall {m :: * -> *} {a} {b} {a}. Monad m => Tuple3' a b a -> m a
extract Tuple3' (Channel m x a) (Channel m x b) c -> m c
forall {m :: * -> *} {a} {b} {a} {b} {b}.
MonadIO m =>
Tuple3' (Channel m a b) (Channel m a b) b -> m b
final

    where

    getResponse :: Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch1 Channel m a b
ch2 = do
        -- NOTE: We do not need a queue and doorbell mechanism for this, a single
        -- MVar should be enough. Also, there is only one writer and it writes
        -- only once before we read it.
        let db1 :: MVar ()
db1 = Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
ch1
        let q1 :: IORef ([OutEvent b], Int)
q1 = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
ch1
        ([OutEvent b]
xs1, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> (([OutEvent b], Int)
    -> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([OutEvent b], Int)
q1 ((([OutEvent b], Int)
  -> (([OutEvent b], Int), ([OutEvent b], Int)))
 -> IO ([OutEvent b], Int))
-> (([OutEvent b], Int)
    -> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ \([OutEvent b], Int)
x -> (([],Int
0), ([OutEvent b], Int)
x)
        case [OutEvent b]
xs1 of
            [] -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db1
                Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch1 Channel m a b
ch2
            OutEvent b
x1 : [] -> do
                case OutEvent b
x1 of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX
                        -- liftIO $ throwTo ch2Tid ThreadAbort
                        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
ch1
                        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
ch2
                        IO (Either b b) -> m (Either b b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either b b) -> m (Either b b))
-> IO (Either b b) -> m (Either b b)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Either b b)
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
_tid b
b -> Either b b -> m (Either b b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either b b
forall a b. a -> Either a b
Left b
b)
                    FoldPartial b
b -> Either b b -> m (Either b b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either b b
forall a b. b -> Either a b
Right b
b)
            [OutEvent b]
_ -> [Char] -> m (Either b b)
forall a. HasCallStack => [Char] -> a
error [Char]
"parTeeWith: not expecting more than one msg in q"

    processResponses :: a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses a
ch1 b
ch2 Either a a
r1 Either b b
r2 =
        Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c))
-> Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c)
forall a b. (a -> b) -> a -> b
$ case Either a a
r1 of
            Left a
b1 -> do
                case Either b b
r2 of
                    Left b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
                    Right b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
            Right a
b1 -> do
                case Either b b
r2 of
                    Left b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
                    Right b
b2 -> Tuple3' a b c -> Step (Tuple3' a b c) c
forall s b. s -> Step s b
Partial (Tuple3' a b c -> Step (Tuple3' a b c) c)
-> Tuple3' a b c -> Step (Tuple3' a b c) c
forall a b. (a -> b) -> a -> b
$ a -> b -> c -> Tuple3' a b c
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' a
ch1 b
ch2 (a -> b -> c
f a
b1 b
b2)

    initial :: m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
initial = do
        Channel m x a
ch1 <- (Config -> Config) -> Scanl m x a -> m (Channel m x a)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
cfg Scanl m x a
c1
        Channel m x b
ch2 <- (Config -> Config) -> Scanl m x b -> m (Channel m x b)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
cfg Scanl m x b
c2
        Either a a
r1 <- Channel m x a -> Channel m x b -> m (Either a a)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m x a
ch1 Channel m x b
ch2
        Either b b
r2 <- Channel m x b -> Channel m x a -> m (Either b b)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m x b
ch2 Channel m x a
ch1
        Channel m x a
-> Channel m x b
-> Either a a
-> Either b b
-> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
forall {m :: * -> *} {a} {b}.
Monad m =>
a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses Channel m x a
ch1 Channel m x b
ch2 Either a a
r1 Either b b
r2

    step :: Tuple3' (Channel m a a) (Channel m a b) c
-> a -> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
step (Tuple3' Channel m a a
ch1 Channel m a b
ch2 c
_) a
x = do
        Channel m a a -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a a
ch1 a
x
        Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
ch2 a
x
        Either a a
r1 <- Channel m a a -> Channel m a b -> m (Either a a)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a a
ch1 Channel m a b
ch2
        Either b b
r2 <- Channel m a b -> Channel m a a -> m (Either b b)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch2 Channel m a a
ch1
        Channel m a a
-> Channel m a b
-> Either a a
-> Either b b
-> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
forall {m :: * -> *} {a} {b}.
Monad m =>
a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses Channel m a a
ch1 Channel m a b
ch2 Either a a
r1 Either b b
r2

    extract :: Tuple3' a b a -> m a
extract (Tuple3' a
_ b
_ a
x) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

    final :: Tuple3' (Channel m a b) (Channel m a b) b -> m b
final (Tuple3' Channel m a b
ch1 Channel m a b
ch2 b
x) = do
        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
ch1
        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
ch2
        -- XXX generate the final value?
        b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- There are two ways to implement a concurrent scan.
--
-- 1. Make the scan itself asynchronous, add the input to the queue, and then
-- extract the output. Extraction will have to be asynchronous, which will
-- require changes to the scan driver. This will require a different Scanl
-- type.
--
-- 2. A monolithic implementation of concurrent Stream->Stream scan, using a
-- custom implementation of the scan and the driver.

{-# ANN type ScanState Fuse #-}
data ScanState s q db f =
      ScanInit
    | ScanGo s q db [f]
    | ScanDrain q db [f]
    | ScanStop

-- XXX return [b] or just b?
-- XXX We can use a one way mailbox type abstraction instead of using an IORef
-- for adding new folds dynamically.

-- | Evaluate a stream and scan its outputs using zero or more dynamically
-- generated parallel scans. It checks for any new folds at each input
-- generation step. Any new fold is added to the list of folds which are
-- currently running. If there are no folds available, the input is discarded.
-- If a fold completes its output is emitted in the output of the scan. The
-- outputs of the parallel scans are merged in the output stream.
--
-- >>> import Data.IORef
-- >>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
-- >>> gen = atomicModifyIORef ref (\xs -> ([], xs))
-- >>> Stream.toList $ Scanl.parDistributeScan id gen (Stream.enumerateFromTo 1 10)
-- ...
--
{-# INLINE parDistributeScan #-}
parDistributeScan :: MonadAsync m =>
    (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScan :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScan Config -> Config
cfg m [Scanl m a b]
getFolds (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    (State StreamK m [b]
 -> ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Stream m [b]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall {m :: * -> *} {a}.
State StreamK m a
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
step ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanInit

    where

    -- XXX can be written as a fold
    processOutputs :: [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
events [a]
done = do
        case [OutEvent a]
events of
            [] -> ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [a]
done)
            (OutEvent a
x:[OutEvent a]
xs) ->
                case OutEvent a
x of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX report the fold that threw the exception
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
                        (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
                        IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([(Channel m a b, ThreadId)], [a])
 -> m ([(Channel m a b, ThreadId)], [a]))
-> IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ([(Channel m a b, ThreadId)], [a])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
tid a
b ->
                        let ch :: [(Channel m a b, ThreadId)]
ch = ((Channel m a b, ThreadId) -> Bool)
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Channel m a b
_, ThreadId
t) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(Channel m a b, ThreadId)]
chans
                         in [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
ch [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)
                    FoldPartial a
b ->
                         [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)

    collectOutputs :: IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent a], Int)
qref [(Channel m a b, ThreadId)]
chans = do
        ([OutEvent a]
_, Int
n) <- IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent a], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then do
            [OutEvent a]
r <- (([OutEvent a], Int) -> [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent a], Int) -> [OutEvent a]
forall a b. (a, b) -> a
fst (m ([OutEvent a], Int) -> m [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent a], Int)
qref
            [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
forall {m :: * -> *} {a} {b} {a}.
MonadIO m =>
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
r []
        else ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [])

    step :: State StreamK m a
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
step State StreamK m a
_ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanInit = do
        IORef ([OutEvent b], Int)
q <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
        MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
state IORef ([OutEvent b], Int)
q MVar ()
db [])

    step State StreamK m a
gst (ScanGo s
st IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
        -- merge any new channels added since last input
        [Scanl m a b]
fxs <- m [Scanl m a b]
getFolds
        [(Channel m a b, ThreadId)]
newChans <- (Scanl m a b -> m (Channel m a b, ThreadId))
-> [Scanl m a b] -> m [(Channel m a b, ThreadId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
Prelude.mapM (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
q MVar ()
db Config -> Config
cfg) [Scanl m a b]
fxs
        let allChans :: [(Channel m a b, ThreadId)]
allChans = [(Channel m a b, ThreadId)]
chans [(Channel m a b, ThreadId)]
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. [a] -> [a] -> [a]
++ [(Channel m a b, ThreadId)]
newChans

        -- Collect outputs from running channels
        ([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
allChans

        -- Send input to running folds
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next <- case Step s a
res of
            Yield a
x s
s -> do
                -- XXX We might block forever if some folds are already
                -- done but we have not read the output queue yet. To
                -- avoid that we have to either (1) precheck if space
                -- is available in the input queues of all folds so
                -- that this does not block, or (2) we have to use a
                -- non-blocking read and track progress so that we can
                -- restart from where we left.
                --
                -- If there is no space available then we should block
                -- on doorbell db or inputSpaceDoorBell of the relevant
                -- channel. To avoid deadlock the output space can be
                -- kept unlimited. However, the blocking will delay the
                -- processing of outputs. We should yield the outputs
                -- before blocking.
                (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
`sendToWorker_` a
x) (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
            Skip s
s -> do
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
            Step s a
Stop -> do
                (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
        if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
        then Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
        else Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
    step State StreamK m a
_ (ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
        ([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
chans
        case [(Channel m a b, ThreadId)]
running of
            [] -> Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanStop
            [(Channel m a b, ThreadId)]
_ -> do
                if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
                then do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
                    Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
                else Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
    step State StreamK m a
_ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanStop = Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
forall s a. Step s a
Stop

{-# ANN type DemuxState Fuse #-}
data DemuxState s q db f =
      DemuxInit
    | DemuxGo s q db f
    | DemuxDrain q db f
    | DemuxStop

-- XXX We need to either (1) remember a key when done so that we do not add the
-- fold again because some inputs would be lost in between, or (2) have a
-- FoldYield constructor to yield repeatedly so that we can restart the
-- existing fold itself when it is done. But in that case we cannot change the
-- fold once it is started. Also the Map would keep on increasing in size as we
-- never delete a key. Whatever we do we should keep the non-concurrent fold as
-- well consistent with that.

-- | Evaluate a stream and send its outputs to the selected scan. The scan is
-- dynamically selected using a key at the time of the first input seen for
-- that key. Any new scan is added to the list of scans which are currently
-- running. If there are no scans available for a given key, the input is
-- discarded. If a constituent scan completes its output is emitted in the
-- output of the composed scan.
--
-- >>> import qualified Data.Map.Strict as Map
-- >>> import Data.Maybe (fromJust)
-- >>> f1 = ("even", Scanl.take 5 Scanl.sum)
-- >>> f2 = ("odd", Scanl.take 5 Scanl.sum)
-- >>> kv = Map.fromList [f1, f2]
-- >>> getScan k = return (fromJust $ Map.lookup k kv)
-- >>> getKey x = if even x then "even" else "odd"
-- >>> input = Stream.enumerateFromTo 1 10
-- >>> Stream.toList $ Scanl.parDemuxScan id getKey getScan input
-- ...
--
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
       (Config -> Config)
    -> (a -> k)
    -> (k -> m (Scanl m a b))
    -> Stream m a
    -> Stream m [(k, b)]
parDemuxScan :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Scanl m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan Config -> Config
cfg a -> k
getKey k -> m (Scanl m a b)
getFold (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    (State StreamK m [(k, b)]
 -> DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId))
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Stream m [(k, b)]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall {m :: * -> *} {a}.
State StreamK m a
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
step DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxInit

    where

    -- XXX can be written as a fold
    processOutputs :: Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
events [(a, b)]
done = do
        case [OutEvent (a, b)]
events of
            [] -> (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [(a, b)]
done)
            (OutEvent (a, b)
x:[OutEvent (a, b)]
xs) ->
                case OutEvent (a, b)
x of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX report the fold that threw the exception
                        let chans :: [(Channel m a b, ThreadId)]
chans = ((a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId))
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId)
forall a b. (a, b) -> b
snd ([(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)])
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> a -> b
$ Map a (Channel m a b, ThreadId) -> [(a, (Channel m a b, ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map a (Channel m a b, ThreadId)
keyToChan
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
                        (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
                        IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map a (Channel m a b, ThreadId), [(a, b)])
 -> m (Map a (Channel m a b, ThreadId), [(a, b)]))
-> IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Map a (Channel m a b, ThreadId), [(a, b)])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
_tid o :: (a, b)
o@(a
k, b
_) ->
                        let ch :: Map a (Channel m a b, ThreadId)
ch = a
-> Map a (Channel m a b, ThreadId)
-> Map a (Channel m a b, ThreadId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete a
k Map a (Channel m a b, ThreadId)
keyToChan
                         in Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
ch [OutEvent (a, b)]
xs ((a, b)
o(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)
                    FoldPartial (a, b)
b ->
                         Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
xs ((a, b)
b(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)

    collectOutputs :: IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (a, b)], Int)
qref Map a (Channel m a b, ThreadId)
keyToChan = do
        ([OutEvent (a, b)]
_, Int
n) <- IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent (a, b)], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then do
            [OutEvent (a, b)]
r <- (([OutEvent (a, b)], Int) -> [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent (a, b)], Int) -> [OutEvent (a, b)]
forall a b. (a, b) -> a
fst (m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent (a, b)], Int)
qref
            Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall {m :: * -> *} {a} {a} {b} {b}.
(MonadIO m, Ord a) =>
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
r []
        else (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [])

    step :: State StreamK m a
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
step State StreamK m a
_ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
DemuxInit = do
        IORef ([OutEvent (k, b)], Int)
q <- IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent (k, b)], Int))
 -> m (IORef ([OutEvent (k, b)], Int)))
-> IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent (k, b)], Int) -> IO (IORef ([OutEvent (k, b)], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
        MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
state IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
forall k a. Map k a
Map.empty)

    step State StreamK m a
gst (DemuxGo s
st IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
        -- Collect outputs from running channels
        (Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan

        -- Send input to the selected fold
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st

        DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next <- case Step s a
res of
            Yield a
x s
s -> do
                -- XXX If the fold for a particular key is done and we see that
                -- key again. If we have not yet collected the done event we
                -- cannot restart the fold because the previous key is already
                -- installed. Thererfore, restarting the fold for the same key
                -- fraught with races.
                let k :: k
k = a -> k
getKey a
x
                (Map k (Channel m a (k, b), ThreadId)
keyToChan2, Channel m a (k, b)
ch) <-
                    case k
-> Map k (Channel m a (k, b), ThreadId)
-> Maybe (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Channel m a (k, b), ThreadId)
keyToChan1 of
                        Maybe (Channel m a (k, b), ThreadId)
Nothing -> do
                            Scanl m a b
fld <- k -> m (Scanl m a b)
getFold k
k
                            r :: (Channel m a (k, b), ThreadId)
r@(Channel m a (k, b)
chan, ThreadId
_) <- IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a (k, b)
-> m (Channel m a (k, b), ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent (k, b)], Int)
q MVar ()
db Config -> Config
cfg ((b -> (k, b)) -> Scanl m a b -> Scanl m a (k, b)
forall a b. (a -> b) -> Scanl m a a -> Scanl m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k
k,) Scanl m a b
fld)
                            (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (k
-> (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k (Channel m a (k, b), ThreadId)
r Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
                        Just (Channel m a (k, b)
chan, ThreadId
_) -> (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
                -- XXX We might block forever if some folds are already
                -- done but we have not read the output queue yet. To
                -- avoid that we have to either (1) precheck if space
                -- is available in the input queues of all folds so
                -- that this does not block, or (2) we have to use a
                -- non-blocking read and track progress so that we can
                -- restart from where we left.
                --
                -- If there is no space available then we should block
                -- on doorbell db or inputSpaceDoorBell of the relevant
                -- channel. To avoid deadlock the output space can be
                -- kept unlimited. However, the blocking will delay the
                -- processing of outputs. We should yield the outputs
                -- before blocking.
                Channel m a (k, b) -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a (k, b)
ch a
x
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan2
            Skip s
s ->
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
            Step s a
Stop -> do
                let chans :: [Channel m a (k, b)]
chans = ((k, (Channel m a (k, b), ThreadId)) -> Channel m a (k, b))
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Channel m a (k, b), ThreadId) -> Channel m a (k, b)
forall a b. (a, b) -> a
fst ((Channel m a (k, b), ThreadId) -> Channel m a (k, b))
-> ((k, (Channel m a (k, b), ThreadId))
    -> (Channel m a (k, b), ThreadId))
-> (k, (Channel m a (k, b), ThreadId))
-> Channel m a (k, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId)
forall a b. (a, b) -> b
snd) ([(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)])
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> a -> b
$ Map k (Channel m a (k, b), ThreadId)
-> [(k, (Channel m a (k, b), ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map k (Channel m a (k, b), ThreadId)
keyToChan1
                (Channel m a (k, b) -> m ()) -> [Channel m a (k, b)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a (k, b) -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize [Channel m a (k, b)]
chans
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
        if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
        then Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next
        else Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next
    step State StreamK m a
_ (DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
        (Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
        if Map k (Channel m a (k, b), ThreadId) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Channel m a (k, b), ThreadId)
keyToChan1
        -- XXX null outputs case
        then Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxStop
        else do
            if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
            then do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
                Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
            else Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
    step State StreamK m a
_ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
DemuxStop = Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
forall s a. Step s a
Stop