Streamly.Internal.Data.Pipe
There are three fundamental types that make up a stream pipeline:
- Stream: sources
- Scan: transformations
- Fold: sinks
Streams are sources or producers of values, multiple sources can be merged into a single source but a source cannot be split into multiple stream sources. Folds are sinks or consumers, a stream can be split and distributed to multiple folds but the results cannot be merged back into a stream source again. Scans are simple one-to-one transformations with filtering. One element cannot be transformed to multiple elements.
The Pipe type is a super type of all the above, it is the most complex type. All of these can be represented by a pipe. A pipe can act as a source or a sink or a transformation, dynamically. A stream source can be split and distributed to multiple pipes each pipe can apply its own transform on the stream and the results can be merged back into a single pipe. Pipes can be attached to a source to produce a source or they can be attached to a fold to produce a fold, or multiple pipes can be merged or zipped into a single pipe.
import qualified Streamly.Internal.Data.Pipe as Pipe
Type
Represents a stateful transformation over an input stream of values of
type a
to outputs of type b
in Monad
m
.
The constructor is Pipe consume produce initial
.
Instances
Monad m => Category (Pipe m :: Type -> Type -> Type) Source # | "." composes the pipes in series. |
Functor m => Functor (Pipe m a) Source # |
|
Monad m => Semigroup (Pipe m a b) Source # |
|
From folds
fromFold :: Monad m => Fold m a b -> Pipe m a b Source #
Create a singleton pipe from a fold.
Pipes do not support finalization yet. This does not finalize the fold when the stream stops before the fold terminates. So cannot be used on folds that require such finalization.
> Stream.toList $ Stream.pipe (Pipe.fromFold Fold.sum) $ Stream.fromList [1..5::Int]
- 15
scanFold :: Monad m => Fold m a b -> Pipe m a b Source #
Pipes do not support finalization yet. This does not finalize the fold when the stream stops before the fold terminates. So cannot be used on folds that require finalization.
>>>
Stream.toList $ Stream.pipe (Pipe.scanFold Fold.sum) $ Stream.fromList [1..5::Int]
[1,3,6,10,15]
Primitive Pipes
identity :: Monad m => Pipe m a a Source #
An identity pipe producing the same output as input.
>>>
identity = Pipe.map Prelude.id
>>>
Stream.toList $ Stream.pipe (Pipe.identity) $ Stream.fromList [1..5::Int]
[1,2,3,4,5]
map :: Monad m => (a -> b) -> Pipe m a b Source #
A pipe representing mapping of a pure function.
>>>
Stream.toList $ Stream.pipe (Pipe.map (+1)) $ Stream.fromList [1..5::Int]
[2,3,4,5,6]
mapM :: Monad m => (a -> m b) -> Pipe m a b Source #
A pipe representing mapping of a monadic action.
>>>
Stream.toList $ Stream.pipe (Pipe.mapM print) $ Stream.fromList [1..5::Int]
1 2 3 4 5 [(),(),(),(),()]
filter :: Monad m => (a -> Bool) -> Pipe m a a Source #
A filtering pipe using a pure predicate.
>>>
Stream.toList $ Stream.pipe (Pipe.filter odd) $ Stream.fromList [1..5::Int]
[1,3,5]
filterM :: Monad m => (a -> m Bool) -> Pipe m a a Source #
A filtering pipe using a monadic predicate.