module Streamly.Internal.Data.Stream.Concurrent
(
MonadAsync
, parBuffered
, parRepeatM
, parReplicateM
, parMapM
, parSequence
, parTwo
, parZipWithM
, parZipWith
, parMergeByM
, parMergeBy
, parListLazy
, parListOrdered
, parListInterleaved
, parListEager
, parListEagerFst
, parListEagerMin
, parList
, parCrossApply
, parConcat
, parConcatMap
, parConcatIterate
, newStreamAndCallback
, fromCallback
, parTapCount
, tapCount
, parEval
, parApply
)
where
#include "inline.hs"
#include "deprecation.h"
import Control.Concurrent (myThreadId, killThread)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Stream (Stream, Step(..))
import Streamly.Internal.Data.Stream.Channel
( Channel(..), newChannel, fromChannel, toChannelK, withChannelK
, withChannel, shutdown, chanConcatMapK
)
import qualified Streamly.Internal.Data.IORef as Unboxed
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K
import Prelude hiding (mapM, sequence, concat, concatMap, zipWith)
import Streamly.Internal.Data.Channel.Types
{-# INLINE parBuffered #-}
parBuffered, parEval
:: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
parBuffered :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parBuffered Config -> Config
modifier Stream m a
input = (Config -> Config)
-> Stream m a
-> (Channel m a -> Stream m a -> Stream m a)
-> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input ((Stream m a -> Stream m a)
-> Channel m a -> Stream m a -> Stream m a
forall a b. a -> b -> a
const Stream m a -> Stream m a
forall a. a -> a
id)
RENAME(parEval,parBuffered)
{-# INLINE _appendGeneric #-}
_appendGeneric :: MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config)
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
_appendGeneric :: forall (m :: * -> *) a.
MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
_appendGeneric (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 = m (StreamK m a) -> StreamK m a
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m a)
action
where
action :: m (StreamK m a)
action = do
Channel m a
chan <- (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m a -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m a
chan)
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> do
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream1
StopWhen
FirstStops -> do
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 StreamK m a
forall {a}. StreamK m a
done)
StopWhen
AnyStops -> do
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream2 StreamK m a
forall {a}. StreamK m a
done)
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 StreamK m a
forall {a}. StreamK m a
done)
StreamK m a -> m (StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m a -> m (StreamK m a)) -> StreamK m a -> m (StreamK m a)
forall a b. (a -> b) -> a -> b
$ Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK (Stream m a -> StreamK m a) -> Stream m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ Channel m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan
{-# INLINE appendWithK #-}
appendWithK :: MonadAsync m =>
(Config -> Config) -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
appendWithK :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 =
(Config -> Config)
-> (StreamK m a -> StreamK m a)
-> StreamK m (StreamK m a)
-> StreamK m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier StreamK m a -> StreamK m a
forall a. a -> a
id (StreamK m a
stream1 StreamK m a -> StreamK m (StreamK m a) -> StreamK m (StreamK m a)
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` StreamK m a -> StreamK m (StreamK m a)
forall a (m :: * -> *). a -> StreamK m a
K.fromPure StreamK m a
stream2)
{-# INLINE _appendWithChanK #-}
_appendWithChanK :: MonadAsync m =>
Channel m a -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
_appendWithChanK :: forall (m :: * -> *) a.
MonadAsync m =>
Channel m a -> StreamK m a -> StreamK m a -> StreamK m a
_appendWithChanK Channel m a
chan StreamK m a
stream1 StreamK m a
stream2 =
m () -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
Monad m =>
m b -> StreamK m a -> StreamK m a
K.before (Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2) StreamK m a
stream1
{-# INLINE parTwo #-}
parTwo :: MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo Config -> Config
modifier Stream m a
stream1 Stream m a
stream2 =
StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
(StreamK m a -> Stream m a) -> StreamK m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ (Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK
Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream1) (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream2)
{-# INLINE parConcatMapK #-}
parConcatMapK :: MonadAsync m =>
(Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier a -> StreamK m b
f StreamK m a
input =
let g :: Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
g = (Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
chanConcatMapK Config -> Config
modifier
in (Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input (Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall {b} {a}.
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
`g` a -> StreamK m b
f)
{-# INLINE parConcatMap #-}
parConcatMap :: MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier a -> Stream m b
f Stream m a
stream =
StreamK m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
(StreamK m b -> Stream m b) -> StreamK m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK
Config -> Config
modifier (Stream m b -> StreamK m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK (Stream m b -> StreamK m b)
-> (a -> Stream m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f) (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream)
{-# INLINE parConcat #-}
parConcat :: MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier = (Config -> Config)
-> (Stream m a -> Stream m a)
-> Stream m (Stream m a)
-> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier Stream m a -> Stream m a
forall a. a -> a
id
{-# INLINE parList #-}
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
parList :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList Config -> Config
modifier = (Config -> Config) -> Stream m (Stream m a) -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier (Stream m (Stream m a) -> Stream m a)
-> ([Stream m a] -> Stream m (Stream m a))
-> [Stream m a]
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Stream m a] -> Stream m (Stream m a)
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList
{-# INLINE parListLazy #-}
parListLazy :: MonadAsync m => [Stream m a] -> Stream m a
parListLazy :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListLazy = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList Config -> Config
forall a. a -> a
id
{-# INLINE parListInterleaved #-}
parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
interleaved Bool
True)
{-# INLINE parListOrdered #-}
parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a
parListOrdered :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListOrdered = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
ordered Bool
True)
{-# INLINE parListEager #-}
parListEager :: MonadAsync m => [Stream m a] -> Stream m a
parListEager :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEager = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True)
{-# INLINE parListEagerFst #-}
parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True (Config -> Config) -> (Config -> Config) -> Config -> Config
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
FirstStops)
{-# INLINE parListEagerMin #-}
parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True (Config -> Config) -> (Config -> Config) -> Config -> Config
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
AnyStops)
{-# INLINE parCrossApply #-}
{-# SPECIALIZE parCrossApply ::
(Config -> Config) -> Stream IO (a -> b) -> Stream IO a -> Stream IO b #-}
parCrossApply, parApply :: MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parCrossApply :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parCrossApply Config -> Config
modifier Stream m (a -> b)
stream1 Stream m a
stream2 =
(Config -> Config)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap
Config -> Config
modifier
(\a -> b
g -> (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (b -> Stream m b
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
g) Stream m a
stream2)
Stream m (a -> b)
stream1
RENAME(parApply,parCrossApply)
{-# INLINE parMapM #-}
parMapM :: MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier a -> m b
f = (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (m b -> Stream m b
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (m b -> Stream m b) -> (a -> m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
{-# INLINE parSequence #-}
parSequence :: MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
modifier = (Config -> Config) -> (m a -> m a) -> Stream m (m a) -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier m a -> m a
forall a. a -> a
id
{-# INLINE parZipWithM #-}
parZipWithM :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> m c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWithM :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg a -> b -> m c
f Stream m a
m1 Stream m b
m2 =
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWithM a -> b -> m c
f ((Config -> Config) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parBuffered Config -> Config
cfg Stream m a
m1) ((Config -> Config) -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parBuffered Config -> Config
cfg Stream m b
m2)
{-# INLINE parZipWith #-}
parZipWith :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWith :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
parZipWith Config -> Config
cfg a -> b -> c
f = (Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg (\a
a b
b -> c -> m c
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
a b
b)
{-# INLINE parMergeByM #-}
parMergeByM :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> m Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeByM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg a -> a -> m Ordering
f Stream m a
m1 Stream m a
m2 =
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.mergeByM a -> a -> m Ordering
f ((Config -> Config) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parBuffered Config -> Config
cfg Stream m a
m1) ((Config -> Config) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parBuffered Config -> Config
cfg Stream m a
m2)
{-# INLINE parMergeBy #-}
parMergeBy :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeBy :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeBy Config -> Config
cfg a -> a -> Ordering
f = (Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg (\a
a a
b -> Ordering -> m Ordering
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ordering -> m Ordering) -> Ordering -> m Ordering
forall a b. (a -> b) -> a -> b
$ a -> a -> Ordering
f a
a a
b)
{-# INLINE parConcatIterate #-}
parConcatIterate :: MonadAsync m =>
(Config -> Config)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
parConcatIterate :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
parConcatIterate Config -> Config
modifier a -> Stream m a
f Stream m a
input =
StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
(StreamK m a -> Stream m a) -> StreamK m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> StreamK m a
-> (Channel m a -> StreamK m a -> StreamK m a)
-> StreamK m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
input) Channel m a -> StreamK m a -> StreamK m a
iterateStream
where
iterateStream :: Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
chan = (Config -> Config)
-> Channel m a -> (a -> StreamK m a) -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
chanConcatMapK Config -> Config
modifier Channel m a
chan (Channel m a -> a -> StreamK m a
generate Channel m a
chan)
generate :: Channel m a -> a -> StreamK m a
generate Channel m a
chan a
x = a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
chan (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK (Stream m a -> StreamK m a) -> Stream m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ a -> Stream m a
f a
x)
{-# INLINE parRepeatM #-}
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
parRepeatM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m a -> Stream m a
parRepeatM Config -> Config
cfg = (Config -> Config) -> Stream m (m a) -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg (Stream m (m a) -> Stream m a)
-> (m a -> Stream m (m a)) -> m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m (m a)
forall (m :: * -> *) a. Monad m => a -> Stream m a
Stream.repeat
{-# INLINE parReplicateM #-}
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
parReplicateM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Int -> m a -> Stream m a
parReplicateM Config -> Config
cfg Int
n = (Config -> Config) -> Stream m (m a) -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg (Stream m (m a) -> Stream m a)
-> (m a -> Stream m (m a)) -> m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m (m a)
forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
Stream.replicate Int
n
{-# INLINE_NORMAL newStreamAndCallback #-}
newStreamAndCallback :: MonadAsync m => m (a -> m (), Stream m a)
newStreamAndCallback :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newStreamAndCallback = do
Channel m a
chan <- (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel (Bool -> Config -> Config
eager Bool
True)
IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
m ThreadId -> (ThreadId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
let callback :: a -> m ()
callback a
a =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan) (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
(a -> m (), Stream m a) -> m (a -> m (), Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
callback, Channel m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan)
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
fromCallback :: forall (m :: * -> *) a.
MonadAsync m =>
((a -> m ()) -> m ()) -> Stream m a
fromCallback (a -> m ()) -> m ()
setCallback = m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
(a -> m ()
callback, Stream m a
stream) <- m (a -> m (), Stream m a)
forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newStreamAndCallback
(a -> m ()) -> m ()
setCallback a -> m ()
callback
Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream m a
stream
{-# INLINE_NORMAL parTapCount #-}
parTapCount
:: MonadAsync m
=> (a -> Bool)
-> (D.Stream m Int -> m b)
-> D.Stream m a
-> D.Stream m a
parTapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
parTapCount a -> Bool
predicate Stream m Int -> m b
fld (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Maybe (IORef Int, ThreadId, s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' Maybe (IORef Int, ThreadId, s)
forall a. Maybe a
Nothing
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' State StreamK m a
_ Maybe (IORef Int, ThreadId, s)
Nothing = do
IORef Int
countVar <- IO (IORef Int) -> m (IORef Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Int) -> m (IORef Int))
-> IO (IORef Int) -> m (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef (Int
0 :: Int)
ThreadId
tid <- m () -> m ThreadId
forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
forkManaged
(m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Stream m Int -> m b
fld
(Stream m Int -> m b) -> Stream m Int -> m b
forall a b. (a -> b) -> a -> b
$ IORef Int -> Stream m Int
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
IORef a -> Stream m a
Unboxed.pollIntIORef IORef Int
countVar
Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state))
step' State StreamK m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
case Step s a
r of
Yield a
x s
s -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
predicate a
x)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ a
-> Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. a -> s -> Step s a
Yield a
x ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Skip s
s -> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Step s a
Stop -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. Step s a
Stop
{-# DEPRECATED tapCount "Please use parTapCount instead." #-}
{-# INLINE tapCount #-}
tapCount ::
(MonadAsync m)
=> (a -> Bool)
-> (Stream m Int -> m b)
-> Stream m a
-> Stream m a
tapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
tapCount = (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
parTapCount