{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
module Streamly.Internal.Data.Stream.Parallel {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" instead." #-}
(
ParallelT(..)
, Parallel
, consM
, parallelK
, parallelFstK
, parallelMinK
, mkParallelD
, mkParallelK
, tapAsyncK
, tapAsyncF
, newCallbackStream
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Trans.Class (MonadTrans(lift))
#endif
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream (Step(..))
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K
(StreamK, foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream as D
(Stream(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "inline.hs"
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.StreamK m a -> K.StreamK m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
m =
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> m r
yld a
a ((r -> r) -> StreamK m a -> StreamK m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
r)
in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) StreamK m a
m
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOne :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo =
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> StreamK m a -> m ()
go StreamK m a
m0
Just Count
_ -> State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo
where
go :: StreamK m a -> m ()
go StreamK m a
m = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
stop :: m ()
stop = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
runOneLimited
:: MonadIO m
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo = StreamK m a -> m ()
go StreamK m a
m0
where
go :: StreamK m a -> m ()
go StreamK m a
m = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
else do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar StreamK m a
sv
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sv :: SVar StreamK m a
sv = Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
stop :: m ()
stop = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
forkSVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m StreamK m a
r = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State StreamK m a
st
SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
m)
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar StreamK m a
sv)
IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar StreamK m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar StreamK m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
SVarStopStyle
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
r)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar ::
MonadAsync m
=> SVarStyle
-> SVarStopStyle
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
joinStreamVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2 = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar StreamK m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar StreamK m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
SVar StreamK m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m1)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
Maybe (SVar StreamK m a)
_ ->
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2)
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT StreamK m a
r) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) StreamK m a
r
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelFstK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelFstK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelMinK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelMinK = SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny
mkParallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a
mkParallelK :: forall (m :: * -> *) a. MonadAsync m => StreamK m a -> StreamK m a
mkParallelK StreamK m a
m = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st)
State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
st SVar StreamK m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK StreamK m a
m
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> StreamK m a) -> SerialT m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD Stream m a
m = (State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
SVar StreamK m a
sv <- SVarStopStyle -> State StreamK m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State StreamK m a
gst
State StreamK m a -> SVar StreamK m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
gst SVar StreamK m a
sv Stream m a
m
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar StreamK m a
sv
step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Step s a
Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
Stop
{-# INLINE tapAsyncK #-}
tapAsyncK ::
MonadAsync m => (K.StreamK m a -> m b) -> K.StreamK m a -> K.StreamK m a
tapAsyncK :: forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
tapAsyncK StreamK m a -> m b
f StreamK m a
m = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
forall (m :: * -> *) a b.
MonadAsync m =>
State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
SVar.newFoldSVar State StreamK m a
st (StreamK m a -> m b
f (StreamK m a -> m b)
-> (SerialT m a -> StreamK m a) -> SerialT m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
(StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> StreamK m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar StreamK m a
sv (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK StreamK m a
m)
data TapState fs st a = TapInit | Tapping !fs st | TapDone st
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = (State StreamK m a
-> TapState (SVar StreamK m a) s Any
-> m (Step (TapState (SVar StreamK m a) s Any) a))
-> TapState (SVar StreamK m a) s Any -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> TapState (SVar StreamK m a) s Any
-> m (Step (TapState (SVar StreamK m a) s Any) a)
forall {a} {a}.
State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step TapState (SVar StreamK m a) s Any
forall fs st a. TapState fs st a
TapInit
where
drainFold :: SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr = do
Bool
done <- SVar StreamK m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar StreamK m a -> m Bool
SVar.fromConsumer SVar StreamK m a
svr
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar StreamK m a
svr String
"teeToSVar: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar StreamK m a
svr)
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr
stopFold :: SVar StreamK m a -> m ()
stopFold SVar StreamK m a
svr = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr
{-# INLINE_LATE step #-}
step :: State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step State StreamK m a
gst TapState (SVar StreamK m a) s a
TapInit = do
SVar StreamK m a
sv <- State StreamK m a -> Fold m a b -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State StreamK m a
gst Fold m a b
f
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
state1)
step State StreamK m a
gst (Tapping SVar StreamK m a
sv s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
case Step s a
r of
Yield a
a s
s -> do
Bool
done <- SVar StreamK m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> a -> m Bool
SVar.pushToFold SVar StreamK m a
sv a
a
if Bool
done
then do
SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
else Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
Skip s
s -> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (SVar StreamK m a -> s -> TapState (SVar StreamK m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
Step s a
Stop -> do
SVar StreamK m a -> m ()
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop
step State StreamK m a
gst (TapDone s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a))
-> Step (TapState (SVar StreamK m a) s a) a
-> m (Step (TapState (SVar StreamK m a) s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a
-> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Skip s
s -> TapState (SVar StreamK m a) s a
-> Step (TapState (SVar StreamK m a) s a) a
forall s a. s -> Step s a
Skip (s -> TapState (SVar StreamK m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Step s a
Stop -> Step (TapState (SVar StreamK m a) s a) a
forall s a. Step s a
Stop
newtype ParallelT m a = ParallelT {forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT :: K.StreamK m a}
#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans ParallelT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
lift = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (m a -> StreamK m a) -> m a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif
type Parallel = ParallelT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT StreamK m a
m1) (ParallelT StreamK m a
m2) = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a) -> StreamK m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m1 StreamK m a
m2
instance MonadAsync m => Semigroup (ParallelT m a) where
<> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append
instance MonadAsync m => Monoid (ParallelT m a) where
mempty :: ParallelT m a
mempty = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT StreamK m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT StreamK m (a -> b)
m1) (ParallelT StreamK m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) StreamK m a
m2
in StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> ((a -> b) -> StreamK m b) -> StreamK m (a -> b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (a -> b) -> StreamK m b
forall {b}. (a -> b) -> StreamK m b
f StreamK m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
{-# INLINE pure #-}
pure :: forall a. a -> ParallelT m a
pure = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m a -> ParallelT m a)
-> (a -> StreamK m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel
{-# INLINE bind #-}
{-# SPECIALIZE bind ::
ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT StreamK m a
m) a -> ParallelT m b
f = StreamK m b -> ParallelT m b
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT (StreamK m b -> ParallelT m b) -> StreamK m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m (ParallelT m b -> StreamK m b
forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT (ParallelT m b -> StreamK m b)
-> (a -> ParallelT m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)
instance MonadAsync m => Monad (ParallelT m) where
return :: forall a. a -> ParallelT m a
return = a -> ParallelT m a
forall a. a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind
#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) where
liftBase :: forall α. b α -> ParallelT m α
liftBase = b α -> ParallelT m α
forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), K.StreamK m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), StreamK m a)
newCallbackStream = do
SVar Any m a
sv <- SVarStopStyle -> State Any m a -> m (SVar Any m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Any m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState
IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Any m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv
let callback :: a -> m ()
callback a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Any m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
(a -> m (), StreamK m a) -> m (a -> m (), StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
callback, Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (SVar Any m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))