module Streamly.Internal.Data.Stream.Concurrent
(
module Streamly.Internal.Data.Stream.Concurrent.Channel
, MonadAsync
, parEval
, parRepeatM
, parReplicateM
, parMapM
, parSequence
, parTwo
, parZipWithM
, parZipWith
, parMergeByM
, parMergeBy
, parListLazy
, parListOrdered
, parListInterleaved
, parListEager
, parListEagerFst
, parListEagerMin
, parList
, parApply
, parConcat
, parConcatMap
, parConcatIterate
, fromCallback
, parTapCount
, tapCount
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, killThread)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import Streamly.Internal.Data.Channel.Types (ChildEvent(..))
import Streamly.Internal.Data.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Stream (Stream, Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import qualified Streamly.Internal.Data.MutArray as Unboxed
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K
import Prelude hiding (mapM, sequence, concat, concatMap, zipWith)
import Streamly.Internal.Data.Stream.Concurrent.Channel
{-# INLINE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
parEval :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
modifier Stream m a
input = (Config -> Config)
-> Stream m a
-> (Channel m a -> Stream m a -> Stream m a)
-> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input ((Stream m a -> Stream m a)
-> Channel m a -> Stream m a -> Stream m a
forall a b. a -> b -> a
const Stream m a -> Stream m a
forall a. a -> a
id)
{-# INLINE _appendGeneric #-}
_appendGeneric :: MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config)
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
_appendGeneric :: forall (m :: * -> *) a.
MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
_appendGeneric (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 = m (StreamK m a) -> StreamK m a
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m a)
action
where
action :: m (StreamK m a)
action = do
Channel m a
chan <- (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m a -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m a
chan)
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> do
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream1
StopWhen
FirstStops -> do
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 StreamK m a
forall {a}. StreamK m a
done)
StopWhen
AnyStops -> do
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream2 StreamK m a
forall {a}. StreamK m a
done)
Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 StreamK m a
forall {a}. StreamK m a
done)
StreamK m a -> m (StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m a -> m (StreamK m a)) -> StreamK m a -> m (StreamK m a)
forall a b. (a -> b) -> a -> b
$ Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK (Stream m a -> StreamK m a) -> Stream m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ Channel m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan
{-# INLINE appendWithK #-}
appendWithK :: MonadAsync m =>
(Config -> Config) -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
appendWithK :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 =
(Config -> Config)
-> (StreamK m a -> StreamK m a)
-> StreamK m (StreamK m a)
-> StreamK m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier StreamK m a -> StreamK m a
forall a. a -> a
id (StreamK m a
stream1 StreamK m a -> StreamK m (StreamK m a) -> StreamK m (StreamK m a)
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` StreamK m a -> StreamK m (StreamK m a)
forall a (m :: * -> *). a -> StreamK m a
K.fromPure StreamK m a
stream2)
{-# INLINE _appendWithChanK #-}
_appendWithChanK :: MonadAsync m =>
Channel m a -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
_appendWithChanK :: forall (m :: * -> *) a.
MonadAsync m =>
Channel m a -> StreamK m a -> StreamK m a -> StreamK m a
_appendWithChanK Channel m a
chan StreamK m a
stream1 StreamK m a
stream2 =
m () -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
Monad m =>
m b -> StreamK m a -> StreamK m a
K.before (Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2) StreamK m a
stream1
{-# INLINE parTwo #-}
parTwo :: MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo Config -> Config
modifier Stream m a
stream1 Stream m a
stream2 =
StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
(StreamK m a -> Stream m a) -> StreamK m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ (Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK
Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream1) (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream2)
{-# INLINE concatMapDivK #-}
concatMapDivK :: Monad m =>
(K.StreamK m a -> m ())
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
concatMapDivK :: forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
useTail a -> StreamK m b
useHead StreamK m a
stream =
(forall r.
State StreamK m b
-> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m b
-> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b)
-> (forall r.
State StreamK m b
-> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp -> do
let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
single :: a -> m r
single a
a = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m b
useHead a
a
yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m a -> m ()
useTail StreamK m a
r m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
single a
a
in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
stream
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-> m (K.StreamK m a -> m ())
mkEnqueue :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner = do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
(StreamK m a -> m ()) -> m (StreamK m a -> m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
((StreamK m a -> m ()) -> m (StreamK m a -> m ()))
-> (StreamK m a -> m ()) -> m (StreamK m a -> m ())
forall a b. (a -> b) -> a -> b
$ let q :: StreamK m a -> m ()
q StreamK m a
stream = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m b -> Bool -> (RunInIO m, StreamK m b) -> IO ()
forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m b
chan Bool
False (RunInIO m
runInIO, (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner StreamK m a -> m ()
q StreamK m a
stream)
Channel m b -> m ()
forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch Channel m b
chan
in StreamK m a -> m ()
q
{-# INLINE parConcatMapChanK #-}
parConcatMapChanK :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanK :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanK Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q a -> StreamK m b
f
in ((StreamK m a -> m ()) -> StreamK m b)
-> m (StreamK m a -> m ()) -> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m b -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q (\a
x -> StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
x) StreamK m b
forall {a}. StreamK m a
done)
in ((StreamK m a -> m ()) -> StreamK m b)
-> m (StreamK m a -> m ()) -> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m b -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q a -> StreamK m b
f
in m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect (m (StreamK m b) -> StreamK m b) -> m (StreamK m b) -> StreamK m b
forall a b. (a -> b) -> a -> b
$ do
Maybe (a, StreamK m a)
res <- StreamK m a -> m (Maybe (a, StreamK m a))
forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
K.uncons StreamK m a
stream
case Maybe (a, StreamK m a)
res of
Maybe (a, StreamK m a)
Nothing -> StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamK m b
forall (m :: * -> *) a. StreamK m a
K.nil
Just (a
h, StreamK m a
t) -> do
StreamK m a -> m ()
q <- Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run
StreamK m a -> m ()
q StreamK m a
t
StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
h) StreamK m b
forall {a}. StreamK m a
done
{-# INLINE parConcatMapChanKGeneric #-}
parConcatMapChanKGeneric :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
parConcatMapChanKGeneric :: forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier Channel m b
chan a -> StreamK m b
f StreamK m a
stream = do
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanK Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
FirstStops -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
AnyStops -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream
{-# INLINE parConcatMapK #-}
parConcatMapK :: MonadAsync m =>
(Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier a -> StreamK m b
f StreamK m a
input =
let g :: Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
g = (Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier
in (Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input (Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall {b} {a}.
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
`g` a -> StreamK m b
f)
{-# INLINE parConcatMap #-}
parConcatMap :: MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier a -> Stream m b
f Stream m a
stream =
StreamK m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
(StreamK m b -> Stream m b) -> StreamK m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK
Config -> Config
modifier (Stream m b -> StreamK m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK (Stream m b -> StreamK m b)
-> (a -> Stream m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f) (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream)
{-# INLINE parConcat #-}
parConcat :: MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier = (Config -> Config)
-> (Stream m a -> Stream m a)
-> Stream m (Stream m a)
-> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier Stream m a -> Stream m a
forall a. a -> a
id
{-# INLINE parList #-}
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
parList :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList Config -> Config
modifier = (Config -> Config) -> Stream m (Stream m a) -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier (Stream m (Stream m a) -> Stream m a)
-> ([Stream m a] -> Stream m (Stream m a))
-> [Stream m a]
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Stream m a] -> Stream m (Stream m a)
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList
{-# INLINE parListLazy #-}
parListLazy :: MonadAsync m => [Stream m a] -> Stream m a
parListLazy :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListLazy = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList Config -> Config
forall a. a -> a
id
{-# INLINE parListInterleaved #-}
parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
interleaved Bool
True)
{-# INLINE parListOrdered #-}
parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a
parListOrdered :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListOrdered = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
ordered Bool
True)
{-# INLINE parListEager #-}
parListEager :: MonadAsync m => [Stream m a] -> Stream m a
parListEager :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEager = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True)
{-# INLINE parListEagerFst #-}
parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True (Config -> Config) -> (Config -> Config) -> Config -> Config
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
FirstStops)
{-# INLINE parListEagerMin #-}
parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin = (Config -> Config) -> [Stream m a] -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True (Config -> Config) -> (Config -> Config) -> Config -> Config
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
AnyStops)
{-# INLINE parApply #-}
{-# SPECIALIZE parApply ::
(Config -> Config) -> Stream IO (a -> b) -> Stream IO a -> Stream IO b #-}
parApply :: MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parApply :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parApply Config -> Config
modifier Stream m (a -> b)
stream1 Stream m a
stream2 =
(Config -> Config)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap
Config -> Config
modifier
(\a -> b
g -> (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (b -> Stream m b
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
g) Stream m a
stream2)
Stream m (a -> b)
stream1
{-# INLINE parMapM #-}
parMapM :: MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier a -> m b
f = (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (m b -> Stream m b
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (m b -> Stream m b) -> (a -> m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
{-# INLINE parSequence #-}
parSequence :: MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
modifier = (Config -> Config) -> (m a -> m a) -> Stream m (m a) -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier m a -> m a
forall a. a -> a
id
{-# INLINE parZipWithM #-}
parZipWithM :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> m c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWithM :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg a -> b -> m c
f Stream m a
m1 Stream m b
m2 = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWithM a -> b -> m c
f ((Config -> Config) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m1) ((Config -> Config) -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m b
m2)
{-# INLINE parZipWith #-}
parZipWith :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWith :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
parZipWith Config -> Config
cfg a -> b -> c
f = (Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg (\a
a b
b -> c -> m c
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
a b
b)
{-# INLINE parMergeByM #-}
parMergeByM :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> m Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeByM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg a -> a -> m Ordering
f Stream m a
m1 Stream m a
m2 = (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.mergeByM a -> a -> m Ordering
f ((Config -> Config) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m1) ((Config -> Config) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m2)
{-# INLINE parMergeBy #-}
parMergeBy :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeBy :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeBy Config -> Config
cfg a -> a -> Ordering
f = (Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg (\a
a a
b -> Ordering -> m Ordering
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ordering -> m Ordering) -> Ordering -> m Ordering
forall a b. (a -> b) -> a -> b
$ a -> a -> Ordering
f a
a a
b)
{-# INLINE parConcatIterate #-}
parConcatIterate :: MonadAsync m =>
(Config -> Config)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
parConcatIterate :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
parConcatIterate Config -> Config
modifier a -> Stream m a
f Stream m a
input =
StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
(StreamK m a -> Stream m a) -> StreamK m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> StreamK m a
-> (Channel m a -> StreamK m a -> StreamK m a)
-> StreamK m a
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
input) Channel m a -> StreamK m a -> StreamK m a
iterateStream
where
iterateStream :: Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
channel =
(Config -> Config)
-> Channel m a -> (a -> StreamK m a) -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier Channel m a
channel (Channel m a -> a -> StreamK m a
generate Channel m a
channel)
generate :: Channel m a -> a -> StreamK m a
generate Channel m a
channel a
x =
a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
channel (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK (Stream m a -> StreamK m a) -> Stream m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ a -> Stream m a
f a
x)
{-# INLINE parRepeatM #-}
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
parRepeatM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m a -> Stream m a
parRepeatM Config -> Config
cfg = (Config -> Config) -> Stream m (m a) -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg (Stream m (m a) -> Stream m a)
-> (m a -> Stream m (m a)) -> m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m (m a)
forall (m :: * -> *) a. Monad m => a -> Stream m a
Stream.repeat
{-# INLINE parReplicateM #-}
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
parReplicateM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Int -> m a -> Stream m a
parReplicateM Config -> Config
cfg Int
n = (Config -> Config) -> Stream m (m a) -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg (Stream m (m a) -> Stream m a)
-> (m a -> Stream m (m a)) -> m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m (m a)
forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
Stream.replicate Int
n
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream = do
Channel m a
chan <- (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel (Bool -> Config -> Config
eager Bool
True)
IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
m ThreadId -> (ThreadId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
let callback :: a -> m ()
callback a
a =
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan) (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
(a -> m (), Stream m a) -> m (a -> m (), Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall {m :: * -> *}. MonadIO m => a -> m ()
callback, Channel m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan)
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
fromCallback :: forall (m :: * -> *) a.
MonadAsync m =>
((a -> m ()) -> m ()) -> Stream m a
fromCallback (a -> m ()) -> m ()
setCallback = m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
(a -> m ()
callback, Stream m a
stream) <- m (a -> m (), Stream m a)
forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream
(a -> m ()) -> m ()
setCallback a -> m ()
callback
Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream m a
stream
{-# INLINE_NORMAL parTapCount #-}
parTapCount
:: MonadAsync m
=> (a -> Bool)
-> (D.Stream m Int -> m b)
-> D.Stream m a
-> D.Stream m a
parTapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
parTapCount a -> Bool
predicate Stream m Int -> m b
fld (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Maybe (IORef Int, ThreadId, s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' Maybe (IORef Int, ThreadId, s)
forall a. Maybe a
Nothing
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' State StreamK m a
_ Maybe (IORef Int, ThreadId, s)
Nothing = do
IORef Int
countVar <- IO (IORef Int) -> m (IORef Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Int) -> m (IORef Int))
-> IO (IORef Int) -> m (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef (Int
0 :: Int)
ThreadId
tid <- m () -> m ThreadId
forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
forkManaged
(m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Stream m Int -> m b
fld
(Stream m Int -> m b) -> Stream m Int -> m b
forall a b. (a -> b) -> a -> b
$ IORef Int -> Stream m Int
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
IORef a -> Stream m a
Unboxed.pollIntIORef IORef Int
countVar
Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state))
step' State StreamK m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
case Step s a
r of
Yield a
x s
s -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
predicate a
x)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ a
-> Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. a -> s -> Step s a
Yield a
x ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Skip s
s -> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Step s a
Stop -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. Step s a
Stop
{-# DEPRECATED tapCount "Please use parTapCount instead." #-}
{-# INLINE tapCount #-}
tapCount ::
(MonadAsync m)
=> (a -> Bool)
-> (Stream m Int -> m b)
-> Stream m a
-> Stream m a
tapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
tapCount = (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
parTapCount