Filter Modules

Most of the combinators in this module can be implemented as unfolds. Some of them however can only be expressed in terms StreamK e.g. cons/consM, fromFoldable, mfix. We can possibly remove those from this module which can be expressed as unfolds. Unless we want to use rewrite rules to rewrite them as StreamK when StreamK is used, avoiding conversion to StreamD. Will that help? Are there any other reasons to keep these and not use unfolds?

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil")) "nil" []

*Pre-release*

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as `(return a) `consM` r`

but
more efficient. For concurrent streams this is not concurrent whereas
`consM`

is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]

*Since: 0.1.0*

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]

*Concurrent (do not use fromParallel to construct infinite streams)*

*Since: 0.2.0*

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of `consM`

. We can read it as "`parallel colon`

"
to remember that `|`

comes before `:`

.

> toList $ getLine |: getLine |: nil hello world ["hello","world"]

let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil

*Concurrent (do not use fromParallel to construct infinite streams)*

*Since: 0.2.0*

`Unfold`

unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #

Convert an `Unfold`

into a stream by supplying it an input seed.

`>>>`

hello hello hello`Stream.drain $ Stream.unfold (Unfold.replicateM 3) (putStrLn "hello")`

*Since: 0.7.0*

unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b Source #

Convert an `Unfold`

with a closed input end into a stream.

*Pre-release*

unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

```
unfoldr step s =
case step s of
Nothing ->
````nil`

Just (a, b) -> a `cons` unfoldr step b

Build a stream by unfolding a *pure* step function `step`

starting from a
seed `s`

. The step function returns the next element in the stream and the
next seed value. When it is done it returns `Nothing`

and the stream ends.
For example,

>>> :{ let f b = if b > 3 then Nothing else Just (b, b + 1) in Stream.toList $ Stream.unfoldr f 0 :} [0,1,2,3]

*Since: 0.1.0*

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a *monadic* step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns `Nothing`

and the stream ends. For
example,

>>> :{ let f b = if b > 3 then return Nothing else return (Just (b, b + 1)) in Stream.toList $ Stream.unfoldrM f 0 :} [0,1,2,3]

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

(fromAsync $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

*Concurrent*

*Since: 0.1.0*

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure fromPure = fromEffect . pure

In Zip applicative streams `fromPure`

is not the same as `pure`

because in that
case `pure`

is equivalent to `repeat`

instead. `fromPure`

and `pure`

are
equally efficient, in other cases `fromPure`

may be slightly more efficient
than the other equivalent definitions.

*Since: 0.8.0 (Renamed yield to fromPure)*

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine hello ["hello"]

*Since: 0.8.0 (Renamed yieldM to fromEffect)*

repeat :: (IsStream t, Monad m) => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

*Since: 0.4.0*

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . consM repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

drain $ fromSerial $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) drain $ fromAsync $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)

*Concurrent, infinite (do not use with fromParallel)*

*Since: 0.2.0*

replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #

replicate = take n . repeat

Generate a stream of length `n`

by repeating a value `n`

times.

*Since: 0.6.0*

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

replicateM = take n . repeatM

Generate a stream by performing a monadic action `n`

times. Same as:

drain $ fromSerial $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) drain $ fromAsync $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)

*Concurrent*

*Since: 0.1.1*

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the `Enum`

type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.

*Since: 0.6.0*

enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #

`enumerateFrom from`

generates a stream starting with the element
`from`

, enumerating up to `maxBound`

when the type is `Bounded`

or
generating an infinite stream when the type is not `Bounded`

.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int) [0,1,2,3]

For `Fractional`

types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]

*Since: 0.6.0*

enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element `from`

, enumerating
the type up to the value `to`

. If `to`

is smaller than `from`

then an
empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4 [0,1,2,3,4]

For `Fractional`

types, the last element is equal to the specified `to`

value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]

*Since: 0.6.0*

enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #

`enumerateFromThen from then`

generates a stream whose first element
is `from`

, the second element is `then`

and the successive elements are
in increments of `then - from`

. Enumeration can occur downwards or
upwards depending on whether `then`

comes before or after `from`

. For
`Bounded`

types the stream ends when `maxBound`

is reached, for
unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2) [0,-2,-4,-6]

*Since: 0.6.0*

enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #

`enumerateFromThenTo from then to`

generates a finite stream whose
first element is `from`

, the second element is `then`

and the successive
elements are in increments of `then - from`

up to `to`

. Enumeration can
occur downwards or upwards depending on whether `then`

comes before or
after `from`

.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]

*Since: 0.6.0*

enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64) Source #

`times`

returns a stream of time value tuples with clock of 10 ms
granularity. The first component of the tuple is an absolute time reference
(epoch) denoting the start of the stream and the second component is a time
relative to the reference.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))`Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime Source #

`absTimes`

returns a stream of absolute timestamps using a clock of 10 ms
granularity.

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

`absTimesWith g`

returns a stream of absolute timestamps using a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64 Source #

`relTimes`

returns a stream of relative time values starting from 0,
using a clock of granularity 10 ms.

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

`relTimesWith g`

returns a stream of relative time values starting from 0,
using a clock of granularity `g`

specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

durations :: Double -> t m RelTime64 Source #

`durations g`

returns a stream of relative time values measuring the time
elapsed since the immediate predecessor element of the stream was generated.
The first element of the stream is always 0. `durations`

uses a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
Durations lower than 1 ms will be 0.

Note: This API is not safe on 32-bit machines.

*Unimplemented*

ticks :: Rate -> t m () Source #

Generate ticks at the specified rate. The rate is adaptive, the tick
generation speed can be increased or decreased at different times to achieve
the specified rate. The specific behavior for different styles of `Rate`

specifications is documented under `Rate`

. The effective maximum rate
achieved by a stream is governed by the processor speed.

*Unimplemented*

timeout :: AbsTime -> t m () Source #

Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.

*Unimplemented*

fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #

fromIndices f = fmap f $ Stream.enumerateFrom 0 fromIndices f = let g i = f i `cons` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a function `f`

applied on the corresponding index. Index starts at 0.

`>>>`

[0,1,2,3,4]`Stream.toList $ Stream.take 5 $ Stream.fromIndices id`

*Since: 0.6.0*

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0 fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a monadic
function `f`

applied on the corresponding index. Index starts at 0.

*Concurrent*

*Since: 0.6.0*

iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #

iterate f x = x `cons` iterate f x

Generate an infinite stream with `x`

as the first element and each
successive element derived by applying the function `f`

on the previous
element.

>>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1 [1,2,3,4,5]

*Since: 0.1.2*

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

iterateM f m = m >>= a -> return a `consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action
`m`

and each successive element derived by applying the monadic function
`f`

on the previous element.

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

drain $ fromSerial $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0) drain $ fromAsync $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

*Concurrent*

*Since: 0.1.2*

*Since: 0.7.0 (signature change)*

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

We can define cyclic structures using `let`

:

`>>>`

([1,1],1)`let (a, b) = ([1, b], head a) in (a, b)`

The function `fix`

defined as:

fix f = let x = f x in x

ensures that the argument of a function and its output refer to the same
lazy value `x`

i.e. the same location in memory. Thus `x`

can be defined
in terms of itself, creating structures with cyclic references.

`>>>`

`import Data.Function (fix)`

`>>>`

`f ~(a, b) = ([1, b], head a)`

`>>>`

([1,1],1)`fix f`

`mfix`

is essentially the same as `fix`

but for monadic
values.

Using `mfix`

for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
itself.

In the following example, the argument `action`

of the function `f`

represents the tuple `(x,y)`

returned by it in a given iteration. We define
the first element of the tuple in terms of the second.

import Streamly.Internal.Data.Stream.IsStream as Stream import System.IO.Unsafe (unsafeInterleaveIO) main = do Stream.mapM_ print $ Stream.mfix f where f action = do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act x <- Stream.fromListM [incr 1 action, incr 2 action] y <- Stream.fromList [4,5] return (x, y)

Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.

Note that the function `f`

must be lazy in its argument, that's why we use
`unsafeInterleaveIO`

on `action`

because IO monad is strict.

*Pre-release*

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList =`foldr`

`cons`

`nil`

Construct a stream from a list of pure values. This is more efficient than
`fromFoldable`

for serial streams.

*Since: 0.4.0*

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

fromListM =`foldr`

`consM`

`nil`

Construct a stream from a list of monadic actions. This is more efficient
than `fromFoldableM`

for serial streams.

*Since: 0.4.0*

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

fromFoldableM =`foldr`

`consM`

`nil`

Construct a stream from a `Foldable`

containing monadic actions.

drain $ fromSerial $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) drain $ fromAsync $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)

*Concurrent (do not use with fromParallel on infinite containers)*

*Since: 0.3.0*

fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #

Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.

*Pre-release*

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

*Since: 0.2.0*

each :: (IsStream t, Foldable f) => f a -> t m a Source #

Deprecated: Please use fromFoldable instead.

Same as `fromFoldable`

.

*Since: 0.1.0*

fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

Read lines from an IO Handle into a stream of Strings.

*Since: 0.1.0*

currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

Deprecated: Please use absTimes instead

streamly-0.8.0**Streamly.Internal.Data.Stream.IsStream.Generate**