Filter Modules

This module contains functions ending in the shape:

t m a -> m b

We call them stream folding functions, they reduce a stream `t m a`

to a
monadic value `m b`

.

`>>>`

`:m`

`>>>`

`import Streamly.Prelude (SerialT)`

`>>>`

`import qualified Streamly.Prelude as Stream`

`>>>`

`import qualified Streamly.Internal.Data.Stream.IsStream as Stream`

`>>>`

`import qualified Streamly.Internal.Data.Parser as Parser`

`>>>`

`import qualified Streamly.Data.Fold as Fold`

`Fold`

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left `Fold`

and reducing the resulting
expression strictly at each step. The behavior is similar to `foldl'`

. A
`Fold`

can terminate early without consuming the full stream. See the
documentation of individual `Fold`

s for termination behavior.

`>>>`

5050`Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)`

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

`>>>`

0`Stream.fold Fold.sum Stream.nil`

However, `foldMany`

on an empty stream results in an empty stream.
Therefore, `Stream.fold f`

is not the same as ```
Stream.head . Stream.foldMany
f
```

.

fold f = Stream.parse (Parser.fromFold f)

*Since: 0.7.0*

`Parser`

parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied `Parser`

.

Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

`>>>`

*** Exception: ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0"`Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil`

Note:

fold f = Stream.parse (Parser.fromFold f)

`parse p`

is not the same as `head . parseMany p`

on an empty stream.

*Pre-release*

parseK :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied ParserK `Parser`

.

*Internal*

parseD :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied ParserD `Parser`

.

*Internal*

parse_ :: MonadThrow m => Parser m a b -> SerialT m a -> m (b, SerialT m a) Source #

Parse a stream using the supplied `Parser`

.

*Internal*

foldr and foldl do not provide the remaining stream. `uncons`

is more
general, as it can be used to implement those as well. It allows to use
the stream one element at a time, and we have the remaining stream all
the time.

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns
`Nothing`

. If the stream is non-empty, returns `Just (a, ma)`

, where `a`

is
the head of the stream and `ma`

its tail.

This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of `uncons`

, however
the specific implementations are generally more efficient.

*Since: 0.1.0*

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. `foldrM build final stream`

constructs
an output structure using the step function `build`

. `build`

is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls `build`

again thus
lazily consuming the input `stream`

until either the output expression built
by `build`

is free of the "tail" or the input is exhausted in which case
`final`

is used as the terminating case for the output structure. For more
details see the description in the previous section.

Example, determine if any element is `odd`

in a stream:

`>>>`

True`Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)`

*Since: 0.7.0 (signature changed)*

*Since: 0.2.0 (signature changed)*

*Since: 0.1.0*

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad `m`

is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.

*Since: 0.1.0*

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. `foldl' reduce initial stream`

invokes
`reduce`

with the accumulator and the next input in the input stream, using
`initial`

as the initial value of the current value of the accumulator. When
the input is exhausted the current value of the accumulator is returned.
Make sure to use a strict data structure for accumulator to not build
unnecessary lazy expressions unless that's what you want. See the previous
section for more details.

*Since: 0.2.0*

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the
starting value. Returns `Nothing`

if the stream is empty.

*Since: 0.5.0*

Folds as functions of the shape `t m a -> m b`

.

These functions are good to run individually but they do not compose
well. Prefer writing folds as the `Fold`

data type. Use folds from
Streamly.Internal.Data.Fold instead of using the functions in this
section.

This section can possibly be removed in future. Are these better in
some case compared to `Fold`

? When the input stream is in CPS style
(StreamK) we may want to rewrite the function call to CPS implementation
of the fold through these definitions. Will that be more efficient for
StreamK?

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = Stream.drain . Stream.mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

*Since: 0.1.0*

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ()) drain = Stream.fold Fold.drain

Run a stream, discarding the results. By default it interprets the stream
as `SerialT`

, to run other types of streams use the type adapting
combinators for example `Stream.drain . `

.`fromAsync`

*Since: 0.7.0*

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (Stream.length xs - 1) last = Stream.fold Fold.last

*Since: 0.1.1*

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns `0`

when
the stream is empty. Note that this is not numerically stable for floating
point numbers.

sum = Stream.fold Fold.sum

*Since: 0.1.0*

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns `1`

when the stream is empty.

product = Stream.fold Fold.product

*Since: 0.1.1*

mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #

Fold a stream of monoid elements by appending them.

mconcat = Stream.fold Fold.mconcat

*Pre-release*

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximumBy = Stream.fold Fold.maximumBy

*Since: 0.6.0*

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

`maximum = ``maximumBy`

compare
maximum = Stream.fold Fold.maximum

Determine the maximum element in a stream.

*Since: 0.1.0*

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

minimumBy = Stream.fold Fold.minimumBy

*Since: 0.6.0*

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

`minimum = ``minimumBy`

compare
minimum = Stream.fold Fold.minimum

Determine the minimum element in a stream.

*Since: 0.1.0*

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

*Since: 0.6.0*

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = Stream.drain . Stream.take n drainN n = Stream.fold (Fold.take n Fold.drain)

Run maximum up to `n`

iterations of a stream.

*Since: 0.7.0*

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = Stream.drain . Stream.takeWhile p

Run a stream as long as the predicate holds true.

*Since: 0.7.0*

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

*Since: 0.6.0*

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0) head = Stream.fold Fold.head

*Since: 0.1.0*

headElse :: Monad m => a -> SerialT m a -> m a Source #

Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.

*Pre-release*

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . Stream.uncons

Extract all but the first element of the stream, if any.

*Since: 0.1.1*

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

*Since: 0.5.0*

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

findM = Stream.fold Fold.findM

*Since: 0.6.0*

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like `findM`

but with a non-monadic predicate.

find p = findM (return . p) find = Stream.fold Fold.find

*Since: 0.5.0*

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

findIndex = Stream.fold Fold.findIndex

*Since: 0.5.0*

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = Stream.findIndex (== a)

*Since: 0.5.0*

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs `(a, b)`

, return the value `b`

of the
first pair where the key equals the given value `a`

.

lookup = snd <$> Stream.find ((==) . fst) lookup = Stream.fold Fold.lookup

*Since: 0.5.0*

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

null = Stream.fold Fold.null

*Since: 0.1.1*

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

elem = Stream.fold Fold.elem

*Since: 0.1.0*

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

notElem = Stream.fold Fold.length

*Since: 0.1.0*

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

all = Stream.fold Fold.all

*Since: 0.1.0*

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

any = Stream.fold Fold.any

*Since: 0.1.0*

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

and = Stream.fold Fold.and

*Since: 0.5.0*

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

or = Stream.fold Fold.or

*Since: 0.5.0*

toList :: Monad m => SerialT m a -> m [a] Source #

toList = Stream.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. `Identity`

). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.

*Warning!* working on large lists accumulated as buffers in memory could be
very inefficient, consider using Streamly.Array instead.

*Since: 0.1.0*

toListRev :: Monad m => SerialT m a -> m [a] Source #

toListRev = Stream.foldl' (flip (:)) []

Convert a stream into a list in reverse order in the underlying monad.

*Warning!* working on large lists accumulated as buffers in memory could be
very inefficient, consider using Streamly.Array instead.

*Pre-release*

toStream :: Monad m => SerialT m a -> m (SerialT Identity a) Source #

Convert a stream to a pure stream.

toStream = Stream.foldr Stream.cons Stream.nil

*Pre-release*

toStreamRev :: Monad m => SerialT m a -> m (SerialT Identity a) Source #

Convert a stream to a pure stream in reverse order.

toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil

*Pre-release*

foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #

Same as `|$.`

.

*Internal*

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function `t m a -> m b`

to a stream `t m a`

concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.

If you read the signature as `(t m a -> m b) -> (t m a -> m b)`

you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.

The `.`

at the end of the operator is a mnemonic for termination of the
stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

`>>>`

`import Control.Concurrent (threadDelay)`

`>>>`

`import Streamly.Prelude ((|$.))`

`>>>`

Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ()) |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1`:{`

*Concurrent*

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

*Since: 0.6.0*

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

*Since: 0.6.0*

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns `True`

if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.

`>>>`

True`Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)`

*Since: 0.6.0*

isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns `True`

if the first stream is a suffix of the second. A stream is
considered a suffix of itself.

`>>>`

True`Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)`

Space: `O(n)`

, buffers entire input stream and the suffix.

*Pre-release*

*Suboptimal* - Help wanted.

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns `True`

if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.

`>>>`

True`Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)`

*Since: 0.6.0*

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

`stripPrefix prefix stream`

strips `prefix`

from `stream`

if it is a
prefix of stream. Returns `Nothing`

if the stream does not start with the
given prefix, stripped stream otherwise. Returns `Just nil`

when the prefix
is the same as the stream.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".

Space: `O(1)`

*Since: 0.6.0*

stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #

Drops the given suffix from a stream. Returns `Nothing`

if the stream does
not end with the given suffix. Returns `Just nil`

when the suffix is the
same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".

Space: `O(n)`

, buffers the entire input stream as well as the suffix

*Pre-release*

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Deprecated: Please use foldl' followed by fmap instead.

Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
`foldl`

library. The suffix `x`

is a mnemonic for extraction.

*Since: 0.2.0*

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Deprecated: Please use foldlM' followed by fmap instead.

Like `foldx`

, but with a monadic step function.

*Since: 0.2.0*

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Deprecated: Use foldrM instead.

Lazy right fold for non-empty streams, using first element as the starting
value. Returns `Nothing`

if the stream is empty.

*Since: 0.5.0*

runStream :: Monad m => SerialT m a -> m () Source #

Deprecated: Please use "drain" instead

Run a stream, discarding the results. By default it interprets the stream
as `SerialT`

, to run other types of streams use the type adapting
combinators for example `runStream . `

.`fromAsync`

*Since: 0.2.0*

runN :: Monad m => Int -> SerialT m a -> m () Source #

Deprecated: Please use "drainN" instead

runN n = runStream . take n

Run maximum up to `n`

iterations of a stream.

*Since: 0.6.0*

streamly-0.8.0**Streamly.Internal.Data.Stream.IsStream.Eliminate**