Loading...

Streamly.Internal.Data.Pipe

There are three fundamental types that make up a stream pipeline:

  • Stream: sources
  • Scan: transformations
  • Fold: sinks

Streams are sources or producers of values, multiple sources can be merged into a single source but a source cannot be split into multiple stream sources. Folds are sinks or consumers, a stream can be split and distributed to multiple folds but the results cannot be merged back into a stream source again. Scans are simple one-to-one transformations with filtering. One element cannot be transformed to multiple elements.

The Pipe type is a super type of all the above, it is the most complex type. All of these can be represented by a pipe. A pipe can act as a source or a sink or a transformation, dynamically. A stream source can be split and distributed to multiple pipes each pipe can apply its own transform on the stream and the results can be merged back into a single pipe. Pipes can be attached to a source to produce a source or they can be attached to a fold to produce a fold, or multiple pipes can be merged or zipped into a single pipe.

import qualified Streamly.Internal.Data.Pipe as Pipe

Type

data Step cs ps b Source #

Constructors

YieldC cs b

Yield and consume

SkipC cs

Skip and consume

Stop

when consuming, Stop means input remains unused Therefore, Stop should not be used when we are processing an input, instead use YieldP and then Stop.

YieldP ps b

Yield and produce

SkipP ps

Skip and produce

Instances
Instances details
Functor (Step cs ps) Source # 
Instance details

Defined in Streamly.Internal.Data.Pipe.Type

Methods

fmap :: (a -> b) -> Step cs ps a -> Step cs ps b Source #

(<$) :: a -> Step cs ps b -> Step cs ps a Source #

data Pipe m a b Source #

Represents a stateful transformation over an input stream of values of type a to outputs of type b in Monad m.

The constructor is Pipe consume produce initial.

Constructors

forall cs ps. Pipe (cs -> a -> m (Step cs ps b)) (ps -> m (Step cs ps b)) cs 
Instances
Instances details
Monad m => Category (Pipe m :: Type -> Type -> Type) Source #

"." composes the pipes in series.

Instance details

Defined in Streamly.Internal.Data.Pipe.Type

Methods

id :: forall (a :: k). Pipe m a a Source #

(.) :: forall (b :: k) (c :: k) (a :: k). Pipe m b c -> Pipe m a b -> Pipe m a c Source #

Functor m => Functor (Pipe m a) Source #

fmap maps a pure function on a scan output.

>>> Stream.toList $ Stream.pipe (fmap (+1) Pipe.identity) $ Stream.fromList [1..5::Int]
[2,3,4,5,6]
Instance details

Defined in Streamly.Internal.Data.Pipe.Type

Methods

fmap :: (a0 -> b) -> Pipe m a a0 -> Pipe m a b Source #

(<$) :: a0 -> Pipe m a b -> Pipe m a a0 Source #

Monad m => Semigroup (Pipe m a b) Source #

<> composes the pipes in parallel.

Instance details

Defined in Streamly.Internal.Data.Pipe.Type

Methods

(<>) :: Pipe m a b -> Pipe m a b -> Pipe m a b Source #

sconcat :: NonEmpty (Pipe m a b) -> Pipe m a b Source #

stimes :: Integral b0 => b0 -> Pipe m a b -> Pipe m a b Source #

From folds

fromStream :: Monad m => Stream m a -> Pipe m () a Source #

Produces the stream on consuming ().

fromScanr :: Monad m => Scanr m a b -> Pipe m a b Source #

fromFold :: Monad m => Fold m a b -> Pipe m a b Source #

Create a singleton pipe from a fold.

Pipes do not support finalization yet. This does not finalize the fold when the stream stops before the fold terminates. So cannot be used on folds that require such finalization.

> Stream.toList $ Stream.pipe (Pipe.fromFold Fold.sum) $ Stream.fromList [1..5::Int]
15

scanFold :: Monad m => Fold m a b -> Pipe m a b Source #

Pipes do not support finalization yet. This does not finalize the fold when the stream stops before the fold terminates. So cannot be used on folds that require finalization.

>>> Stream.toList $ Stream.pipe (Pipe.scanFold Fold.sum) $ Stream.fromList [1..5::Int]
[1,3,6,10,15]

Primitive Pipes

identity :: Monad m => Pipe m a a Source #

An identity pipe producing the same output as input.

>>> identity = Pipe.map Prelude.id
>>> Stream.toList $ Stream.pipe (Pipe.identity) $ Stream.fromList [1..5::Int]
[1,2,3,4,5]

map :: Monad m => (a -> b) -> Pipe m a b Source #

A pipe representing mapping of a pure function.

>>> Stream.toList $ Stream.pipe (Pipe.map (+1)) $ Stream.fromList [1..5::Int]
[2,3,4,5,6]

mapM :: Monad m => (a -> m b) -> Pipe m a b Source #

A pipe representing mapping of a monadic action.

>>> Stream.toList $ Stream.pipe (Pipe.mapM print) $ Stream.fromList [1..5::Int]
1
2
3
4
5
[(),(),(),(),()]

filter :: Monad m => (a -> Bool) -> Pipe m a a Source #

A filtering pipe using a pure predicate.

>>> Stream.toList $ Stream.pipe (Pipe.filter odd) $ Stream.fromList [1..5::Int]
[1,3,5]

filterM :: Monad m => (a -> m Bool) -> Pipe m a a Source #

A filtering pipe using a monadic predicate.

Combinators

compose :: Monad m => Pipe m b c -> Pipe m a b -> Pipe m a c Source #

Series composition. Compose two pipes such that the output of the second pipe is attached to the input of the first pipe.

>>> Stream.toList $ Stream.pipe (Pipe.map (+1) >>> Pipe.map (+1)) $ Stream.fromList [1..5::Int]
[3,4,5,6,7]

teeMerge :: Monad m => Pipe m a b -> Pipe m a b -> Pipe m a b Source #

Parallel composition. Distribute the input across two pipes and merge their outputs.

>>> Stream.toList $ Stream.pipe (Pipe.teeMerge Pipe.identity (Pipe.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
[1,1,2,4,3,9,4,16,5,25]