{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Parallel
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}
--
module Streamly.Internal.Data.Stream.Parallel {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" instead." #-}
    (
    -- * Parallel Stream Type
      ParallelT(..)
    , Parallel
    , consM

    -- * Merge Concurrently
    , parallelK
    , parallelFstK
    , parallelMinK

    -- * Evaluate Concurrently
    , mkParallelD
    , mkParallelK

    -- * Tap Concurrently
    , tapAsyncK
    , tapAsyncF

    -- * Callbacks
    , newCallbackStream
    )
where

import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Trans.Class (MonadTrans(lift))
#endif
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)

import Streamly.Data.Fold (Fold)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream (Step(..))

import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K
    (StreamK, foldStreamShared, mkStream, foldStream, fromEffect
    , nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream as D
    (Stream(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream

import Streamly.Internal.Data.SVar
import Prelude hiding (map)

#include "inline.hs"
#include "Instances.hs"

--
-- $setup
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.StreamK m a -> K.StreamK m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
m =
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> m r
yld a
a ((r -> r) -> StreamK m a -> StreamK m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
r)
        in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) StreamK m a
m

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

-------------------------------------------------------------------------------
-- StreamK based worker routines
-------------------------------------------------------------------------------

{-# NOINLINE runOne #-}
runOne
    :: MonadIO m
    => State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOne :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo =
    case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
        Maybe Count
Nothing -> StreamK m a -> m ()
go StreamK m a
m0
        Just Count
_  -> State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo

    where

    go :: StreamK m a -> m ()
go StreamK m a
m = do
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
        State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m

    sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st

    stop :: m ()
stop = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
        SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
    sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
    yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r

runOneLimited
    :: MonadIO m
    => State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo = StreamK m a -> m ()
go StreamK m a
m0

    where

    go :: StreamK m a -> m ()
go StreamK m a
m = do
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
        if Bool
yieldLimitOk
        then do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
            State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
        else do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar StreamK m a
sv
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

    sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st

    stop :: m ()
stop = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
        SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
    sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
    yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r

-------------------------------------------------------------------------------
-- Consing and appending a stream in parallel style
-------------------------------------------------------------------------------

-- Note that consing and appending requires StreamK as it would not scale well
-- with StreamD unless we are only consing a very small number of streams or
-- elements in a stream. StreamK allows us to manipulate control flow in a way
-- which StreamD cannot allow. StreamK can make a jump without having to
-- remember the past state.

{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
    => SVarStopStyle -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
forkSVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m StreamK m a
r = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State StreamK m a
st
    SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
m)
    case SVarStopStyle
ss of
        SVarStopStyle
StopBy -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar StreamK m a
sv)
            IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar StreamK m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar StreamK m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
        SVarStopStyle
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
r)
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv)

{-# INLINE joinStreamVarPar #-}
joinStreamVarPar ::
       MonadAsync m
    => SVarStyle
    -> SVarStopStyle
    -> K.StreamK m a
    -> K.StreamK m a
    -> K.StreamK m a
joinStreamVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
        Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar StreamK m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar StreamK m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
            -- Here, WE ARE IN THE WORKER/PRODUCER THREAD, we know that because
            -- the SVar exists. We are running under runOne and the output we
            -- produce ultimately will be sent to the SVar by runOne.
            --
            -- If we came here the worker/runOne is evaluating a `parallel`
            -- combinator. In this case, we always fork a new worker for the
            -- first component (m1) in the parallel composition and continue to
            -- evaluate the second component (m2) in the current worker thread.
            --
            -- When m1 is serially composed, the worker would evaluate it
            -- without any further forks and the resulting output is sent to
            -- the SVar and the evaluation terminates. If m1 is a `parallel`
            -- composition of two streams the worker would again recurses here.
            --
            -- Similarly, when m2 is serially composed it gets evaluated here
            -- and the resulting output is sent to the SVar by the runOne
            -- wrapper. When m2 is composed with `parallel` it will again
            -- recurse here and so on until it finally terminates.
            --
            -- When we create a right associated expression using `parallel`,
            -- then m1 would always terminate without further forks or
            -- recursion into this routine, therefore, the worker returns
            -- immediately after evaluating it. And m2 would continue to
            -- fork/recurse, therefore, the current thread always recurses and
            -- forks new workers one after the other.  This is a tail recursive
            -- style execution, m2, the recursive expression always executed at
            -- the tail.
            --
            -- When the expression is left associated, the worker spawned would
            -- get the forking/recursing responsibility and then again the
            -- worker spawned by that worker would fork, thus creating layer
            -- over layer of workers and a chain of threads leading to a very
            -- inefficient execution.
            SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m1)
            State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
        Maybe (SVar StreamK m a)
_ ->
            -- Here WE ARE IN THE CONSUMER THREAD, we create a new SVar, fork
            -- worker threads to execute m1 and m2 and this thread starts
            -- pulling the stream from the SVar.
            State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2)

-------------------------------------------------------------------------------
-- User facing APIs
-------------------------------------------------------------------------------

{-# INLINE parallelK #-}
parallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using parallel.
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT StreamK m a
r) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) StreamK m a
r

-- This is a co-parallel like combinator for streams, where first stream is the
-- main stream and the rest are just supporting it, when the first ends
-- everything ends.
--
-- | Like `parallel` but stops the output as soon as the first stream stops.
--
-- /Pre-release/
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelFstK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelFstK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy

-- This is a race like combinator for streams.
--
-- | Like `parallel` but stops the output as soon as any of the two streams
-- stops.
--
-- /Pre-release/
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelMinK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelMinK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny

------------------------------------------------------------------------------
-- Convert a stream to parallel
------------------------------------------------------------------------------

-- | Like 'mkParallel' but uses StreamK internally.
--
-- /Pre-release/
--
mkParallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a
mkParallelK :: forall (m :: * -> *) a. MonadAsync m => StreamK m a -> StreamK m a
mkParallelK StreamK m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st)
    -- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
    State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
st SVar StreamK m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK StreamK m a
m
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> StreamK m a) -> SerialT m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv

-- | Same as 'mkParallel' but for StreamD stream.
--
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD Stream m a
m = (State StreamK m a
 -> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
    where

    step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
        SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State StreamK m a
gst
        State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
gst SVar StreamK m a
sv Stream m a
m
        -- XXX use unfold instead?
        Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar StreamK m a
sv

    step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
            Skip s
s    -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
            Step s a
Stop      -> Step (Maybe (Stream m a)) a
forall s a. Step s a
Stop

-------------------------------------------------------------------------------
-- Concurrent tap
-------------------------------------------------------------------------------

-- NOTE: In regular pull style streams, the consumer stream is pulling elements
-- from the SVar and we have several workers producing elements and pushing to
-- SVar. In case of folds, we, the parent stream driving the fold, are the
-- stream producing worker, we start an SVar and start pushing to the SVar, the
-- fold on the other side of the SVar is the consumer stream.
--
-- In the pull stream case exceptions are propagated from the producing workers
-- to the consumer stream, the exceptions are propagated on the same channel as
-- the produced stream elements. However, in case of push style folds the
-- current stream itself is the worker and the fold is the consumer, in this
-- case we have to propagate the exceptions from the consumer to the producer.
-- This is reverse of the pull case and we need a reverse direction channel
-- to propagate the exception.
--
-- | Redirect a copy of the stream to a supplied fold and run it concurrently
-- in an independent thread. The fold may buffer some elements. The buffer size
-- is determined by the prevailing 'Streamly.Prelude.maxBuffer' setting.
--
-- @
--               StreamK m a -> m b
--                       |
-- -----stream m a ---------------stream m a-----
--
-- @
--
-- @
-- > S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2)
-- 1
-- 2
-- @
--
-- Exceptions from the concurrently running fold are propagated to the current
-- computation.  Note that, because of buffering in the fold, exceptions may be
-- delayed and may not correspond to the current element being processed in the
-- parent stream, but we guarantee that before the parent stream stops the tap
-- finishes and all exceptions from it are drained.
--
--
-- Compare with 'tap'.
--
-- /Pre-release/
{-# INLINE tapAsyncK #-}
tapAsyncK ::
       MonadAsync m => (K.StreamK m a -> m b) -> K.StreamK m a -> K.StreamK m a
tapAsyncK :: forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
tapAsyncK StreamK m a -> m b
f StreamK m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar StreamK m a
sv <- State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
forall (m :: * -> *) a b.
MonadAsync m =>
State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
SVar.newFoldSVar State StreamK m a
st (StreamK m a -> m b
f (StreamK m a -> m b)
-> (SerialT m a -> StreamK m a) -> SerialT m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK)
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
        (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar StreamK m a
sv (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK StreamK m a
m)

data TapState fs st a = TapInit | Tapping !fs st | TapDone st

-- | Like 'tapAsync' but uses a 'Fold' instead of a fold function.
--
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = (State StreamK m a
 -> TapState (SVar StreamK m a) s Any
 -> m (Step (TapState (SVar StreamK m a) s Any) a))
-> TapState (SVar StreamK m a) s Any -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> TapState (SVar StreamK m a) s Any
-> m (Step (TapState (SVar StreamK m a) s Any) a)
forall {a} {a}.
State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step TapState (SVar StreamK m a) s Any
forall fs st a. TapState fs st a
TapInit
    where

    drainFold :: SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr = do
            -- In general, a Stop event would come equipped with the result
            -- of the fold. It is not used here but it would be useful in
            -- applicative and distribute.
            Bool
done <- SVar StreamK m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar StreamK m a -> m Bool
SVar.fromConsumer SVar StreamK m a
svr
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar StreamK m a
svr String
"teeToSVar: waiting to drain"
                       (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar StreamK m a
svr)
                SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr

    stopFold :: SVar StreamK m a -> m ()
stopFold SVar StreamK m a
svr = do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
            -- drain/wait until a stop event arrives from the fold.
            SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step State StreamK m a
gst TapState (SVar StreamK m a) s a
TapInit = do
        SVar StreamK m a
sv <- State StreamK m a -> Fold m a b -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State StreamK m a
gst Fold m a b
f
        Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
state1)

    step State StreamK m a
gst (Tapping SVar StreamK m a
sv s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
a s
s ->  do
                Bool
done <- SVar StreamK m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> a -> m Bool
SVar.pushToFold SVar StreamK m a
sv a
a
                if Bool
done
                then do
                    -- XXX we do not need to wait synchronously here
                    SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
                    Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
                else Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
            Skip s
s -> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
            Step s a
Stop -> do
                SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
                Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop

    step State StreamK m a
gst (TapDone s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
a s
s -> a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
            Skip s
s    -> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
            Step s a
Stop      -> Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- ParallelT
------------------------------------------------------------------------------

-- | For 'ParallelT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.parallel'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.parallel'
-- @
--
-- See 'Streamly.Prelude.AsyncT', 'ParallelT' is similar except that all
-- iterations are strictly concurrent while in 'AsyncT' it depends on the
-- consumer demand and available threads. See 'parallel' for more details.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- /Since: 0.7.0 (maxBuffer applies to ParallelT streams)/
--
-- @since 0.8.0
newtype ParallelT m a = ParallelT {forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT :: K.StreamK m a}

#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans ParallelT where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
lift = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (m a -> StreamK m a) -> m a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif

-- | A parallely composing IO stream of elements of type @a@.
-- See 'ParallelT' documentation for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type Parallel = ParallelT IO

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT StreamK m a
m1) (ParallelT StreamK m a
m2) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m1 StreamK m a
m2

instance MonadAsync m => Semigroup (ParallelT m a) where
    <> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (ParallelT m a) where
    mempty :: ParallelT m a
mempty = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT StreamK m a
forall (m :: * -> *) a. StreamK m a
K.nil
    mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
    ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
    ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT StreamK m (a -> b)
m1) (ParallelT StreamK m a
m2) =
    let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) StreamK m a
m2
    in StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> ((a -> b) -> StreamK m b) -> StreamK m (a -> b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (a -> b) -> StreamK m b
forall {b}. (a -> b) -> StreamK m b
f StreamK m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> ParallelT m a
pure = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (a -> StreamK m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure

    {-# INLINE (<*>) #-}
    <*> :: forall a b. ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE bind #-}
{-# SPECIALIZE bind ::
    ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT StreamK m a
m) a -> ParallelT m b
f = StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m (ParallelT m b -> StreamK m b
forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT (ParallelT m b -> StreamK m b)
-> (a -> ParallelT m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)

instance MonadAsync m => Monad (ParallelT m) where
    return :: forall a. a -> ParallelT m a
return = a -> ParallelT m a
forall a. a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    {-# INLINE (>>=) #-}
    >>= :: forall a b. ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) where
    liftBase :: forall α. b α -> ParallelT m α
liftBase = b α -> ParallelT m α
forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif

MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)

-------------------------------------------------------------------------------
-- From callback
-------------------------------------------------------------------------------

-- Note: we can use another API with two callbacks stop and yield if we want
-- the callback to be able to indicate end of stream.
--
-- | Generates a callback and a stream pair. The callback returned is used to
-- queue values to the stream.  The stream is infinite, there is no way for the
-- callback to indicate that it is done now.
--
-- /Pre-release/
--
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), K.StreamK m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), StreamK m a)
newCallbackStream = do
    SVar Any m a
sv <- SVarStopStyle -> State Any m a -> m (SVar Any m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Any m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState

    -- XXX Add our own thread-id to the SVar as we can not know the callback's
    -- thread-id and the callback is not run in a managed worker. We need to
    -- handle this better.
    IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Any m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv

    let callback :: a -> m ()
callback a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Any m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    -- XXX we can return an SVar and then the consumer can unfold from the
    -- SVar?
    (a -> m (), StreamK m a) -> m (a -> m (), StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
callback, Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (SVar Any m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))