-- |
-- Module      : Streamly.Internal.Data.Fold.Concurrent
-- Copyright   : (c) 2022 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- = Asynchronous Evaluation
--
-- Using 'parBuffered' a fold can be decoupled from the driver and evaluated
-- concurrently with the driver. The driver just pushes an element to the
-- fold's buffer and waits for async evaluation to finish.
--
-- Stages in a fold pipeline can be made concurrent using 'parBuffered'.
--
-- = Concurrent Fold Combinators
--
-- The 'demux' combinator can be made concurrent by using 'parBuffered' on the fold
-- returned by the fold-generating function. Thus, we can fold values for each
-- key in the input stream concurrently.
--
-- Similarly, we can use 'parBuffered' with other cobminators like 'toMap',
-- 'demuxToMap', 'classify', 'tee', 'distribute', 'partition' etc. Basically,
-- any combinator that composes multiple folds or multiple instances of a fold
-- is a good candidate for running folds concurrently.
--
-- = Finalization
--
-- Before a fold returns "done" it has to drain the child folds. For example,
-- consider a "take" operation on a `parBuffered` fold, the take should return as
-- soon as it has taken required number of elements but we have to ensure that
-- any asynchronous child folds finish before it returns. This is achieved by
-- calling the "final" operation of the fold.

-- = TODO
--
-- Use multiple worker folds to fold serial chunks of a stream and collect the
-- results using another fold, combine using a monoid. The results can be
-- collected out-of-order or in-order. This would be easier if each input
-- element is a streamable chunk and each fold consumes one chunk at a time.
-- This is like parConcatMap in streams.
--
-- Concurrent append: if one fold's buffer becomes full then use the next one
-- Concurrent interleave/partition: Round robin to n folds.
-- Concurrent distribute to multiple folds.

module Streamly.Internal.Data.Fold.Concurrent
    (
      parBuffered
    , parLmapM
    , parTeeWith
    , parDistribute
    , parPartition
    , parUnzipWithM
    , parDistributeScan
    , parDemuxScan

    -- Deprecated
    , parEval
    )
where

#include "inline.hs"
#include "deprecation.h"

import Control.Concurrent (newEmptyMVar, takeMVar, throwTo)
import Control.Monad.Catch (throwM)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)

import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Fold as Fold

import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import Control.Concurrent (threadDelay)
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Concurrent as Fold

-------------------------------------------------------------------------------
-- Evaluating a Fold
-------------------------------------------------------------------------------

-- | 'parBuffered' introduces a concurrent stage at the input of the fold. The
-- inputs are asynchronously queued in a buffer and evaluated concurrently with
-- the evaluation of the source stream. On finalization, 'parBuffered' waits for
-- the asynchronous fold to complete before it returns.
--
-- In the following example both the stream and the fold have a 1 second delay,
-- but the delay is not compounded because both run concurrently.
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
-- >>> Stream.fold dst src
-- ...
--
-- Another example:
--
-- >>> Stream.toList $ Stream.groupsOf 4 dst src
-- ...
--
{-# INLINABLE parBuffered #-}
parBuffered, parEval
    :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parBuffered :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
modifier Fold m a b
f =
    (Channel m a b -> a -> m (Step (Channel m a b) b))
-> m (Step (Channel m a b) b)
-> (Channel m a b -> m b)
-> (Channel m a b -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Channel m a b -> a -> m (Step (Channel m a b) b)
forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> a -> m (Step (Channel m a b) b)
step m (Step (Channel m a b) b)
forall {b}. m (Step (Channel m a b) b)
initial Channel m a b -> m b
forall {p} {a}. p -> a
extract Channel m a b -> m b
forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> m b
final

    where

    -- XXX Supply an output channel to the fold. The fold would send the result
    -- from each step (i.e. scan result) to the channel. The Partial and Done
    -- constructors are sent to the channel. We then draw the resulting stream
    -- from that channel. Kind of concurrrent mapping on the stream but with a
    -- fold/scan.
    --
    -- There can also be a model where multiple folds pick input from the same
    -- channel.
    --
    -- We can also run parsers this way. So instead of sending output on each
    -- step it can send once it is done.
    initial :: m (Step (Channel m a b) b)
initial = Channel m a b -> Step (Channel m a b) b
forall s b. s -> Step s b
Partial (Channel m a b -> Step (Channel m a b) b)
-> m (Channel m a b) -> m (Step (Channel m a b) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Config -> Config) -> Fold m a b -> m (Channel m a b)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f

    -- XXX This is not truly asynchronous. If the fold is done we only get to
    -- know when we send the next input unless the stream ends. We could
    -- potentially throw an async exception to the driver to inform it
    -- asynchronously. Alternatively, the stream should not block forever, it
    -- should keep polling the fold status. We can insert a timer tick in the
    -- input stream to do that.
    --
    -- A polled stream abstraction may be useful, it would consist of normal
    -- events and tick events, latter are guaranteed to arrive.
    --
    -- XXX We can use the config to indicate if the fold is a scanning type or
    -- one-shot, or use a separate parBufferedScan for scanning. For a scanning
    -- type fold the worker would always send the intermediate values back to
    -- the driver. An intermediate value can be returned on an input, or the
    -- driver can poll even without input, if we have the Skip input support.
    -- When the buffer is full we can return "Skip" and then the next step
    -- without input can wait for an output to arrive. Similarly, when "final"
    -- is called it can return "Skip" to continue or "Done" to indicate
    -- termination.
    step :: Channel m a b -> a -> m (Step (Channel m a b) b)
step Channel m a b
chan a
a = do
        Maybe b
status <- Channel m a b -> a -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a
        Step (Channel m a b) b -> m (Step (Channel m a b) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Channel m a b) b -> m (Step (Channel m a b) b))
-> Step (Channel m a b) b -> m (Step (Channel m a b) b)
forall a b. (a -> b) -> a -> b
$ case Maybe b
status of
            Maybe b
Nothing -> Channel m a b -> Step (Channel m a b) b
forall s b. s -> Step s b
Partial Channel m a b
chan
            Just b
b -> b -> Step (Channel m a b) b
forall s b. b -> Step s b
Done b
b

    extract :: p -> a
extract p
_ = [Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"Concurrent folds do not support scanning"

    -- XXX depending on the use case we may want to either wait for the result
    -- or cancel the ongoing work. We can use the config to control that?
    -- Currently it waits for the work to complete.
    final :: Channel m a b -> m b
final Channel m a b
chan = do
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
            (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
                (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
                (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
                ChildEvent a
forall a. ChildEvent a
ChildStopChannel
        Maybe b
status <- Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
        case Maybe b
status of
            Maybe b
Nothing -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
                        (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
                        (Channel m a b -> IO [Char]
forall (m :: * -> *) a b. Channel m a b -> IO [Char]
dumpChannel Channel m a b
chan)
                        [Char]
"parBuffered: waiting to drain"
                    (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
                -- XXX remove recursion
                Channel m a b -> m b
final Channel m a b
chan
            Just b
b -> do
                Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
chan
                b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
b
RENAME(parEval,parBuffered)

-- XXX We can have a lconcatMap (unfoldMany) to expand the chunks in the input
-- to streams before folding. This will require an input Skip constructor. In
-- fact, parLmapM can be implemented in terms of this like in streams.

-- | Evaluate the mapped actions concurrently with respect to each other. The
-- results may be unordered or ordered depending on the configuration.
--
-- /Unimplemented/
{-# INLINABLE parLmapM #-}
parLmapM :: -- MonadAsync m =>
    (Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
parLmapM :: forall a (m :: * -> *) b r.
(Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
parLmapM = (Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
forall a. HasCallStack => a
undefined

-- | Execute both the folds in a tee concurrently.
--
-- Definition:
--
-- >>> parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Fold.lmapM delay Fold.sum
-- >>> c2 = Fold.lmapM delay Fold.length
-- >>> dst = Fold.parTeeWith id (,) c1 c2
-- >>> Stream.fold dst src
-- ...
--
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
       (Config -> Config)
    -> (a -> b -> c)
    -> Fold m x a
    -> Fold m x b
    -> Fold m x c
parTeeWith :: forall (m :: * -> *) a b c x.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
parTeeWith Config -> Config
cfg a -> b -> c
f Fold m x a
c1 Fold m x b
c2 = (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
Fold.teeWith a -> b -> c
f ((Config -> Config) -> Fold m x a -> Fold m x a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m x a
c1) ((Config -> Config) -> Fold m x b -> Fold m x b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m x b
c2)

-- | Distribute the input to all the folds in the supplied list concurrently.
--
-- Definition:
--
-- >>> parDistribute cfg = Fold.distribute . fmap (Fold.parBuffered cfg)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c = Fold.lmapM delay Fold.sum
-- >>> dst = Fold.parDistribute id [c,c,c]
-- >>> Stream.fold dst src
-- ...
--
{-# INLINABLE parDistribute #-}
parDistribute :: MonadAsync m =>
    (Config -> Config) -> [Fold m a b] -> Fold m a [b]
parDistribute :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> [Fold m a b] -> Fold m a [b]
parDistribute Config -> Config
cfg = [Fold m a b] -> Fold m a [b]
forall (m :: * -> *) a b. Monad m => [Fold m a b] -> Fold m a [b]
Fold.distribute ([Fold m a b] -> Fold m a [b])
-> ([Fold m a b] -> [Fold m a b]) -> [Fold m a b] -> Fold m a [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Fold m a b -> Fold m a b) -> [Fold m a b] -> [Fold m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Config -> Config) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg)

-- | Select first fold for Left input and second for Right input. Both folds
-- run concurrently.
--
-- Definition
--
-- >>> parPartition cfg c1 c2 = Fold.partition (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Fold.lmapM delay Fold.sum
-- >>> c2 = Fold.lmapM delay Fold.sum
-- >>> dst = Fold.parPartition id c1 c2
-- >>> Stream.fold dst $ (fmap (\x -> if even x then Left x else Right x)) src
-- ...
--
{-# INLINABLE parPartition #-}
parPartition :: MonadAsync m =>
    (Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
parPartition :: forall (m :: * -> *) b x c y.
MonadAsync m =>
(Config -> Config)
-> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
parPartition Config -> Config
cfg Fold m b x
c1 Fold m c y
c2 = Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
forall (m :: * -> *) b x c y.
Monad m =>
Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
Fold.partition ((Config -> Config) -> Fold m b x -> Fold m b x
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m b x
c1) ((Config -> Config) -> Fold m c y -> Fold m c y
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m c y
c2)

-- | Split and distribute the output to two different folds and then zip the
-- results. Both the consumer folds run concurrently.
--
-- Definition
--
-- >>> parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Fold.lmapM delay Fold.sum
-- >>> c2 = Fold.lmapM delay Fold.sum
-- >>> dst = Fold.parUnzipWithM id (pure . id) c1 c2
-- >>> Stream.fold dst $ (fmap (\x -> (x, x* x))) src
-- ...
--
{-# INLINABLE parUnzipWithM #-}
parUnzipWithM :: MonadAsync m
    => (Config -> Config) -> (a -> m (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y)
parUnzipWithM :: forall (m :: * -> *) a b c x y.
MonadAsync m =>
(Config -> Config)
-> (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
parUnzipWithM Config -> Config
cfg a -> m (b, c)
f Fold m b x
c1 Fold m c y
c2 = (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
forall (m :: * -> *) a b c x y.
Monad m =>
(a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
Fold.unzipWithM a -> m (b, c)
f ((Config -> Config) -> Fold m b x -> Fold m b x
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m b x
c1) ((Config -> Config) -> Fold m c y -> Fold m c y
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parBuffered Config -> Config
cfg Fold m c y
c2)

-- There are two ways to implement a concurrent scan.
--
-- 1. Make the scan itself asynchronous, add the input to the queue, and then
-- extract the output. Extraction will have to be asynchronous, which will
-- require changes to the scan driver. This will require a different Scanl
-- type.
--
-- 2. A monolithic implementation of concurrent Stream->Stream scan, using a
-- custom implementation of the scan and the driver.

{-# ANN type ScanState Fuse #-}
data ScanState s q db f =
      ScanInit
    | ScanGo s q db [f]
    | ScanDrain q db [f]
    | ScanStop

-- XXX return [b] or just b?
-- XXX We can use a one way mailbox type abstraction instead of using an IORef
-- for adding new folds dynamically.

-- | Evaluate a stream and send its outputs to zero or more dynamically
-- generated folds. It checks for any new folds at each input generation step.
-- Any new fold is added to the list of folds which are currently running. If
-- there are no folds available, the input is discarded. If a fold completes
-- its output is emitted in the output of the scan.
--
-- >>> import Data.IORef
-- >>> ref <- newIORef [Fold.take 2 Fold.sum, Fold.take 2 Fold.length :: Fold.Fold IO Int Int]
-- >>> gen = atomicModifyIORef ref (\xs -> ([], xs))
-- >>> Stream.toList $ Fold.parDistributeScan id gen (Stream.enumerateFromTo 1 10)
-- ...
--
{-# INLINE parDistributeScan #-}
parDistributeScan :: MonadAsync m =>
    (Config -> Config) -> m [Fold m a b] -> Stream m a -> Stream m [b]
parDistributeScan :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> m [Fold m a b] -> Stream m a -> Stream m [b]
parDistributeScan Config -> Config
cfg m [Fold m a b]
getFolds (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    (State StreamK m [b]
 -> ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Stream m [b]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall {m :: * -> *} {a}.
State StreamK m a
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
step ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanInit

    where

    -- XXX can be written as a fold
    processOutputs :: [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
events [a]
done = do
        case [OutEvent a]
events of
            [] -> ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [a]
done)
            (OutEvent a
x:[OutEvent a]
xs) ->
                case OutEvent a
x of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX report the fold that threw the exception
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
                        (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
                        IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([(Channel m a b, ThreadId)], [a])
 -> m ([(Channel m a b, ThreadId)], [a]))
-> IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ([(Channel m a b, ThreadId)], [a])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
tid a
b ->
                        let ch :: [(Channel m a b, ThreadId)]
ch = ((Channel m a b, ThreadId) -> Bool)
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Channel m a b
_, ThreadId
t) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(Channel m a b, ThreadId)]
chans
                         in [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
ch [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)
                    FoldPartial a
_ -> m ([(Channel m a b, ThreadId)], [a])
forall a. HasCallStack => a
undefined

    collectOutputs :: IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent a], Int)
qref [(Channel m a b, ThreadId)]
chans = do
        ([OutEvent a]
_, Int
n) <- IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent a], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then do
            [OutEvent a]
r <- (([OutEvent a], Int) -> [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent a], Int) -> [OutEvent a]
forall a b. (a, b) -> a
fst (m ([OutEvent a], Int) -> m [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent a], Int)
qref
            [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
forall {m :: * -> *} {a} {b} {a}.
MonadIO m =>
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
r []
        else ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [])

    step :: State StreamK m a
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
step State StreamK m a
_ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanInit = do
        IORef ([OutEvent b], Int)
q <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
        MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
state IORef ([OutEvent b], Int)
q MVar ()
db [])

    step State StreamK m a
gst (ScanGo s
st IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
        -- merge any new channels added since last input
        [Fold m a b]
fxs <- m [Fold m a b]
getFolds
        [(Channel m a b, ThreadId)]
newChans <- (Fold m a b -> m (Channel m a b, ThreadId))
-> [Fold m a b] -> m [(Channel m a b, ThreadId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
Prelude.mapM (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
q MVar ()
db Config -> Config
cfg) [Fold m a b]
fxs
        let allChans :: [(Channel m a b, ThreadId)]
allChans = [(Channel m a b, ThreadId)]
chans [(Channel m a b, ThreadId)]
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. [a] -> [a] -> [a]
++ [(Channel m a b, ThreadId)]
newChans

        -- Collect outputs from running channels
        ([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
allChans

        -- Send input to running folds
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next <- case Step s a
res of
            Yield a
x s
s -> do
                -- XXX We might block forever if some folds are already
                -- done but we have not read the output queue yet. To
                -- avoid that we have to either (1) precheck if space
                -- is available in the input queues of all folds so
                -- that this does not block, or (2) we have to use a
                -- non-blocking read and track progress so that we can
                -- restart from where we left.
                --
                -- If there is no space available then we should block
                -- on doorbell db or inputSpaceDoorBell of the relevant
                -- channel. To avoid deadlock the output space can be
                -- kept unlimited. However, the blocking will delay the
                -- processing of outputs. We should yield the outputs
                -- before blocking.
                (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
`sendToWorker_` a
x) (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
            Skip s
s -> do
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
            Step s a
Stop -> do
                (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
        if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
        then Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
        else Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
    step State StreamK m a
_ (ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
        ([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
chans
        case [(Channel m a b, ThreadId)]
running of
            [] -> Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanStop
            [(Channel m a b, ThreadId)]
_ -> do
                if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
                then do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
                    Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
                else Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
    step State StreamK m a
_ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanStop = Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
forall s a. Step s a
Stop

{-# ANN type DemuxState Fuse #-}
data DemuxState s q db f =
      DemuxInit
    | DemuxGo s q db f
    | DemuxDrain q db f
    | DemuxStop

-- XXX We need to either (1) remember a key when done so that we do not add the
-- fold again because some inputs would be lost in between, or (2) have a
-- FoldYield constructor to yield repeatedly so that we can restart the
-- existing fold itself when it is done. But in that case we cannot change the
-- fold once it is started. Also the Map would keep on increasing in size as we
-- never delete a key. Whatever we do we should keep the non-concurrent fold as
-- well consistent with that.

-- | Evaluate a stream and send its outputs to the selected fold. The fold is
-- dynamically selected using a key at the time of the first input seen for
-- that key. Any new fold is added to the list of folds which are currently
-- running. If there are no folds available for a given key, the input is
-- discarded. If a fold completes its output is emitted in the output of the
-- scan.
--
-- >>> import qualified Data.Map.Strict as Map
-- >>> import Data.Maybe (fromJust)
-- >>> f1 = ("even", Fold.take 2 Fold.sum)
-- >>> f2 = ("odd", Fold.take 2 Fold.sum)
-- >>> kv = Map.fromList [f1, f2]
-- >>> getFold k = return (fromJust $ Map.lookup k kv)
-- >>> getKey x = if even x then "even" else "odd"
-- >>> input = Stream.enumerateFromTo 1 10
-- >>> Stream.toList $ Fold.parDemuxScan id getKey getFold input
-- ...
--
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
       (Config -> Config)
    -> (a -> k)
    -> (k -> m (Fold m a b))
    -> Stream m a
    -> Stream m [(k, b)]
parDemuxScan :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Fold m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan Config -> Config
cfg a -> k
getKey k -> m (Fold m a b)
getFold (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    (State StreamK m [(k, b)]
 -> DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId))
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Stream m [(k, b)]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall {m :: * -> *} {a}.
State StreamK m a
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
step DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxInit

    where

    -- XXX can be written as a fold
    processOutputs :: Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
events [(a, b)]
done = do
        case [OutEvent (a, b)]
events of
            [] -> (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [(a, b)]
done)
            (OutEvent (a, b)
x:[OutEvent (a, b)]
xs) ->
                case OutEvent (a, b)
x of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX report the fold that threw the exception
                        let chans :: [(Channel m a b, ThreadId)]
chans = ((a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId))
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId)
forall a b. (a, b) -> b
snd ([(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)])
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> a -> b
$ Map a (Channel m a b, ThreadId) -> [(a, (Channel m a b, ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map a (Channel m a b, ThreadId)
keyToChan
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
                        (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
                        IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map a (Channel m a b, ThreadId), [(a, b)])
 -> m (Map a (Channel m a b, ThreadId), [(a, b)]))
-> IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Map a (Channel m a b, ThreadId), [(a, b)])
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
_tid o :: (a, b)
o@(a
k, b
_) ->
                        let ch :: Map a (Channel m a b, ThreadId)
ch = a
-> Map a (Channel m a b, ThreadId)
-> Map a (Channel m a b, ThreadId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete a
k Map a (Channel m a b, ThreadId)
keyToChan
                         in Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
ch [OutEvent (a, b)]
xs ((a, b)
o(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)
                    FoldPartial (a, b)
_ -> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. HasCallStack => a
undefined

    collectOutputs :: IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (a, b)], Int)
qref Map a (Channel m a b, ThreadId)
keyToChan = do
        ([OutEvent (a, b)]
_, Int
n) <- IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent (a, b)], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then do
            [OutEvent (a, b)]
r <- (([OutEvent (a, b)], Int) -> [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent (a, b)], Int) -> [OutEvent (a, b)]
forall a b. (a, b) -> a
fst (m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent (a, b)], Int)
qref
            Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall {m :: * -> *} {a} {a} {b} {b}.
(MonadIO m, Ord a) =>
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
r []
        else (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [])

    step :: State StreamK m a
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
step State StreamK m a
_ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
DemuxInit = do
        IORef ([OutEvent (k, b)], Int)
q <- IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent (k, b)], Int))
 -> m (IORef ([OutEvent (k, b)], Int)))
-> IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent (k, b)], Int) -> IO (IORef ([OutEvent (k, b)], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
        MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
state IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
forall k a. Map k a
Map.empty)

    step State StreamK m a
gst (DemuxGo s
st IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
        -- Collect outputs from running channels
        (Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan

        -- Send input to the selected fold
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st

        DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next <- case Step s a
res of
            Yield a
x s
s -> do
                -- XXX If the fold for a particular key is done and we see that
                -- key again. If we have not yet collected the done event we
                -- cannot restart the fold because the previous key is already
                -- installed. Thererfore, restarting the fold for the same key
                -- fraught with races.
                let k :: k
k = a -> k
getKey a
x
                (Map k (Channel m a (k, b), ThreadId)
keyToChan2, Channel m a (k, b)
ch) <-
                    case k
-> Map k (Channel m a (k, b), ThreadId)
-> Maybe (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Channel m a (k, b), ThreadId)
keyToChan1 of
                        Maybe (Channel m a (k, b), ThreadId)
Nothing -> do
                            Fold m a b
fld <- k -> m (Fold m a b)
getFold k
k
                            r :: (Channel m a (k, b), ThreadId)
r@(Channel m a (k, b)
chan, ThreadId
_) <- IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a (k, b)
-> m (Channel m a (k, b), ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent (k, b)], Int)
q MVar ()
db Config -> Config
cfg ((b -> (k, b)) -> Fold m a b -> Fold m a (k, b)
forall a b. (a -> b) -> Fold m a a -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k
k,) Fold m a b
fld)
                            (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (k
-> (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k (Channel m a (k, b), ThreadId)
r Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
                        Just (Channel m a (k, b)
chan, ThreadId
_) -> (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
                -- XXX We might block forever if some folds are already
                -- done but we have not read the output queue yet. To
                -- avoid that we have to either (1) precheck if space
                -- is available in the input queues of all folds so
                -- that this does not block, or (2) we have to use a
                -- non-blocking read and track progress so that we can
                -- restart from where we left.
                --
                -- If there is no space available then we should block
                -- on doorbell db or inputSpaceDoorBell of the relevant
                -- channel. To avoid deadlock the output space can be
                -- kept unlimited. However, the blocking will delay the
                -- processing of outputs. We should yield the outputs
                -- before blocking.
                Channel m a (k, b) -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a (k, b)
ch a
x
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan2
            Skip s
s ->
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
            Step s a
Stop -> do
                let chans :: [Channel m a (k, b)]
chans = ((k, (Channel m a (k, b), ThreadId)) -> Channel m a (k, b))
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Channel m a (k, b), ThreadId) -> Channel m a (k, b)
forall a b. (a, b) -> a
fst ((Channel m a (k, b), ThreadId) -> Channel m a (k, b))
-> ((k, (Channel m a (k, b), ThreadId))
    -> (Channel m a (k, b), ThreadId))
-> (k, (Channel m a (k, b), ThreadId))
-> Channel m a (k, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId)
forall a b. (a, b) -> b
snd) ([(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)])
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> a -> b
$ Map k (Channel m a (k, b), ThreadId)
-> [(k, (Channel m a (k, b), ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map k (Channel m a (k, b), ThreadId)
keyToChan1
                (Channel m a (k, b) -> m ()) -> [Channel m a (k, b)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a (k, b) -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize [Channel m a (k, b)]
chans
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
        if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
        then Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next
        else Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next
    step State StreamK m a
_ (DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
        (Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
        if Map k (Channel m a (k, b), ThreadId) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Channel m a (k, b), ThreadId)
keyToChan1
        -- XXX null outputs case
        then Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxStop
        else do
            if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
            then do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
                Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
            else Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
    step State StreamK m a
_ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
DemuxStop = Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
forall s a. Step s a
Stop