{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Parallel
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}
--
module Streamly.Internal.Data.Stream.Parallel {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" instead." #-}
    (
    -- * Parallel Stream Type
      ParallelT(..)
    , Parallel
    , consM

    -- * Merge Concurrently
    , parallelK
    , parallelFstK
    , parallelMinK

    -- * Evaluate Concurrently
    , mkParallelD
    , mkParallelK

    -- * Tap Concurrently
    , tapAsyncK
    , tapAsyncF

    -- * Callbacks
    , newCallbackStream
    )
where

import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)

import Streamly.Data.Fold (Fold)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))

import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
    (StreamK, foldStreamShared, mkStream, foldStream, fromEffect
    , nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
    (Stream(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream

import Streamly.Internal.Data.SVar
import Prelude hiding (map)

#include "inline.hs"
#include "Instances.hs"

--
-- $setup
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.StreamK m a -> K.StreamK m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
m =
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> m r
yld a
a ((r -> r) -> StreamK m a -> StreamK m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
r)
        in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) StreamK m a
m

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

-------------------------------------------------------------------------------
-- StreamK based worker routines
-------------------------------------------------------------------------------

{-# NOINLINE runOne #-}
runOne
    :: MonadIO m
    => State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOne :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo =
    case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
        Maybe Count
Nothing -> StreamK m a -> m ()
go StreamK m a
m0
        Just Count
_  -> State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo

    where

    go :: StreamK m a -> m ()
go StreamK m a
m = do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
        State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m

    sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st

    stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
        SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
    sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
    yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r

runOneLimited
    :: MonadIO m
    => State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo = StreamK m a -> m ()
go StreamK m a
m0

    where

    go :: StreamK m a -> m ()
go StreamK m a
m = do
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
        if Bool
yieldLimitOk
        then do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
            State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
        else do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar StreamK m a
sv
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

    sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st

    stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
        SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
        SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
    sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
    yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r

-------------------------------------------------------------------------------
-- Consing and appending a stream in parallel style
-------------------------------------------------------------------------------

-- Note that consing and appending requires StreamK as it would not scale well
-- with StreamD unless we are only consing a very small number of streams or
-- elements in a stream. StreamK allows us to manipulate control flow in a way
-- which StreamD cannot allow. StreamK can make a jump without having to
-- remember the past state.

{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
    => SVarStopStyle -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
forkSVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m StreamK m a
r = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State StreamK m a
st
    SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
m)
    case SVarStopStyle
ss of
        SVarStopStyle
StopBy -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar StreamK m a
sv)
            IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar StreamK m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar StreamK m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
        SVarStopStyle
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
r)
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv)

{-# INLINE joinStreamVarPar #-}
joinStreamVarPar ::
       MonadAsync m
    => SVarStyle
    -> SVarStopStyle
    -> K.StreamK m a
    -> K.StreamK m a
    -> K.StreamK m a
joinStreamVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
        Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar StreamK m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar StreamK m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
            -- Here, WE ARE IN THE WORKER/PRODUCER THREAD, we know that because
            -- the SVar exists. We are running under runOne and the output we
            -- produce ultimately will be sent to the SVar by runOne.
            --
            -- If we came here the worker/runOne is evaluating a `parallel`
            -- combinator. In this case, we always fork a new worker for the
            -- first component (m1) in the parallel composition and continue to
            -- evaluate the second component (m2) in the current worker thread.
            --
            -- When m1 is serially composed, the worker would evaluate it
            -- without any further forks and the resulting output is sent to
            -- the SVar and the evaluation terminates. If m1 is a `parallel`
            -- composition of two streams the worker would again recurses here.
            --
            -- Similarly, when m2 is serially composed it gets evaluated here
            -- and the resulting output is sent to the SVar by the runOne
            -- wrapper. When m2 is composed with `parallel` it will again
            -- recurse here and so on until it finally terminates.
            --
            -- When we create a right associated expression using `parallel`,
            -- then m1 would always terminate without further forks or
            -- recursion into this routine, therefore, the worker returns
            -- immediately after evaluating it. And m2 would continue to
            -- fork/recurse, therefore, the current thread always recurses and
            -- forks new workers one after the other.  This is a tail recursive
            -- style execution, m2, the recursive expression always executed at
            -- the tail.
            --
            -- When the expression is left associated, the worker spawned would
            -- get the forking/recursing responsibility and then again the
            -- worker spawned by that worker would fork, thus creating layer
            -- over layer of workers and a chain of threads leading to a very
            -- inefficient execution.
            SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m1)
            State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
        Maybe (SVar StreamK m a)
_ ->
            -- Here WE ARE IN THE CONSUMER THREAD, we create a new SVar, fork
            -- worker threads to execute m1 and m2 and this thread starts
            -- pulling the stream from the SVar.
            State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2)

-------------------------------------------------------------------------------
-- User facing APIs
-------------------------------------------------------------------------------

{-# INLINE parallelK #-}
parallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using parallel.
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT StreamK m a
r) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) StreamK m a
r

-- This is a co-parallel like combinator for streams, where first stream is the
-- main stream and the rest are just supporting it, when the first ends
-- everything ends.
--
-- | Like `parallel` but stops the output as soon as the first stream stops.
--
-- /Pre-release/
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelFstK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelFstK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy

-- This is a race like combinator for streams.
--
-- | Like `parallel` but stops the output as soon as any of the two streams
-- stops.
--
-- /Pre-release/
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelMinK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelMinK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny

------------------------------------------------------------------------------
-- Convert a stream to parallel
------------------------------------------------------------------------------

-- | Like 'mkParallel' but uses StreamK internally.
--
-- /Pre-release/
--
mkParallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a
mkParallelK :: forall (m :: * -> *) a. MonadAsync m => StreamK m a -> StreamK m a
mkParallelK StreamK m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st)
    -- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
    State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
st SVar StreamK m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK StreamK m a
m
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> StreamK m a) -> SerialT m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv

-- | Same as 'mkParallel' but for StreamD stream.
--
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD Stream m a
m = (State StreamK m a
 -> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
    where

    step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
        SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State StreamK m a
gst
        State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
gst SVar StreamK m a
sv Stream m a
m
        -- XXX use unfold instead?
        Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar StreamK m a
sv

    step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
            Skip s
s    -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
            Step s a
Stop      -> Step (Maybe (Stream m a)) a
forall s a. Step s a
Stop

-------------------------------------------------------------------------------
-- Concurrent tap
-------------------------------------------------------------------------------

-- NOTE: In regular pull style streams, the consumer stream is pulling elements
-- from the SVar and we have several workers producing elements and pushing to
-- SVar. In case of folds, we, the parent stream driving the fold, are the
-- stream producing worker, we start an SVar and start pushing to the SVar, the
-- fold on the other side of the SVar is the consumer stream.
--
-- In the pull stream case exceptions are propagated from the producing workers
-- to the consumer stream, the exceptions are propagated on the same channel as
-- the produced stream elements. However, in case of push style folds the
-- current stream itself is the worker and the fold is the consumer, in this
-- case we have to propagate the exceptions from the consumer to the producer.
-- This is reverse of the pull case and we need a reverse direction channel
-- to propagate the exception.
--
-- | Redirect a copy of the stream to a supplied fold and run it concurrently
-- in an independent thread. The fold may buffer some elements. The buffer size
-- is determined by the prevailing 'Streamly.Prelude.maxBuffer' setting.
--
-- @
--               StreamK m a -> m b
--                       |
-- -----stream m a ---------------stream m a-----
--
-- @
--
-- @
-- > S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2)
-- 1
-- 2
-- @
--
-- Exceptions from the concurrently running fold are propagated to the current
-- computation.  Note that, because of buffering in the fold, exceptions may be
-- delayed and may not correspond to the current element being processed in the
-- parent stream, but we guarantee that before the parent stream stops the tap
-- finishes and all exceptions from it are drained.
--
--
-- Compare with 'tap'.
--
-- /Pre-release/
{-# INLINE tapAsyncK #-}
tapAsyncK ::
       MonadAsync m => (K.StreamK m a -> m b) -> K.StreamK m a -> K.StreamK m a
tapAsyncK :: forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
tapAsyncK StreamK m a -> m b
f StreamK m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar StreamK m a
sv <- State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
forall (m :: * -> *) a b.
MonadAsync m =>
State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
SVar.newFoldSVar State StreamK m a
st (StreamK m a -> m b
f (StreamK m a -> m b)
-> (SerialT m a -> StreamK m a) -> SerialT m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK)
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
        (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar StreamK m a
sv (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK StreamK m a
m)

data TapState fs st a = TapInit | Tapping !fs st | TapDone st

-- | Like 'tapAsync' but uses a 'Fold' instead of a fold function.
--
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = (State StreamK m a
 -> TapState (SVar StreamK m a) s Any
 -> m (Step (TapState (SVar StreamK m a) s Any) a))
-> TapState (SVar StreamK m a) s Any -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> TapState (SVar StreamK m a) s Any
-> m (Step (TapState (SVar StreamK m a) s Any) a)
forall {a} {a}.
State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step TapState (SVar StreamK m a) s Any
forall fs st a. TapState fs st a
TapInit
    where

    drainFold :: SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr = do
            -- In general, a Stop event would come equipped with the result
            -- of the fold. It is not used here but it would be useful in
            -- applicative and distribute.
            Bool
done <- SVar StreamK m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar StreamK m a -> m Bool
SVar.fromConsumer SVar StreamK m a
svr
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar StreamK m a
svr String
"teeToSVar: waiting to drain"
                       (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar StreamK m a
svr)
                SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr

    stopFold :: SVar StreamK m a -> m ()
stopFold SVar StreamK m a
svr = do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
            -- drain/wait until a stop event arrives from the fold.
            SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step State StreamK m a
gst TapState (SVar StreamK m a) s a
TapInit = do
        SVar StreamK m a
sv <- State StreamK m a -> Fold m a b -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State StreamK m a
gst Fold m a b
f
        Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
state1)

    step State StreamK m a
gst (Tapping SVar StreamK m a
sv s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
a s
s ->  do
                Bool
done <- SVar StreamK m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> a -> m Bool
SVar.pushToFold SVar StreamK m a
sv a
a
                if Bool
done
                then do
                    -- XXX we do not need to wait synchronously here
                    SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
                    Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
                else Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
            Skip s
s -> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
            Step s a
Stop -> do
                SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
                Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop

    step State StreamK m a
gst (TapDone s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
 -> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
a s
s -> a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
            Skip s
s    -> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
            Step s a
Stop      -> Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- ParallelT
------------------------------------------------------------------------------

-- | For 'ParallelT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.parallel'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.parallel'
-- @
--
-- See 'Streamly.Prelude.AsyncT', 'ParallelT' is similar except that all
-- iterations are strictly concurrent while in 'AsyncT' it depends on the
-- consumer demand and available threads. See 'parallel' for more details.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- /Since: 0.7.0 (maxBuffer applies to ParallelT streams)/
--
-- @since 0.8.0
newtype ParallelT m a = ParallelT {forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT :: K.StreamK m a}

instance MonadTrans ParallelT where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
lift = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (m a -> StreamK m a) -> m a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect

-- | A parallely composing IO stream of elements of type @a@.
-- See 'ParallelT' documentation for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type Parallel = ParallelT IO

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT StreamK m a
m1) (ParallelT StreamK m a
m2) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m1 StreamK m a
m2

instance MonadAsync m => Semigroup (ParallelT m a) where
    <> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (ParallelT m a) where
    mempty :: ParallelT m a
mempty = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT StreamK m a
forall (m :: * -> *) a. StreamK m a
K.nil
    mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
    ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
    ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT StreamK m (a -> b)
m1) (ParallelT StreamK m a
m2) =
    let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) StreamK m a
m2
    in StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> ((a -> b) -> StreamK m b) -> StreamK m (a -> b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (a -> b) -> StreamK m b
forall {b}. (a -> b) -> StreamK m b
f StreamK m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> ParallelT m a
pure = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (a -> StreamK m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure

    {-# INLINE (<*>) #-}
    <*> :: forall a b. ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE bind #-}
{-# SPECIALIZE bind ::
    ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT StreamK m a
m) a -> ParallelT m b
f = StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m (ParallelT m b -> StreamK m b
forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT (ParallelT m b -> StreamK m b)
-> (a -> ParallelT m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)

instance MonadAsync m => Monad (ParallelT m) where
    return :: forall a. a -> ParallelT m a
return = a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    {-# INLINE (>>=) #-}
    >>= :: forall a b. ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)

-------------------------------------------------------------------------------
-- From callback
-------------------------------------------------------------------------------

-- Note: we can use another API with two callbacks stop and yield if we want
-- the callback to be able to indicate end of stream.
--
-- | Generates a callback and a stream pair. The callback returned is used to
-- queue values to the stream.  The stream is infinite, there is no way for the
-- callback to indicate that it is done now.
--
-- /Pre-release/
--
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), K.StreamK m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), StreamK m a)
newCallbackStream = do
    SVar Any m a
sv <- SVarStopStyle -> State Any m a -> m (SVar Any m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Any m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState

    -- XXX Add our own thread-id to the SVar as we can not know the callback's
    -- thread-id and the callback is not run in a managed worker. We need to
    -- handle this better.
    IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Any m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv

    let callback :: a -> m ()
callback a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Any m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    -- XXX we can return an SVar and then the consumer can unfold from the
    -- SVar?
    (a -> m (), StreamK m a) -> m (a -> m (), StreamK m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
callback, Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (SVar Any m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))