module Streamly.Internal.Data.Stream.Channel
(
module Streamly.Internal.Data.Stream.Channel.Type
, module Streamly.Internal.Data.Stream.Channel.Append
, module Streamly.Internal.Data.Stream.Channel.Interleave
, newChannel
, module Streamly.Internal.Data.Stream.Channel.Dispatcher
, module Streamly.Internal.Data.Stream.Channel.Consumer
, module Streamly.Internal.Data.Stream.Channel.Operations
, chanConcatMapK
, withChannelK
, withChannel
)
where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Control.Concurrent (askRunInIO)
import Streamly.Internal.Data.SVar.Type (adaptState)
import qualified Streamly.Internal.Data.StreamK as K
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Operations
import Streamly.Internal.Data.Stream.Channel.Append
import Streamly.Internal.Data.Stream.Channel.Interleave
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Consumer
{-# INLINE newChannel #-}
newChannel :: MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier =
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
in if Config -> Bool
getInterleaved Config
cfg
then (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel Config -> Config
modifier
else (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
(Config -> Config)
-> K.StreamK m a
-> (Channel m b -> K.StreamK m a -> K.StreamK m b)
-> K.StreamK m b
withChannelK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input Channel m b -> StreamK m a -> StreamK m b
evaluator = m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m b)
action
where
action :: m (StreamK m b)
action = do
Channel m b
chan <- (Config -> Config) -> m (Channel m b)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier
Channel m b -> StreamK m b -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m b
chan (Channel m b -> StreamK m a -> StreamK m b
evaluator Channel m b
chan StreamK m a
input)
StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ Channel m b -> StreamK m b
forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m b
chan
{-# INLINE withChannel #-}
withChannel :: MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input Channel m b -> Stream m a -> Stream m b
evaluator =
let f :: Channel m b -> StreamK m a -> StreamK m b
f Channel m b
chan StreamK m a
stream = Stream m b -> StreamK m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream (Stream m b -> StreamK m b) -> Stream m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ Channel m b -> Stream m a -> Stream m b
evaluator Channel m b
chan (StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream StreamK m a
stream)
in StreamK m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream (StreamK m b -> Stream m b) -> StreamK m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream Stream m a
input) Channel m b -> StreamK m a -> StreamK m b
f
{-# INLINE concatMapHeadK #-}
concatMapHeadK :: Monad m =>
(K.StreamK m a -> m ())
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
concatMapHeadK :: forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
consumeTail a -> StreamK m b
mapHead StreamK m a
stream =
(forall r.
State StreamK m b
-> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m b
-> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b)
-> (forall r.
State StreamK m b
-> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp -> do
let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
single :: a -> m r
single a
a = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m b
mapHead a
a
yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m a -> m ()
consumeTail StreamK m a
r m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
single a
a
in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
stream
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-> m (K.StreamK m a -> m ())
mkEnqueue :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner = do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
(StreamK m a -> m ()) -> m (StreamK m a -> m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
((StreamK m a -> m ()) -> m (StreamK m a -> m ()))
-> (StreamK m a -> m ()) -> m (StreamK m a -> m ())
forall a b. (a -> b) -> a -> b
$ let f :: StreamK m a -> m ()
f StreamK m a
stream = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m b -> (RunInIO m, StreamK m b) -> IO ()
forall (m :: * -> *) a.
Channel m a -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m b
chan (RunInIO m
runInIO, (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner StreamK m a -> m ()
f StreamK m a
stream)
Channel m b -> m ()
forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch Channel m b
chan
in StreamK m a -> m ()
f
{-# INLINE parConcatMapChanKAll #-}
parConcatMapChanKAll :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAll :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAll Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
q a -> StreamK m b
f
in ((StreamK m a -> m ()) -> StreamK m b)
-> m (StreamK m a -> m ()) -> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m b -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
q (\a
x -> StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
x) StreamK m b
forall {a}. StreamK m a
done)
in ((StreamK m a -> m ()) -> StreamK m b)
-> m (StreamK m a -> m ()) -> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m b -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
q a -> StreamK m b
f
in m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect (m (StreamK m b) -> StreamK m b) -> m (StreamK m b) -> StreamK m b
forall a b. (a -> b) -> a -> b
$ do
Maybe (a, StreamK m a)
res <- StreamK m a -> m (Maybe (a, StreamK m a))
forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
K.uncons StreamK m a
stream
case Maybe (a, StreamK m a)
res of
Maybe (a, StreamK m a)
Nothing -> StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamK m b
forall (m :: * -> *) a. StreamK m a
K.nil
Just (a
h, StreamK m a
t) -> do
StreamK m a -> m ()
q <- Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run
StreamK m a -> m ()
q StreamK m a
t
StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
h) StreamK m b
forall {a}. StreamK m a
done
{-# INLINE chanConcatMapK #-}
chanConcatMapK :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
chanConcatMapK :: forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
chanConcatMapK Config -> Config
modifier Channel m b
chan a -> StreamK m b
f StreamK m a
stream = do
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAll Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
FirstStops -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
AnyStops -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream