Deprecated: Please use "Streamly.Internal.Data.Stream from streamly-core package", Streamly.Internal.Data.Stream.Concurrent, Streamly.Internal.Data.Stream.Exception.Lifted, & Streamly.Internal.Data.Stream.Time from streamly package instead.

This is an internal module which is a superset of the corresponding released module Streamly.Prelude. It contains some additional unreleased or experimental APIs.

## Documentation

newtype StreamK (m :: Type -> Type) a Source #

##### Instances

For `SerialT`

streams:

(<>) =`serial`

--`Semigroup`

(>>=) = flip .`concatMapWith`

`serial`

--`Monad`

A single `Monad`

bind behaves like a `for`

loop:

`>>>`

IsStream.toList $ do x <- IsStream.fromList [1,2] -- foreach x in stream return x :} [1,2]`:{`

Nested monad binds behave like nested `for`

loops:

`>>>`

IsStream.toList $ do x <- IsStream.fromList [1,2] -- foreach x in stream y <- IsStream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]`:{`

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

##### Instances

For `WSerialT`

streams:

(<>) =`wSerial`

--`Semigroup`

(>>=) = flip .`concatMapWith`

`wSerial`

--`Monad`

Note that `<>`

is associative only if we disregard the ordering of elements
in the resulting stream.

A single `Monad`

bind behaves like a `for`

loop:

`>>>`

IsStream.toList $ IsStream.fromWSerial $ do x <- IsStream.fromList [1,2] -- foreach x in stream return x :} [1,2]`:{`

Nested monad binds behave like interleaved nested `for`

loops:

`>>>`

IsStream.toList $ IsStream.fromWSerial $ do x <- IsStream.fromList [1,2] -- foreach x in stream y <- IsStream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]`:{`

It is a result of interleaving all the nested iterations corresponding to
element `1`

in the first stream with all the nested iterations of element
`2`

:

`>>>`

`import Streamly.Prelude (wSerial)`

`>>>`

[(1,3),(2,3),(1,4),(2,4)]`IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)]`

The `W`

in the name stands for `wide`

or breadth wise scheduling in
contrast to the depth wise scheduling behavior of `SerialT`

.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

##### Instances

For `AheadT`

streams:

(<>) =`ahead`

(>>=) = flip .`concatMapWith`

`ahead`

A single `Monad`

bind behaves like a `for`

loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:

`>>>`

Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]`:{`

Nested monad binds behave like nested `for`

loops with nested iterations
executed concurrently, ahead of time:

`>>>`

Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]`:{`

The behavior can be explained as follows. All the iterations corresponding
to the element `1`

in the first stream constitute one output stream and all
the iterations corresponding to `2`

constitute another output stream and
these two output streams are merged using `ahead`

.

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

##### Instances

IsStream AheadT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

MonadTrans AheadT Source # | |

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |

Defined in Streamly.Internal.Data.Stream.Ahead | |

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |

Defined in Streamly.Internal.Data.Stream.Ahead | |

Monad m => Functor (AheadT m) Source # | |

MonadAsync m => Monad (AheadT m) Source # | |

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |

MonadAsync m => Monoid (AheadT m a) Source # | |

MonadAsync m => Semigroup (AheadT m a) Source # | |

For `AsyncT`

streams:

(<>) =`async`

(>>=) = flip .`concatMapWith`

`async`

A single `Monad`

bind behaves like a `for`

loop with iterations of the loop
executed concurrently a la the `async`

combinator, producing results and
side effects of iterations out of order:

`>>>`

Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]`:{`

Nested monad binds behave like nested `for`

loops with nested iterations
executed concurrently, a la the `async`

combinator:

`>>>`

Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]`:{`

The behavior can be explained as follows. All the iterations corresponding
to the element `1`

in the first stream constitute one output stream and all
the iterations corresponding to `2`

constitute another output stream and
these two output streams are merged using `async`

.

*Since: 0.1.0 (Streamly)*

*Since: 0.8.0*

##### Instances

IsStream AsyncT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

MonadTrans AsyncT Source # | |

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |

Defined in Streamly.Internal.Data.Stream.Async | |

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |

Defined in Streamly.Internal.Data.Stream.Async | |

Monad m => Functor (AsyncT m) Source # | |

MonadAsync m => Monad (AsyncT m) Source # | |

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |

MonadAsync m => Monoid (AsyncT m a) Source # | |

MonadAsync m => Semigroup (AsyncT m a) Source # | |

For `WAsyncT`

streams:

(<>) =`wAsync`

(>>=) = flip .`concatMapWith`

`wAsync`

A single `Monad`

bind behaves like a `for`

loop with iterations of the loop
executed concurrently a la the `wAsync`

combinator, producing results and
side effects of iterations out of order:

`>>>`

Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]`:{`

Nested monad binds behave like nested `for`

loops with nested iterations
executed concurrently, a la the `wAsync`

combinator:

`>>>`

Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]`:{`

The behavior can be explained as follows. All the iterations corresponding
to the element `1`

in the first stream constitute one `WAsyncT`

output
stream and all the iterations corresponding to `2`

constitute another
`WAsyncT`

output stream and these two output streams are merged using
`wAsync`

.

The `W`

in the name stands for `wide`

or breadth wise scheduling in
contrast to the depth wise scheduling behavior of `AsyncT`

.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

##### Instances

IsStream WAsyncT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

MonadTrans WAsyncT Source # | |

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |

Defined in Streamly.Internal.Data.Stream.Async | |

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |

Defined in Streamly.Internal.Data.Stream.Async | |

Monad m => Functor (WAsyncT m) Source # | |

MonadAsync m => Monad (WAsyncT m) Source # | |

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |

MonadAsync m => Monoid (WAsyncT m a) Source # | |

MonadAsync m => Semigroup (WAsyncT m a) Source # | |

For `ParallelT`

streams:

(<>) =`parallel`

(>>=) = flip .`concatMapWith`

`parallel`

See `AsyncT`

, `ParallelT`

is similar except that all
iterations are strictly concurrent while in `AsyncT`

it depends on the
consumer demand and available threads. See `parallel`

for more details.

*Since: 0.1.0 (Streamly)*

*Since: 0.7.0 (maxBuffer applies to ParallelT streams)*

*Since: 0.8.0*

##### Instances

data ZipSerialM m a Source #

For `ZipSerialM`

streams:

`(<>) = ``serial`

(*) = 'Streamly.Prelude.serial.zipWith' id

Applicative evaluates the streams being zipped serially:

`>>>`

`s1 = Stream.fromFoldable [1, 2]`

`>>>`

`s2 = Stream.fromFoldable [3, 4]`

`>>>`

`s3 = Stream.fromFoldable [5, 6]`

`>>>`

[(1,3,5),(2,4,6)]`Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3`

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

##### Instances

For `ZipAsyncM`

streams:

`(<>) = ``serial`

(*) = 'Streamly.Prelude.serial.zipAsyncWith' id

Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:

`>>>`

`s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]`

`>>>`

... [(1,1),(1,1),(1,1)]`Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s`

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

##### Instances

IsStream ZipAsyncM Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

MonadAsync m => Applicative (ZipAsyncM m) Source # | |

Defined in Streamly.Internal.Data.Stream.ZipAsync pure :: a -> ZipAsyncM m a Source # (<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b Source # liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c Source # (*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b Source # (<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a Source # | |

Monad m => Functor (ZipAsyncM m) Source # | |

Monoid (ZipAsyncM m a) Source # | |

Semigroup (ZipAsyncM m a) Source # | |

type ZipSerial = ZipSerialM IO Source #

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #

Class of types that can represent a stream of elements of some type `a`

in
some monad `m`

.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]

*Concurrent (do not use fromParallel to construct infinite streams)*

*Since: 0.2.0*

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of `consM`

. We can read it as "`parallel colon`

"
to remember that `|`

comes before `:`

.

> toList $ getLine |: getLine |: nil hello world ["hello","world"]

let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil

*Concurrent (do not use fromParallel to construct infinite streams)*

*Since: 0.2.0*

##### Instances

IsStream AheadT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

IsStream AsyncT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

IsStream WAsyncT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

IsStream ParallelT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

IsStream SerialT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

IsStream WSerialT Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type | |

IsStream ZipSerialM Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> StreamK m a fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ZipSerialM m a consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # | |

IsStream ZipAsyncM Source # | |

Defined in Streamly.Internal.Data.Stream.IsStream.Type |

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList =`foldr`

`cons`

`nil`

Construct a stream from a list of pure values. This is more efficient than
`fromFoldable`

for serial streams.

*Since: 0.4.0*

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as `(return a) `consM` r`

but
more efficient. For concurrent streams this is not concurrent whereas
`consM`

is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]

*Since: 0.1.0*

mkStream :: IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an `SVar`

, a stop continuation, a singleton stream
continuation and a yield continuation.

foldStreamShared :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldStream :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
`foldl`

library. The suffix `x`

is a mnemonic for extraction.

*Since: 0.7.0*

foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #

Like `foldlx'`

, but with a monadic step function.

*Since: 0.7.0*

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

`concatMapWith mixer generator stream`

is a two dimensional looping
combinator. The `generator`

function is used to generate streams from the
elements in the input `stream`

and the `mixer`

function is used to merge
those streams.

Note we can merge streams concurrently by using a concurrent merge function.

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

fromStreamK :: IsStream t => StreamK m a -> t m a Source #

foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

*Since: 0.1.0 (Streamly)*

*Since: 0.8.0*

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of `fold`

that allows you to fold a `Foldable`

container of streams using the specified stream sum operation.

`concatFoldableWith ``async`

$ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f D.nil concatFoldableWith f = D.concatMapFoldableWith f id

*Since: 0.8.0 (Renamed foldWith to concatFoldableWith)*

*Since: 0.1.0 (Streamly)*

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of `foldMap`

that allows you to map a monadic streaming action
on a `Foldable`

container and then fold it using the specified stream merge
operation.

`concatMapFoldableWith ``async`

return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

*Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)*

*Since: 0.1.0 (Streamly)*

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like `concatMapFoldableWith`

but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs concatForFoldableWith f = flip (D.concatMapFoldableWith f)

*Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)*

*Since: 0.1.0 (Streamly)*

fromSerial :: IsStream t => SerialT m a -> t m a Source #

fromWSerial :: IsStream t => WSerialT m a -> t m a Source #

fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #

fromParallel :: IsStream t => ParallelT m a -> t m a Source #

fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #

fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a Source #

Adapt a polymorphic consM operation to a StreamK cons operation

## Primitives

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as `(return a) `consM` r`

but
more efficient. For concurrent streams this is not concurrent whereas
`consM`

is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]

*Since: 0.1.0*

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]

*Concurrent (do not use fromParallel to construct infinite streams)*

*Since: 0.2.0*

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of `consM`

. We can read it as "`parallel colon`

"
to remember that `|`

comes before `:`

.

> toList $ getLine |: getLine |: nil hello world ["hello","world"]

let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil

*Concurrent (do not use fromParallel to construct infinite streams)*

*Since: 0.2.0*

## From `Unfold`

unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #

Convert an `Unfold`

into a stream by supplying it an input seed.

`>>>`

hello hello hello`Stream.drain $ Stream.unfold Unfold.replicateM (3, putStrLn "hello")`

*Since: 0.7.0*

unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b Source #

Convert an `Unfold`

with a closed input end into a stream.

*Pre-release*

## Unfolding

unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

`>>>`

unfoldr step s = case step s of Nothing -> Stream.nil Just (a, b) -> a `Stream.cons` unfoldr step b :}`:{`

Build a stream by unfolding a *pure* step function `step`

starting from a
seed `s`

. The step function returns the next element in the stream and the
next seed value. When it is done it returns `Nothing`

and the stream ends.
For example,

`>>>`

let f b = if b > 2 then Nothing else Just (b, b + 1) in Stream.toList $ Stream.unfoldr f 0 :} [0,1,2]`:{`

*Since: 0.1.0*

unfoldrM :: forall t m b a. (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a *monadic* step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns `Nothing`

and the stream ends. For
example,

`>>>`

let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in Stream.toList $ Stream.unfoldrM f 0 :} [0,1,2]`:{`

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

`>>>`

let f b = if b > 2 then return Nothing else threadDelay 1000000 >> return (Just (b, b + 1)) in Stream.toList $ Stream.delay 1 $ Stream.fromAsync $ Stream.unfoldrM f 0 :} [0,1,2]`:{`

*Concurrent*

*Since: 0.1.0*

## From Values

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure fromPure = fromEffect . pure

In Zip applicative streams `fromPure`

is not the same as `pure`

because in that
case `pure`

is equivalent to `repeat`

instead. `fromPure`

and `pure`

are
equally efficient, in other cases `fromPure`

may be slightly more efficient
than the other equivalent definitions.

*Since: 0.8.0 (Renamed yield to fromPure)*

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine hello ["hello"]

*Since: 0.8.0 (Renamed yieldM to fromEffect)*

repeat :: (IsStream t, Monad m) => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

*Since: 0.4.0*

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

`>>>`

`repeatM = fix . consM`

`>>>`

`repeatM = cycle1 . fromEffect`

Generate a stream by repeatedly executing a monadic action forever.

`>>>`

repeatAsync = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain :}`:{`

*Concurrent, infinite (do not use with fromParallel)*

*Since: 0.2.0*

replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #

`>>>`

`replicate n = Stream.take n . Stream.repeat`

Generate a stream of length `n`

by repeating a value `n`

times.

*Since: 0.6.0*

replicateM :: forall t m a. (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

`>>>`

`replicateM n = Stream.take n . Stream.repeatM`

Generate a stream by performing a monadic action `n`

times. Same as:

`>>>`

`pr n = threadDelay 1000000 >> print n`

This runs serially and takes 3 seconds:

`>>>`

1 1 1`Stream.drain $ Stream.fromSerial $ Stream.replicateM 3 $ pr 1`

This runs concurrently and takes just 1 second:

`>>>`

1 1 1`Stream.drain $ Stream.fromAsync $ Stream.replicateM 3 $ pr 1`

*Concurrent*

*Since: 0.1.1*

## Enumeration

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the `Enum`

type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.

*Since: 0.6.0*

enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #

`enumerateFrom from`

generates a stream starting with the element
`from`

, enumerating up to `maxBound`

when the type is `Bounded`

or
generating an infinite stream when the type is not `Bounded`

.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int) [0,1,2,3]

For `Fractional`

types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]

*Since: 0.6.0*

enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element `from`

, enumerating
the type up to the value `to`

. If `to`

is smaller than `from`

then an
empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4 [0,1,2,3,4]

For `Fractional`

types, the last element is equal to the specified `to`

value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]

*Since: 0.6.0*

enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #

`enumerateFromThen from then`

generates a stream whose first element
is `from`

, the second element is `then`

and the successive elements are
in increments of `then - from`

. Enumeration can occur downwards or
upwards depending on whether `then`

comes before or after `from`

. For
`Bounded`

types the stream ends when `maxBound`

is reached, for
unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2) [0,-2,-4,-6]

*Since: 0.6.0*

enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #

`enumerateFromThenTo from then to`

generates a finite stream whose
first element is `from`

, the second element is `then`

and the successive
elements are in increments of `then - from`

up to `to`

. Enumeration can
occur downwards or upwards depending on whether `then`

comes before or
after `from`

.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]

*Since: 0.6.0*

##### Instances

enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

## Time Enumeration

times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64) Source #

`times`

returns a stream of time value tuples with clock of 10 ms
granularity. The first component of the tuple is an absolute time reference
(epoch) denoting the start of the stream and the second component is a time
relative to the reference.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))`Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime Source #

`absTimes`

returns a stream of absolute timestamps using a clock of 10 ms
granularity.

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

`absTimesWith g`

returns a stream of absolute timestamps using a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64 Source #

`relTimes`

returns a stream of relative time values starting from 0,
using a clock of granularity 10 ms.

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

`relTimesWith g`

returns a stream of relative time values starting from 0,
using a clock of granularity `g`

specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

durations :: Double -> t m RelTime64 Source #

`durations g`

returns a stream of relative time values measuring the time
elapsed since the immediate predecessor element of the stream was generated.
The first element of the stream is always 0. `durations`

uses a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
Durations lower than 1 ms will be 0.

Note: This API is not safe on 32-bit machines.

*Unimplemented*

ticks :: Rate -> t m () Source #

Generate ticks at the specified rate. The rate is adaptive, the tick
generation speed can be increased or decreased at different times to achieve
the specified rate. The specific behavior for different styles of `Rate`

specifications is documented under `Rate`

. The effective maximum rate
achieved by a stream is governed by the processor speed.

*Unimplemented*

timeout :: AbsTime -> t m () Source #

Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.

*Unimplemented*

## From Generators

fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #

`>>>`

`fromIndices f = fmap f $ Stream.enumerateFrom 0`

`>>>`

`fromIndices f = let g i = f i `Stream.cons` g (i + 1) in g 0`

Generate an infinite stream, whose values are the output of a function `f`

applied on the corresponding index. Index starts at 0.

`>>>`

[0,1,2,3,4]`Stream.toList $ Stream.take 5 $ Stream.fromIndices id`

*Since: 0.6.0*

fromIndicesM :: forall t m a. (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

`>>>`

`fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0`

`>>>`

`fromIndicesM f = let g i = f i `Stream.consM` g (i + 1) in g 0`

Generate an infinite stream, whose values are the output of a monadic
function `f`

applied on the corresponding index. Index starts at 0.

*Concurrent*

*Since: 0.6.0*

## Iteration

iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #

`>>>`

`iterate f x = x `Stream.cons` iterate f x`

Generate an infinite stream with `x`

as the first element and each
successive element derived by applying the function `f`

on the previous
element.

`>>>`

[1,2,3,4,5]`Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1`

*Since: 0.1.2*

iterateM :: forall t m a. (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

`>>>`

`iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)`

Generate an infinite stream with the first element generated by the action
`m`

and each successive element derived by applying the monadic function
`f`

on the previous element.

`>>>`

`pr n = threadDelay 1000000 >> print n`

`>>>`

Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.fromSerial & Stream.toList :} 0 1 [0,1,2]`:{`

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

`>>>`

Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.delay 1 & Stream.take 3 & Stream.fromAsync & Stream.toList :} 0 1 ...`:{`

*Concurrent*

*Since: 0.1.2*

*Since: 0.7.0 (signature change)*

## Cyclic Elements

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

We can define cyclic structures using `let`

:

`>>>`

([1,1],1)`let (a, b) = ([1, b], head a) in (a, b)`

The function `fix`

defined as:

`>>>`

`fix f = let x = f x in x`

ensures that the argument of a function and its output refer to the same
lazy value `x`

i.e. the same location in memory. Thus `x`

can be defined
in terms of itself, creating structures with cyclic references.

`>>>`

`f ~(a, b) = ([1, b], head a)`

`>>>`

([1,1],1)`fix f`

`mfix`

is essentially the same as `fix`

but for monadic
values.

Using `mfix`

for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
itself.

*Pre-release*

## From Containers

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList =`foldr`

`cons`

`nil`

Construct a stream from a list of pure values. This is more efficient than
`fromFoldable`

for serial streams.

*Since: 0.4.0*

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

`>>>`

`fromListM = Stream.fromFoldableM`

`>>>`

`fromListM = Stream.sequence . Stream.fromList`

`>>>`

`fromListM = Stream.mapM id . Stream.fromList`

`>>>`

`fromListM = Prelude.foldr Stream.consM Stream.nil`

Construct a stream from a list of monadic actions. This is more efficient
than `fromFoldableM`

for serial streams.

*Since: 0.4.0*

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

`>>>`

`fromFoldable = Prelude.foldr Stream.cons Stream.nil`

Construct a stream from a `Foldable`

containing pure values:

*Since: 0.2.0*

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

`>>>`

`fromFoldableM = Prelude.foldr Stream.consM Stream.nil`

Construct a stream from a `Foldable`

containing monadic actions.

`>>>`

`pr n = threadDelay 1000000 >> print n`

`>>>`

1 2 3`Stream.drain $ Stream.fromSerial $ Stream.fromFoldableM $ map pr [1,2,3]`

`>>>`

... ... ...`Stream.drain $ Stream.fromAsync $ Stream.fromFoldableM $ map pr [1,2,3]`

*Concurrent (do not use with fromParallel on infinite containers)*

*Since: 0.3.0*

fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #

Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.

*Pre-release*

fromPrimIORef :: (IsStream t, MonadIO m, Unbox a) => IORef a -> t m a Source #

Construct a stream by reading an `Unboxed`

`IORef`

repeatedly.

*Pre-release*

## Deprecated

fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #

Read lines from an IO Handle into a stream of Strings.

*Since: 0.1.0*

currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximumBy = Stream.fold Fold.maximumBy

*Since: 0.6.0*

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

minimumBy = Stream.fold Fold.minimumBy

*Since: 0.6.0*

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad `m`

is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.

*Since: 0.1.0*

toList :: Monad m => SerialT m a -> m [a] Source #

toList = Stream.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. `Identity`

). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.

*Warning!* working on large lists accumulated as buffers in memory could be
very inefficient, consider using Streamly.Array instead.

*Since: 0.1.0*

mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #

Fold a stream of monoid elements by appending them.

mconcat = Stream.fold Fold.mconcat

*Pre-release*

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0) head = Stream.fold Fold.one

*Since: 0.1.0*

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns
`Nothing`

. If the stream is non-empty, returns `Just (a, ma)`

, where `a`

is
the head of the stream and `ma`

its tail.

This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of `uncons`

, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.

*Since: 0.1.0*

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . Stream.uncons

Extract all but the first element of the stream, if any.

*Since: 0.1.1*

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (Stream.length xs - 1) last = Stream.fold Fold.last

*Since: 0.1.1*

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

*Since: 0.5.0*

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

null = Stream.fold Fold.null

*Since: 0.1.1*

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. `foldl' reduce initial stream`

invokes
`reduce`

with the accumulator and the next input in the input stream, using
`initial`

as the initial value of the current value of the accumulator. When
the input is exhausted the current value of the accumulator is returned.
Make sure to use a strict data structure for accumulator to not build
unnecessary lazy expressions unless that's what you want. See the previous
section for more details.

*Since: 0.2.0*

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the
starting value. Returns `Nothing`

if the stream is empty.

*Since: 0.5.0*

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns `0`

when
the stream is empty. Note that this is not numerically stable for floating
point numbers.

sum = Stream.fold Fold.sum

*Since: 0.1.0*

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns `1`

when the stream is empty.

product = Stream.fold Fold.product

*Since: 0.1.1*

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Lazy right fold for non-empty streams, using first element as the starting
value. Returns `Nothing`

if the stream is empty.

*Since: 0.5.0*

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

`maximum = ``maximumBy`

compare
maximum = Stream.fold Fold.maximum

Determine the maximum element in a stream.

*Since: 0.1.0*

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

`minimum = ``minimumBy`

compare
minimum = Stream.fold Fold.minimum

Determine the minimum element in a stream.

*Since: 0.1.0*

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

and = Stream.fold Fold.and

*Since: 0.5.0*

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

or = Stream.fold Fold.or

*Since: 0.5.0*

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

any = Stream.fold Fold.any

*Since: 0.1.0*

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

all = Stream.fold Fold.all

*Since: 0.1.0*

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

elem = Stream.fold Fold.elem

*Since: 0.1.0*

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

notElem = Stream.fold Fold.length

*Since: 0.1.0*

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs `(a, b)`

, return the value `b`

of the
first pair where the key equals the given value `a`

.

lookup = snd <$> Stream.find ((==) . fst) lookup = Stream.fold Fold.lookup

*Since: 0.5.0*

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

*Since: 0.6.0*

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left `Fold`

and reducing the resulting
expression strictly at each step. The behavior is similar to `foldl'`

. A
`Fold`

can terminate early without consuming the full stream. See the
documentation of individual `Fold`

s for termination behavior.

`>>>`

5050`Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)`

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

`>>>`

0`Stream.fold Fold.sum Stream.nil`

However, `foldMany`

on an empty stream results in an empty stream.
Therefore, `Stream.fold f`

is not the same as ```
Stream.head . Stream.foldMany
f
```

.

fold f = Stream.parse (Parser.fromFold f)

*Since: 0.7.0*

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. `foldrM build final stream`

constructs
an output structure using the step function `build`

. `build`

is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls `build`

again thus
lazily consuming the input `stream`

until either the output expression built
by `build`

is free of the "tail" or the input is exhausted in which case
`final`

is used as the terminating case for the output structure. For more
details see the description in the previous section.

Example, determine if any element is `odd`

in a stream:

`>>>`

True`Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)`

*Since: 0.7.0 (signature changed)*

*Since: 0.2.0 (signature changed)*

*Since: 0.1.0*

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = Stream.drain . Stream.mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

*Since: 0.1.0*

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like `findM`

but with a non-monadic predicate.

find p = findM (return . p) find = Stream.fold Fold.find

*Since: 0.5.0*

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

`stripPrefix prefix stream`

strips `prefix`

from `stream`

if it is a
prefix of stream. Returns `Nothing`

if the stream does not start with the
given prefix, stripped stream otherwise. Returns `Just nil`

when the prefix
is the same as the stream.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".

Space: `O(1)`

*Since: 0.6.0*

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = Stream.findIndex (== a)

*Since: 0.5.0*

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

findIndex = Stream.fold Fold.findIndex

*Since: 0.5.0*

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns `True`

if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.

`>>>`

True`Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)`

*Since: 0.6.0*

isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns `True`

if the first stream is a suffix of the second. A stream is
considered a suffix of itself.

`>>>`

True`Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)`

Space: `O(n)`

, buffers entire input stream and the suffix.

*Pre-release*

*Suboptimal* - Help wanted.

isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a) => SerialT m a -> SerialT m a -> m Bool Source #

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns `True`

if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.

`>>>`

True`Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)`

*Since: 0.6.0*

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

*Since: 0.6.0*

stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #

Drops the given suffix from a stream. Returns `Nothing`

if the stream does
not end with the given suffix. Returns `Just nil`

when the suffix is the
same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".

Space: `O(n)`

, buffers the entire input stream as well as the suffix

*Pre-release*

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ()) drain = Stream.fold Fold.drain

Run a stream, discarding the results. By default it interprets the stream
as `SerialT`

, to run other types of streams use the type adapting
combinators for example `Stream.drain . `

.`fromAsync`

*Since: 0.7.0*

foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #

Lazy left fold to a stream.

foldlT :: (Monad m, IsStream t, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #

Lazy left fold to a transformer monad.

For example, to reverse a stream:

D.toList $ D.foldlT (flip D.cons) D.nil $ (D.fromList [1..5] :: SerialT IO Int)

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

*Since: 0.6.0*

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

*Since: 0.6.0*

parse :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #

Parse a stream using the supplied `Parser`

.

Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

`>>>`

Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")`Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil`

Note:

fold f = Stream.parse (Parser.fromFold f)

`parse p`

is not the same as `head . parseMany p`

on an empty stream.

*Pre-release*

toStreamRev :: Monad m => SerialT m a -> m (SerialT n a) Source #

Convert a stream to a pure stream in reverse order.

toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil

*Pre-release*

toListRev :: Monad m => SerialT m a -> m [a] Source #

toListRev = Stream.foldl' (flip (:)) []

Convert a stream into a list in reverse order in the underlying monad.

*Warning!* working on large lists accumulated as buffers in memory could be
very inefficient, consider using Streamly.Array instead.

*Pre-release*

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = Stream.drain . Stream.take n drainN n = Stream.fold (Fold.take n Fold.drain)

Run maximum up to `n`

iterations of a stream.

*Since: 0.7.0*

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

findM = Stream.fold Fold.findM

*Since: 0.6.0*

parseD :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #

Parse a stream using the supplied ParserD `Parser`

.

*Internal*

headElse :: Monad m => a -> SerialT m a -> m a Source #

Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.

*Pre-release*

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = Stream.drain . Stream.takeWhile p

Run a stream as long as the predicate holds true.

*Since: 0.7.0*

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function `t m a -> m b`

to a stream `t m a`

concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.

If you read the signature as `(t m a -> m b) -> (t m a -> m b)`

you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.

The `.`

at the end of the operator is a mnemonic for termination of the
stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

`>>>`

`import Control.Concurrent (threadDelay)`

`>>>`

`import Streamly.Prelude ((|$.))`

`>>>`

Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ()) |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1`:{`

*Concurrent*

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
`foldl`

library. The suffix `x`

is a mnemonic for extraction.

*Since: 0.2.0*

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Like `foldx`

, but with a monadic step function.

*Since: 0.2.0*

runStream :: Monad m => SerialT m a -> m () Source #

Run a stream, discarding the results. By default it interprets the stream
as `SerialT`

, to run other types of streams use the type adapting
combinators for example `runStream . `

.`fromAsync`

*Since: 0.2.0*

runN :: Monad m => Int -> SerialT m a -> m () Source #

runN n = runStream . take n

Run maximum up to `n`

iterations of a stream.

*Since: 0.6.0*

runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

runWhile p = runStream . takeWhile p

Run a stream as long as the predicate holds true.

*Since: 0.6.0*

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

toHandle h = D.mapM_ $ hPutStrLn h

Write a stream of Strings to an IO Handle.

*Since: 0.1.0*

foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #

Same as `|$.`

.

*Internal*

## Piping

Pass through a `Pipe`

.

transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #

Use a `Pipe`

to transform a stream.

*Pre-release*

## Folding

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Right fold to a streaming monad.

foldrS Stream.cons Stream.nil === id

`foldrS`

can be used to perform stateless stream to stream transformations
like map and filter in general. It can be coupled with a scan to perform
stateful transformations. However, note that the custom map and filter
routines can be much more efficient than this due to better stream fusion.

`>>>`

[1,2,3,4,5]`Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5]`

Find if any element in the stream is `True`

:

`>>>`

[True]`Stream.toList $ Stream.foldrS (\x xs -> if odd x then (Stream.fromPure True) else xs) (Stream.fromPure False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)`

Map (+2) on odd elements and filter out the even elements:

`>>>`

[3,5,7]`Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)`

`foldrM`

can also be represented in terms of `foldrS`

, however, the former
is much more efficient:

foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

*Pre-release*

foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right fold to a transformer monad. This is the most general right fold
function. `foldrS`

is a special case of `foldrT`

, however `foldrS`

implementation can be more efficient:

foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

`foldrT`

can be used to translate streamly streams to other transformer
monads e.g. to a different streaming type.

*Pre-release*

## Mapping

Stateless one-to-one maps.

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

sequence = mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"] abc >>> :{ drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1) & (fromSerial . Stream.sequence) :} 1 1 1 >>> :{ drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1) & (fromAsync . Stream.sequence) :} 1 1 1

*Concurrent (do not use with fromParallel on infinite streams)*

*Since: 0.1.0*

mapM :: forall t m a b. (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapM f = sequence . map f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"] abc >>> :{ drain $ Stream.replicateM 10 (return 1) & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x)) :} 1 ... 1 > drain $ Stream.replicateM 10 (return 1) & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))

*Concurrent (do not use with fromParallel on infinite streams)*

*Since: 0.1.0*

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful `mapM`

, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to `scan`

. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.

See also: `scanlM'`

*Pre-release*

## Mapping Side Effects (Observation)

See also the intersperse*_ combinators.

trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

`>>>`

1 2`Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2)`

Compare with `tap`

.

*Since: 0.7.0*

trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Perform a side effect before yielding each element of the stream and discard the results.

>>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2) "got here" "got here"

Same as `intersperseMPrefix_`

but always serial.

See also: `trace`

*Pre-release*

tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #

Tap the data flowing through a stream into a `Fold`

. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.

Fold m a b | -----stream m a ---------------stream m a-----

`>>>`

1 2`Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)`

Compare with `trace`

.

*Since: 0.7.0*

tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #

`tapOffsetEvery offset n`

taps every `n`

th element in the stream
starting at `offset`

. `offset`

can be between `0`

and `n - 1`

. Offset 0
means start at the first element in the stream. If the offset is outside
this range then `offset `

is used as offset.`mod`

n

`>>>`

[0,2,4,6,8,10]`Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10`

tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing `maxBuffer`

setting.

Stream m a -> m b | -----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2) 1 2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

`>>>`

`tapAsync f = Stream.tapAsyncK (Stream.fold f . Stream.adapt)`

Compare with `tap`

.

*Pre-release*

tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #

Like `tapAsyncF`

but uses a stream fold function instead of a `Fold`

type.

*Pre-release*

distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #

Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.

> Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2) 1 2 1 2

distributeAsync_ = flip (foldr tapAsync)

*Pre-release*

pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> m b) -> t m a -> t m a Source #

`pollCounts predicate transform fold stream`

counts those elements in the
stream that pass the `predicate`

. The resulting count stream is sent to
another thread which transforms it using `transform`

and then folds it using
`fold`

. The thread is automatically cleaned up if the stream stops or
aborts due to exception.

For example, to print the count of elements processed every second:

> Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print) $ Stream.enumerateFrom 0

Note: This may not work correctly on 32-bit machines.

*Pre-release*

## Scanning By `Fold`

scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Scan a stream using the given monadic fold.

`>>>`

[0,1,3,6]`Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])`

*Since: 0.7.0*

scanMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like `scan`

but restarts scanning afresh when the scanning fold
terminates.

*Pre-release*

postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

`>>>`

`import Data.Maybe (fromJust)`

`>>>`

`let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)`

`>>>`

Stream.toList $ Stream.map (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0) :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]`:{`

*Since: 0.7.0*

## Scanning

Left scans. Stateful, mostly one-to-one maps.

scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like `map`

, `scanl'`

too is a one to one transformation,
however it adds an extra element.

>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]

>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of `scanl'`

is the initial value of the accumulator followed by
all the intermediate steps and the final result of `foldl'`

.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product
of the elements in a stream in one go using a `foldl'`

:

>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4

Using `scanl'`

we can make it modular by computing the sum in the first
stage and passing it down to the next stage for computing the product:

>>> :{ Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1) $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1) $ Stream.fromList [1,2,3,4] :} (10,24)

IMPORTANT: `scanl'`

evaluates the accumulator to WHNF. To avoid building
lazy expressions inside the accumulator, it is recommended that a strict
data structure is used for accumulator.

`>>>`

`scanl' step z = scan (Fold.foldl' step z)`

`>>>`

`scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs`

`>>>`

`scanl' f z xs = z `Stream.cons` postscanl' f z xs`

See also: `usingStateT`

*Since: 0.2.0*

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like `scanl'`

but with a monadic step function and a monadic seed.

*Since: 0.4.0*

*Since: 0.8.0 (signature change)*

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

`scanlMAfter' accumulate initial done stream`

is like `scanlM'`

except
that it provides an additional `done`

function to be applied on the
accumulator when the stream stops. The result of `done`

is also emitted in
the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

*Pre-release*

postscanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like `scanl'`

but does not stream the initial value of the accumulator.

`>>>`

`postscanl' step z = postscan (Fold.foldl' step z)`

`>>>`

`postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)`

`>>>`

`postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs`

*Since: 0.7.0*

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like `postscanl'`

but with a monadic step function and a monadic seed.

`>>>`

`postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs`

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the final value of the accumulator.

*Pre-release*

prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like prescanl' but with a monadic step function and a monadic seed.

*Pre-release*

scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #

Like `scanl'`

but for a non-empty stream. The first element of the stream
is used as the initial value of the accumulator. Does nothing if the stream
is empty.

>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4] [1,3,6,10]

*Since: 0.6.0*

scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #

Like `scanl1'`

but with a monadic step function.

*Since: 0.6.0*

## Filtering

Produce a subset of the stream using criteria based on the values of the elements. We can use a concatMap and scan for filtering but these combinators are more efficient and convenient.

with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a Source #

Modify a `t m a -> t m a`

stream transformation that accepts a predicate
`(a -> b)`

to accept `((s, a) -> b)`

instead, provided a transformation ```
t m
a -> t m (s, a)
```

. Convenient to filter with index or time.

filterWithIndex = with indexed filter filterWithAbsTime = with timestamped filter filterWithRelTime = with timeIndexed filter

*Pre-release*

deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5] [1,3,5]

*Since: 0.6.0*

filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

*Since: 0.1.0*

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as `filter`

but with a monadic predicate.

*Since: 0.4.0*

uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #

Drop repeated elements that are adjacent to each other.

*Since: 0.6.0*

nubBy :: (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements anywhere in the stream.

*Caution: not scalable for infinite streams*

*See also: nubWindowBy*

*Unimplemented*

prune :: (a -> Bool) -> t m a -> t m a Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)

> Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!"

Space: `O(1)`

*Unimplemented*

## Trimming

Produce a subset of the stream trimmed at ends.

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first `n`

elements from the stream and discard the rest.

*Since: 0.1.0*

takeLast :: Int -> t m a -> t m a Source #

Take `n`

elements at the end of the stream.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeLastInterval :: Double -> t m a -> t m a Source #

Take time interval `i`

seconds at the end of the stream.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

*Since: 0.1.0*

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as `takeWhile`

but with a monadic predicate.

*Since: 0.4.0*

takeWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Take all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like `takeWhile`

and `takeWhileLast`

combined.

O(n) space, where n is the number elements taken from the end.

*Unimplemented*

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first `n`

elements from the stream and take the rest.

*Since: 0.1.0*

dropLast :: Int -> t m a -> t m a Source #

Drop `n`

elements at the end of the stream.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropLastInterval :: Int -> t m a -> t m a Source #

Drop time interval `i`

seconds at the end of the stream.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

*Since: 0.1.0*

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as `dropWhile`

but with a monadic predicate.

*Since: 0.4.0*

dropWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Drop all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like `dropWhile`

and `dropWhileLast`

combined.

O(n) space, where n is the number elements dropped from the end.

*Unimplemented*

## Inserting Elements

Produce a superset of the stream. This is the opposite of filtering/sampling. We can always use concatMap and scan for inserting but these combinators are more efficient and convenient.

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

Insert a pure value between successive elements of a stream.

`>>>`

"h,e,l,l,o"`Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"`

*Since: 0.7.0*

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

`>>>`

h.,e.,l.,l.,o"h,e,l,l,o"`Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"`

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

`>>>`

he.l.l.o."h,e,l,l,o"`Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"`

*Since: 0.5.0*

intersperseMWith :: Int -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every `n`

elements.

> Stream.toList $ Stream.intersperseMWith 2 (return ',') $ Stream.fromList "hello" "he,ll,o"

*Unimplemented*

intersperseMSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a Source #

Insert an effect and its output after consuming an element of a stream.

`>>>`

h.,e.,l.,l.,o.,"h,e,l,l,o,"`Stream.toList $ Stream.trace putChar $ intersperseMSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"`

*Pre-release*

intersperseMSuffixWith :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a Source #

Like `intersperseMSuffix`

but intersperses an effectful action into the
input stream after every `n`

elements and after the last element.

`>>>`

"he,ll,o,"`Stream.toList $ Stream.intersperseMSuffixWith 2 (return ',') $ Stream.fromList "hello"`

*Pre-release*

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every `n`

seconds.

> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o

*Pre-release*

## Inserting Side Effects/Time

intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream except the first one.

`>>>`

h.e.l.l.o`Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"`

*Pre-release*

delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of the stream except the first one.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Since: 0.8.0*

intersperseMSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect after consuming an element of a stream.

>>> Stream.mapM_ putChar $ Stream.intersperseMSuffix_ (threadDelay 1000000) $ Stream.fromList "hello" hello

*Pre-release*

delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds after consuming an element of a stream.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

intersperseMPrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream.

`>>>`

.h.e.l.l.o"hello"`Stream.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello"`

Same as `trace_`

but may be concurrent.

*Concurrent*

*Pre-release*

delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of a stream.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

## Element Aware Insertion

Opposite of filtering

insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

`insertBy cmp elem stream`

inserts `elem`

before the first element in
`stream`

that is less than `elem`

when compared using `cmp`

.

insertBy cmp x =`mergeBy`

cmp (`fromPure`

x)

>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5] [1,2,3,5]

*Since: 0.6.0*

## Reordering

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

`>>>`

`reverse = Stream.foldlT (flip Stream.cons) Stream.nil`

*Since 0.7.0 (Monad m constraint)*

*Since: 0.1.1*

reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b Source #

Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.

*Unimplemented*

## Position Indexing

indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #

indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)

Pair each element in a stream with its index, starting from index 0.

`>>>`

[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]`Stream.toList $ Stream.indexed $ Stream.fromList "hello"`

*Since: 0.6.0*

indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #

indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))

Pair each element in a stream with its index, starting from the
given index `n`

and counting down.

`>>>`

[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]`Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"`

*Since: 0.6.0*

## Time Indexing

timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a) Source #

timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a) Source #

Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.

`>>>`

(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)`Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.

`>>>`

(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)`Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

## Searching

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

*Since: 0.5.0*

elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

elemIndices a = findIndices (== a)

*Since: 0.5.0*

## Rolling map

Map using the previous element.

rollingMapM :: (IsStream t, Monad m) => (Maybe a -> a -> m b) -> t m a -> t m b Source #

Like `rollingMap`

but with an effectful map function.

*Pre-release*

rollingMap :: (IsStream t, Monad m) => (Maybe a -> a -> b) -> t m a -> t m b Source #

Apply a function on every two successive elements of a stream. The first
argument of the map function is the previous element and the second argument
is the current element. When the current element is the first element, the
previous element is `Nothing`

.

*Pre-release*

rollingMap2 :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #

Like `rollingMap`

but requires at least two elements in the stream,
returns an empty stream otherwise.

This is the stream equivalent of the list idiom `zipWith f xs (tail xs)`

.

*Pre-release*

## Maybe Streams

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

## Either Streams

both :: Functor (t m) => t m (Either a a) -> t m a Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

*Pre-release*

## Concurrent Evaluation

### Concurrent Pipelines

Run streaming stages concurrently.

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

*Pre-release*

applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #

Same as `|$`

.

*Internal*

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation
function `t m a -> t m b`

to a stream `t m a`

concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. `|$`

is just like regular function application
operator `$`

except that it is concurrent.

If you read the signature as `(t m a -> t m b) -> (t m a -> t m b)`

you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

`>>>`

Stream.drain $ Stream.mapM (\x -> threadDelay 1000000 >> print x) |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1`:{`

*Concurrent*

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

### Concurrency Control

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. `maxThreads`

does not affect
`ParallelT`

streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

*Since: 0.4.0 (Streamly)*

*Since: 0.8.0*

### Buffering and Sampling

Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end or keep dropping elements uniformly to match the rate of the consumer.

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded `maxBuffer`

value (i.e. a negative value)
coupled with an unbounded `maxThreads`

value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when `pure`

is used in `ZipAsyncM`

streams as `pure`

in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.

*Since: 0.4.0 (Streamly)*

*Since: 0.8.0*

sampleOld :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the oldest `n`

elements in the buffer, discard the new ones when the buffer is full. When
the output stream is evaluated it consumes the values from the buffer in a
FIFO manner.

*Unimplemented*

sampleNew :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the latest `n`

elements in a ring buffer, keep discarding the older ones to make space for
the new ones. When the output stream is evaluated it consumes the values
from the buffer in a FIFO manner.

*Unimplemented*

sampleRate :: Double -> t m a -> t m a Source #

### Rate Limiting

Evaluate the stream at uniform intervals to maintain a specified evaluation rate.

Specifies the stream yield rate in yields per second (`Hertz`

).
We keep accumulating yield credits at `rateGoal`

. At any point of time we
allow only as many yields as we have accumulated as per `rateGoal`

since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed `rateGoal`

. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than `rateBuffer`

we try to recover only as
much as `rateBuffer`

.

`rateLow`

puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, `rateHigh`

puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.

If the `rateGoal`

is 0 or negative the stream never yields a value.
If the `rateBuffer`

is 0 or negative we do not attempt to recover.

*Since: 0.5.0 (Streamly)*

*Since: streamly-core-0.8.0*

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream.
A `Nothing`

value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of `Rate`

specifications is documented under `Rate`

. The
effective maximum production rate achieved by a stream is governed by:

- The
`maxThreads`

limit - The
`maxBuffer`

limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate (r/2) r (2*r) maxBound)`

Specifies the average production rate of a stream in number of yields
per second (i.e. `Hertz`

). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate r r (2*r) maxBound)`

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate (r/2) r r maxBound)`

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate r r r 0)`

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

## Diagnostics

inspectMode :: IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

*Pre-release*

## Deprecated

scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Strict left scan with an extraction function. Like `scanl'`

, but applies a
user supplied extraction function (the third argument) at each step. This is
designed to work with the `foldl`

library. The suffix `x`

is a mnemonic for
extraction.

*Since 0.2.0*

*Since: 0.7.0 (Monad m constraint)*

## Binary Combinators (Linear)

Functions ending in the shape:

`t m a -> t m a -> t m a`

.

The functions in this section have a linear or flat n-ary combining
characterstics. It means that when combined `n`

times (e.g. `a `

) the resulting expression will have an `serial`

b `serial`

c ...`O(n)`

complexity (instead O(n^2) for pair wise combinators described in the
next section. These functions can be used efficiently with
`concatMapWith`

et. al. combinators that combine streams in a linear
fashion (contrast with `concatPairsWith`

which combines streams as a
binary tree).

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

`>>>`

`import Streamly.Prelude (serial)`

`>>>`

`stream1 = Stream.fromList [1,2]`

`>>>`

`stream2 = Stream.fromList [3,4]`

`>>>`

[1,2,3,4]`Stream.toList $ stream1 `serial` stream2`

This operation can be used to fold an infinite lazy container of streams.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

`>>>`

`import Streamly.Prelude (ahead, SerialT)`

`>>>`

`stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int`

`>>>`

`stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int`

`>>>`

2 sec 4 sec [4,2]`Stream.toList $ stream1 `ahead` stream2 :: IO [Int]`

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

`>>>`

`stream3 = Stream.fromEffect (delay 1)`

`>>>`

1 sec 2 sec 4 sec [4,2,1]`Stream.toList $ stream1 `ahead` stream2 `ahead` stream3`

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

`>>>`

2 sec 1 sec 4 sec [4,2,1]`Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3`

Only streams are scheduled for ahead evaluation, how actions within a stream
are evaluated depends on the stream type. If it is a concurrent stream they
will be evaluated concurrently. It may not make much sense combining serial
streams using `ahead`

.

`ahead`

can be safely used to fold an infinite lazy container of streams.

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

`>>>`

`import Streamly.Prelude (async)`

`>>>`

`stream1 = Stream.fromEffect (delay 4)`

`>>>`

`stream2 = Stream.fromEffect (delay 2)`

`>>>`

2 sec 4 sec [2,4]`Stream.toList $ stream1 `async` stream2`

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

`>>>`

`stream3 = Stream.fromEffect (delay 1)`

`>>>`

... [1,2,4]`Stream.toList $ stream1 `async` stream2 `async` stream3`

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

`>>>`

... [2,1,4]`Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3`

With a single thread, it becomes serial:

`>>>`

... [4,2,1]`Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3`

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

`>>>`

`stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int`

`>>>`

`stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int`

`>>>`

... [1,1,3,3]`Stream.toList $ stream1 `async` stream2 -- IO [Int]`

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]

... [1,1,3,2,3,2]

Thus `async`

goes deep in first few streams rather than going wide in all
streams. It prefers to evaluate the leftmost streams as much as possible.
Because of this behavior, `async`

can be safely used to fold an infinite
lazy container of streams.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, `wAsync`

is the same as `async`

. See `async`

for
singleton stream behavior. For multi-element streams, while `async`

is left
biased i.e. it tries to evaluate the left side stream as much as possible,
`wAsync`

tries to schedule them both fairly. In other words, `async`

goes
deep while `wAsync`

goes wide. However, outputs are always used as they
arrive.

With a single thread, `async`

starts behaving like `serial`

while `wAsync`

starts behaving like `wSerial`

.

`>>>`

`import Streamly.Prelude (async, wAsync)`

`>>>`

`stream1 = Stream.fromList [1,2,3]`

`>>>`

`stream2 = Stream.fromList [4,5,6]`

`>>>`

[1,2,3,4,5,6]`Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2`

`>>>`

[1,4,2,5,3,6]`Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2`

With two threads available, and combining three streams:

`>>>`

`stream3 = Stream.fromList [7,8,9]`

`>>>`

[1,2,3,4,5,6,7,8,9]`Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3`

`>>>`

[1,4,2,7,5,3,8,6,9]`Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3`

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that `WSerialT`

and single threaded `WAsyncT`

both interleave streams
but the exact scheduling is slightly different in both cases.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like `async`

except that the execution is much more
strict. There is no limit on the number of threads. While
`async`

may not schedule a stream if there is no demand
from the consumer, `parallel`

always evaluates both the streams immediately.
The only limit that applies to `parallel`

is `maxBuffer`

.
Evaluation may block if the output buffer becomes full.

`>>>`

`import Streamly.Prelude (parallel)`

`>>>`

`stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)`

`>>>`

1 sec 2 sec [1,2]`Stream.toList stream -- IO [Int]`

`parallel`

guarantees that all the streams are scheduled for execution
immediately, therefore, we could use things like starting timers inside the
streams and relying on the fact that all timers were started at the same
time.

Unlike `async`

this operation cannot be used to fold an infinite lazy
container of streams, because it schedules all the streams strictly
concurrently.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel`

but stops the output as soon as the first stream stops.

*Pre-release*

parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel`

but stops the output as soon as any of the two streams
stops.

*Pre-release*

## Binary Combinators (Pair Wise)

Like the functions in the section above these functions also combine
two streams into a single stream but when used `n`

times linearly they
exhibit O(n^2) complexity. They are best combined in a binary tree
fashion using `concatPairsWith`

giving a `n * log n`

complexity. Avoid
using these with `concatMapWith`

when combining a large or infinite
number of streams.

### Append

append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.

IMPORTANT NOTE: This could be 100x faster than `serial/<>`

for appending a
few (say 100) streams because it can fuse via stream fusion. However, it
does not scale for a large number of streams (say 1000s) and becomes
qudartically slow. Therefore use this for custom appending of a few streams
but use `concatMap`

or 'concatMapWith serial' for appending `n`

streams or
infinite containers of streams.

*Pre-release*

### wSerial

`wSerial`

is a CPS based stream interleaving functions. Use
'concatPairsWith wSerial' to interleave `n`

streams uniformly. It can be
used with `concatMapWith`

as well, however, the interleaving behavior of
`n`

streams would be asymmetric giving exponentially more weightage to
streams that come earlier in the composition.

wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

`>>>`

`import Streamly.Prelude (wSerial)`

`>>>`

`stream1 = Stream.fromList [1,2]`

`>>>`

`stream2 = Stream.fromList [3,4]`

`>>>`

[1,3,2,4]`Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2`

Note, for singleton streams `wSerial`

and `serial`

are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

### Interleave

`interleave`

is like `wSerial`

but using a direct style
implementation instead of CPS. It is faster than `wSerial`

due to stream
fusion but has worse efficiency when used with `concatMapWith`

for large
number of streams.

interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,,,"`Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,b,cd"`Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char`

`interleave`

is dual to `interleaveMin`

, it can be called `interleaveMax`

.

Do not use at scale in concatMapWith.

*Pre-release*

interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,"`Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,b,c"`Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char`

`interleaveMin`

is dual to `interleave`

.

Do not use at scale in concatMapWith.

*Pre-release*

interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,c,"`Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,bc"`Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char`

`interleaveSuffix`

is a dual of `interleaveInfix`

.

Do not use at scale in concatMapWith.

*Pre-release*

interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,c"`Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,bc"`Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char`

`interleaveInfix`

is a dual of `interleaveSuffix`

.

Do not use at scale in concatMapWith.

*Pre-release*

### Round Robin

roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Schedule the execution of two streams in a fair round-robin manner,
executing each stream once, alternately. Execution of a stream may not
necessarily result in an output, a stream may chose to `Skip`

producing an
element until later giving the other stream a chance to run. Therefore, this
combinator fairly interleaves the execution of two streams rather than
fairly interleaving the output of the two streams. This can be useful in
co-operative multitasking without using explicit threads. This can be used
as an alternative to `async`

.

Do not use at scale in concatMapWith.

*Pre-release*

### Zip

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream `a`

is evaluated first, followed by stream `b`

, the resulting
elements `a`

and `b`

are then zipped using the supplied zip function and the
result `c`

is yielded to the consumer.

If stream `a`

or stream `b`

ends, the zipped stream ends. If stream `b`

ends
first, the element `a`

from previous evaluation of stream `a`

is discarded.

> D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6]) [5,7,9]

*Since: 0.1.0*

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipWith`

but using a monadic zipping function.

*Since: 0.4.0*

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like `zipWith`

but zips concurrently i.e. both the streams being zipped
are evaluated concurrently using the `ParallelT`

concurrent evaluation
style. The maximum number of elements of each stream evaluated in advance
can be controlled by `maxBuffer`

.

The stream ends if stream `a`

or stream `b`

ends. However, if stream `b`

ends while we are still evaluating stream `a`

and waiting for a result then
stream will not end until after the evaluation of stream `a`

finishes. This
behavior can potentially be changed in future to end the stream immediately
as soon as any of the stream end is detected.

*Since: 0.1.0*

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipAsyncWith`

but with a monadic zipping function.

*Since: 0.4.0*

### Merge

mergeBy :: IsStream t => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8]) [1,2,3,4,5,6,8]

See also: `mergeByMFused`

*Since: 0.6.0*

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeBy`

but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

>>> :{ do let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ _ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2]) print xs :} [1,1,2,1,1,2,1,1,2]

See also: `mergeByMFused`

*Since: 0.6.0*

mergeByMFused :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but much faster, works best when merging statically known
number of streams. When merging more than two streams try to merge pairs and
pair pf pairs in a tree like structure.`mergeByM`

works better with variable
number of streams being merged using `concatPairsWith`

.

*Internal*

mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeBy`

but merges concurrently (i.e. both the elements being
merged are generated concurrently).

*Since: 0.6.0*

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but merges concurrently (i.e. both the elements being
merged are generated concurrently).

*Since: 0.6.0*

mergeMinBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but stops merging as soon as any of the two streams stops.

*Unimplemented*

mergeFstBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but stops merging as soon as the first stream stops.

*Unimplemented*

## Combine Streams and Unfolds

Expand a stream by repeatedly using an unfold and merging the resulting streams. Functions generally ending in the shape:

Unfold m a b -> t m a -> t m b

### Append Many (Unfold)

Unfold and flatten streams.

unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `unfoldMany`

but interleaves the streams in the same way as
`interleave`

behaves instead of appending them.

*Pre-release*

unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `unfoldMany`

but executes the streams in the same way as
`roundrobin`

.

*Pre-release*

### Interpose

Insert effects between streams. Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.

interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

unwords = S.interpose ' '

*Pre-release*

interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

unlines = S.interposeSuffix '\n'

*Pre-release*

### Intercalate

Insert Streams between Streams. Like unfoldMany but intersperses streams from another source between the streams from the first source.

intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

`intersperse`

followed by unfold and concat.

intercalate unf a str = unfoldMany unf $ intersperse a str intersperse = intercalate (Unfold.function id) unwords = intercalate Unfold.fromList " "

`>>>`

"abc def ghi"`Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]`

*Since: 0.8.0*

intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

`intersperseMSuffix`

followed by unfold and concat.

intercalateSuffix unf a str = unfoldMany unf $ intersperseMSuffix a str intersperseMSuffix = intercalateSuffix (Unfold.function id) unlines = intercalateSuffix Unfold.fromList "\n"

`>>>`

"abc\ndef\nghi\n"`Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]`

*Since: 0.8.0*

gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

`interleaveInfix`

followed by unfold and concat.

*Pre-release*

gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

`interleaveSuffix`

followed by unfold and concat.

*Pre-release*

## Append Many (concatMap)

Map and serially append streams. `concatMapM`

is a generalization of
the binary append operation to append many streams.

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike `concatMap`

, it can produce an
effect at the beginning of each iteration of the inner loop.

*Since: 0.6.0*

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

`>>>`

`concatMap f = Stream.concatMapM (return . f)`

`>>>`

`concatMap f = Stream.concatMapWith Stream.serial f`

`>>>`

`concatMap f = Stream.concat . Stream.map f`

*Since: 0.6.0*

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

`>>>`

`concatM = Stream.concat . Stream.fromEffect`

`>>>`

`concatM = Stream.concat . lift -- requires (MonadTrans t)`

`>>>`

`concatM = join . lift -- requires (MonadTrans t, Monad (t m))`

*Internal*

concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #

Flatten a stream of streams to a single stream.

concat = concatMap id

*Pre-release*

## Flatten Containers

Flatten `Foldable`

containers using the binary stream merging
operations.

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of `fold`

that allows you to fold a `Foldable`

container of streams using the specified stream sum operation.

`concatFoldableWith ``async`

$ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f D.nil concatFoldableWith f = D.concatMapFoldableWith f id

*Since: 0.8.0 (Renamed foldWith to concatFoldableWith)*

*Since: 0.1.0 (Streamly)*

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of `foldMap`

that allows you to map a monadic streaming action
on a `Foldable`

container and then fold it using the specified stream merge
operation.

`concatMapFoldableWith ``async`

return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

*Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)*

*Since: 0.1.0 (Streamly)*

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like `concatMapFoldableWith`

but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs concatForFoldableWith f = flip (D.concatMapFoldableWith f)

*Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)*

*Since: 0.1.0 (Streamly)*

## ConcatMapWith

Map and flatten a stream like `concatMap`

but using a custom binary
stream merging combinator instead of just appending the streams. The
merging occurs sequentially, it works efficiently for `serial`

, `async`

,
`ahead`

like merge operations where we consume one stream before the
next or in case of `wAsync`

or `parallel`

where we consume all streams
simultaneously anyway.

However, in cases where the merging consumes streams in a round robin
fashion, a pair wise merging using `concatPairsWith`

would be more
efficient. These cases include operations like `mergeBy`

or `zipWith`

.

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

`concatMapWith mixer generator stream`

is a two dimensional looping
combinator. The `generator`

function is used to generate streams from the
elements in the input `stream`

and the `mixer`

function is used to merge
those streams.

Note we can merge streams concurrently by using a concurrent merge function.

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b Source #

Like `concatMapWith`

but carries a state which can be used to share
information across multiple steps of concat.

concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial

*Pre-release*

## ConcatPairsWith

See the notes about suitable merge functions in the `concatMapWith`

section.

concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

Combine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.

For example, you can sort a stream using merge sort like this:

`>>>`

[1,2,5,7,9]`Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2]`

*Caution: the stream of streams must be finite*

*Pre-release*

## IterateMap

Map and flatten Trees of Streams

iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a Source #

Like `iterateM`

but iterates after mapping a stream generator on the
output.

Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure.

Note that `iterateM`

is a special case of `iterateMapWith`

:

iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect

It can be used to traverse a tree structure. For example, to list a directory tree:

Stream.iterateMapWith Stream.serial (either Dir.toEither (const nil)) (fromPure (Left "tmp"))

*Pre-release*

iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a Source #

Like `iterateMap`

but carries a state in the stream generation function.
This can be used to traverse graph like structures, we can remember the
visited nodes in the state to avoid cycles.

Note that a combination of `iterateMap`

and `usingState`

can also be used to
traverse graphs. However, this function provides a more localized state
instead of using a global state.

See also: `mfix`

*Pre-release*

iterateMapLeftsWith :: (IsStream t, b ~ Either a c) => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m b -> t m b Source #

In an `Either`

stream iterate on `Left`

s. This is a special case of
`iterateMapWith`

:

iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))

To traverse a directory tree:

iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))

*Pre-release*

iterateUnfold :: Unfold m a a -> t m a -> t m a Source #

Same as `iterateMapWith Stream.serial`

but more efficient due to stream
fusion.

*Unimplemented*

## Deprecated

## Reduce By Streams

dropPrefix :: t m a -> t m a -> t m a Source #

Drop prefix from the input stream if present.

Space: `O(1)`

*Unimplemented*

dropInfix :: t m a -> t m a -> t m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: `O(n)`

where n is the length of the infix.

*Unimplemented*

dropSuffix :: t m a -> t m a -> t m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: `O(n)`

where n is the length of the suffix.

*Unimplemented*

## Reduce By Folds

Reduce a stream by folding or parsing chunks of the stream. Functions generally ending in these shapes:

f (Fold m a b) -> t m a -> t m b f (Parser a m b) -> t m a -> t m b

### Generic Folding

Apply folds on a stream.

foldMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Apply a `Fold`

repeatedly on a stream and emit the fold outputs in the
output stream.

To sum every two contiguous elements in a stream:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

[3,7,11,15,19]`Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]`

On an empty stream the output is empty:

`>>>`

[]`Stream.toList $ Stream.foldMany f $ Stream.fromList []`

Note `Stream.foldMany (Fold.take 0)`

would result in an infinite loop in a
non-empty stream.

*Since: 0.8.0*

foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like `foldMany`

but appends empty fold output if the fold and stream
termination aligns:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

[0]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList []`

`>>>`

[3,7,11,15,9]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]`

`>>>`

[3,7,11,15,19,0]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]`

*Pre-release*

foldSequence :: t m (Fold m a b) -> t m a -> t m b Source #

Apply a stream of folds to an input stream and emit the results in the output stream.

*Unimplemented*

foldIterateM :: (IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b Source #

Iterate a fold generator on a stream. The initial value `b`

is used to
generate the first fold, the fold is applied on the stream and the result of
the fold is used to generate the next fold and so on.

>>> import Data.Monoid (Sum(..)) >>> f x = return (Fold.take 2 (Fold.sconcat x)) >>> s = Stream.map Sum $ Stream.fromList [1..10] >>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f (pure 0) s [3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.

*Pre-release*

refoldIterateM :: (IsStream t, Monad m) => Refold m b a b -> m b -> t m a -> t m b Source #

Like `foldIterateM`

but using the `Refold`

type instead. This could be
much more efficient due to stream fusion.

*Internal*

### Chunking

Element unaware grouping.

chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #

Group the input stream into groups of `n`

elements each and then fold each
group using the provided fold function.

`>>>`

[3,7,11,15,19]`Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)`

This can be considered as an n-fold version of `take`

where we apply
`take`

repeatedly on the leftover stream until the stream exhausts.

chunksOf n f = foldMany (FL.take n f)

*Since: 0.7.0*

arraysOf :: (IsStream t, MonadIO m, Unbox a) => Int -> t m a -> t m (Array a) Source #

`arraysOf n stream`

groups the elements in the input stream into arrays of
`n`

elements each.

Same as the following but may be more efficient:

arraysOf n = Stream.foldMany (A.writeN n)

*Pre-release*

intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #

Group the input stream into windows of `n`

second each and then fold each
group using the provided fold function.

`>>>`

[...,...,...,...,...]`Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1`

*Since: 0.7.0*

chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m)) => Int -> Double -> Fold m a b -> t m a -> t m b Source #

Like `chunksOf`

but if the chunk is not completed within the specified
time interval then emit whatever we have collected till now. The chunk
timeout is reset whenever a chunk is emitted. The granularity of the clock
is 100 ms.

`>>>`

`s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]`

`>>>`

`f = Stream.mapM_ print $ Stream.chunksOfTimeout 5 1 Fold.toList s`

*Pre-release*

### Splitting

Streams can be sliced into segments in space or in time. We use the
term `chunk`

to refer to a spatial length of the stream (spatial window)
and the term `session`

to refer to a length in time (time window).

splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on an infixed separator element, dropping the separator. The
supplied `Fold`

is applied on the split segments. Splits the stream on
separator elements determined by the supplied predicate, separator is
considered as infixed between two segments:

`>>>`

`splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)`

`>>>`

["a","b"]`splitOn' (== '.') "a.b"`

An empty stream is folded to the default value of the fold:

`>>>`

[""]`splitOn' (== '.') ""`

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

`>>>`

["",""]`splitOn' (== '.') "."`

`>>>`

["","a"]`splitOn' (== '.') ".a"`

`>>>`

["a",""]`splitOn' (== '.') "a."`

`>>>`

["a","","b"]`splitOn' (== '.') "a..b"`

splitOn is an inverse of intercalating single element:

Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id

*Since: 0.7.0*

splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a suffixed separator element, dropping the separator. The
supplied `Fold`

is applied on the split segments.

`>>>`

`splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)`

`>>>`

["a","b"]`splitOnSuffix' (== '.') "a.b."`

`>>>`

["a"]`splitOnSuffix' (== '.') "a."`

An empty stream results in an empty output stream:

`>>>`

[]`splitOnSuffix' (== '.') ""`

An empty segment consisting of only a suffix is folded to the default output of the fold:

`>>>`

[""]`splitOnSuffix' (== '.') "."`

`>>>`

["a","","b",""]`splitOnSuffix' (== '.') "a..b.."`

A suffix is optional at the end of the stream:

`>>>`

["a"]`splitOnSuffix' (== '.') "a"`

`>>>`

["","a"]`splitOnSuffix' (== '.') ".a"`

`>>>`

["a","b"]`splitOnSuffix' (== '.') "a.b"`

lines = splitOnSuffix (== '\n')

`splitOnSuffix`

is an inverse of `intercalateSuffix`

with a single element:

Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id

*Since: 0.7.0*

splitOnPrefix :: (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a prefixed separator element, dropping the separator. The
supplied `Fold`

is applied on the split segments.

```
> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (==
````.`

) ".a.b"
["a","b"]

An empty stream results in an empty output stream:
```
> splitOnPrefix' (==
```

`.`

) ""
[]

An empty segment consisting of only a prefix is folded to the default output of the fold:

> splitOnPrefix' (==`.`

) "." [""] > splitOnPrefix' (==`.`

) ".a.b." ["a","b",""] > splitOnPrefix' (==`.`

) ".a..b" ["a","","b"]

A prefix is optional at the beginning of the stream:

> splitOnPrefix' (==`.`

) "a" ["a"] > splitOnPrefix' (==`.`

) "a.b" ["a","b"]

`splitOnPrefix`

is an inverse of `intercalatePrefix`

with a single element:

Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id

*Unimplemented*

splitOnAny :: [Array a] -> Fold m a b -> t m a -> t m b Source #

Split on any one of the given patterns.

*Unimplemented*

splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like `splitOnSuffix`

but keeps the suffix attached to the resulting
splits.

`>>>`

`splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)`

`>>>`

[]`splitWithSuffix' (== '.') ""`

`>>>`

["."]`splitWithSuffix' (== '.') "."`

`>>>`

["a"]`splitWithSuffix' (== '.') "a"`

`>>>`

[".","a"]`splitWithSuffix' (== '.') ".a"`

`>>>`

["a."]`splitWithSuffix' (== '.') "a."`

`>>>`

["a.","b"]`splitWithSuffix' (== '.') "a.b"`

`>>>`

["a.","b."]`splitWithSuffix' (== '.') "a.b."`

`>>>`

["a.",".","b.","."]`splitWithSuffix' (== '.') "a..b.."`

*Since: 0.7.0*

splitBySeq :: (IsStream t, MonadAsync m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitOnSeq`

but splits the separator as well, as an infix token.

`>>>`

`splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)`

`>>>`

["h","","e","","l","","l","","o"]`splitOn'_ "" "hello"`

`>>>`

[""]`splitOn'_ "hello" ""`

`>>>`

["","hello",""]`splitOn'_ "hello" "hello"`

`>>>`

["hello"]`splitOn'_ "x" "hello"`

`>>>`

["","h","ello"]`splitOn'_ "h" "hello"`

`>>>`

["hell","o",""]`splitOn'_ "o" "hello"`

`>>>`

["h","e","llo"]`splitOn'_ "e" "hello"`

`>>>`

["he","l","","l","o"]`splitOn'_ "l" "hello"`

`>>>`

["he","ll","o"]`splitOn'_ "ll" "hello"`

*Pre-release*

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitOn`

but the separator is a sequence of elements instead of a
single element.

For illustration, let's define a function that operates on pure lists:

`>>>`

`splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)`

`>>>`

["h","e","l","l","o"]`splitOnSeq' "" "hello"`

`>>>`

[""]`splitOnSeq' "hello" ""`

`>>>`

["",""]`splitOnSeq' "hello" "hello"`

`>>>`

["hello"]`splitOnSeq' "x" "hello"`

`>>>`

["","ello"]`splitOnSeq' "h" "hello"`

`>>>`

["hell",""]`splitOnSeq' "o" "hello"`

`>>>`

["h","llo"]`splitOnSeq' "e" "hello"`

`>>>`

["he","","o"]`splitOnSeq' "l" "hello"`

`>>>`

["he","o"]`splitOnSeq' "ll" "hello"`

`splitOnSeq`

is an inverse of `intercalate`

. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id

`>>>`

`splitOnSeq pat f = Stream.foldManyPost (Fold.takeEndBySeq_ pat f)`

*Pre-release*

splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitSuffixBy`

but the separator is a sequence of elements, instead
of a predicate for a single element.

`>>>`

`splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)`

`>>>`

[]`splitOnSuffixSeq_ "." ""`

`>>>`

[""]`splitOnSuffixSeq_ "." "."`

`>>>`

["a"]`splitOnSuffixSeq_ "." "a"`

`>>>`

["","a"]`splitOnSuffixSeq_ "." ".a"`

`>>>`

["a"]`splitOnSuffixSeq_ "." "a."`

`>>>`

["a","b"]`splitOnSuffixSeq_ "." "a.b"`

`>>>`

["a","b"]`splitOnSuffixSeq_ "." "a.b."`

`>>>`

["a","","b",""]`splitOnSuffixSeq_ "." "a..b.."`

lines = splitOnSuffixSeq "\n"

`splitOnSuffixSeq`

is an inverse of `intercalateSuffix`

. The following law
always holds:

intercalateSuffix . splitOnSuffixSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitSuffixOn . intercalateSuffix == id

`>>>`

`splitOnSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq_ pat f)`

*Pre-release*

splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitOnSuffixSeq`

but keeps the suffix intact in the splits.

`>>>`

`splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)`

`>>>`

[]`splitWithSuffixSeq' "." ""`

`>>>`

["."]`splitWithSuffixSeq' "." "."`

`>>>`

["a"]`splitWithSuffixSeq' "." "a"`

`>>>`

[".","a"]`splitWithSuffixSeq' "." ".a"`

`>>>`

["a."]`splitWithSuffixSeq' "." "a."`

`>>>`

["a.","b"]`splitWithSuffixSeq' "." "a.b"`

`>>>`

["a.","b."]`splitWithSuffixSeq' "." "a.b."`

`>>>`

["a.",".","b.","."]`splitWithSuffixSeq' "." "a..b.."`

`>>>`

`splitWithSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq pat f)`

*Pre-release*

splitOnSuffixSeqAny :: [Array a] -> Fold m a b -> t m a -> t m b Source #

Split post any one of the given patterns.

*Unimplemented*

### Keyed Window Classification

Split the stream into chunks or windows by position or time. Each window can be associated with a key, all events associated with a particular key in the window can be folded to a single result. The window termination can be dynamically controlled by the fold.

The term "chunk" is used for a window defined by position of elements and the term "session" is used for a time window.

#### Tumbling Windows

A new window starts after the previous window is finished.

classifySessionsByGeneric Source #

:: forall t m f a b. (IsStream t, MonadAsync m, IsMap f) | |

=> Proxy (f :: Type -> Type) | |

-> Double | timer tick in seconds |

-> Bool | reset the timer when an event is received |

-> (Int -> m Bool) | predicate to eject sessions based on session count |

-> Double | session timeout in seconds |

-> Fold m a b | Fold to be applied to session data |

-> t m (AbsTime, (Key f, a)) | timestamp, (session key, session data) |

-> t m (Key f, b) | session key, fold result |

:: (IsStream t, MonadAsync m, Ord k) | |

=> Double | timer tick in seconds |

-> Bool | reset the timer when an event is received |

-> (Int -> m Bool) | predicate to eject sessions based on session count |

-> Double | session timeout in seconds |

-> Fold m a b | Fold to be applied to session data |

-> t m (AbsTime, (k, a)) | timestamp, (session key, session data) |

-> t m (k, b) | session key, fold result |

`classifySessionsBy tick keepalive predicate timeout fold stream`

classifies an input event `stream`

consisting of ```
(timestamp, (key,
value))
```

into sessions based on the `key`

, folding all the values
corresponding to the same key into a session using the supplied `fold`

.

When the fold terminates or a `timeout`

occurs, a tuple consisting of the
session key and the folded value is emitted in the output stream. The
timeout is measured from the first event in the session. If the `keepalive`

option is set to `True`

the timeout is reset to 0 whenever an event is
received.

The `timestamp`

in the input stream is an absolute time from some epoch,
characterizing the time when the input event was generated. The notion of
current time is maintained by a monotonic event time clock using the
timestamps seen in the input stream. The latest timestamp seen till now is
used as the base for the current time. When no new events are seen, a timer
is started with a clock resolution of `tick`

seconds. This timer is used to
detect session timeouts in the absence of new events.

To ensure an upper bound on the memory used the number of sessions can be
limited to an upper bound. If the ejection `predicate`

returns `True`

, the
oldest session is ejected before inserting a new session.

When the stream ends any buffered sessions are ejected immediately.

If a session key is received even after a session has finished, another session is created for that key.

`>>>`

Stream.mapM_ print $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList) $ Stream.timestamped $ Stream.delay 0.1 $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c']) :} (1,"abc") (2,"abc") (3,"abc")`:{`

*Pre-release*

:: (IsStream t, MonadAsync m, Ord k) | |

=> (Int -> m Bool) | predicate to eject sessions on session count |

-> Double | time window size |

-> Fold m a b | Fold to be applied to session data |

-> t m (AbsTime, (k, a)) | timestamp, (session key, session data) |

-> t m (k, b) |

Same as `classifySessionsBy`

with a timer tick of 1 second and keepalive
option set to `False`

.

classifySessionsOf = classifySessionsBy 1 False

*Pre-release*

#### Keep Alive Windows

The window size is extended if an event arrives within the specified window size. This can represent sessions with idle or inactive timeout.

classifyKeepAliveSessions Source #

:: (IsStream t, MonadAsync m, Ord k) | |

=> (Int -> m Bool) | predicate to eject sessions on session count |

-> Double | session inactive timeout |

-> Fold m a b | Fold to be applied to session payload data |

-> t m (AbsTime, (k, a)) | timestamp, (session key, session data) |

-> t m (k, b) |

Same as `classifySessionsBy`

with a timer tick of 1 second and keepalive
option set to `True`

.

classifyKeepAliveSessions = classifySessionsBy 1 True

*Pre-release*

## Reduce By Parsers

### Generic Parsing

Apply parsers on a stream.

parseMany :: (IsStream t, Monad m) => Parser a m b -> t m a -> t m (Either ParseError b) Source #

Apply a `Parser`

repeatedly on a stream and emit the parsed values in the
output stream.

This is the streaming equivalent of the `many`

parse combinator.

`>>>`

[Right 3,Right 7,Right 11,Right 15,Right 19]`Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10]`

> Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\nworld" ["hello\n","world"]

foldMany f = parseMany (fromFold f)

Known Issues: When the parser fails there is no way to get the remaining stream.

*Pre-release*

parseManyD :: (IsStream t, Monad m) => Parser a m b -> t m a -> t m (Either ParseError b) Source #

Same as parseMany but for StreamD streams.

*Internal*

parseManyTill :: Parser a m b -> Parser a m x -> t m a -> t m b Source #

`parseManyTill collect test stream`

tries the parser `test`

on the input,
if `test`

fails it backtracks and tries `collect`

, after `collect`

succeeds
`test`

is tried again and so on. The parser stops when `test`

succeeds. The
output of `test`

is discarded and the output of `collect`

is emitted in the
output stream. The parser fails if `collect`

fails.

*Unimplemented*

parseSequence :: t m (Parser a m b) -> t m a -> t m b Source #

Apply a stream of parsers to an input stream and emit the results in the output stream.

*Unimplemented*

parseIterate :: (IsStream t, Monad m) => (b -> Parser a m b) -> b -> t m a -> t m (Either ParseError b) Source #

Iterate a parser generating function on a stream. The initial value `b`

is
used to generate the first parser, the parser is applied on the stream and
the result is used to generate the next parser and so on.

`>>>`

`import Data.Monoid (Sum(..))`

`>>>`

[3,10,21,36,55,55]`Stream.toList $ fmap getSum $ Stream.rights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum $ Stream.fromList [1..10]`

This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.

*Pre-release*

### Grouping

wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like `splitOn`

after stripping leading, trailing, and repeated separators.
Therefore, `".a..b."`

with `.`

as the separator would be parsed as
`["a","b"]`

. In other words, its like parsing words from whitespace
separated text.

`>>>`

`wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)`

`>>>`

[]`wordsBy' (== ',') ""`

`>>>`

[]`wordsBy' (== ',') ","`

`>>>`

["a","b"]`wordsBy' (== ',') ",a,,b,"`

words = wordsBy isSpace

*Since: 0.7.0*

wordsOn :: Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitOn`

but drops any empty splits.

*Unimplemented*

groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #

groups = groupsBy (==) groups = groupsByRolling (==)

Groups contiguous spans of equal elements together in individual groups.

`>>>`

[[1,1],[2,2]]`Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]`

*Since: 0.7.0*

groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

`groupsBy cmp f $ S.fromList [a,b,c,...]`

assigns the element `a`

to the
first group, if `b `cmp` a`

is `True`

then `b`

is also assigned to the same
group. If `c `cmp` a`

is `True`

then `c`

is also assigned to the same
group and so on. When the comparison fails a new group is started. Each
group is folded using the fold `f`

and the result of the fold is emitted in
the output stream.

`>>>`

[[1,3,7],[0,2,5]]`Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]`

*Since: 0.7.0*

groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Unlike `groupsBy`

this function performs a rolling comparison of two
successive elements in the input stream. ```
groupsByRolling cmp f $ S.fromList
[a,b,c,...]
```

assigns the element `a`

to the first group, if `a `cmp` b`

is
`True`

then `b`

is also assigned to the same group. If `b `cmp` c`

is
`True`

then `c`

is also assigned to the same group and so on. When the
comparison fails a new group is started. Each group is folded using the fold
`f`

.

`>>>`

[[1,2,3],[7,8,9]]`Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]`

*Since: 0.7.0*

## Nested splitting

splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

`splitInnerBy splitter joiner stream`

splits the inner containers `f a`

of
an input stream `t m (f a)`

using the `splitter`

function. Container
elements `f a`

are collected until a split occurs, then all the elements
before the split are joined using the `joiner`

function.

For example, if we have a stream of `Array Word8`

, we may want to split the
stream into arrays representing lines separated by 'n' byte such that the
resulting stream after a split would be one array for each line.

CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.

*Pre-release*

splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

Like `splitInnerBy`

but splits assuming the separator joins the segment in
a suffix style.

*Pre-release*

before :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Run the action `m b`

before the stream yields its first element.

Same as the following but more efficient due to fusion:

`>>>`

`before action xs = Stream.nilM action <> xs`

`>>>`

`before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)`

*Since: 0.7.0*

after_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Like `after`

, with following differences:

- action
`m b`

won't run if the stream is garbage collected after partial evaluation. - Monad
`m`

does not require any other constraints. - has slightly better performance than
`after`

.

Same as the following, but with stream fusion:

after_ action xs = xs <> 'nilM' action

*Pre-release*

after :: (IsStream t, MonadRunInIO m) => m b -> t m a -> t m a Source #

bracket_ :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Like `bracket`

but with following differences:

- alloc action
`m b`

runs with async exceptions enabled - cleanup action
`b -> m c`

won't run if the stream is garbage collected after partial evaluation. - does not require a
`MonadAsync`

constraint. - has slightly better performance than
`bracket`

.

*Inhibits stream fusion*

*Pre-release*

bracket :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Run the alloc action `m b`

with async exceptions disabled but keeping
blocking operations interruptible (see `mask`

). Use the
output `b`

as input to `b -> t m a`

to generate an output stream.

`b`

is usually a resource under the state of monad `m`

, e.g. a file
handle, that requires a cleanup after use. The cleanup action `b -> m c`

,
runs whenever the stream ends normally, due to a sync or async exception or
if it gets garbage collected after a partial lazy evaluation.

`bracket`

only guarantees that the cleanup action runs, and it runs with
async exceptions enabled. The action must ensure that it can successfully
cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.

*See also: bracket_*

*Inhibits stream fusion*

*Since: 0.7.0*

bracket' :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a Source #

Like `bracket`

but can use separate cleanup actions depending on the mode
of termination. `bracket' before onStop onGC onException action`

runs
`action`

using the result of `before`

. If the stream stops, `onStop`

action
is executed, if the stream is abandoned `onGC`

is executed, if the stream
encounters an exception `onException`

is executed.

*Pre-release*

onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Run the action `m b`

if the stream aborts due to an exception. The
exception is not caught, simply rethrown.

*Inhibits stream fusion*

*Since: 0.7.0*

finally_ :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Like `finally`

with following differences:

- action
`m b`

won't run if the stream is garbage collected after partial evaluation. - does not require a
`MonadAsync`

constraint. - has slightly better performance than
`finally`

.

*Inhibits stream fusion*

*Pre-release*

finally :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a Source #

Run the action `m b`

whenever the stream `t m a`

stops normally, aborts
due to an exception or if it is garbage collected after a partial lazy
evaluation.

The semantics of running the action `m b`

are similar to the cleanup action
semantics described in `bracket`

.

*See also finally_*

*Inhibits stream fusion*

*Since: 0.7.0*

ghandle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a -> t m a) -> t m a -> t m a Source #

Like `handle`

but the exception handler is also provided with the stream
that generated the exception as input. The exception handler can thus
re-evaluate the stream to retry the action that failed. The exception
handler can again call `ghandle`

on it to retry the action multiple times.

This is highly experimental. In a stream of actions we can map the stream with a retry combinator to retry each action on failure.

*Inhibits stream fusion*

*Pre-release*

handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.

*Inhibits stream fusion*

*Since: 0.7.0*

:: (IsStream t, MonadCatch m, Exception e, Ord e) | |

=> Map e Int | map from exception to retry count |

-> (e -> t m a) | default handler for those exceptions that are not in the map |

-> t m a | |

-> t m a |

`retry`

takes 3 arguments

- A map
`m`

whose keys are exceptions and values are the number of times to retry the action given that the exception occurs. - A handler
`han`

that decides how to handle an exception when the exception cannot be retried. - The stream itself that we want to run this mechanism on.

When evaluating a stream if an exception occurs,

- The stream evaluation aborts
- The exception is looked up in
`m`

a. If the exception exists and the mapped value is > 0 then,

i. The value is decreased by 1.

ii. The stream is resumed from where the exception was called, retrying the action.

b. If the exception exists and the mapped value is == 0 then the stream evaluation stops.

c. If the exception does not exist then we handle the exception using
`han`

.

*Internal*

## Generalize Inner Monad

hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a Source #

Transform the inner monad of a stream using a natural transformation.

* Internal*

generally :: (IsStream t, Monad m) => t Identity a -> t m a Source #

Generalize the inner monad of the stream from `Identity`

to any monad.

* Internal*

## Transform Inner Monad

liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a Source #

Lift the inner monad `m`

of a stream `t m a`

to `tr m`

using the monad
transformer `tr`

.

*Since: 0.8.0*

usingReaderT :: (Monad m, IsStream t) => m r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a Source #

runReaderT :: (IsStream t, Monad m) => m s -> t (ReaderT s m) a -> t m a Source #

Evaluate the inner monad of a stream as `ReaderT`

.

*Since: 0.8.0*

usingStateT :: Monad m => m s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a Source #

Run a stateful (StateT) stream transformation using a given state.

This is supported only for `SerialT`

as concurrent state updation may not be
safe.

usingStateT s f = evalStateT s . f . liftInner

See also: `scanl'`

* Internal*

## Transformation

### Sampling

Value agnostic filtering.

sampleFromThen :: (IsStream t, Monad m, Functor (t m)) => Int -> Int -> t m a -> t m a Source #

`sampleFromthen offset stride`

samples the element at `offset`

index and
then every element at strides of `stride`

.

`>>>`

[2,5,8]`Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10`

*Pre-release*

sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like `sampleInterval`

but samples at the beginning of the time window.

sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one

*Pre-release*

sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Continuously evaluate the input stream and sample the last event in time
window of `n`

seconds.

This is also known as `throttle`

in some libraries.

sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last

*Pre-release*

sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like `sampleBurstEnd`

but samples the event at the beginning of the burst
instead of at the end of it.

*Pre-release*

sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.

This is known as `debounce`

in some libraries.

The clock granularity is 10 ms.

*Pre-release*

### Reordering

sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a Source #

Sort the input stream using a supplied comparison function.

*O(n) space*

Note: this is not the fastest possible implementation as of now.

*Pre-release*

## Nesting

### Set like operations

These are not exactly set operations because streams are not necessarily sets, they may have duplicated elements.

intersectBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

`intersectBy`

is essentially a filtering operation that retains only those
elements in the first stream that are present in the second stream.

`>>>`

[1,2,2]`Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])`

`>>>`

[2,1,1]`Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])`

`intersectBy`

is similar to but not the same as `joinInner`

:

`>>>`

[1,1,2,2]`Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])`

Space: O(n) where `n`

is the number of elements in the second stream.

Time: O(m x n) where `m`

is the number of elements in the first stream and
`n`

is the number of elements in the second stream.

*Pre-release*

intersectBySorted :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like `intersectBy`

but works only on streams sorted in ascending order.

Space: O(1)

Time: O(m+n)

*Pre-release*

differenceBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

Delete first occurrences of those elements from the first stream that are present in the second stream. If an element occurs multiple times in the second stream as many occurrences of it are deleted from the first stream.

`>>>`

[2]`Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])`

The following laws hold:

(s1`serial`

s2) `differenceBy eq` s1 === s2 (s1`wSerial`

s2) `differenceBy eq` s1 === s2

Same as the list `//`

operation.

Space: O(m) where `m`

is the number of elements in the first stream.

Time: O(m x n) where `m`

is the number of elements in the first stream and
`n`

is the number of elements in the second stream.

*Pre-release*

mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

This is essentially an append operation that appends all the extra occurrences of elements from the second stream that are not already present in the first stream.

`>>>`

[1,2,2,4,3]`Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])`

Equivalent to the following except that `s1`

is evaluated only once:

unionBy eq s1 s2 = s1 `serial` (s2 `differenceBy eq` s1)

Similar to `joinOuter`

but not the same.

Space: O(n)

Time: O(m x n)

*Pre-release*

mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

### Join operations

crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b) Source #

This is the same as `outerProduct`

but less
efficient.

The second stream is evaluated multiple times. If the second stream is
consume-once stream then it can be cached in an `Array`

before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.

Time: O(m x n)

*Pre-release*

joinInner :: forall (t :: (Type -> Type) -> Type -> Type) m a b. (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> t m (a, b) Source #

For all elements in `t m a`

, for all elements in `t m b`

if `a`

and `b`

are equal by the given equality pedicate then return the tuple (a, b).

The second stream is evaluated multiple times. If the stream is a
consume-once stream then the caller should cache it (e.g. in a
`Array`

) before calling this function. Caching may also improve
performance if the stream is expensive to evaluate.

For space efficiency use the smaller stream as the second stream.

You should almost always use joinInnerMap instead of joinInner. joinInnerMap is an order of magnitude faster. joinInner may be used when the second stream is generated from a seed, therefore, need not be stored in memory and the amount of memory it takes is a concern.

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

*Pre-release*

joinInnerMap :: (IsStream t, Monad m, Ord k) => t m (k, a) -> t m (k, b) -> t m (k, a, b) Source #

Like `joinInner`

but uses a `Map`

for efficiency.

If the input streams have duplicate keys, the behavior is undefined.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

*Pre-release*

joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) Source #

mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b) Source #

Like `joinLeft`

but works only on sorted streams.

Space: O(1)

Time: O(m + n)

*Unimplemented*

joinLeftMap :: (IsStream t, Ord k, Monad m) => t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b) Source #

Like `joinLeft`

but uses a hashmap for efficiency.

Space: O(n)

Time: O(m + n)

*Pre-release*

mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b) Source #

Like `joinOuter`

but works only on sorted streams.

Space: O(1)

Time: O(m + n)

*Unimplemented*

joinOuterMap :: (IsStream t, Ord k, MonadIO m) => t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b) Source #

Like `joinOuter`

but uses a `Map`

for efficiency.

Space: O(m + n)

Time: O(m + n)

*Pre-release*

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. `maxThreads`

does not affect
`ParallelT`

streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

*Since: 0.4.0 (Streamly)*

*Since: 0.8.0*

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded `maxBuffer`

value (i.e. a negative value)
coupled with an unbounded `maxThreads`

value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when `pure`

is used in `ZipAsyncM`

streams as `pure`

in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.

*Since: 0.4.0 (Streamly)*

*Since: 0.8.0*

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream.
A `Nothing`

value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of `Rate`

specifications is documented under `Rate`

. The
effective maximum production rate achieved by a stream is governed by:

- The
`maxThreads`

limit - The
`maxBuffer`

limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate (r/2) r (2*r) maxBound)`

Specifies the average production rate of a stream in number of yields
per second (i.e. `Hertz`

). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate r r (2*r) maxBound)`

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate (r/2) r r maxBound)`

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate r r r 0)`

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

inspectMode :: IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

*Pre-release*

## Generation

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure fromPure = fromEffect . pure

In Zip applicative streams `fromPure`

is not the same as `pure`

because in that
case `pure`

is equivalent to `repeat`

instead. `fromPure`

and `pure`

are
equally efficient, in other cases `fromPure`

may be slightly more efficient
than the other equivalent definitions.

*Since: 0.8.0 (Renamed yield to fromPure)*

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine hello ["hello"]

*Since: 0.8.0 (Renamed yieldM to fromEffect)*

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

`>>>`

`repeatM = fix . consM`

`>>>`

`repeatM = cycle1 . fromEffect`

Generate a stream by repeatedly executing a monadic action forever.

`>>>`

repeatAsync = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain :}`:{`

*Concurrent, infinite (do not use with fromParallel)*

*Since: 0.2.0*

timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #

`timesWith g`

returns a stream of time value tuples. The first component
of the tuple is an absolute time reference (epoch) denoting the start of the
stream and the second component is a time relative to the reference.

The argument `g`

specifies the granularity of the relative time in seconds.
A lower granularity clock gives higher precision but is more expensive in
terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

`>>>`

`import Control.Concurrent (threadDelay)`

`>>>`

`import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)`

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))`Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

`absTimesWith g`

returns a stream of absolute timestamps using a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

`relTimesWith g`

returns a stream of relative time values starting from 0,
using a clock of granularity `g`

specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

## Elimination

foldContinue :: Monad m => Fold m a b -> SerialT m a -> Fold m a b Source #

We can create higher order folds using `foldContinue`

. We can fold a
number of streams to a given fold efficiently with full stream fusion. For
example, to fold a list of streams on the same sum fold:

concatFold = Prelude.foldl Stream.foldContinue Fold.sum

fold f = Fold.extractM . Stream.foldContinue f

*Internal*

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left `Fold`

and reducing the resulting
expression strictly at each step. The behavior is similar to `foldl'`

. A
`Fold`

can terminate early without consuming the full stream. See the
documentation of individual `Fold`

s for termination behavior.

`>>>`

5050`Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)`

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

`>>>`

0`Stream.fold Fold.sum Stream.nil`

However, `foldMany`

on an empty stream results in an empty stream.
Therefore, `Stream.fold f`

is not the same as ```
Stream.head . Stream.foldMany
f
```

.

fold f = Stream.parse (Parser.fromFold f)

*Since: 0.7.0*

## Transformation

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

`scanlMAfter' accumulate initial done stream`

is like `scanlM'`

except
that it provides an additional `done`

function to be applied on the
accumulator when the stream stops. The result of `done`

is also emitted in
the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

*Pre-release*

postscanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like `postscanl'`

but with a monadic step function and a monadic seed.

`>>>`

`postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs`

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful `mapM`

, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to `scan`

. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.

See also: `scanlM'`

*Pre-release*

foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like `foldMany`

but appends empty fold output if the fold and stream
termination aligns:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

[0]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList []`

`>>>`

[3,7,11,15,9]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]`

`>>>`

[3,7,11,15,19,0]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]`

*Pre-release*

The stateful step function can be simplified to `(s -> a -> m b)`

to provide
a read-only environment. However, that would just be `mapM`

.

The initial action could be `m (s, Maybe b)`

, and we can also add a final
action `s -> m (Maybe b)`

. This can be used to get pre/post scan like
functionality and also to flush the state in the end like scanlMAfter'.
We can also use it along with a fusible version of bracket to get
scanlMAfter' like functionality. See issue #677.

This can be further generalized to a type similar to Fold/Parser, giving it filtering and parsing capability as well (this is in fact equivalent to parseMany):

smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first `n`

elements from the stream and discard the rest.

*Since: 0.1.0*

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

*Since: 0.1.0*

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first `n`

elements from the stream and take the rest.

*Since: 0.1.0*

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

*Since: 0.5.0*

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

`>>>`

h.,e.,l.,l.,o"h,e,l,l,o"`Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"`

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

`>>>`

he.l.l.o."h,e,l,l,o"`Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"`

*Since: 0.5.0*

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every `n`

seconds.

> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o

*Pre-release*

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

`>>>`

`reverse = Stream.foldlT (flip Stream.cons) Stream.nil`

*Since 0.7.0 (Monad m constraint)*

*Since: 0.1.1*

## Concurrent

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

*Pre-release*

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel`

but stops the output as soon as the first stream stops.

*Pre-release*

## Nesting

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

`>>>`

`concatM = Stream.concat . Stream.fromEffect`

`>>>`

`concatM = Stream.concat . lift -- requires (MonadTrans t)`

`>>>`

`concatM = join . lift -- requires (MonadTrans t, Monad (t m))`

*Internal*

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike `concatMap`

, it can produce an
effect at the beginning of each iteration of the inner loop.

*Since: 0.6.0*

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

`>>>`

`concatMap f = Stream.concatMapM (return . f)`

`>>>`

`concatMap f = Stream.concatMapWith Stream.serial f`

`>>>`

`concatMap f = Stream.concat . Stream.map f`

*Since: 0.6.0*

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitOn`

but the separator is a sequence of elements instead of a
single element.

For illustration, let's define a function that operates on pure lists:

`>>>`

`splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)`

`>>>`

["h","e","l","l","o"]`splitOnSeq' "" "hello"`

`>>>`

[""]`splitOnSeq' "hello" ""`

`>>>`

["",""]`splitOnSeq' "hello" "hello"`

`>>>`

["hello"]`splitOnSeq' "x" "hello"`

`>>>`

["","ello"]`splitOnSeq' "h" "hello"`

`>>>`

["hell",""]`splitOnSeq' "o" "hello"`

`>>>`

["h","llo"]`splitOnSeq' "e" "hello"`

`>>>`

["he","","o"]`splitOnSeq' "l" "hello"`

`>>>`

["he","o"]`splitOnSeq' "ll" "hello"`

`splitOnSeq`

is an inverse of `intercalate`

. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id

`>>>`

`splitOnSeq pat f = Stream.foldManyPost (Fold.takeEndBySeq_ pat f)`

*Pre-release*

## Zipping

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipWith`

but using a monadic zipping function.

*Since: 0.4.0*

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream `a`

is evaluated first, followed by stream `b`

, the resulting
elements `a`

and `b`

are then zipped using the supplied zip function and the
result `c`

is yielded to the consumer.

If stream `a`

or stream `b`

ends, the zipped stream ends. If stream `b`

ends
first, the element `a`

from previous evaluation of stream `a`

is discarded.

> D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6]) [5,7,9]

*Since: 0.1.0*

## Deprecated

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the `Enum`

type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.

*Since: 0.6.0*

enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #

`enumerateFrom from`

generates a stream starting with the element
`from`

, enumerating up to `maxBound`

when the type is `Bounded`

or
generating an infinite stream when the type is not `Bounded`

.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int) [0,1,2,3]

For `Fractional`

types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]

*Since: 0.6.0*

enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element `from`

, enumerating
the type up to the value `to`

. If `to`

is smaller than `from`

then an
empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4 [0,1,2,3,4]

For `Fractional`

types, the last element is equal to the specified `to`

value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]

*Since: 0.6.0*

enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #

`enumerateFromThen from then`

generates a stream whose first element
is `from`

, the second element is `then`

and the successive elements are
in increments of `then - from`

. Enumeration can occur downwards or
upwards depending on whether `then`

comes before or after `from`

. For
`Bounded`

types the stream ends when `maxBound`

is reached, for
unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2) [0,-2,-4,-6]

*Since: 0.6.0*

enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #

`enumerateFromThenTo from then to`

generates a finite stream whose
first element is `from`

, the second element is `then`

and the successive
elements are in increments of `then - from`

up to `to`

. Enumeration can
occur downwards or upwards depending on whether `then`

comes before or
after `from`

.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]

*Since: 0.6.0*

##### Instances

### Enumerating `Bounded`

`Enum`

Types

enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

enumerateFromBounded :: (IsStream t, Monad m, Enumerable a, Bounded a) => a -> t m a Source #

enumerateFromBounded = enumerateFromTo from maxBound

`enumerateFrom`

for `Bounded`

`Enum`

types.

*Since: 0.6.0*

### Enumerating `Enum`

Types not larger than `Int`

enumerateFromToSmall :: (IsStream t, Monad m, Enum a) => a -> a -> t m a Source #

`enumerateFromTo`

for `Enum`

types not larger than `Int`

.

*Since: 0.6.0*

enumerateFromThenToSmall :: (IsStream t, Monad m, Enum a) => a -> a -> a -> t m a Source #

`enumerateFromThenTo`

for `Enum`

types not larger than `Int`

.

*Since: 0.6.0*

enumerateFromThenSmallBounded :: (IsStream t, Monad m, Enumerable a, Bounded a) => a -> a -> t m a Source #

`enumerateFromThen`

for `Enum`

types not larger than `Int`

.

Note: We convert the `Enum`

to `Int`

and enumerate the `Int`

. If a
type is bounded but does not have a `Bounded`

instance then we can go on
enumerating it beyond the legal values of the type, resulting in the failure
of `toEnum`

when converting back to `Enum`

. Therefore we require a `Bounded`

instance for this function to be safely used.

*Since: 0.6.0*

### Enumerating `Bounded`

`Integral`

Types

enumerateFromIntegral :: (IsStream t, Monad m, Integral a, Bounded a) => a -> t m a Source #

Enumerate an `Integral`

type. `enumerateFromIntegral from`

generates a
stream whose first element is `from`

and the successive elements are in
increments of `1`

. The stream is bounded by the size of the `Integral`

type.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromIntegral (0 :: Int) [0,1,2,3]

*Since: 0.6.0*

enumerateFromThenIntegral :: (IsStream t, Monad m, Integral a, Bounded a) => a -> a -> t m a Source #

Enumerate an `Integral`

type in steps. ```
enumerateFromThenIntegral from
then
```

generates a stream whose first element is `from`

, the second element
is `then`

and the successive elements are in increments of `then - from`

.
The stream is bounded by the size of the `Integral`

type.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) (-2) [0,-2,-4,-6]

*Since: 0.6.0*

### Enumerating `Integral`

Types

enumerateFromToIntegral :: (IsStream t, Monad m, Integral a) => a -> a -> t m a Source #

Enumerate an `Integral`

type up to a given limit.
`enumerateFromToIntegral from to`

generates a finite stream whose first
element is `from`

and successive elements are in increments of `1`

up to
`to`

.

>>> Stream.toList $ Stream.enumerateFromToIntegral 0 4 [0,1,2,3,4]

*Since: 0.6.0*