-- |
-- Module      : Streamly.Internal.Data.Fold.Channel.Type
-- Copyright   : (c) 2017, 2022 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Fold.Channel.Type
    (
    -- ** Type
      Channel (..)
    , OutEvent (..)

    -- ** Configuration
    , Config
    , maxBuffer
    , boundThreads
    , inspect

    -- ** Operations
    , newChannelWith
    , newChannelWithScan
    , newChannel
    , newScanChannel
    , sendToWorker
    , sendToWorker_
    , checkFoldStatus -- XXX collectFoldOutput
    , dumpChannel
    , cleanup
    , finalize
    )
where

#include "inline.hs"

import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void, when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)

import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as D

import Streamly.Internal.Data.Channel.Types

-- XXX We can make the fold evaluation concurrent by using a monoid for the
-- accumulator. It will then work in the same way as the stream evaluation, in
-- stream evaluation we dequeue the head and queue the tail, in folds we will
-- queue the accumulator and it will be picked by the next worker to accumulate
-- the next value.

data OutEvent b =
      FoldException ThreadId SomeException
    | FoldPartial b
    | FoldDone ThreadId b

-- | The fold driver thread queues the input of the fold in the 'inputQueue'
-- The driver rings the doorbell when the queue transitions from empty to
-- non-empty state.
--
-- The fold consumer thread dequeues the input items from the 'inputQueue' and
-- supplies them to the fold. When the fold is done the output of the fold is
-- placed in 'inputQueue' and 'outputDoorBell' is rung.
--
-- The fold driver thread keeps watching the 'outputQueue', if the fold has
-- terminated, it stops queueing the input to the 'inputQueue'
--
-- If the fold driver runs out of input it stops and waits for the fold to
-- drain the buffered input.
--
-- Driver thread ------>------Input Queue and Doorbell ----->-----Fold thread
--
-- Driver thread ------<------Output Queue and Doorbell-----<-----Fold thread
--
data Channel m a b = Channel
    {
    -- FORWARD FLOW: Flow of data from the driver to the consuming fold

    -- XXX Use a different type than ChildEvent. We can do with a simpler type
    -- in folds.

    -- | Input queue (messages, length).
    --
    -- [LOCKING] Frequent, locked access. Input is queued frequently by the
    -- driver and infrequently dequeued in chunks by the fold.
    --
      forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue :: IORef ([ChildEvent a], Int)

      -- | The maximum size of the inputQueue allowed.
    , forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer :: Limit

    -- | Doorbell is rung by the driver when 'inputQueue' transitions from
    -- empty to non-empty.
    --
    -- [LOCKING] Infrequent, MVar.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell :: MVar ()

    -- | Doorbell to tell the driver that there is now space available in the
    -- 'inputQueue' and more items can be queued.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell :: MVar ()

    , forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ :: m [ChildEvent a]

    -- | Final output and exceptions, if any, queued by the fold and read by
    -- the fold driver.
    --
    -- [LOCKING] atomicModifyIORef. Output is queued infrequently by the fold
    -- and read frequently by the driver.
    , forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue :: IORef ([OutEvent b], Int)

    -- | Doorbell for the 'outputQueue', rung by the fold when the queue
    -- transitions from empty to non-empty.
    --
    -- [LOCKING] Infrequent, MVar.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()

    -- cleanup: to track garbage collection of SVar --
    , forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())

    -- Stats --
    , forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats

    -- Diagnostics --
    , forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
    , forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
    }

-- | Dump the channel stats for diagnostics. Used when 'inspect' option is
-- enabled.
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a b -> IO String
dumpChannel :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
sv = do
    [String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
        [ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a b -> ThreadId
forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
        , String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
        , IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
        -- XXX print the types of events in the outputQueue, first 5
        , MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
sv)
        , String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
        , Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) Maybe YieldRateInfo
forall a. Maybe a
Nothing (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
        ]
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs

-------------------------------------------------------------------------------
-- Support for running folds concurrently
-------------------------------------------------------------------------------

-- $concurrentFolds
--
-- To run folds concurrently, we need to decouple the fold execution from the
-- stream production. We use the SVar to do that, we have a single worker
-- pushing the stream elements to the SVar and on the consumer side a fold
-- driver pulls the values and folds them.
--
-- @
--
-- Fold worker <------Channel<------Fold driver
--     |  exceptions  |
--     --------------->
--
-- @
--
-- We need a channel for pushing exceptions from the fold worker to the fold
-- driver. The stream may be pushed to multiple folds at the same time. For
-- that we need one Channel per fold:
--
-- @
--
-- Fold worker <------Channel--
--                    |        |
-- Fold worker <------Channel------Driver
--                    |        |
-- Fold worker <------Channel--
--
-- @
--
-- Note: If the stream pusher terminates due to an exception, we do not
-- actively terminate the fold. It gets cleaned up by the GC.

-------------------------------------------------------------------------------
-- Process events received by a fold worker from a fold driver
-------------------------------------------------------------------------------

sendToDriver :: Channel m a b -> OutEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv OutEvent b
msg = do
    -- In case the producer stream is blocked on pushing to the fold buffer
    -- then wake it up so that it can check for the stop event or exception
    -- being sent to it otherwise we will be deadlocked.
    -- void $ tryPutMVar (pushBufferMVar sv) ()
    IORef ([OutEvent b], Int) -> MVar () -> OutEvent b -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
                     (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv) OutEvent b
msg

sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> b -> OutEvent b
forall b. ThreadId -> b -> OutEvent b
FoldDone ThreadId
tid b
res)

sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (b -> OutEvent b
forall b. b -> OutEvent b
FoldPartial b
res)

{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> SomeException -> OutEvent b
forall b. ThreadId -> SomeException -> OutEvent b
FoldException ThreadId
tid SomeException
e)

data FromSVarState m a b =
      FromSVarRead (Channel m a b)
    | FromSVarLoop (Channel m a b) [ChildEvent a]

{-# INLINE_NORMAL fromInputQueue #-}
fromInputQueue :: MonadIO m => Channel m a b -> D.Stream m a
fromInputQueue :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
svar = (State StreamK m a
 -> FromSVarState m a b -> m (Step (FromSVarState m a b) a))
-> FromSVarState m a b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)

    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
        [ChildEvent a]
list <- Channel m a b -> m [ChildEvent a]
forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ Channel m a b
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a b
sv []) = Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
    step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            -- XXX Separate input and output events. Input events cannot have
            -- Stop event and output events cannot have ChildStopChannel
            -- event.
            ChildYield a
a -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState m a b) a
forall s a. Step s a
D.Stop
            ChildEvent a
_ -> m (Step (FromSVarState m a b) a)
forall a. HasCallStack => a
undefined

{-# INLINE readInputQChan #-}
readInputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan = do
    let ss :: Maybe SVarStats
ss = if Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else Maybe SVarStats
forall a. Maybe a
Nothing
    r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
    if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then do
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
                (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
                (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan)
                String
"readInputQChan: nothing to do"
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
        IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
    else ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

{-# INLINE readInputQWithDB #-}
readInputQWithDB :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
chan = do
    ([ChildEvent a], Int)
r <- Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan
    -- XXX We can do this only if needed, if someone sleeps because of buffer
    -- then they can set a flag and we ring the doorbell only if the flag is
    -- set. Like we do in sendWorkerWait for streams.
    Bool
_ <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan) ()
    ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

mkNewChannelWith :: forall m a b. MonadIO m =>
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> Config
    -> IO (Channel m a b)
mkNewChannelWith :: forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar ()
bufferMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: Channel m a b -> Channel m a b
        getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
            { inputQueue :: IORef ([ChildEvent a], Int)
inputQueue      = IORef ([ChildEvent a], Int)
outQ
            , inputItemDoorBell :: MVar ()
inputItemDoorBell   = MVar ()
outQMv
            , outputQueue :: IORef ([OutEvent b], Int)
outputQueue = IORef ([OutEvent b], Int)
outQRev
            , outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMvRev
            , inputSpaceDoorBell :: MVar ()
inputSpaceDoorBell = MVar ()
bufferMv
            , maxInputBuffer :: Limit
maxInputBuffer   = Config -> Limit
getMaxBuffer Config
cfg
            , readInputQ :: m [ChildEvent a]
readInputQ      = IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
sv)
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in Channel m a b -> IO (Channel m a b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv

{-# INLINABLE newChannelWith #-}
{-# SPECIALIZE newChannelWith ::
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Fold IO a b
    -> IO (Channel IO a b, ThreadId) #-}
newChannelWith :: (MonadRunInIO m) =>
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Fold m a b
    -> m (Channel m a b, ThreadId)
newChannelWith :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Fold m a b
f = do
    let config :: Config
config = Config -> Config
modifier Config
defaultConfig
    Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
        (Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
    (Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)

    where

    {-# NOINLINE work #-}
    work :: Channel m a b -> m ()
work Channel m a b
chan =
        let f1 :: Fold m a ()
f1 = (b -> m ()) -> Fold m a b -> Fold m a ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (b -> m ()) -> b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
         in Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan

{-# INLINE scanToChannel #-}
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel :: forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel Channel m a b
chan (Scanl s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) =
    (s -> a -> m (Step s ()))
-> m (Step s ()) -> (s -> m ()) -> (s -> m ()) -> Scanl m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl s -> a -> m (Step s ())
step1 m (Step s ())
initial1 s -> m ()
forall {m :: * -> *} {p}. Monad m => p -> m ()
extract1 s -> m ()
final1

    where

    initial1 :: m (Step s ())
initial1 = do
        Step s b
r <- m (Step s b)
initial
        case Step s b
r of
            Fold.Partial s
s -> do
                b
b <- s -> m b
extract s
s
                m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
                Step s () -> m (Step s ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s () -> m (Step s ())) -> Step s () -> m (Step s ())
forall a b. (a -> b) -> a -> b
$ s -> Step s ()
forall s b. s -> Step s b
Fold.Partial s
s
            Fold.Done b
b ->
                () -> Step s ()
forall s b. b -> Step s b
Fold.Done (() -> Step s ()) -> m () -> m (Step s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b)

    step1 :: s -> a -> m (Step s ())
step1 s
st a
x = do
        Step s b
r <- s -> a -> m (Step s b)
step s
st a
x
        case Step s b
r of
            Fold.Partial s
s -> do
                b
b <- s -> m b
extract s
s
                m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
                Step s () -> m (Step s ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s () -> m (Step s ())) -> Step s () -> m (Step s ())
forall a b. (a -> b) -> a -> b
$ s -> Step s ()
forall s b. s -> Step s b
Fold.Partial s
s
            Fold.Done b
b ->
                () -> Step s ()
forall s b. b -> Step s b
Fold.Done (() -> Step s ()) -> m () -> m (Step s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b)

    extract1 :: p -> m ()
extract1 p
_ = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    -- XXX Should we not discard the result?
    final1 :: s -> m ()
final1 s
st = m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (s -> m b
final s
st)

{-# INLINABLE newChannelWithScan #-}
{-# SPECIALIZE newChannelWithScan ::
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Scanl IO a b
    -> IO (Channel IO a b, ThreadId) #-}
newChannelWithScan :: (MonadRunInIO m) =>
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Scanl m a b
    -> m (Channel m a b, ThreadId)
newChannelWithScan :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Scanl m a b
f = do
    let config :: Config
config = Config -> Config
modifier Config
defaultConfig
    Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
        (Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
    (Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)

    where

    {-# NOINLINE work #-}
    work :: Channel m a b -> m ()
work Channel m a b
chan = Stream m () -> m ()
forall (m :: * -> *) a. Monad m => Stream m a -> m ()
D.drain (Stream m () -> m ()) -> Stream m () -> m ()
forall a b. (a -> b) -> a -> b
$ Scanl m a () -> Stream m a -> Stream m ()
forall (m :: * -> *) a b.
Monad m =>
Scanl m a b -> Stream m a -> Stream m b
D.scanl (Channel m a b -> Scanl m a b -> Scanl m a ()
forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel Channel m a b
chan Scanl m a b
f) (Stream m a -> Stream m ()) -> Stream m a -> Stream m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
    (Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
    (Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
    IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    ((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Fold m a b
f)

{-# INLINABLE newScanChannel #-}
{-# SPECIALIZE newScanChannel ::
    (Config -> Config) -> Scanl IO a b -> IO (Channel IO a b) #-}
newScanChannel :: (MonadRunInIO m) =>
    (Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
modifier Scanl m a b
f = do
    IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    ((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Scanl m a b
f)

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------

-- XXX currently only one event is sent by a fold consumer to the stream
-- producer. But we can potentially have multiple events e.g. the fold step can
-- generate exception more than once and the producer can ignore those
-- exceptions or handle them and still keep driving the fold.

-- XXX In case of scan this could be a stream.

-- | Poll for events sent by the fold worker to the fold driver. The fold
-- consumer can send a "Stop" event or an exception. When a "Stop" is received
-- this function returns 'True'. If an exception is recieved then it throws the
-- exception.
--
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
    ([OutEvent b]
list, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    [OutEvent b] -> m (Maybe b)
forall {m :: * -> *} {a}.
MonadThrow m =>
[OutEvent a] -> m (Maybe a)
processEvents ([OutEvent b] -> m (Maybe b)) -> [OutEvent b] -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ [OutEvent b] -> [OutEvent b]
forall a. [a] -> [a]
reverse [OutEvent b]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [OutEvent a] -> m (Maybe a)
processEvents [] = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    processEvents (OutEvent a
ev : [OutEvent a]
_) = do
        case OutEvent a
ev of
            FoldException ThreadId
_ SomeException
e -> SomeException -> m (Maybe a)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
e
            FoldDone ThreadId
_ a
b -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
b)
            FoldPartial a
_ -> m (Maybe a)
forall a. HasCallStack => a
undefined

{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
    let limit :: Limit
limit = Channel m a b -> Limit
forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer Channel m a b
sv
    case Limit
limit of
        Limit
Unlimited -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Limited Word
lim -> do
            ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
            Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n

-- | Push values from a driver to a fold worker via a Channel. Blocks if no
-- space is available in the buffer. Before pushing a value to the Channel it
-- polls for events received from the fold worker.  If a stop event is received
-- then it returns 'True' otherwise false.  Propagates exceptions received from
-- the fold worker.
--
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go

    where

    -- Recursive function, should we use SPEC?
    go :: m (Maybe b)
go = do
        let qref :: IORef ([OutEvent b], Int)
qref = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
chan
        Maybe b
status <- do
            ([OutEvent b]
_, Int
n) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent b], Int)
qref
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
            then Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
            else Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
        case Maybe b
status of
            Just b
_ -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
            Maybe b
Nothing -> do
                    Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
                    if Bool
r
                    then do
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                            (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                            (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
                                (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
                                (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
                                (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
                        Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
                    else do
                        () <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan)
                        m (Maybe b)
go

-- | Like sendToWorker but only sends, does not receive any events from the
-- fold.
{-# INLINE sendToWorker_ #-}
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
sendToWorker_ :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
chan a
a = m ()
go

    where

    -- Recursive function, should we use SPEC?
    go :: m ()
go = do
        Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
        if Bool
r
        then do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
                    (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
                    (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
                    (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        else do
            String -> m ()
forall a. HasCallStack => String -> a
error String
"sendToWorker_: No space available in the buffer"
            -- Block for space
            -- () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
            -- go

-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.

cleanup :: MonadIO m => Channel m a b -> m ()
cleanup :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
chan = do
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
        IO String -> String -> IO ()
printSVar (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan) String
"Scan channel done"

finalize :: MonadIO m => Channel m a b -> m ()
finalize :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
chan = do
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
        (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
            (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
            (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
            ChildEvent a
forall a. ChildEvent a
ChildStopChannel