-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
    (
      newInterleaveChannel
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (newIORef, readIORef)
import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, MonadAsync, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Channel.Dispatcher (delThread)

import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types

------------------------------------------------------------------------------
-- Creating a channel
------------------------------------------------------------------------------

data WorkerStatus = Continue | Suspend

-- XXX This is not strictly round-robin as the streams that are faster may
-- yield more elements than the ones that are slower. Also, when streams
-- suspend due to buffer getting full they get added to the queue in a random
-- order.

{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
       Channel m a
    -> LinkedQueue (RunInIO m, K.StreamK m a)
    -> (RunInIO m, K.StreamK m a)
    -> IO ()
enqueueFIFO :: forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m = do
    LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m
    IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

{-# INLINE workLoopFIFO #-}
workLoopFIFO
    :: MonadRunInIO m
    => LinkedQueue (RunInIO m, K.StreamK m a)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    run :: m ()
run = do
        Maybe (RunInIO m, StreamK m a)
work <- IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, StreamK m a))
 -> m (Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, StreamK m a)
-> IO (Maybe (RunInIO m, StreamK m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
        case Maybe (RunInIO m, StreamK m a)
work of
            Maybe (RunInIO m, StreamK m a)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
                StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                        (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin
                        (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                            State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
m
                WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                case WorkerStatus
res of
                    WorkerStatus
Continue -> m ()
run
                    WorkerStatus
Suspend -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    -- XXX in general we would like to yield "n" elements from a single stream
    -- before moving on to the next. Single element granularity could be too
    -- expensive in certain cases. Similarly, we can use time limit for
    -- yielding.
    yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
        Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
        -- XXX If the queue is empty we do not need to enqueue. We can just
        -- continue evaluating the stream.
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
        WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
    :: forall m a. MonadRunInIO m
    => LinkedQueue (RunInIO m, K.StreamK m a)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    incrContinue :: m WorkerStatus
incrContinue =
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) m () -> m WorkerStatus -> m WorkerStatus
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue

    run :: m ()
run = do
        Maybe (RunInIO m, StreamK m a)
work <- IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, StreamK m a))
 -> m (Maybe (RunInIO m, StreamK m a)))
-> IO (Maybe (RunInIO m, StreamK m a))
-> m (Maybe (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, StreamK m a)
-> IO (Maybe (RunInIO m, StreamK m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
        case Maybe (RunInIO m, StreamK m a)
work of
            Maybe (RunInIO m, StreamK m a)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                if Bool
yieldLimitOk
                then do
                    StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                            (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin
                            (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                                State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
m
                    WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                    case WorkerStatus
res of
                        WorkerStatus
Continue -> m ()
run
                        WorkerStatus
Suspend -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO m b -> IO (StM m b)
forall b. m b -> IO (StM m b)
runin, StreamK m a
m)
                    Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                    Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
        Bool
res <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
        else IO WorkerStatus -> m WorkerStatus
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
            Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            WorkerStatus -> IO WorkerStatus
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-------------------------------------------------------------------------------
-- SVar creation
-------------------------------------------------------------------------------

-- XXX we have this function in this file because passing runStreamLIFO as a
-- function argument to this function results in a perf degradation of more
-- than 10%.  Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
getFifoSVar :: forall m a. MonadRunInIO m =>
    RunInIO m -> Config -> IO (Channel m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
Set.empty
    LinkedQueue (RunInIO m, StreamK m a)
q       <- IO (LinkedQueue (RunInIO m, StreamK m a))
forall a. IO (LinkedQueue a)
newQ
    Maybe (IORef Count)
yl      <- case Config -> Maybe Count
getYieldLimit Config
cfg of
                Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
                Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = LinkedQueue (RunInIO m, StreamK m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
    let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
            Bool
yieldsDone <-
                    case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
                        Just IORef Count
ref -> do
                            Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
                            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                        Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Bool
qEmpty <- LinkedQueue (RunInIO m, StreamK m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone

    let getSVar :: Channel m a
            -> (Channel m a -> m [ChildEvent a])
            -> (Channel m a -> m Bool)
            -> (Channel m a -> IO Bool)
            -> (LinkedQueue (RunInIO m, K.StreamK m a)
                -> Channel m a
                -> Maybe WorkerInfo
                -> m())
            -> Channel m a
        getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = Config -> Limit
getMaxBuffer Config
cfg
            , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
            , postProcess :: m Bool
postProcess      = Channel m a -> m Bool
postProc Channel m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv
            , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue          = \Bool
_ -> Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q
            , eagerDispatch :: m ()
eagerDispatch    = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            , isWorkDone :: IO Bool
isWorkDone       = Channel m a -> IO Bool
workDone Channel m a
sv
            , isQueueDone :: IO Bool
isQueueDone      = Channel m a -> IO Bool
workDone Channel m a
sv
            , doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ  = IORef Bool
wfw
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = IORef (Set ThreadId) -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> ThreadId -> m ()
delThread IORef (Set ThreadId)
running
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
forall a. HasCallStack => a
undefined
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a
sv =
            case Config -> Maybe Rate
getStreamRate Config
cfg of
                Maybe Rate
Nothing ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
                                              LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
                Just Rate
_  ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
                                              LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
     in Channel m a -> IO (Channel m a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv

-- XXX GHC: If instead of MonadAsync we use (MonadIO m, MonadBaseControl IO m)
-- constraint we get a 2x perf regression. Need to look into that.
--
-- | Create a new async style concurrent stream evaluation channel. The monad
-- state used to run the stream actions is taken from the call site of
-- newInterleaveChannel.
{-# INLINABLE newInterleaveChannel #-}
{-# SPECIALIZE newInterleaveChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newInterleaveChannel :: MonadAsync m =>
    (Config -> Config) -> m (Channel m a)
newInterleaveChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel Config -> Config
modifier = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    IO (Channel m a) -> m (Channel m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a) -> m (Channel m a))
-> IO (Channel m a) -> m (Channel m a)
forall a b. (a -> b) -> a -> b
$ RunInIO m -> Config -> IO (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)