{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
module Streamly.Internal.Data.Stream.Parallel {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" instead." #-}
(
ParallelT(..)
, Parallel
, consM
, parallelK
, parallelFstK
, parallelMinK
, mkParallelD
, mkParallelK
, tapAsyncK
, tapAsyncF
, newCallbackStream
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(StreamK, foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "inline.hs"
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.StreamK m a -> K.StreamK m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
m =
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> m r
yld a
a ((r -> r) -> StreamK m a -> StreamK m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
r)
in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) StreamK m a
m
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOne :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo =
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> StreamK m a -> m ()
go StreamK m a
m0
Just Count
_ -> State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo
where
go :: StreamK m a -> m ()
go StreamK m a
m = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
runOneLimited
:: MonadIO m
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo = StreamK m a -> m ()
go StreamK m a
m0
where
go :: StreamK m a -> m ()
go StreamK m a
m = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar StreamK m a
sv
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
forkSVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m StreamK m a
r = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State StreamK m a
st
SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
m)
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar StreamK m a
sv)
IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar StreamK m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar StreamK m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
SVarStopStyle
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
r)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar ::
MonadAsync m
=> SVarStyle
-> SVarStopStyle
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
joinStreamVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2 = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar StreamK m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar StreamK m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m1)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
Maybe (SVar StreamK m a)
_ ->
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2)
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT StreamK m a
r) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) StreamK m a
r
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelFstK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelFstK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelMinK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelMinK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny
mkParallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a
mkParallelK :: forall (m :: * -> *) a. MonadAsync m => StreamK m a -> StreamK m a
mkParallelK StreamK m a
m = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st)
State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
st SVar StreamK m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK StreamK m a
m
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> StreamK m a) -> SerialT m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD Stream m a
m = (State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State StreamK m a
gst
State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
gst SVar StreamK m a
sv Stream m a
m
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar StreamK m a
sv
step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Step s a
Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
Stop
{-# INLINE tapAsyncK #-}
tapAsyncK ::
MonadAsync m => (K.StreamK m a -> m b) -> K.StreamK m a -> K.StreamK m a
tapAsyncK :: forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
tapAsyncK StreamK m a -> m b
f StreamK m a
m = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
forall (m :: * -> *) a b.
MonadAsync m =>
State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
SVar.newFoldSVar State StreamK m a
st (StreamK m a -> m b
f (StreamK m a -> m b)
-> (SerialT m a -> StreamK m a) -> SerialT m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
(StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar StreamK m a
sv (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK StreamK m a
m)
data TapState fs st a = TapInit | Tapping !fs st | TapDone st
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = (State StreamK m a
-> TapState (SVar StreamK m a) s Any
-> m (Step (TapState (SVar StreamK m a) s Any) a))
-> TapState (SVar StreamK m a) s Any -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> TapState (SVar StreamK m a) s Any
-> m (Step (TapState (SVar StreamK m a) s Any) a)
forall {a} {a}.
State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step TapState (SVar StreamK m a) s Any
forall fs st a. TapState fs st a
TapInit
where
drainFold :: SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr = do
Bool
done <- SVar StreamK m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar StreamK m a -> m Bool
SVar.fromConsumer SVar StreamK m a
svr
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar StreamK m a
svr String
"teeToSVar: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar StreamK m a
svr)
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr
stopFold :: SVar StreamK m a -> m ()
stopFold SVar StreamK m a
svr = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr
{-# INLINE_LATE step #-}
step :: State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step State StreamK m a
gst TapState (SVar StreamK m a) s a
TapInit = do
SVar StreamK m a
sv <- State StreamK m a -> Fold m a b -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State StreamK m a
gst Fold m a b
f
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
state1)
step State StreamK m a
gst (Tapping SVar StreamK m a
sv s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
case Step s a
r of
Yield a
a s
s -> do
Bool
done <- SVar StreamK m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> a -> m Bool
SVar.pushToFold SVar StreamK m a
sv a
a
if Bool
done
then do
SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
else Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
Skip s
s -> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
Step s a
Stop -> do
SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop
step State StreamK m a
gst (TapDone s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Skip s
s -> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Step s a
Stop -> Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop
newtype ParallelT m a = ParallelT {forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT :: K.StreamK m a}
instance MonadTrans ParallelT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
lift = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (m a -> StreamK m a) -> m a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
type Parallel = ParallelT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT StreamK m a
m1) (ParallelT StreamK m a
m2) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m1 StreamK m a
m2
instance MonadAsync m => Semigroup (ParallelT m a) where
<> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append
instance MonadAsync m => Monoid (ParallelT m a) where
mempty :: ParallelT m a
mempty = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT StreamK m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT StreamK m (a -> b)
m1) (ParallelT StreamK m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) StreamK m a
m2
in StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> ((a -> b) -> StreamK m b) -> StreamK m (a -> b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (a -> b) -> StreamK m b
forall {b}. (a -> b) -> StreamK m b
f StreamK m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
{-# INLINE pure #-}
pure :: forall a. a -> ParallelT m a
pure = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (a -> StreamK m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel
{-# INLINE bind #-}
{-# SPECIALIZE bind ::
ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT StreamK m a
m) a -> ParallelT m b
f = StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m (ParallelT m b -> StreamK m b
forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT (ParallelT m b -> StreamK m b)
-> (a -> ParallelT m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)
instance MonadAsync m => Monad (ParallelT m) where
return :: forall a. a -> ParallelT m a
return = a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), K.StreamK m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), StreamK m a)
newCallbackStream = do
SVar Any m a
sv <- SVarStopStyle -> State Any m a -> m (SVar Any m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Any m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState
IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Any m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv
let callback :: a -> m ()
callback a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Any m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
(a -> m (), StreamK m a) -> m (a -> m (), StreamK m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
callback, Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (SVar Any m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))