-- |
-- Module      : Streamly.Internal.Data.Pipe.Type
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Pipe.Type
    (
    -- * Type
      Step (..)
    , Pipe (..)

    -- * From folds
    , fromStream
    , fromScanr
    , fromFold
    , scanFold

    -- * Primitive Pipes
    , identity
    , map -- function?
    , mapM -- functionM?
    , filter
    , filterM

    -- * Combinators
    , compose
    , teeMerge
    -- , zipWith -- teeZip
    )
where

#include "inline.hs"
-- import Control.Arrow (Arrow(..))
import Control.Category (Category(..))
import Data.Functor ((<&>))
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Scanr (Scanr(..))
import Streamly.Internal.Data.Stream.Type (Stream(..))
-- import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.SVar.Type (defState)

import qualified Prelude
import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.Stream.Type as Stream

import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null)

-- $setup
-- >>> :m
-- >>> :set -XFlexibleContexts
-- >>> import Control.Category
--
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Pipe as Pipe
-- >>> import qualified Streamly.Internal.Data.Stream as Stream

------------------------------------------------------------------------------
-- Pipes
------------------------------------------------------------------------------

-- XXX If we do not want to change Streams, we should use "Yield b s" instead
-- of "Yield s b". Though "Yield s b" is sometimes better when using curried
-- "Yield s". "Yield b" sounds better because the verb applies to "b".
--
-- Note: We could reduce the number of constructors by using Consume | Produce
-- wrapper around the state. But when fusion does not occur, it may be better
-- to use a flat structure rather than nested to avoid more allocations. In a
-- flat structure the pointer tag from the Step constructor itself can identiy
-- any of the 5 constructors.
--
{-# ANN type Step Fuse #-}
data Step cs ps b =
      YieldC cs b -- ^ Yield and consume
    | SkipC cs -- ^ Skip and consume
    | Stop -- ^ when consuming, Stop means input remains unused
    -- Therefore, Stop should not be used when we are processing an input,
    -- instead use YieldP and then Stop.
    | YieldP ps b -- ^ Yield and produce
    | SkipP ps -- ^ Skip and produce

instance Functor (Step cs ps) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> Step cs ps a -> Step cs ps b
fmap a -> b
f (YieldC cs
s a
b) = cs -> b -> Step cs ps b
forall cs ps b. cs -> b -> Step cs ps b
YieldC cs
s (a -> b
f a
b)
    fmap a -> b
f (YieldP ps
s a
b) = ps -> b -> Step cs ps b
forall cs ps b. ps -> b -> Step cs ps b
YieldP ps
s (a -> b
f a
b)
    fmap a -> b
_ (SkipC cs
s) = cs -> Step cs ps b
forall cs ps b. cs -> Step cs ps b
SkipC cs
s
    fmap a -> b
_ (SkipP ps
s) = ps -> Step cs ps b
forall cs ps b. ps -> Step cs ps b
SkipP ps
s
    fmap a -> b
_ Step cs ps a
Stop = Step cs ps b
forall cs ps b. Step cs ps b
Stop

-- A pipe uses a consume function and a produce function. It can dynamically
-- switch from consume/fold mode to a produce/source mode.
--
-- We can upgrade a stream, fold or scan into a pipe. However, the simpler
-- types should be preferred because they can be more efficient and fuse
-- better.
--
-- The design of the Pipe type is such that the pipe decides whether it wants
-- to consume or produce, not the driver. The driver has to do what the pipe
-- dictates, if it can. The starting state of the pipe could either be
-- consuming or producing. Current implementation starts with a consuming
-- state. If the default state of the pipe is consumption state and there is no
-- input, the driver can call finalC :: cs -> m (Step cs ps b) to switch the
-- pipe to production state. The pipe can use SkipP to switch to production
-- state. If the default state of the pipe is producing state, the pipe can use
-- SkipC to switch to the consumer state. The driver can use finalP to switch
-- to consuming state.

-- | Represents a stateful transformation over an input stream of values of
-- type @a@ to outputs of type @b@ in 'Monad' @m@.
--
-- The constructor is @Pipe consume produce initial@.
data Pipe m a b =
    forall cs ps. Pipe
        (cs -> a -> m (Step cs ps b))
        (ps -> m (Step cs ps b))
     -- (cs -> m (Step cs ps b)) -- finalC
     -- (ps -> m (Step cs ps b)) -- finalP
        cs                       -- Either cs ps

------------------------------------------------------------------------------
-- Functor: Mapping on the output
------------------------------------------------------------------------------

-- | 'fmap' maps a pure function on a scan output.
--
-- >>> Stream.toList $ Stream.pipe (fmap (+1) Pipe.identity) $ Stream.fromList [1..5::Int]
-- [2,3,4,5,6]
--
instance Functor m => Functor (Pipe m a) where
    {-# INLINE_NORMAL fmap #-}
    fmap :: forall a b. (a -> b) -> Pipe m a a -> Pipe m a b
fmap a -> b
f (Pipe cs -> a -> m (Step cs ps a)
consume ps -> m (Step cs ps a)
produce cs
cinitial) =
        (cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe cs -> a -> m (Step cs ps b)
consume1 ps -> m (Step cs ps b)
produce1 cs
cinitial

        where

        {-# INLINE_LATE consume1 #-}
        consume1 :: cs -> a -> m (Step cs ps b)
consume1 cs
s a
b = (Step cs ps a -> Step cs ps b)
-> m (Step cs ps a) -> m (Step cs ps b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Step cs ps a -> Step cs ps b
forall a b. (a -> b) -> Step cs ps a -> Step cs ps b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) (cs -> a -> m (Step cs ps a)
consume cs
s a
b)
        {-# INLINE_LATE produce1 #-}
        produce1 :: ps -> m (Step cs ps b)
produce1 ps
s = (Step cs ps a -> Step cs ps b)
-> m (Step cs ps a) -> m (Step cs ps b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Step cs ps a -> Step cs ps b
forall a b. (a -> b) -> Step cs ps a -> Step cs ps b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) (ps -> m (Step cs ps a)
produce ps
s)

-------------------------------------------------------------------------------
-- Category
-------------------------------------------------------------------------------

{-# ANN type ComposeConsume Fuse #-}
data ComposeConsume csL psL csR =
      ComposeConsume csL csR

{-# ANN type ComposeProduce Fuse #-}
data ComposeProduce csL psL csR psR =
      ComposeProduceR csL psR
    | ComposeProduceL psL csR
    | ComposeProduceLR psL psR

-- | Series composition. Compose two pipes such that the output of the second
-- pipe is attached to the input of the first pipe.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.map (+1) >>> Pipe.map (+1)) $ Stream.fromList [1..5::Int]
-- [3,4,5,6,7]
--
{-# INLINE_NORMAL compose #-}
compose :: Monad m => Pipe m b c -> Pipe m a b -> Pipe m a c
compose :: forall (m :: * -> *) b c a.
Monad m =>
Pipe m b c -> Pipe m a b -> Pipe m a c
compose
    (Pipe cs -> b -> m (Step cs ps c)
consumeR ps -> m (Step cs ps c)
produceR cs
initialR)
    (Pipe cs -> a -> m (Step cs ps b)
consumeL ps -> m (Step cs ps b)
produceL cs
initialL) =
        (ComposeConsume cs Any cs
 -> a
 -> m (Step
         (ComposeConsume cs Any cs) (ComposeProduce cs ps cs ps) c))
-> (ComposeProduce cs ps cs ps
    -> m (Step
            (ComposeConsume cs Any cs) (ComposeProduce cs ps cs ps) c))
-> ComposeConsume cs Any cs
-> Pipe m a c
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe ComposeConsume cs Any cs
-> a
-> m (Step
        (ComposeConsume cs Any cs) (ComposeProduce cs ps cs ps) c)
forall {psL} {psL}.
ComposeConsume cs psL cs
-> a
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
consume ComposeProduce cs ps cs ps
-> m (Step
        (ComposeConsume cs Any cs) (ComposeProduce cs ps cs ps) c)
forall {psL}.
ComposeProduce cs ps cs ps
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
produce (cs -> cs -> ComposeConsume cs Any cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume cs
initialL cs
initialR)

    where

    {-# INLINE consumeLFeedR #-}
    consumeLFeedR :: csL
-> cs
-> b
-> m (Step
        (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c)
consumeLFeedR csL
csL cs
csR b
bL = do
        Step cs ps c
rR <- cs -> b -> m (Step cs ps c)
consumeR cs
csR b
bL
        Step (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
-> m (Step
        (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
 -> m (Step
         (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c))
-> Step
     (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
-> m (Step
        (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c)
forall a b. (a -> b) -> a -> b
$ case Step cs ps c
rR of
                YieldC cs
csR1 c
br -> ComposeConsume csL psL cs
-> c
-> Step
     (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
forall cs ps b. cs -> b -> Step cs ps b
YieldC (csL -> cs -> ComposeConsume csL psL cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume csL
csL cs
csR1) c
br
                SkipC cs
csR1 -> ComposeConsume csL psL cs
-> Step
     (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
forall cs ps b. cs -> Step cs ps b
SkipC (csL -> cs -> ComposeConsume csL psL cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume csL
csL cs
csR1)
                Step cs ps c
Stop -> Step (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
forall cs ps b. Step cs ps b
Stop
                YieldP ps
psR c
br -> ComposeProduce csL psL csR ps
-> c
-> Step
     (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
forall cs ps b. ps -> b -> Step cs ps b
YieldP (csL -> ps -> ComposeProduce csL psL csR ps
forall csL psL csR psR.
csL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceR csL
csL ps
psR) c
br
                SkipP ps
psR -> ComposeProduce csL psL csR ps
-> Step
     (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (csL -> ps -> ComposeProduce csL psL csR ps
forall csL psL csR psR.
csL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceR csL
csL ps
psR)

    {-# INLINE produceLFeedR #-}
    produceLFeedR :: psL -> cs -> b -> m (Step cs (ComposeProduce csL psL cs ps) c)
produceLFeedR psL
psL cs
csR b
bL = do
        Step cs ps c
rR <- cs -> b -> m (Step cs ps c)
consumeR cs
csR b
bL
        Step cs (ComposeProduce csL psL cs ps) c
-> m (Step cs (ComposeProduce csL psL cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step cs (ComposeProduce csL psL cs ps) c
 -> m (Step cs (ComposeProduce csL psL cs ps) c))
-> Step cs (ComposeProduce csL psL cs ps) c
-> m (Step cs (ComposeProduce csL psL cs ps) c)
forall a b. (a -> b) -> a -> b
$ case Step cs ps c
rR of
                YieldC cs
csR1 c
br -> ComposeProduce csL psL cs ps
-> c -> Step cs (ComposeProduce csL psL cs ps) c
forall cs ps b. ps -> b -> Step cs ps b
YieldP (psL -> cs -> ComposeProduce csL psL cs ps
forall csL psL csR psR.
psL -> csR -> ComposeProduce csL psL csR psR
ComposeProduceL psL
psL cs
csR1) c
br
                SkipC cs
csR1 -> ComposeProduce csL psL cs ps
-> Step cs (ComposeProduce csL psL cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (psL -> cs -> ComposeProduce csL psL cs ps
forall csL psL csR psR.
psL -> csR -> ComposeProduce csL psL csR psR
ComposeProduceL psL
psL cs
csR1)
                Step cs ps c
Stop -> Step cs (ComposeProduce csL psL cs ps) c
forall cs ps b. Step cs ps b
Stop
                YieldP ps
psR c
br -> ComposeProduce csL psL cs ps
-> c -> Step cs (ComposeProduce csL psL cs ps) c
forall cs ps b. ps -> b -> Step cs ps b
YieldP (psL -> ps -> ComposeProduce csL psL cs ps
forall csL psL csR psR.
psL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceLR psL
psL ps
psR) c
br
                SkipP ps
psR -> ComposeProduce csL psL cs ps
-> Step cs (ComposeProduce csL psL cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (psL -> ps -> ComposeProduce csL psL cs ps
forall csL psL csR psR.
psL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceLR psL
psL ps
psR)

    consume :: ComposeConsume cs psL cs
-> a
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
consume (ComposeConsume cs
csL cs
csR) a
x = do
        Step cs ps b
rL <- cs -> a -> m (Step cs ps b)
consumeL cs
csL a
x
        case Step cs ps b
rL of
            YieldC cs
csL1 b
bL ->
                -- XXX Use SkipC instead? Flat may be better for fusion.
                cs
-> cs
-> b
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall {csL} {psL} {psL} {csR}.
csL
-> cs
-> b
-> m (Step
        (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c)
consumeLFeedR cs
csL1 cs
csR b
bL
            SkipC cs
csL1 -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
 -> m (Step
         (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c))
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a b. (a -> b) -> a -> b
$ ComposeConsume cs psL cs
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> cs -> ComposeConsume cs psL cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume cs
csL1 cs
csR)
            Step cs ps b
Stop -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. Step cs ps b
Stop
            YieldP ps
psL b
bL ->
                -- XXX Use SkipC instead?
                ps
-> cs
-> b
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall {psL} {cs} {csL}.
psL -> cs -> b -> m (Step cs (ComposeProduce csL psL cs ps) c)
produceLFeedR ps
psL cs
csR b
bL
            SkipP ps
psL -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
 -> m (Step
         (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c))
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a b. (a -> b) -> a -> b
$ ComposeProduce cs ps cs ps
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> cs -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
psL -> csR -> ComposeProduce csL psL csR psR
ComposeProduceL ps
psL cs
csR)

    produce :: ComposeProduce cs ps cs ps
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
produce (ComposeProduceL ps
psL cs
csR) = do
        Step cs ps b
rL <- ps -> m (Step cs ps b)
produceL ps
psL
        case Step cs ps b
rL of
            YieldC cs
csL b
bL ->
                -- XXX Use SkipC instead?
                cs
-> cs
-> b
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall {csL} {psL} {psL} {csR}.
csL
-> cs
-> b
-> m (Step
        (ComposeConsume csL psL cs) (ComposeProduce csL psL csR ps) c)
consumeLFeedR cs
csL cs
csR b
bL
            SkipC cs
csL -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
 -> m (Step
         (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c))
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a b. (a -> b) -> a -> b
$ ComposeConsume cs psL cs
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> cs -> ComposeConsume cs psL cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume cs
csL cs
csR)
            Step cs ps b
Stop -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. Step cs ps b
Stop
            YieldP ps
psL1 b
bL ->
                -- XXX Use SkipC instead?
                ps
-> cs
-> b
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall {psL} {cs} {csL}.
psL -> cs -> b -> m (Step cs (ComposeProduce csL psL cs ps) c)
produceLFeedR ps
psL1 cs
csR b
bL
            SkipP ps
psL1 -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
 -> m (Step
         (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c))
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a b. (a -> b) -> a -> b
$ ComposeProduce cs ps cs ps
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> cs -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
psL -> csR -> ComposeProduce csL psL csR psR
ComposeProduceL ps
psL1 cs
csR)

    produce (ComposeProduceR cs
csL ps
psR) = do
        Step cs ps c
rR <- ps -> m (Step cs ps c)
produceR ps
psR
        Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
 -> m (Step
         (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c))
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a b. (a -> b) -> a -> b
$ case Step cs ps c
rR of
                YieldC cs
csR1 c
br -> ComposeConsume cs psL cs
-> c
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> cs -> ComposeConsume cs psL cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume cs
csL cs
csR1) c
br
                SkipC cs
csR1 -> ComposeConsume cs psL cs
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> cs -> ComposeConsume cs psL cs
forall csL psL csR. csL -> csR -> ComposeConsume csL psL csR
ComposeConsume cs
csL cs
csR1)
                Step cs ps c
Stop -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. Step cs ps b
Stop
                YieldP ps
psR1 c
br -> ComposeProduce cs ps cs ps
-> c
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> b -> Step cs ps b
YieldP (cs -> ps -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
csL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceR cs
csL ps
psR1) c
br
                SkipP ps
psR1 -> ComposeProduce cs ps cs ps
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (cs -> ps -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
csL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceR cs
csL ps
psR1)

    produce (ComposeProduceLR ps
psL ps
psR) = do
        Step cs ps c
rR <- ps -> m (Step cs ps c)
produceR ps
psR
        Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
 -> m (Step
         (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c))
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
-> m (Step
        (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c)
forall a b. (a -> b) -> a -> b
$ case Step cs ps c
rR of
                YieldC cs
csR1 c
br -> ComposeProduce cs ps cs ps
-> c
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> cs -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
psL -> csR -> ComposeProduce csL psL csR psR
ComposeProduceL ps
psL cs
csR1) c
br
                SkipC cs
csR1 -> ComposeProduce cs ps cs ps
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> cs -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
psL -> csR -> ComposeProduce csL psL csR psR
ComposeProduceL ps
psL cs
csR1)
                Step cs ps c
Stop -> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. Step cs ps b
Stop
                YieldP ps
psR1 c
br -> ComposeProduce cs ps cs ps
-> c
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> ps -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
psL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceLR ps
psL ps
psR1) c
br
                SkipP ps
psR1 -> ComposeProduce cs ps cs ps
-> Step (ComposeConsume cs psL cs) (ComposeProduce cs ps cs ps) c
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> ps -> ComposeProduce cs ps cs ps
forall csL psL csR psR.
psL -> psR -> ComposeProduce csL psL csR psR
ComposeProduceLR ps
psL ps
psR1)

-- | A pipe representing mapping of a monadic action.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.mapM print) $ Stream.fromList [1..5::Int]
-- 1
-- 2
-- 3
-- 4
-- 5
-- [(),(),(),(),()]
--
{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> Pipe m a b
mapM :: forall (m :: * -> *) a b. Monad m => (a -> m b) -> Pipe m a b
mapM a -> m b
f = (() -> a -> m (Step () Any b))
-> (Any -> m (Step () Any b)) -> () -> Pipe m a b
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe (\() a
a -> a -> m b
f a
a m b -> (b -> Step () Any b) -> m (Step () Any b)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> () -> b -> Step () Any b
forall cs ps b. cs -> b -> Step cs ps b
YieldC ()) Any -> m (Step () Any b)
forall a. HasCallStack => a
undefined ()

-- | A pipe representing mapping of a pure function.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.map (+1)) $ Stream.fromList [1..5::Int]
-- [2,3,4,5,6]
--
{-# INLINE map #-}
map :: Monad m => (a -> b) -> Pipe m a b
map :: forall (m :: * -> *) a b. Monad m => (a -> b) -> Pipe m a b
map a -> b
f = (a -> m b) -> Pipe m a b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Pipe m a b
mapM (b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (a -> b) -> a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
Prelude.. a -> b
f)

{- HLINT ignore "Redundant map" -}

-- | An identity pipe producing the same output as input.
--
-- >>> identity = Pipe.map Prelude.id
--
-- >>> Stream.toList $ Stream.pipe (Pipe.identity) $ Stream.fromList [1..5::Int]
-- [1,2,3,4,5]
--
{-# INLINE identity #-}
identity :: Monad m => Pipe m a a
identity :: forall (m :: * -> *) a. Monad m => Pipe m a a
identity = (a -> a) -> Pipe m a a
forall (m :: * -> *) a b. Monad m => (a -> b) -> Pipe m a b
map a -> a
forall a. a -> a
Prelude.id

-- | "." composes the pipes in series.
instance Monad m => Category (Pipe m) where
    {-# INLINE id #-}
    id :: forall a. Pipe m a a
id = Pipe m a a
forall (m :: * -> *) a. Monad m => Pipe m a a
identity

    {-# INLINE (.) #-}
    . :: forall b c a. Pipe m b c -> Pipe m a b -> Pipe m a c
(.) = Pipe m b c -> Pipe m a b -> Pipe m a c
forall (m :: * -> *) b c a.
Monad m =>
Pipe m b c -> Pipe m a b -> Pipe m a c
compose

{-# ANN type TeeMergeConsume Fuse #-}
data TeeMergeConsume csL csR
    = TeeMergeConsume !csL !csR
    | TeeMergeConsumeOnlyL !csL
    | TeeMergeConsumeOnlyR !csR

{-# ANN type TeeMergeProduce Fuse #-}
data TeeMergeProduce csL csR psL psR x
    = TeeMergeProduce !csL !csR !x
    | TeeMergeProduceL !psL !csR !x
    | TeeMergeProduceR !csL !psR
    | TeeMergeProduceOnlyL !psL
    | TeeMergeProduceOnlyR !psR

-- | Parallel composition. Distribute the input across two pipes and merge
-- their outputs.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.teeMerge Pipe.identity (Pipe.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
-- [1,1,2,4,3,9,4,16,5,25]
--
{-# INLINE_NORMAL teeMerge #-}
teeMerge :: Monad m => Pipe m a b -> Pipe m a b -> Pipe m a b
teeMerge :: forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Pipe m a b -> Pipe m a b
teeMerge (Pipe cs -> a -> m (Step cs ps b)
consumeL ps -> m (Step cs ps b)
produceL cs
initialL) (Pipe cs -> a -> m (Step cs ps b)
consumeR ps -> m (Step cs ps b)
produceR cs
initialR) =
    (TeeMergeConsume cs cs
 -> a
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> (TeeMergeProduce cs cs ps ps a
    -> m (Step
            (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> TeeMergeConsume cs cs
-> Pipe m a b
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe TeeMergeConsume cs cs
-> a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
consume TeeMergeProduce cs cs ps ps a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
produce (cs -> cs -> TeeMergeConsume cs cs
forall csL csR. csL -> csR -> TeeMergeConsume csL csR
TeeMergeConsume cs
initialL cs
initialR)

    where

    {-# INLINE feedRightOnly #-}
    feedRightOnly :: cs
-> a
-> m (Step
        (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b)
feedRightOnly cs
csR a
a = do
        Step cs ps b
resR <- cs -> a -> m (Step cs ps b)
consumeR cs
csR a
a
        Step (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
-> m (Step
        (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
 -> m (Step
         (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b))
-> Step
     (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
-> m (Step
        (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b)
forall a b. (a -> b) -> a -> b
$ case Step cs ps b
resR of
                  YieldC cs
cs b
b -> TeeMergeConsume csL cs
-> b
-> Step
     (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> TeeMergeConsume csL cs
forall csL csR. csR -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyR cs
cs) b
b
                  SkipC cs
cs -> TeeMergeConsume csL cs
-> Step
     (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> TeeMergeConsume csL cs
forall csL csR. csR -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyR cs
cs)
                  Step cs ps b
Stop -> Step (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
forall cs ps b. Step cs ps b
Stop
                  YieldP ps
ps b
b -> TeeMergeProduce csL csR psL ps x
-> b
-> Step
     (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> TeeMergeProduce csL csR psL ps x
forall csL csR psL psR x. psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyR ps
ps) b
b
                  SkipP ps
ps -> TeeMergeProduce csL csR psL ps x
-> Step
     (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> TeeMergeProduce csL csR psL ps x
forall csL csR psL psR x. psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyR ps
ps)

    {-# INLINE_LATE consume #-}
    consume :: TeeMergeConsume cs cs
-> a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
consume (TeeMergeConsume cs
csL cs
csR) a
a = do
        Step cs ps b
resL <- cs -> a -> m (Step cs ps b)
consumeL cs
csL a
a
        case Step cs ps b
resL of
              YieldC cs
cs b
b -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (cs -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduce cs
cs cs
csR a
a) b
b
              SkipC cs
cs -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (cs -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduce cs
cs cs
csR a
a)
              Step cs ps b
Stop ->
                -- XXX Skip to a state instead?
                cs
-> a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall {csL} {csL} {csR} {psL} {x}.
cs
-> a
-> m (Step
        (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b)
feedRightOnly cs
csR a
a
              YieldP ps
ps b
b -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
psL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceL ps
ps cs
csR a
a) b
b
              SkipP ps
ps -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
psL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceL ps
ps cs
csR a
a)

    -- XXX Adding additional consume states causes 4x regression in
    -- All.Data.Stream/o-1-space.pipesX4.tee benchmark (mapM 4 times).
    -- Commenting these two states makes it 4x faster. Need to investigate why.
    consume (TeeMergeConsumeOnlyL cs
csL) a
a = do
        Step cs ps b
resL <- cs -> a -> m (Step cs ps b)
consumeL cs
csL a
a
        Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ case Step cs ps b
resL of
                  YieldC cs
cs b
b -> TeeMergeConsume cs cs
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> TeeMergeConsume cs cs
forall csL csR. csL -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyL cs
cs) b
b
                  SkipC cs
cs -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> TeeMergeConsume cs cs
forall csL csR. csL -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyL cs
cs)
                  Step cs ps b
Stop -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. Step cs ps b
Stop
                  YieldP ps
ps b
b -> TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x. psL -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyL ps
ps) b
b
                  SkipP ps
ps -> TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x. psL -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyL ps
ps)
    consume (TeeMergeConsumeOnlyR cs
csR) a
a = cs
-> a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall {csL} {csL} {csR} {psL} {x}.
cs
-> a
-> m (Step
        (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b)
feedRightOnly cs
csR a
a

    {-# INLINE_LATE produce #-}
    produce :: TeeMergeProduce cs cs ps ps a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
produce (TeeMergeProduce cs
csL cs
csR a
a) = do
        Step cs ps b
res <- cs -> a -> m (Step cs ps b)
consumeR cs
csR a
a
        Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ case Step cs ps b
res of
                  YieldC cs
cs b
b -> TeeMergeConsume cs cs
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> cs -> TeeMergeConsume cs cs
forall csL csR. csL -> csR -> TeeMergeConsume csL csR
TeeMergeConsume cs
csL cs
cs) b
b
                  SkipC cs
cs -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> cs -> TeeMergeConsume cs cs
forall csL csR. csL -> csR -> TeeMergeConsume csL csR
TeeMergeConsume cs
csL cs
cs)
                  Step cs ps b
Stop -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> TeeMergeConsume cs cs
forall csL csR. csL -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyL cs
csL)
                  YieldP ps
ps b
b -> TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (cs -> ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceR cs
csL ps
ps) b
b
                  SkipP ps
ps -> TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (cs -> ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceR cs
csL ps
ps)

    produce (TeeMergeProduceL ps
psL cs
csR a
a) = do
        Step cs ps b
res <- ps -> m (Step cs ps b)
produceL ps
psL
        case Step cs ps b
res of
              YieldC cs
cs b
b -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (cs -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduce cs
cs cs
csR a
a) b
b
              SkipC cs
cs -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (cs -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduce cs
cs cs
csR a
a)
              Step cs ps b
Stop -> cs
-> a
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall {csL} {csL} {csR} {psL} {x}.
cs
-> a
-> m (Step
        (TeeMergeConsume csL cs) (TeeMergeProduce csL csR psL ps x) b)
feedRightOnly cs
csR a
a
              YieldP ps
ps b
b -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
psL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceL ps
ps cs
csR a
a) b
b
              SkipP ps
ps -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> cs -> a -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
psL -> csR -> x -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceL ps
ps cs
csR a
a)

    produce (TeeMergeProduceR cs
csL ps
psR) = do
        Step cs ps b
res <- ps -> m (Step cs ps b)
produceR ps
psR
        Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ case Step cs ps b
res of
              YieldC cs
cs b
b -> TeeMergeConsume cs cs
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> cs -> TeeMergeConsume cs cs
forall csL csR. csL -> csR -> TeeMergeConsume csL csR
TeeMergeConsume cs
csL cs
cs) b
b
              SkipC cs
cs -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> cs -> TeeMergeConsume cs cs
forall csL csR. csL -> csR -> TeeMergeConsume csL csR
TeeMergeConsume cs
csL cs
cs)
              Step cs ps b
Stop -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> TeeMergeConsume cs cs
forall csL csR. csL -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyL cs
csL)
              YieldP ps
ps b
b -> TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (cs -> ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceR cs
csL ps
ps) b
b
              SkipP ps
ps -> TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (cs -> ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x.
csL -> psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceR cs
csL ps
ps)

    produce (TeeMergeProduceOnlyL ps
psL) = do
        Step cs ps b
resL <- ps -> m (Step cs ps b)
produceL ps
psL
        Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ case Step cs ps b
resL of
                  YieldC cs
cs b
b -> TeeMergeConsume cs cs
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> TeeMergeConsume cs cs
forall csL csR. csL -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyL cs
cs) b
b
                  SkipC cs
cs -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> TeeMergeConsume cs cs
forall csL csR. csL -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyL cs
cs)
                  Step cs ps b
Stop -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. Step cs ps b
Stop
                  YieldP ps
ps b
b -> TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x. psL -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyL ps
ps) b
b
                  SkipP ps
ps -> TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x. psL -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyL ps
ps)

    produce (TeeMergeProduceOnlyR ps
psR) = do
        Step cs ps b
resL <- ps -> m (Step cs ps b)
produceR ps
psR
        Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
 -> m (Step
         (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b))
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
-> m (Step
        (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b)
forall a b. (a -> b) -> a -> b
$ case Step cs ps b
resL of
                  YieldC cs
cs b
b -> TeeMergeConsume cs cs
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (cs -> TeeMergeConsume cs cs
forall csL csR. csR -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyR cs
cs) b
b
                  SkipC cs
cs -> TeeMergeConsume cs cs
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. cs -> Step cs ps b
SkipC (cs -> TeeMergeConsume cs cs
forall csL csR. csR -> TeeMergeConsume csL csR
TeeMergeConsumeOnlyR cs
cs)
                  Step cs ps b
Stop -> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. Step cs ps b
Stop
                  YieldP ps
ps b
b -> TeeMergeProduce cs cs ps ps a
-> b
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP (ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x. psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyR ps
ps) b
b
                  SkipP ps
ps -> TeeMergeProduce cs cs ps ps a
-> Step (TeeMergeConsume cs cs) (TeeMergeProduce cs cs ps ps a) b
forall cs ps b. ps -> Step cs ps b
SkipP (ps -> TeeMergeProduce cs cs ps ps a
forall csL csR psL psR x. psR -> TeeMergeProduce csL csR psL psR x
TeeMergeProduceOnlyR ps
ps)

-- | '<>' composes the pipes in parallel.
instance Monad m => Semigroup (Pipe m a b) where
    {-# INLINE (<>) #-}
    <> :: Pipe m a b -> Pipe m a b -> Pipe m a b
(<>) = Pipe m a b -> Pipe m a b -> Pipe m a b
forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Pipe m a b -> Pipe m a b
teeMerge

-------------------------------------------------------------------------------
-- Arrow
-------------------------------------------------------------------------------

{-
unzip :: Pipe m a x -> Pipe m b y -> Pipe m (a, b) (x, y)
unzip = undefined

-- XXX move this to a separate module
data Deque a = Deque [a] [a]

{-# INLINE null #-}
null :: Deque a -> Bool
null (Deque [] []) = True
null _ = False

{-# INLINE snoc #-}
snoc :: a -> Deque a -> Deque a
snoc a (Deque snocList consList) = Deque (a : snocList) consList

{-# INLINE uncons #-}
uncons :: Deque a -> Maybe (a, Deque a)
uncons (Deque snocList consList) =
  case consList of
    h : t -> Just (h, Deque snocList t)
    _ ->
      case Prelude.reverse snocList of
        h : t -> Just (h, Deque [] t)
        _ -> Nothing

-- XXX This is old code retained for reference until rewritten.

-- | The composed pipe distributes the input to both the constituent pipes and
-- zips the output of the two using a supplied zipping function.
--
-- @since 0.7.0
{-# INLINE_NORMAL zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> Pipe m i a -> Pipe m i b -> Pipe m i c
zipWith f (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) =
                    Pipe consume produce state
        where

        -- Left state means we need to consume input from the source. A Right
        -- state means we either have buffered input or we are in generation
        -- mode so we do not need input from source in either case.
        --
        state = Tuple' (Consume stateL, Nothing, Nothing)
                       (Consume stateR, Nothing, Nothing)

        -- XXX for heavy buffering we need to have the (ring) buffer in pinned
        -- memory using the Storable instance.
        {-# INLINE_LATE consume #-}
        consume (Tuple' (sL, resL, lq) (sR, resR, rq)) a = do
            s1 <- drive sL resL lq consumeL produceL a
            s2 <- drive sR resR rq consumeR produceR a
            yieldOutput s1 s2

            where

            {-# INLINE drive #-}
            drive st res queue fConsume fProduce val =
                case res of
                    Nothing -> goConsume st queue val fConsume fProduce
                    Just x -> return $
                        case queue of
                            Nothing -> (st, Just x, Just $ Deque [val] [])
                            Just q  -> (st, Just x, Just $ snoc val q)

            {-# INLINE goConsume #-}
            goConsume stt queue val fConsume stp2 =
                case stt of
                    Consume st ->
                        case queue of
                            Nothing -> do
                                r <- fConsume st val
                                return $ case r of
                                    Yield x s  -> (s, Just x, Nothing)
                                    Continue s -> (s, Nothing, Nothing)
                            Just queue' ->
                                case uncons queue' of
                                    Just (v, q) -> do
                                        r <- fConsume st v
                                        let q' = snoc val q
                                        return $ case r of
                                            Yield x s  -> (s, Just x, Just q')
                                            Continue s -> (s, Nothing, Just q')
                                    Nothing -> undefined -- never occurs
                    Produce st -> do
                        r <- stp2 st
                        return $ case r of
                            Yield x s  -> (s, Just x, queue)
                            Continue s -> (s, Nothing, queue)

        {-# INLINE_LATE produce #-}
        produce (Tuple' (sL, resL, lq) (sR, resR, rq)) = do
            s1 <- drive sL resL lq consumeL produceL
            s2 <- drive sR resR rq consumeR produceR
            yieldOutput s1 s2

            where

            {-# INLINE drive #-}
            drive stt res q fConsume fProduce =
                case res of
                    Nothing -> goProduce stt q fConsume fProduce
                    Just x -> return (stt, Just x, q)

            {-# INLINE goProduce #-}
            goProduce stt queue fConsume fProduce =
                case stt of
                    Consume st ->
                        case queue of
                            -- See yieldOutput. We enter produce mode only when
                            -- each pipe is either in Produce state or the
                            -- queue is non-empty. So this case cannot occur.
                            Nothing -> undefined
                            Just queue' ->
                                case uncons queue' of
                                    Just (v, q) -> do
                                        r <- fConsume st v
                                        -- We provide a guarantee that if the
                                        -- queue is "Just" it is always
                                        -- non-empty. yieldOutput and goConsume
                                        -- depend on it.
                                        let q' = if null q
                                                 then Nothing
                                                 else Just q
                                        return $ case r of
                                            Yield x s  -> (s, Just x, q')
                                            Continue s -> (s, Nothing, q')
                                    Nothing -> return (stt, Nothing, Nothing)
                    Produce st -> do
                        r <- fProduce st
                        return $ case r of
                            Yield x s  -> (s, Just x, queue)
                            Continue s -> (s, Nothing, queue)

        {-# INLINE yieldOutput #-}
        yieldOutput s1@(sL', resL', lq') s2@(sR', resR', rq') = return $
            -- switch to produce mode if we do not need input
            if (isProduce sL' || isJust lq') && (isProduce sR' || isJust rq')
            then
                case (resL', resR') of
                    (Just xL, Just xR) ->
                        Yield (f xL xR) (Produce (Tuple' (clear s1) (clear s2)))
                    _ -> Continue (Produce (Tuple' s1 s2))
            else
                case (resL', resR') of
                    (Just xL, Just xR) ->
                        Yield (f xL xR) (Consume (Tuple' (clear s1) (clear s2)))
                    _ -> Continue (Consume (Tuple' s1 s2))
            where clear (s, _, q) = (s, Nothing, q)

instance Monad m => Applicative (Pipe m a) where
    {-# INLINE pure #-}
    pure b = Pipe (\_ _ -> pure $ Yield b (Consume ())) undefined ()

    (<*>) = zipWith id

instance Monad m => Arrow (Pipe m) where
    {-# INLINE arr #-}
    arr = map

    {-# INLINE (***) #-}
    (***) = unzip

    {-# INLINE (&&&) #-}
    -- (&&&) = zipWith (,)
    (&&&) = undefined
-}

-------------------------------------------------------------------------------
-- Primitive pipes
-------------------------------------------------------------------------------

-- | A filtering pipe using a monadic predicate.
{-# INLINE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Pipe m a a
filterM :: forall (m :: * -> *) a. Monad m => (a -> m Bool) -> Pipe m a a
filterM a -> m Bool
f = (() -> a -> m (Step () Any a))
-> (Any -> m (Step () Any a)) -> () -> Pipe m a a
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe (\() a
a -> a -> m Bool
f a
a m Bool -> (Bool -> m (Step () Any a)) -> m (Step () Any a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> Bool -> m (Step () Any a)
forall {m :: * -> *} {b} {ps}.
Monad m =>
b -> Bool -> m (Step () ps b)
g a
a) Any -> m (Step () Any a)
forall a. HasCallStack => a
undefined ()

    where

    {-# INLINE g #-}
    g :: b -> Bool -> m (Step () ps b)
g b
a Bool
b =
        Step () ps b -> m (Step () ps b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step () ps b -> m (Step () ps b))
-> Step () ps b -> m (Step () ps b)
forall a b. (a -> b) -> a -> b
$ if Bool
b
              then () -> b -> Step () ps b
forall cs ps b. cs -> b -> Step cs ps b
YieldC () b
a
              else () -> Step () ps b
forall cs ps b. cs -> Step cs ps b
SkipC ()

-- | A filtering pipe using a pure predicate.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.filter odd) $ Stream.fromList [1..5::Int]
-- [1,3,5]
--
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Pipe m a a
filter :: forall (m :: * -> *) a. Monad m => (a -> Bool) -> Pipe m a a
filter a -> Bool
f = (a -> m Bool) -> Pipe m a a
forall (m :: * -> *) a. Monad m => (a -> m Bool) -> Pipe m a a
filterM (Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> (a -> Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
Prelude.. a -> Bool
f)

-------------------------------------------------------------------------------
-- Convert folds to pipes
-------------------------------------------------------------------------------

-- Note when we have a separate Scan type then we can remove extract from
-- Folds. Then folds can only be used for foldMany or many and not for
-- scanning. This combinator has to be removed then.

-- XXX The way filter is implemented in Folds is that it discards the input and
-- on "extract" it will return the previous accumulator value only. Thus the
-- accumulator may repeat in the output stream when filter is used. Ideally the
-- output stream should not have a value corresponding to the filtered value.
-- With "Continue s" and "Partial s b" instead of using "extract" we can do
-- that.

{-# ANN type FromFoldConsume Fuse #-}
data FromFoldConsume s x = FoldConsumeInit | FoldConsumeGo s

{-# ANN type FromFoldProduce Fuse #-}
data FromFoldProduce s x = FoldProduceInit s x | FoldProduceStop

-- XXX This should be removed once we remove "extract" from folds.

-- | Pipes do not support finalization yet. This does not finalize the fold
-- when the stream stops before the fold terminates. So cannot be used on folds
-- that require finalization.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.scanFold Fold.sum) $ Stream.fromList [1..5::Int]
-- [1,3,6,10,15]
--
{-# INLINE scanFold #-}
scanFold :: Monad m => Fold m a b -> Pipe m a b
scanFold :: forall (m :: * -> *) a b. Monad m => Fold m a b -> Pipe m a b
scanFold (Fold s -> a -> m (Step s b)
fstep m (Step s b)
finitial s -> m b
fextract s -> m b
_) =
    (FromFoldConsume s Any
 -> a -> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b))
-> (FromFoldProduce s a
    -> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b))
-> FromFoldConsume s Any
-> Pipe m a b
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe FromFoldConsume s Any
-> a -> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b)
forall {x} {x}.
FromFoldConsume s x
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
consume FromFoldProduce s a
-> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b)
forall {x}.
FromFoldProduce s a
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
produce FromFoldConsume s Any
forall s x. FromFoldConsume s x
FoldConsumeInit

    where

    -- XXX make the initial state Either type and start in produce mode
    consume :: FromFoldConsume s x
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
consume FromFoldConsume s x
FoldConsumeInit a
x = do
        Step s b
r <- m (Step s b)
finitial
        Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromFoldConsume s x) (FromFoldProduce s a) b
 -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b))
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Fold.Partial s
s -> FromFoldProduce s a
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. ps -> Step cs ps b
SkipP (s -> a -> FromFoldProduce s a
forall s x. s -> x -> FromFoldProduce s x
FoldProduceInit s
s a
x)
            Fold.Done b
b -> FromFoldProduce s a
-> b -> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP FromFoldProduce s a
forall s x. FromFoldProduce s x
FoldProduceStop b
b

    consume (FoldConsumeGo s
st) a
a = do
        Step s b
r <- s -> a -> m (Step s b)
fstep s
st a
a
        case Step s b
r of
            Fold.Partial s
s -> do
                b
b <- s -> m b
fextract s
s
                Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromFoldConsume s x) (FromFoldProduce s a) b
 -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b))
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a b. (a -> b) -> a -> b
$ FromFoldConsume s x
-> b -> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. cs -> b -> Step cs ps b
YieldC (s -> FromFoldConsume s x
forall s x. s -> FromFoldConsume s x
FoldConsumeGo s
s) b
b
            Fold.Done b
b -> Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromFoldConsume s x) (FromFoldProduce s a) b
 -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b))
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a b. (a -> b) -> a -> b
$ FromFoldProduce s a
-> b -> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP FromFoldProduce s a
forall s x. FromFoldProduce s x
FoldProduceStop b
b

    produce :: FromFoldProduce s a
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
produce (FoldProduceInit s
st a
x) = FromFoldConsume s Any
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall {x} {x}.
FromFoldConsume s x
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
consume (s -> FromFoldConsume s Any
forall s x. s -> FromFoldConsume s x
FoldConsumeGo s
st) a
x
    produce FromFoldProduce s a
FoldProduceStop = Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. Step cs ps b
Stop

-- XXX The doctest for Pipe.fromFold fails with "[]" as the result.

-- | Create a singleton pipe from a fold.
--
-- Pipes do not support finalization yet. This does not finalize the fold
-- when the stream stops before the fold terminates. So cannot be used on folds
-- that require such finalization.
--
-- >> Stream.toList $ Stream.pipe (Pipe.fromFold Fold.sum) $ Stream.fromList [1..5::Int]
-- [15]
--
{-# INLINE fromFold #-}
fromFold :: Monad m => Fold m a b -> Pipe m a b
fromFold :: forall (m :: * -> *) a b. Monad m => Fold m a b -> Pipe m a b
fromFold (Fold s -> a -> m (Step s b)
fstep m (Step s b)
finitial s -> m b
_ s -> m b
_) =
    (FromFoldConsume s Any
 -> a -> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b))
-> (FromFoldProduce s a
    -> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b))
-> FromFoldConsume s Any
-> Pipe m a b
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe FromFoldConsume s Any
-> a -> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b)
forall {x} {x}.
FromFoldConsume s x
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
consume FromFoldProduce s a
-> m (Step (FromFoldConsume s Any) (FromFoldProduce s a) b)
forall {x}.
FromFoldProduce s a
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
produce FromFoldConsume s Any
forall s x. FromFoldConsume s x
FoldConsumeInit

    where

    -- XXX make the initial state Either type and start in produce mode
    consume :: FromFoldConsume s x
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
consume FromFoldConsume s x
FoldConsumeInit a
x = do
        Step s b
r <- m (Step s b)
finitial
        Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromFoldConsume s x) (FromFoldProduce s a) b
 -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b))
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Fold.Partial s
s -> FromFoldProduce s a
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. ps -> Step cs ps b
SkipP (s -> a -> FromFoldProduce s a
forall s x. s -> x -> FromFoldProduce s x
FoldProduceInit s
s a
x)
            Fold.Done b
b -> FromFoldProduce s a
-> b -> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP FromFoldProduce s a
forall s x. FromFoldProduce s x
FoldProduceStop b
b

    consume (FoldConsumeGo s
st) a
a = do
        Step s b
r <- s -> a -> m (Step s b)
fstep s
st a
a
        Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromFoldConsume s x) (FromFoldProduce s a) b
 -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b))
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Fold.Partial s
s -> FromFoldConsume s x
-> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. cs -> Step cs ps b
SkipC (s -> FromFoldConsume s x
forall s x. s -> FromFoldConsume s x
FoldConsumeGo s
s)
            Fold.Done b
b -> FromFoldProduce s a
-> b -> Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. ps -> b -> Step cs ps b
YieldP FromFoldProduce s a
forall s x. FromFoldProduce s x
FoldProduceStop b
b

    produce :: FromFoldProduce s a
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
produce (FoldProduceInit s
st a
x) = FromFoldConsume s Any
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall {x} {x}.
FromFoldConsume s x
-> a -> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
consume (s -> FromFoldConsume s Any
forall s x. s -> FromFoldConsume s x
FoldConsumeGo s
st) a
x
    produce FromFoldProduce s a
FoldProduceStop = Step (FromFoldConsume s x) (FromFoldProduce s a) b
-> m (Step (FromFoldConsume s x) (FromFoldProduce s a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromFoldConsume s x) (FromFoldProduce s a) b
forall cs ps b. Step cs ps b
Stop

-- | Produces the stream on consuming ().
--
{-# INLINE fromStream #-}
fromStream :: Monad m => Stream m a -> Pipe m () a
fromStream :: forall (m :: * -> *) a. Monad m => Stream m a -> Pipe m () a
fromStream (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (() -> () -> m (Step () s a))
-> (s -> m (Step () s a)) -> () -> Pipe m () a
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe () -> () -> m (Step () s a)
forall {m :: * -> *} {cs} {b}.
Monad m =>
() -> () -> m (Step cs s b)
consume s -> m (Step () s a)
forall {cs}. s -> m (Step cs s a)
produce ()

    where

    -- XXX make the initial state Either type and start in produce mode
    consume :: () -> () -> m (Step cs s b)
consume () () = Step cs s b -> m (Step cs s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step cs s b -> m (Step cs s b)) -> Step cs s b -> m (Step cs s b)
forall a b. (a -> b) -> a -> b
$ s -> Step cs s b
forall cs ps b. ps -> Step cs ps b
SkipP s
state

    produce :: s -> m (Step cs s a)
produce s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        Step cs s a -> m (Step cs s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step cs s a -> m (Step cs s a)) -> Step cs s a -> m (Step cs s a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Stream.Yield a
b s
s -> s -> a -> Step cs s a
forall cs ps b. ps -> b -> Step cs ps b
YieldP s
s a
b
            Stream.Skip s
s -> s -> Step cs s a
forall cs ps b. ps -> Step cs ps b
SkipP s
s
            Step s a
Stream.Stop -> Step cs s a
forall cs ps b. Step cs ps b
Stop

{-# INLINE fromScanr #-}
fromScanr :: Monad m => Scanr m a b -> Pipe m a b
fromScanr :: forall (m :: * -> *) a b. Monad m => Scanr m a b -> Pipe m a b
fromScanr (Scanr s -> a -> m (Step s b)
step s
initial) = (s -> a -> m (Step s Any b))
-> (Any -> m (Step s Any b)) -> s -> Pipe m a b
forall (m :: * -> *) a b cs ps.
(cs -> a -> m (Step cs ps b))
-> (ps -> m (Step cs ps b)) -> cs -> Pipe m a b
Pipe s -> a -> m (Step s Any b)
forall {ps}. s -> a -> m (Step s ps b)
consume Any -> m (Step s Any b)
forall a. HasCallStack => a
undefined s
initial

    where

    consume :: s -> a -> m (Step s ps b)
consume s
st a
a = do
        Step s b
r <- s -> a -> m (Step s b)
step s
st a
a
        Step s ps b -> m (Step s ps b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s ps b -> m (Step s ps b)) -> Step s ps b -> m (Step s ps b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Stream.Yield b
b s
s -> s -> b -> Step s ps b
forall cs ps b. cs -> b -> Step cs ps b
YieldC s
s b
b
            Stream.Skip s
s -> s -> Step s ps b
forall cs ps b. cs -> Step cs ps b
SkipC s
s
            Step s b
Stream.Stop -> Step s ps b
forall cs ps b. Step cs ps b
Stop