Pass through a `Pipe`

.

transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #

Use a `Pipe`

to transform a stream.

*Pre-release*

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Right fold to a streaming monad.

foldrS Stream.cons Stream.nil === id

`foldrS`

can be used to perform stateless stream to stream transformations
like map and filter in general. It can be coupled with a scan to perform
stateful transformations. However, note that the custom map and filter
routines can be much more efficient than this due to better stream fusion.

`>>>`

[1,2,3,4,5]`Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5]`

Find if any element in the stream is `True`

:

`>>>`

[True]`Stream.toList $ Stream.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)`

Map (+2) on odd elements and filter out the even elements:

`>>>`

[3,5,7]`Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)`

`foldrM`

can also be represented in terms of `foldrS`

, however, the former
is much more efficient:

foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

*Pre-release*

foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right fold to a transformer monad. This is the most general right fold
function. `foldrS`

is a special case of `foldrT`

, however `foldrS`

implementation can be more efficient:

foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

`foldrT`

can be used to translate streamly streams to other transformer
monads e.g. to a different streaming type.

*Pre-release*

Stateless one-to-one maps.

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

sequence = mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"] abc >>> :{ drain $ Stream.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (fromSerial . Stream.sequence) :} 1 ... 1 >>> :{ drain $ Stream.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (fromAsync . Stream.sequence) :} 1 ... 1

*Concurrent (do not use with fromParallel on infinite streams)*

*Since: 0.1.0*

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapM f = sequence . map f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"] abc >>> :{ drain $ Stream.replicateM 10 (return 1) & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x)) :} 1 ... 1 > drain $ Stream.replicateM 10 (return 1) & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))

*Concurrent (do not use with fromParallel on infinite streams)*

*Since: 0.1.0*

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful `mapM`

, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to `scan`

. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.

See also: `scanlM'`

*Pre-release*

See also the intersperse*_ combinators.

trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

>>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2) 1 2

Compare with `tap`

.

*Since: 0.7.0*

trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Perform a side effect before yielding each element of the stream and discard the results.

>>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2) "got here" "got here"

Same as `interspersePrefix_`

but always serial.

See also: `trace`

*Pre-release*

tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #

Tap the data flowing through a stream into a `Fold`

. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.

Fold m a b | -----stream m a ---------------stream m a-----

`>>>`

1 2`Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)`

Compare with `trace`

.

*Since: 0.7.0*

tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #

`tapOffsetEvery offset n`

taps every `n`

th element in the stream
starting at `offset`

. `offset`

can be between `0`

and `n - 1`

. Offset 0
means start at the first element in the stream. If the offset is outside
this range then `offset `

is used as offset.`mod`

n

`>>>`

[0,2,4,6,8,10]`Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10`

tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing `maxBuffer`

setting.

Stream m a -> m b | -----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2) 1 2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

Compare with `tap`

.

*Pre-release*

tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #

Calls the supplied function with the number of elements consumed
every `n`

seconds. The given function is run in a separate thread
until the end of the stream. In case there is an exception in the
stream the thread is killed during the next major GC.

Note: The action is not guaranteed to run if the main thread exits.

> delay n = threadDelay (round $ n * 1000000) >> return n > Stream.toList $ Stream.tapRate 2 (n -> print $ show n ++ " elements processed") (delay 1 Stream.|: delay 0.5 Stream.|: delay 0.5 Stream.|: Stream.nil) "2 elements processed" [1.0,0.5,0.5] "1 elements processed"

Note: This may not work correctly on 32-bit machines.

*Pre-release*

pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #

`pollCounts predicate transform fold stream`

counts those elements in the
stream that pass the `predicate`

. The resulting count stream is sent to
another thread which transforms it using `transform`

and then folds it using
`fold`

. The thread is automatically cleaned up if the stream stops or
aborts due to exception.

For example, to print the count of elements processed every second:

> Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print) $ Stream.enumerateFrom 0

Note: This may not work correctly on 32-bit machines.

*Pre-release*

`Fold`

scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Scan a stream using the given monadic fold.

`>>>`

[0,1,3,6]`Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])`

*Since: 0.7.0*

postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

`>>>`

`import Data.Maybe (fromJust)`

`>>>`

`let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)`

`>>>`

Stream.toList $ Stream.map (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0) :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]`:{`

*Since: 0.7.0*

Left scans. Stateful, mostly one-to-one maps.

scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like `map`

, `scanl'`

too is a one to one transformation,
however it adds an extra element.

>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]

>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of `scanl'`

is the initial value of the accumulator followed by
all the intermediate steps and the final result of `foldl'`

.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product
of the elements in a stream in one go using a `foldl'`

:

>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4

Using `scanl'`

we can make it modular by computing the sum in the first
stage and passing it down to the next stage for computing the product:

>>> :{ Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1) $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1) $ Stream.fromList [1,2,3,4] :} (10,24)

IMPORTANT: `scanl'`

evaluates the accumulator to WHNF. To avoid building
lazy expressions inside the accumulator, it is recommended that a strict
data structure is used for accumulator.

See also: `usingStateT`

*Since: 0.2.0*

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like `scanl'`

but with a monadic step function and a monadic seed.

*Since: 0.4.0*

*Since: 0.8.0 (signature change)*

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

`scanlMAfter' accumulate initial done stream`

is like `scanlM'`

except
that it provides an additional `done`

function to be applied on the
accumulator when the stream stops. The result of `done`

is also emitted in
the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

*Pre-release*

postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like `scanl'`

but does not stream the initial value of the accumulator.

postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs

*Since: 0.7.0*

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like `postscanl'`

but with a monadic step function and a monadic seed.

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the final value of the accumulator.

*Pre-release*

prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like prescanl' but with a monadic step function and a monadic seed.

*Pre-release*

scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #

Like `scanl'`

but for a non-empty stream. The first element of the stream
is used as the initial value of the accumulator. Does nothing if the stream
is empty.

>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4] [1,3,6,10]

*Since: 0.6.0*

scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #

Like `scanl1'`

but with a monadic step function.

*Since: 0.6.0*

Produce a subset of the stream using criteria based on the values of the elements. We can use a concatMap and scan for filtering but these combinators are more efficient and convenient.

with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a Source #

Modify a `t m a -> t m a`

stream transformation that accepts a predicate
`(a -> b)`

to accept `((s, a) -> b)`

instead, provided a transformation ```
t m
a -> t m (s, a)
```

. Convenient to filter with index or time.

filterWithIndex = with indexed filter filterWithAbsTime = with timestamped filter filterWithRelTime = with timeIndexed filter

*Pre-release*

deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5] [1,3,5]

*Since: 0.6.0*

filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

*Since: 0.1.0*

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as `filter`

but with a monadic predicate.

*Since: 0.4.0*

uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #

Drop repeated elements that are adjacent to each other.

*Since: 0.6.0*

nubBy :: (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements anywhere in the stream.

*Caution: not scalable for infinite streams*

*See also: nubWindowBy*

*Unimplemented*

nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements within the specified tumbling window in the stream.

nubBy = nubWindowBy maxBound

*Unimplemented*

prune :: (a -> Bool) -> t m a -> t m a Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)

> Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!"

Space: `O(1)`

*Unimplemented*

Produce a subset of the stream trimmed at ends.

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first `n`

elements from the stream and discard the rest.

*Since: 0.1.0*

takeInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #

`takeInterval duration`

yields stream elements upto specified time
`duration`

. The duration starts when the stream is evaluated for the first
time, before the first element is yielded. The time duration is checked
before generating each element, if the duration has expired the stream
stops.

The total time taken in executing the stream is guaranteed to be *at least*
`duration`

, however, because the duration is checked before generating an
element, the upper bound is indeterminate and depends on the time taken in
generating and processing the last element.

No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.

*Pre-release*

takeLast :: Int -> t m a -> t m a Source #

Take `n`

elements at the end of the stream.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeLastInterval :: Double -> t m a -> t m a Source #

Take time interval `i`

seconds at the end of the stream.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

*Since: 0.1.0*

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as `takeWhile`

but with a monadic predicate.

*Since: 0.4.0*

takeWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Take all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements taken.

*Unimplemented*

takeWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like `takeWhile`

and `takeWhileLast`

combined.

O(n) space, where n is the number elements taken from the end.

*Unimplemented*

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first `n`

elements from the stream and take the rest.

*Since: 0.1.0*

dropInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #

`dropInterval duration`

drops stream elements until specified `duration`

has
passed. The duration begins when the stream is evaluated for the first
time. The time duration is checked *after* generating a stream element, the
element is yielded if the duration has expired otherwise it is dropped.

The time elapsed before starting to generate the first element is *at most*
`duration`

, however, because the duration expiry is checked after the
element is generated, the lower bound is indeterminate and depends on the
time taken in generating an element.

All elements are yielded if the duration is zero.

*Pre-release*

dropLast :: Int -> t m a -> t m a Source #

Drop `n`

elements at the end of the stream.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropLastInterval :: Int -> t m a -> t m a Source #

Drop time interval `i`

seconds at the end of the stream.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

*Since: 0.1.0*

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as `dropWhile`

but with a monadic predicate.

*Since: 0.4.0*

dropWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Drop all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements dropped.

*Unimplemented*

dropWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like `dropWhile`

and `dropWhileLast`

combined.

O(n) space, where n is the number elements dropped from the end.

*Unimplemented*

Produce a superset of the stream. This is the opposite of filtering/sampling. We can always use concatMap and scan for inserting but these combinators are more efficient and convenient.

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

Insert a pure value between successive elements of a stream.

`>>>`

"h,e,l,l,o"`Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"`

*Since: 0.7.0*

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

`>>>`

h.,e.,l.,l.,o"h,e,l,l,o"`Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"`

*Since: 0.5.0*

intersperseBySpan :: Int -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every `n`

elements.

> Stream.toList $ Stream.intersperseBySpan 2 (return ',') $ Stream.fromList "hello" "he,ll,o"

*Unimplemented*

intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a Source #

Insert an effect and its output after consuming an element of a stream.

`>>>`

h.,e.,l.,l.,o.,"h,e,l,l,o,"`Stream.toList $ Stream.trace putChar $ intersperseSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"`

*Pre-release*

intersperseSuffixBySpan :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a Source #

Like `intersperseSuffix`

but intersperses an effectful action into the
input stream after every `n`

elements and after the last element.

`>>>`

"he,ll,o,"`Stream.toList $ Stream.intersperseSuffixBySpan 2 (return ',') $ Stream.fromList "hello"`

*Pre-release*

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every `n`

seconds.

> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o

*Pre-release*

intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream except the first one.

`>>>`

h.e.l.l.o`Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"`

*Pre-release*

delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of the stream except the first one.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Since: 0.8.0*

intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect after consuming an element of a stream.

>>> Stream.mapM_ putChar $ Stream.intersperseSuffix_ (threadDelay 1000000) $ Stream.fromList "hello" hello

*Pre-release*

delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds after consuming an element of a stream.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream.

`>>>`

.h.e.l.l.o"hello"`Stream.toList $ Stream.trace putChar $ Stream.interspersePrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello"`

Same as `trace_`

but may be concurrent.

*Concurrent*

*Pre-release*

delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of a stream.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

Opposite of filtering

insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

`insertBy cmp elem stream`

inserts `elem`

before the first element in
`stream`

that is less than `elem`

when compared using `cmp`

.

insertBy cmp x =`mergeBy`

cmp (`fromPure`

x)

>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5] [1,2,3,5]

*Since: 0.6.0*

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

*Since 0.7.0 (Monad m constraint)*

*Since: 0.1.1*

reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b Source #

Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.

*Unimplemented*

indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #

indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)

Pair each element in a stream with its index, starting from index 0.

`>>>`

[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]`Stream.toList $ Stream.indexed $ Stream.fromList "hello"`

*Since: 0.6.0*

indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #

indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))

Pair each element in a stream with its index, starting from the
given index `n`

and counting down.

`>>>`

[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]`Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"`

*Since: 0.6.0*

timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a) Source #

timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a) Source #

Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)`Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.

`>>>`

(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)`Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.

`>>>`

(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)`Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3`

*Pre-release*

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

*Since: 0.5.0*

elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

elemIndices a = findIndices (== a)

*Since: 0.5.0*

Map using the previous element.

rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #

Like `rollingMap`

but with an effectful map function.

*Pre-release*

rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #

Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.

This is the stream equivalent of the list idiom `zipWith f xs (tail xs)`

.

*Pre-release*

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

Run streaming stages concurrently.

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = D.fromStreamD . mkParallelD . D.toStreamD

*Pre-release*

applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #

Same as `|$`

.

*Internal*

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation
function `t m a -> t m b`

to a stream `t m a`

concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. `|$`

is just like regular function application
operator `$`

except that it is concurrent.

If you read the signature as `(t m a -> t m b) -> (t m a -> t m b)`

you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

`>>>`

Stream.drain $ Stream.mapM (\x -> threadDelay 1000000 >> print x) |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1`:{`

*Concurrent*

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. `maxThreads`

does not affect
`ParallelT`

streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

*Since: 0.4.0 (Streamly)*

*Since: 0.8.0*

Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end or keep dropping elements uniformly to match the rate of the consumer.

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded `maxBuffer`

value (i.e. a negative value)
coupled with an unbounded `maxThreads`

value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when `pure`

is used in `ZipAsyncM`

streams as `pure`

in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.

*Since: 0.4.0 (Streamly)*

*Since: 0.8.0*

sampleOld :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the oldest `n`

elements in the buffer, discard the new ones when the buffer is full. When
the output stream is evaluated it consumes the values from the buffer in a
FIFO manner.

*Unimplemented*

sampleNew :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the latest `n`

elements in a ring buffer, keep discarding the older ones to make space for
the new ones. When the output stream is evaluated it consumes the values
from the buffer in a FIFO manner.

*Unimplemented*

sampleRate :: Double -> t m a -> t m a Source #

Evaluate the stream at uniform intervals to maintain a specified evaluation rate.

Specifies the stream yield rate in yields per second (`Hertz`

).
We keep accumulating yield credits at `rateGoal`

. At any point of time we
allow only as many yields as we have accumulated as per `rateGoal`

since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed `rateGoal`

. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than `rateBuffer`

we try to recover only as
much as `rateBuffer`

.

`rateLow`

puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, `rateHigh`

puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.

If the `rateGoal`

is 0 or negative the stream never yields a value.
If the `rateBuffer`

is 0 or negative we do not attempt to recover.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream.
A `Nothing`

value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of `Rate`

specifications is documented under `Rate`

. The
effective maximum production rate achieved by a stream is governed by:

- The
`maxThreads`

limit - The
`maxBuffer`

limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate (r/2) r (2*r) maxBound)`

Specifies the average production rate of a stream in number of yields
per second (i.e. `Hertz`

). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate r r (2*r) maxBound)`

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate (r/2) r r maxBound)`

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as `rate (Just $ Rate r r r 0)`

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

*Since: 0.5.0 (Streamly)*

*Since: 0.8.0*

inspectMode :: IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

*Pre-release*

scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Deprecated: Please use scanl followed by map instead.

Strict left scan with an extraction function. Like `scanl'`

, but applies a
user supplied extraction function (the third argument) at each step. This is
designed to work with the `foldl`

library. The suffix `x`

is a mnemonic for
extraction.

*Since 0.2.0*

*Since: 0.7.0 (Monad m constraint)*

streamly-0.8.0**Streamly.Internal.Data.Stream.IsStream.Transform**