-- |
-- Module      : Streamly.Internal.Data.Stream.Channel
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Channel
    (
      module Streamly.Internal.Data.Stream.Channel.Type

    -- ** Allocation
    , module Streamly.Internal.Data.Stream.Channel.Append
    , module Streamly.Internal.Data.Stream.Channel.Interleave
    , newChannel

    -- ** Event Processing Loop
    , module Streamly.Internal.Data.Stream.Channel.Dispatcher
    , module Streamly.Internal.Data.Stream.Channel.Consumer
    , module Streamly.Internal.Data.Stream.Channel.Operations
    , chanConcatMapK

    -- ** Evaluation
    , withChannelK
    , withChannel
    -- quiesceChannel -- wait for running tasks but do not schedule any more.
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Control.Concurrent (askRunInIO)
import Streamly.Internal.Data.SVar.Type (adaptState)

import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Operations
import Streamly.Internal.Data.Stream.Channel.Append
import Streamly.Internal.Data.Stream.Channel.Interleave
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Consumer

-- | Create a new concurrent stream evaluation channel. The monad
-- state used to run the stream actions is captured from the call site of
-- newChannel.
{-# INLINE newChannel #-}
newChannel :: MonadAsync m =>
    (Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier =
    let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
     in if Config -> Bool
getInterleaved Config
cfg
        then (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel Config -> Config
modifier
        else (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier

-- | Allocate a channel and evaluate the stream concurrently using the channel
-- and the supplied evaluator function. The evaluator is run in a worker
-- thread.
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
       (Config -> Config) -- ^ config modifier
    -> K.StreamK m a -- ^ input stream
    -> (Channel m b -> K.StreamK m a -> K.StreamK m b) -- ^ stream evaluator
    -> K.StreamK m b -- ^ output stream
withChannelK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input Channel m b -> StreamK m a -> StreamK m b
evaluator = m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m b)
action

    where

    action :: m (StreamK m b)
action = do
        Channel m b
chan <- (Config -> Config) -> m (Channel m b)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier
        Channel m b -> StreamK m b -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m b
chan (Channel m b -> StreamK m a -> StreamK m b
evaluator Channel m b
chan StreamK m a
input)
        StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ Channel m b -> StreamK m b
forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m b
chan

-- | A wrapper over 'withChannelK', converts 'Stream' to 'StreamK' and invokes
-- 'withChannelK'.
{-# INLINE withChannel #-}
withChannel :: MonadAsync m =>
       (Config -> Config)
    -> Stream m a
    -> (Channel m b -> Stream m a -> Stream m b)
    -> Stream m b
withChannel :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input Channel m b -> Stream m a -> Stream m b
evaluator =
    let f :: Channel m b -> StreamK m a -> StreamK m b
f Channel m b
chan StreamK m a
stream = Stream m b -> StreamK m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream (Stream m b -> StreamK m b) -> Stream m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ Channel m b -> Stream m a -> Stream m b
evaluator Channel m b
chan (StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream StreamK m a
stream)
     in StreamK m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream (StreamK m b -> Stream m b) -> StreamK m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream Stream m a
input) Channel m b -> StreamK m a -> StreamK m b
f

-------------------------------------------------------------------------------
-- Evaluator
-------------------------------------------------------------------------------

-- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation
-- function on the head element and performs a side effect on the tail.
--
-- Used for concurrent evaluation of streams using a Channel. A worker
-- evaluating the stream would queue the tail and go on to evaluate the head.
-- The tail is picked up by another worker which does the same.
{-# INLINE concatMapHeadK #-}
concatMapHeadK :: Monad m =>
       (K.StreamK m a -> m ()) -- ^ Queue the tail
    -> (a -> K.StreamK m b) -- ^ Generate a stream from the head
    -> K.StreamK m a
    -> K.StreamK m b
concatMapHeadK :: forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
consumeTail a -> StreamK m b
mapHead StreamK m a
stream =
    (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp -> do
        let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
            single :: a -> m r
single a
a = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m b
mapHead a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m a -> m ()
consumeTail StreamK m a
r m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
single a
a
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
stream

-------------------------------------------------------------------------------
-- concat streams
-------------------------------------------------------------------------------

-- | 'mkEnqueue chan f returns a queuing function @enq@. @enq@ takes a
-- @stream@ and enqueues @f enq stream@ on the channel. One example of @f@ is
-- 'concatMapHeadK'. When the enqueued value with 'concatMapHeadK' as @f@ is
-- evaluated, it generates an output stream from the head and enqueues @f enq
-- tail@ on the channel. Thus whenever the enqueued stream is evaluated it
-- generates a stream from the head and queues the tail on the channel.
--
-- Note that @enq@ and runner are mutually recursive, mkEnqueue ties the
-- knot between the two.
--
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
    Channel m b
    -- | @divider enq stream@
    -> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
    -- | Queuing function @enq@
    -> m (K.StreamK m a -> m ())
mkEnqueue :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner = do
    RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    (StreamK m a -> m ()) -> m (StreamK m a -> m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
        ((StreamK m a -> m ()) -> m (StreamK m a -> m ()))
-> (StreamK m a -> m ()) -> m (StreamK m a -> m ())
forall a b. (a -> b) -> a -> b
$ let f :: StreamK m a -> m ()
f StreamK m a
stream = do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m b -> (RunInIO m, StreamK m b) -> IO ()
forall (m :: * -> *) a.
Channel m a -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m b
chan (RunInIO m
runInIO, (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner StreamK m a -> m ()
f StreamK m a
stream)
                -- XXX In case of eager dispatch we can just directly dispatch
                -- a worker with the tail stream here rather than first queuing
                -- and then dispatching a worker which dequeues the work. The
                -- older implementation did a direct dispatch here and its perf
                -- characterstics looked much better.
                Channel m b -> m ()
forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch Channel m b
chan
           in StreamK m a -> m ()
f

{-# INLINE parConcatMapChanKAll #-}
parConcatMapChanKAll :: MonadAsync m =>
    Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAll :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAll Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
   let run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
q a -> StreamK m b
f
    in ((StreamK m a -> m ()) -> StreamK m b)
-> m (StreamK m a -> m ()) -> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
    -- K.parConcatMap (_appendWithChanK chan) f stream

{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
    Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
   let done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m b -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m b
chan)
       run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
q (\a
x -> StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
x) StreamK m b
forall {a}. StreamK m a
done)
    in ((StreamK m a -> m ()) -> StreamK m b)
-> m (StreamK m a -> m ()) -> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)

{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
    Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
   let done :: StreamK m a
done = m () -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (Channel m b -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m b
chan)
       run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = (StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapHeadK StreamK m a -> m ()
q a -> StreamK m b
f
    in m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect (m (StreamK m b) -> StreamK m b) -> m (StreamK m b) -> StreamK m b
forall a b. (a -> b) -> a -> b
$ do
        Maybe (a, StreamK m a)
res <- StreamK m a -> m (Maybe (a, StreamK m a))
forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
K.uncons StreamK m a
stream
        case Maybe (a, StreamK m a)
res of
            Maybe (a, StreamK m a)
Nothing -> StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamK m b
forall (m :: * -> *) a. StreamK m a
K.nil
            Just (a
h, StreamK m a
t) -> do
                StreamK m a -> m ()
q <- Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run
                StreamK m a -> m ()
q StreamK m a
t
                StreamK m b -> m (StreamK m b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
h) StreamK m b
forall {a}. StreamK m a
done

-- | Make a concurrent stream evaluator from a stream, to be used in
-- 'withChannelK' or 'toChannelK'. Maps a stream generation function on each
-- element of the stream, the evaluation of the map on each element happens
-- concurrently. All the generated streams are merged together in the output of
-- the channel. The scheduling and termination behavior depends on the channel
-- settings.
--
-- Note that if you queue a stream on the channel using 'toChannelK', it will
-- be picked up by a worker and the worker would evaluate the entire stream
-- serially and emit the results on the channel. However, if you transform the
-- stream using 'parConcatMapChanK' and queue it on the channel, it
-- parallelizes the function map on each element of the stream. The simplest
-- example is @parConcatMapChanK id id@ which is equivalent to evaluating each
-- element of the stream concurrently.
--
-- A channel worker evaluating this function would enqueue the tail on the
-- channel's work queue and go on to evaluate the head generating an output
-- stream. The tail is picked up by another worker which does the same and so
-- on.
{-# INLINE chanConcatMapK #-}
chanConcatMapK :: MonadAsync m =>
       (Config -> Config)
    -> Channel m b
    -> (a -> K.StreamK m b)
    -> K.StreamK m a
    -> K.StreamK m b
chanConcatMapK :: forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
chanConcatMapK Config -> Config
modifier Channel m b
chan a -> StreamK m b
f StreamK m a
stream = do
        let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
        case Config -> StopWhen
getStopWhen Config
cfg of
            StopWhen
AllStop -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAll Channel m b
chan a -> StreamK m b
f StreamK m a
stream
            StopWhen
FirstStops -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream
            StopWhen
AnyStops -> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream