{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Ahead
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}
--
module Streamly.Internal.Data.Stream.Ahead {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
    (
      AheadT(..)
    , Ahead
    , aheadK
    , consM
    )
where

import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Trans.Class (MonadTrans(lift))
#endif
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
import GHC.Exts (inline)

import qualified Data.Heap as H

import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.StreamK (Stream)

import qualified Streamly.Internal.Data.StreamK as K
    (foldStreamShared, cons, mkStream, foldStream, fromEffect
    , nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream as D
    (mapM, fromStreamK, toStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)

import Streamly.Internal.Data.Stream.SVar.Generate
import Streamly.Internal.Data.SVar
import Prelude hiding (map)

#include "Instances.hs"

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
    (forall r.
 State StreamK m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State StreamK m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
            yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a ((r -> r) -> Stream m a -> Stream m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
        in State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
--
-- XXX Also note that limiting concurrency for cases like "take 10" would not
-- work well with left associative expressions, because we have no visibility
-- about how much the left side of the expression would yield.
--
-- XXX It may be a good idea to increment sequence numbers for each yield,
-- currently a stream on the left side of the expression may yield many
-- elements with the same sequene number. We can then use the seq number to
-- enforce yieldMax and yieldLImit as well.

-- Invariants:
--
-- * A worker should always ensure that it pushes all the consecutive items in
-- the heap to the outputQueue especially the items on behalf of the workers
-- that have already left when we were holding the token. This avoids deadlock
-- conditions when the later workers completion depends on the consumption of
-- earlier results. For more details see comments in the consumer pull side
-- code.

{-# INLINE underMaxHeap #-}
underMaxHeap ::
       SVar Stream m a
    -> Heap (Entry Int (AheadHeapEntry Stream m a))
    -> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
    ([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar StreamK m a
sv)

    -- XXX simplify this
    let maxHeap :: Limit
maxHeap = case SVar StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar StreamK m a
sv of
            Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
                Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
            Limit
Unlimited -> Limit
Unlimited

    case Limit
maxHeap of
        Limited Word
lim -> do
            Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar StreamK m a
sv)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
        Limit
Unlimited -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Return value:
-- True => stop
-- False => continue
preStopCheck ::
       SVar Stream m a
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
    -> IO Bool
preStopCheck :: forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
    -- check the stop condition under a lock before actually
    -- stopping so that the whole herd does not stop at once.
    IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
  -> IO Bool)
 -> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
        Bool
heapOk <- SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
        MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv)
        let stop :: IO Bool
stop = do
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            continue :: IO Bool
continue = do
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        if Bool
heapOk
        then
            case SVar StreamK m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
                Maybe YieldRateInfo
Nothing -> IO Bool
continue
                Just YieldRateInfo
yinfo -> do
                    Bool
rateOk <- SVar StreamK m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar StreamK m a
sv YieldRateInfo
yinfo
                    if Bool
rateOk then IO Bool
continue else IO Bool
stop
        else IO Bool
stop

abortExecution ::
       IORef ([Stream m a], Int)
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> IO ()
abortExecution :: forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
    SVar StreamK m a
-> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
    SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
    SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
-- sequencing. The heap can be drained only by one thread at a time, any thread
-- that finds that heap can be drained now, takes a lock and starts draining
-- it, however the thread may get prempted in the middle of it holding the
-- lock. Since that thread is holding the lock, the other threads cannot pick
-- up the draining task, therefore they proceed to picking up the next task to
-- execute. If the draining thread could yield voluntarily at a point where it
-- has released the lock, then the next threads could pick up the draining
-- instead of executing more tasks. When there are 100,000 threads the drainer
-- gets a cpu share to run only 1:100000 of the time. This makes the heap
-- accumulate a lot of output when we the buffer size is large.
--
-- The solutions to this problem are:
-- 1) make the other threads wait in a queue until the draining finishes
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
--
processHeap
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> AheadHeapEntry Stream m a
    -> Int
    -> Bool -- we are draining the heap before we stop
    -> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry

    where

    stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r = do
        Bool
stopIt <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        if Bool
stopIt
        then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            -- put the entry back in the heap and stop
            IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
            SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
        else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r

    loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
        case AheadHeapEntry StreamK m a
ent of
            AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
            AheadEntryPure a
a -> do
                -- Use 'send' directly so that we do not account this in worker
                -- latency as this will not be the real latency.
                -- Don't stop the worker in this case as we are just
                -- transferring available results from heap to outputQueue.
                m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
                Int -> m ()
nextHeap Int
seqNo
            AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
r) ->
                if Bool
stopping
                then AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r
                else do
                    StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin (Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r)
                    StM m () -> m ()
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res


    nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
        HeapDequeueResult StreamK m a
res <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
 -> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        case HeapDequeueResult StreamK m a
res of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
            HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ ->
                if Bool
stopping
                then do
                    Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
                    if Bool
r
                    then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
                    else Int -> m ()
processWorkQueue Int
prevSeqNo
                else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo

    processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
        Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
        case Maybe (Stream m a, Int)
work of
            Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
            Just (Stream m a
m, Int
seqNo) -> do
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
                if Bool
yieldLimitOk
                then
                    if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                    then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                    else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m

    -- We do not stop the worker on buffer full here as we want to proceed to
    -- nextHeap anyway so that we can clear any subsequent entries. We stop
    -- only in yield continuation where we may have a remaining stream to be
    -- pushed on the heap.
    singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
        m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Int -> m ()
nextHeap Int
seqNo

    -- XXX when we have an unfinished stream on the heap we cannot account all
    -- the yields of that stream until it finishes, so if we have picked up
    -- and executed more actions beyond that in the parent stream and put them
    -- on the heap then they would eat up some yield limit which is not
    -- correct, we will think that our yield limit is over even though we have
    -- to yield items from unfinished stream before them. For this reason, if
    -- there are pending items in the heap we drain them unconditionally
    -- without considering the yield limit.
    runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
        Bool
_ <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
        if Bool
continue -- see comment above -- && yieldLimitOk
        then do
            let stop :: m ()
stop = do
                  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
                  Int -> m ()
nextHeap Int
seqNo
            State StreamK m a
-> (a -> Stream m a -> m ())
-> (a -> m ())
-> m ()
-> Stream m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                          (Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
                          (Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
                          m ()
stop
                          Stream m a
r
        else do
            RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Stream m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
                SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
                SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

    yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
        Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r

{-# NOINLINE drainHeap #-}
drainHeap
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
    HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
 -> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
    case HeapDequeueResult StreamK m a
r of
        Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
            IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
        HeapDequeueResult StreamK m a
_ -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend

processWithoutToken
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
    -- we have already decremented the yield limit for m
    let stop :: m WorkerStatus
stop = do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
            -- If the stream stops without yielding anything, and we do not put
            -- anything on heap, but if heap was waiting for this seq number
            -- then it will keep waiting forever, because we are never going to
            -- put it on heap. So we have to put a null entry on heap even when
            -- we stop.
            AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv

    StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
            State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                (\a
a Stream m a
r -> do
                    RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                    AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> AheadHeapEntry StreamK m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ (RunInIO m, Stream m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, a -> Stream m a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a Stream m a
r))
                (AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> (a -> AheadHeapEntry StreamK m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
                m WorkerStatus
stop
                Stream m a
m
    WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
    case WorkerStatus
res of
        WorkerStatus
Continue -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
        WorkerStatus
Suspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

    where

    -- XXX to reduce contention each CPU can have its own heap
    toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
        -- Heap insertion is an expensive affair so we use a non CAS based
        -- modification, otherwise contention and retries can make a thread
        -- context switch and throw it behind other threads which come later in
        -- sequence.
        Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
 -> m (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
        Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
      Heap (Entry Int (AheadHeapEntry StreamK m a))))
 -> IO (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
        Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
            let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = Entry Int (AheadHeapEntry StreamK m a)
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
            in Bool
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
    Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
    Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar StreamK m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar StreamK m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv)
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv) (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)

        Bool
heapOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
        HeapStatus
status <-
            case SVar StreamK m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
                Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                Just YieldRateInfo
yinfo ->
                    case Maybe WorkerInfo
winfo of
                        Just WorkerInfo
info -> do
                            Bool
rateOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar StreamK m a
sv YieldRateInfo
yinfo WorkerInfo
info
                            if Bool
rateOk
                            then HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                            else HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
                        Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue

        if Bool
heapOk
        then
            case HeapStatus
status of
                HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
                HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
        else WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

data TokenWorkerStatus = TokenContinue Int | TokenSuspend

processWithToken
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
    -- Note, we enter this function with yield limit already decremented
    -- XXX deduplicate stop in all invocations
    let stop :: m TokenWorkerStatus
stop = do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
            TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv

    StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
        State StreamK m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action

    TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
    case TokenWorkerStatus
res of
        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
        TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

    where

    singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
        Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        if Bool
continue
        then TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        else do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    -- XXX use a wrapper function around stop so that we never miss
    -- incrementing the yield in a stop continuation. Essentiatlly all
    -- "unstream" calls in this function must increment yield limit on stop.
    yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
        Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
        if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then do
            let stop :: m TokenWorkerStatus
stop = do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
                    TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            State StreamK m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                          (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                          (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                          m TokenWorkerStatus
stop
                          Stream m a
r
        else do
            RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Stream m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
            TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
        Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
        case Maybe (Stream m a, Int)
work of
            Maybe (Stream m a, Int)
Nothing -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
                IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

            Just (Stream m a
m, Int
seqNo) -> do
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
                let undo :: m ()
undo = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                        IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
                        SVar StreamK m a
-> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
                        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
                if Bool
yieldLimitOk
                then
                    if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
                    then do
                        let stop :: m TokenWorkerStatus
stop = do
                                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
                                TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
                        StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
                            State StreamK m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                                          (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                                          (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                                          m TokenWorkerStatus
stop
                                          Stream m a
m
                        TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
                        case TokenWorkerStatus
res of
                            TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
                            TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

                    else
                        -- To avoid a race when another thread puts something
                        -- on the heap and goes away, the consumer will not get
                        -- a doorBell and we will not clear the heap before
                        -- executing the next action. If the consumer depends
                        -- on the output that is stuck in the heap then this
                        -- will result in a deadlock. So we always clear the
                        -- heap before executing the next action.
                        m ()
undo m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
                else m ()
undo m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
-- without pacing code to keep the performance higher in the unlimited and
-- unpaced case.
--
-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.

-- XXX we can remove the sv parameter as it can be derived from st

workLoopAhead
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
        HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
 -> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        case HeapDequeueResult StreamK m a
r of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
                IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
            HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ -> do
                -- Before we execute the next item from the work queue we check
                -- if we are beyond the yield limit. It is better to check the
                -- yield limit before we pick up the next item. Otherwise we
                -- may have already started more tasks even though we may have
                -- reached the yield limit.  We can avoid this by taking active
                -- workers into account, but that is not as reliable, because
                -- workers may go away without picking up work and yielding a
                -- value.
                --
                -- Rate control can be done either based on actual yields in
                -- the output queue or based on any yield either to the heap or
                -- to the output queue. In both cases we may have one issue or
                -- the other. We chose to do this based on actual yields to the
                -- output queue because it makes the code common to both async
                -- and ahead streams.
                --
                Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
                case Maybe (Stream m a, Int)
work of
                    Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
                    Just (Stream m a
m, Int
seqNo) -> do
                        Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
                        if Bool
yieldLimitOk
                        then
                            if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                            then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                            else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                        -- If some worker decremented the yield limit but then
                        -- did not yield anything and therefore incremented it
                        -- later, then if we did not requeue m here we may find
                        -- the work queue empty and therefore miss executing
                        -- the remaining action.
                        else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-- The only difference between forkSVarAsync and this is that we run the left
-- computation without a shared SVar.
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2 = (forall r.
 State StreamK m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State StreamK m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
        SVar StreamK m a
sv <- State StreamK m a
-> Stream m a
-> (IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> State StreamK m a
    -> SVar StreamK m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar t m a)
newAheadVar State StreamK m a
st (Stream m a -> Stream m a -> Stream m a
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
                          IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
        State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv)
    where
    concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, StreamK m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma

{-# INLINE aheadK #-}
aheadK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2 = (forall r.
 State StreamK m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State StreamK m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
    case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
        Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
            RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
            -- Always run the left side on a new SVar to avoid complexity in
            -- sequencing results. This means the left side cannot further
            -- split into more ahead computations on the same SVar.
            State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
        Maybe (SVar StreamK m a)
_ -> State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2)

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using ahead.
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consM m a
m (AheadT Stream m a
r) = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r

------------------------------------------------------------------------------
-- AheadT
------------------------------------------------------------------------------

-- | For 'AheadT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.ahead'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.ahead'
-- @
--
-- A single 'Monad' bind behaves like a @for@ loop with iterations executed
-- concurrently, ahead of time, producing side effects of iterations out of
-- order, but results in order:
--
-- >>> :{
-- Stream.toList $ Stream.fromAhead $ do
--      x <- Stream.fromList [2,1] -- foreach x in stream
--      Stream.fromEffect $ delay x
-- :}
-- 1 sec
-- 2 sec
-- [2,1]
--
-- Nested monad binds behave like nested @for@ loops with nested iterations
-- executed concurrently, ahead of time:
--
-- >>> :{
-- Stream.toList $ Stream.fromAhead $ do
--     x <- Stream.fromList [1,2] -- foreach x in stream
--     y <- Stream.fromList [2,4] -- foreach y in stream
--     Stream.fromEffect $ delay (x + y)
-- :}
-- 3 sec
-- 4 sec
-- 5 sec
-- 6 sec
-- [3,5,4,6]
--
-- The behavior can be explained as follows. All the iterations corresponding
-- to the element @1@ in the first stream constitute one output stream and all
-- the iterations corresponding to @2@ constitute another output stream and
-- these two output streams are merged using 'ahead'.
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
newtype AheadT m a = AheadT {forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT :: Stream m a}

#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans AheadT where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
lift = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a)
-> (m a -> Stream m a) -> m a -> AheadT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif

-- | A serial IO stream of elements of type @a@ with concurrent lookahead.  See
-- 'AheadT' documentation for more details.
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
type Ahead = AheadT IO

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE append #-}
{-# SPECIALIZE append :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
append :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append (AheadT Stream m a
m1) (AheadT Stream m a
m2) = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2

instance MonadAsync m => Semigroup (AheadT m a) where
    <> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = AheadT m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (AheadT m a) where
    mempty :: AheadT m a
mempty = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
    mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = AheadT m a -> AheadT m a -> AheadT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
    let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> AheadT m a
pure = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> (a -> Stream m a) -> a -> AheadT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure

    {-# INLINE (<*>) #-}
    <*> :: forall a b. AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = AheadT m (a -> b) -> AheadT m a -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE bindAhead #-}
{-# SPECIALIZE bindAhead ::
    AheadT IO a -> (a -> AheadT IO b) -> AheadT IO b #-}
bindAhead :: MonadAsync m => AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead (AheadT Stream m a
m) a -> AheadT m b
f = Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m (AheadT m b -> Stream m b
forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT (AheadT m b -> Stream m b) -> (a -> AheadT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadT m b
f)

instance MonadAsync m => Monad (AheadT m) where
    return :: forall a. a -> AheadT m a
return = a -> AheadT m a
forall a. a -> AheadT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    {-# INLINE (>>=) #-}
    >>= :: forall a b. AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = AheadT m a -> (a -> AheadT m b) -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) where
    liftBase :: forall α. b α -> AheadT m α
liftBase = b α -> AheadT m α
forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif

MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)