Filter Modules

To run examples in this module:

>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Prelude as Stream

We will add some more imports in the examples as needed.

For effectful streams we will use the following IO action that blocks for n seconds:

>>> import Control.Concurrent (threadDelay)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int
:}
>>> delay 1
1 sec
1

Overview

Streamly is a framework for modular data flow based programming and declarative concurrency. Powerful stream fusion framework in streamly allows high performance combinatorial programming even when using byte level streams. Streamly API is similar to Haskell lists.

The basic stream type is SerialT. The type SerialT IO a is an effectful equivalent of a list [a] using the IO monad. Streams can be constructed like lists, except that they use nil instead of '[]' and cons instead of :.

cons constructs a pure stream which is more or less the same as a list:

>>> import Streamly.Prelude (SerialT, cons, consM, nil)
>>> stream = 1 `cons` 2 `cons` nil :: SerialT IO Int
>>> Stream.toList stream -- IO [Int]
[1,2]

consM constructs a stream from effectful actions:

>>> stream = delay 1 `consM` delay 2 `consM` nil
>>> Stream.toList stream
1 sec
2 sec
[1,2]

Console Echo Program

In the following example, repeatM generates an infinite stream of String by repeatedly performing the getLine IO action. mapM then applies putStrLn on each element in the stream converting it to stream of (). Finally, drain folds the stream to IO discarding the () values, thus producing only effects.

>>> import Data.Function ((&))
> :{
 Stream.repeatM getLine      -- SerialT IO String
     & Stream.mapM putStrLn  -- SerialT IO ()
     & Stream.drain          -- IO ()
:}

This is a console echo program. It is an example of a declarative loop written using streaming combinators. Compare it with an imperative while loop.

Hopefully, this gives you an idea how we can program declaratively by representing loops using streams. In this module, you can find all Data.List like functions and many more powerful combinators to perform common programming tasks. Also see Streamly.Internal.Data.Stream.IsStream module for many more Pre-release combinators. See the https://github.com/composewell/streamly-examples repository for many more real world examples of stream programming.

Polymorphic Combinators

Streamly has several stream types, SerialT is one type of stream with serial execution of actions, AsyncT is another with concurrent execution. The combinators in this module are polymorphic in stream type. For example,

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a

t is the stream type, m is the underlying Monad of the stream (e.g. IO) and a is the type of elements in the stream (e.g. Int).

Stream elimination combinators accept a SerialT type instead of a polymorphic type to force a concrete monomorphic type by default, reducing type errors. That's why in the console echo example above the stream type is SerialT.

drain :: Monad m => SerialT m a -> m ()

We can force a certain stream type in polymorphic code by using "Stream Type Adaptors". For example, to force AsyncT:

>>> Stream.drain $ Stream.fromAsync $ Stream.replicateM 10 $ delay 1
...

Combining two streams

Two streams can be combined to form a single stream in various interesting ways. serial (append), wSerial (interleave), ahead (concurrent, ordered append), async (lazy concurrent, unordered append) , wAsync (lazy concurrent, unordered interleave), parallel (strict concurrent merge), zipWith, zipAsyncWith (concurrent zip), mergeBy, mergeAsyncBy (concurrent merge) are some ways of combining two streams.

For example, the parallel combinator schedules both the streams concurrently.

>>> stream1 = Stream.fromListM [delay 3, delay 4]
>>> stream2 = Stream.fromListM [delay 1, delay 2]
>>> Stream.toList $ stream1 `parallel` stream2
...

We can chain the operations to combine more than two streams:

>>> stream3 = Stream.fromListM [delay 1, delay 2]
>>> Stream.toList $ stream1 `parallel` stream2 `parallel` stream3
...

Concurrent generation (consM) and concurrent merging of streams is the fundamental basis of all concurrency in streamly.

Combining many streams

The concatMapWith combinator can be used to generalize the two stream combining combinators to n streams. For example, we can use concatMapWith parallel to read concurrently from all incoming network connections and combine the input streams into a single output stream:

import qualified Streamly.Network.Inet.TCP as TCP
import qualified Streamly.Network.Socket as Socket

Stream.unfold TCP.acceptOnPort 8090
 & Stream.concatMapWith Stream.parallel (Stream.unfold Socket.read)

See the streamly-examples repository for a full working example.

Concurrent Nested Loops

The Monad instance of SerialT is an example of nested looping. It is in fact a list transformer. Different stream types provide different variants of nested looping. For example, the Monad instance of ParallelT uses concatMapWith parallel as its bind operation. Therefore, each iteration of the loop for ParallelT stream can run concurrently. See the documentation for individual stream types for the specific execution behavior of the stream as well as the behavior of Semigroup and Monad instances.

Stream Types

Streamly has several stream types. These types differ in three fundamental operations, consM (IsStream instance), <> (Semigroup instance) and >>= (Monad instance). Below we will see how consM behaves for SerialT, AsyncT and AheadT stream types.

SerialT executes actions serially, so the total delay in the following example is 2 + 1 = 3 seconds:

>>> stream = delay 2 `consM` delay 1 `consM` nil
>>> Stream.toList stream -- IO [Int]
2 sec
1 sec
[2,1]

AsyncT executes the actions concurrently, so the total delay is max 2 1 = 2 seconds:

>>> Stream.toList $ Stream.fromAsync stream -- IO [Int]
1 sec
2 sec
[1,2]

AsyncT produces the results in the order in which execution finishes. Notice the order of elements in the list above, it is not the same as the order of actions in the stream.

AheadT is similar to AsyncT but the order of results is the same as the order of actions, even though they execute concurrently:

>>> Stream.toList $ Stream.fromAhead stream -- IO [Int]
1 sec
2 sec
[2,1]

Semigroup Instance

Earlier we distinguished stream types based on the execution behavior of actions within a stream. Stream types are also distinguished based on how actions from different streams are scheduled for execution when two streams are combined together.

For example, both SerialT and WSerialT execute actions within the stream serially, however, they differ in how actions from individual streams are executed when two streams are combined with <> (the Semigroup instance).

For SerialT, <> has an appending behavior i.e. it executes the actions from the second stream after executing actions from the first stream:

>>> stream1 = Stream.fromListM [delay 1, delay 2]
>>> stream2 = Stream.fromListM [delay 3, delay 4]
>>> Stream.toList $ stream1 <> stream2
1 sec
2 sec
3 sec
4 sec
[1,2,3,4]

For WSerialT, <> has an interleaving behavior i.e. it executes one action from the first stream and then one action from the second stream and so on:

>>> Stream.toList $ Stream.fromWSerial $ stream1 <> stream2
1 sec
3 sec
2 sec
4 sec
[1,3,2,4]

The <> operation of SerialT and WSerialT is the same as serial and wSerial respectively. The serial combinator combines two streams of any type in the same way as a serial stream combines.

Concurrent Combinators

Like consM, there are several other stream generation operations whose execution behavior depends on the stream type, they all follow behavior similar to consM.

By default, folds like drain force the stream type to be SerialT, so replicateM in the following code runs serially, and takes 10 seconds:

>>> Stream.drain $ Stream.replicateM 10 $ delay 1
...

We can use the fromAsync combinator to force the argument stream to be of AsyncT type, replicateM in the following example executes the replicated actions concurrently, thus taking only 1 second:

>>> Stream.drain $ Stream.fromAsync $ Stream.replicateM 10 $ delay 1
...

We can use mapM to map an action concurrently:

>>> f x = delay 1 >> return (x + 1)
>>> Stream.toList $ Stream.fromAhead $ Stream.mapM f $ Stream.fromList [1..3]
...
[2,3,4]

fromAhead forces mapM to happen in AheadT style, thus all three actions take only one second even though each individual action blocks for a second.

See the documentation of individual combinators to check if it is concurrent or not. The concurrent combinators necessarily have a MonadAsync m constraint. However, a MonadAsync m constraint does not necessarily mean that the combinator is concurrent.

Automatic Concurrency Control

SerialT (and WSerialT) runs all tasks serially whereas ParallelT runs all tasks concurrently i.e. one thread per task. The stream types AsyncT, WAsyncT, and AheadT provide demand driven concurrency. It means that based on the rate at which the consumer is consuming the stream, it maintains the optimal number of threads to increase or decrease parallelism.

However, the programmer can control the maximum number of threads using maxThreads. It provides an upper bound on the concurrent IO requests or CPU cores that can be used. maxBuffer limits the number of evaluated stream elements that we can buffer. See the "Concurrency Control" section for details.

Caveats

When we use combinators like fromAsync on a piece of code, all combinators inside the argument of fromAsync become concurrent which is often counter productive. Therefore, we recommend that in a pipeline, you identify the combinators that you really want to be concurrent and add a fromSerial after those combinators so that the code following the combinator remains serial:

Stream.fromAsync $ ... concurrent combinator here ... $ Stream.fromSerial $ ...

Conventions

Functions with the suffix M are general functions that work on monadic arguments. The corresponding functions without the suffix M work on pure arguments and can in general be derived from their monadic versions but are provided for convenience and for consistency with other pure APIs in the base package.

In many cases, short definitions of the combinators are provided in the documentation for illustration. The actual implementation may differ for performance reasons.

Construction

Functions ending in the general shape b -> t m a.

See also: Streamly.Internal.Data.Stream.IsStream.Generate for Pre-release functions.

Primitives

Primitives to construct a stream from pure values or monadic actions. All other stream construction and generation combinators described later can be expressed in terms of these primitives. However, the special versions provided in this module can be much more efficient in most cases. Users can create custom combinators using these primitives.

nil :: IsStream t => t m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

Unfolding

Generalized way of generating a stream efficiently.

unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #

Convert an Unfold into a stream by supplying it an input seed.

>>> Stream.drain $ Stream.unfold (Unfold.replicateM 3) (putStrLn "hello")
hello
hello
hello

Since: 0.7.0

unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

unfoldr step s =
    case step s of
        Nothing -> nil
        Just (a, b) -> a `cons` unfoldr step b

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 3
        then Nothing
        else Just (b, b + 1)
in Stream.toList $ Stream.unfoldr f 0
:}
[0,1,2,3]

Since: 0.1.0

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 3
        then return Nothing
        else return (Just (b, b + 1))
in Stream.toList $ Stream.unfoldrM f 0
:}
[0,1,2,3]

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

(fromAsync $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0)
    & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

Concurrent

Since: 0.1.0

From Values

Generate a monadic stream from a seed value or values.

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure
fromPure = fromEffect . pure

In Zip applicative streams fromPure is not the same as pure because in that case pure is equivalent to repeat instead. fromPure and pure are equally efficient, in other cases fromPure may be slightly more efficient than the other equivalent definitions.

Since: 0.8.0 (Renamed yield to fromPure)

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine
hello
["hello"]

Since: 0.8.0 (Renamed yieldM to fromEffect)

repeat :: (IsStream t, Monad m) => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

Since: 0.4.0

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . consM
repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

drain $ fromSerial $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)

Concurrent, infinite (do not use with fromParallel)

Since: 0.2.0

replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #

replicate = take n . repeat

Generate a stream of length n by repeating a value n times.

Since: 0.6.0

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

replicateM = take n . repeatM

Generate a stream by performing a monadic action n times. Same as:

drain $ fromSerial $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)

Concurrent

Since: 0.1.1

Enumeration

We can use the Enum type class to enumerate a type producing a list and then convert it to a stream:

fromList $ enumFromThen from then

However, this is not particularly efficient. The Enumerable type class provides corresponding functions that generate a stream instead of a list, efficiently.

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Since: 0.6.0

Methods

enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

Since: 0.6.0

enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Since: 0.6.0

enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]

>>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]

Since: 0.6.0

Instances
Instances details
Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> t m Bool Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> Bool -> t m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> t m Char Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> Char -> t m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> t m Double Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> Double -> t m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> t m Float Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> Float -> t m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> t m Int Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> Int -> t m Int Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> t m Int8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> Int8 -> t m Int8 Source #

Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> t m Int16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> Int16 -> t m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> t m Int32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> Int32 -> t m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> t m Int64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> Int64 -> t m Int64 Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> t m Integer Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> Integer -> t m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> t m Natural Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> Natural -> t m Natural Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> t m Ordering Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> Ordering -> t m Ordering Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> t m Word Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> Word -> t m Word Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> t m Word8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> Word8 -> t m Word8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> t m Word16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> Word16 -> t m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> t m Word32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> Word32 -> t m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> t m Word64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> Word64 -> t m Word64 Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> t m () Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> () -> t m () Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> t m (Ratio a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> Ratio a -> t m (Ratio a) Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> t m (Identity a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> Identity a -> t m (Identity a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> t m (Fixed a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

Since: 0.6.0

enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

enumerateTo = enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

Since: 0.6.0

Iteration

iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #

iterate f x = x `cons` iterate f x

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]

Since: 0.1.2

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

iterateM f m = m >>= a -> return a `consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

drain $ fromSerial $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

drain $ fromAsync  $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

Concurrent

Since: 0.1.2

Since: 0.7.0 (signature change)

From Generators

Generate a monadic stream from a seed value and a generator function.

fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #

fromIndices f = fmap f $ Stream.enumerateFrom 0
fromIndices f = let g i = f i `cons` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a function f applied on the corresponding index. Index starts at 0.

>>> Stream.toList $ Stream.take 5 $ Stream.fromIndices id
[0,1,2,3,4]

Since: 0.6.0

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a monadic function f applied on the corresponding index. Index starts at 0.

Concurrent

Since: 0.6.0

From Containers

Convert an input structure, container or source into a stream. All of these can be expressed in terms of primitives.

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

fromListM = foldr consM nil

Construct a stream from a list of monadic actions. This is more efficient than fromFoldableM for serial streams.

Since: 0.4.0

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

fromFoldableM = foldr consM nil

Construct a stream from a Foldable containing monadic actions.

drain $ fromSerial $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)

Concurrent (do not use with fromParallel on infinite containers)

Since: 0.3.0

Elimination

Functions ending in the general shape t m a -> m b

See also: Streamly.Internal.Data.Stream.IsStream.Eliminate for Pre-release functions.

Running a Fold

See Streamly.Data.Fold for an overview of composable folds. All folds in this module can be expressed in terms of composable folds using fold.

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

Deconstruction

Functions ending in the general shape t m a -> m (b, t m a)

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however the specific implementations are generally more efficient.

Since: 0.1.0

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . Stream.uncons

Extract all but the first element of the stream, if any.

Since: 0.1.1

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

Since: 0.5.0

General Folds

In imperative terms a fold can be considered as a loop over the stream that reduces the stream to a single value. Left and right folds both use a fold function f and an identity element z (zero) to deconstruct a recursive data structure and reconstruct a new data structure. The new structure may be a recursive construction (a container) or a non-recursive single value reduction of the original structure.

Both right and left folds are mathematical duals of each other, they are functionally equivalent. Operationally, a left fold on a left associated structure behaves exactly in the same way as a right fold on a right associated structure. Similarly, a left fold on a right associated structure behaves in the same way as a right fold on a left associated structure. However, the behavior of a right fold on a right associated structure is operationally different (even though functionally equivalent) than a left fold on the same structure.

On right associated structures like Haskell cons lists or Streamly streams, a lazy right fold is naturally suitable for lazy recursive reconstruction of a new structure, while a strict left fold is naturally suitable for efficient reduction. In right folds control is in the hand of the puller whereas in left folds the control is in the hand of the pusher.

The behavior of right and left folds are described in detail in the individual fold's documentation. To illustrate the two folds for right associated cons lists:

foldr :: (a -> b -> b) -> b -> [a] -> b
foldr f z [] = z
foldr f z (x:xs) = x `f` foldr f z xs

foldl :: (b -> a -> b) -> b -> [a] -> b
foldl f z [] = z
foldl f z (x:xs) = foldl f (z `f` x) xs

foldr is conceptually equivalent to:

foldr f z [] = z
foldr f z [x] = f x z
foldr f z xs = foldr f (foldr f z (tail xs)) [head xs]

foldl is conceptually equivalent to:

foldl f z [] = z
foldl f z [x] = f z x
foldl f z xs = foldl f (foldl f z (init xs)) [last xs]

Left and right folds are duals of each other.

foldr f z xs = foldl (flip f) z (reverse xs)
foldl f z xs = foldr (flip f) z (reverse xs)

More generally:

foldr f z xs = foldl g id xs z where g k x = k . f x
foldl f z xs = foldr g id xs z where g x k = k . flip f x

Right Folds

Let's take a closer look at the foldr definition for lists, as given3 earlier:

foldr f z (x:xs) = x `f` foldr f z xs

foldr invokes the fold step function f as f x (foldr f z xs). At each invocation of f, foldr gives us the next element in the input container x and a recursive expression foldr f z xs representing the yet unbuilt (lazy thunk) part of the output.

When f x xs is lazy in xs it can consume the input one element at a time in FIFO order to build a lazy output expression. For example,

f x remaining = show x : remaining

take 2 $ foldr f [] (1:2:undefined) would consume the input lazily on demand, consuming only first two elements and resulting in ["1", "2"]. f can terminate recursion by not evaluating the remaining part:

f 2 remaining = show 2 : []
f x remaining = show x : remaining

f would terminate recursion whenever it sees element 2 in the input. Therefore, take 2 $ foldr f [] (1:2:undefined) would work without any problem.

On the other hand, if f a b is strict in b it would end up consuming the whole input right away and expanding the recursive expression b (i.e. foldr f z xs) fully before it yields an output expression, resulting in the following right associated expression:

foldr f z xs == x1 `f` (x2 `f` ...(xn `f` z))

For example,

f x remaining = x + remaining

With this definition, foldr f 0 [1..1000], would recurse completely until it reaches the terminating case ... f (1000 f 0), and then start reducing the whole expression from right to left, therefore, consuming the input elements in LIFO order. Thus, such an evaluation would require memory proportional to the size of input. Try out foldr (+) 0 (map (\x -> trace (show x) x) [1..10]).

Notice the order of the arguments to the step function f a b. It follows the order of a and b in the right associative recursive expression generated by expanding a `f` b.

A right fold is a pull fold, the step function is the puller, it can pull more data from the input container by using its second argument in the output expression or terminate pulling by not using it. As a corollary:

  1. a step function which is lazy in its second argument (usually functions or constructors that build a lazy structure e.g. (:)) can pull lazily on demand.
  2. a step function strict in its second argument (usually reducers e.g. (+)) would end up pulling all of its input and buffer it in memory before potentially reducing it.

A right fold is suitable for lazy reconstructions e.g. transformation, mapping, filtering of right associated input structures (e.g. cons lists). Whereas a left fold is suitable for reductions (e.g. summing a stream of numbers) of right associated structures. Note that these roles will reverse for left associated structures (e.g. snoc lists). Most of our observations here assume right associated structures, lists being the canonical example.

  1. A lazy FIFO style pull using a right fold allows pulling a potentially infinite input stream lazily, perform transformations on it, and reconstruct a new structure without having to buffer the whole structure. In contrast, a left fold would buffer the entire structure before the reconstructed structure can be consumed.
  2. Even if buffering the entire input structure is ok, we need to keep in mind that a right fold reconstructs structures in a FIFO style, whereas a left fold reconstructs in a LIFO style, thereby reversing the order of elements..
  3. A right fold has termination control and therefore can terminate early without going through the entire input, a left fold cannot terminate without consuming all of its input. For example, a right fold implementation of or can terminate as soon as it finds the first True element, whereas a left fold would necessarily go through the entire input irrespective of that.
  4. Reduction (e.g. using (+) on a stream of numbers) using a right fold occurs in a LIFO style, which means that the entire input gets buffered before reduction starts. Whereas with a strict left fold reductions occur incrementally in FIFO style. Therefore, a strict left fold is more suitable for reductions.

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)
True

Since: 0.7.0 (signature changed)

Since: 0.2.0 (signature changed)

Since: 0.1.0

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

Since: 0.1.0

Left Folds

Note that the observations below about the behavior of a left fold assume that we are working on a right associated structure like cons lists and streamly streams. If we are working on a left associated structure (e.g. snoc lists) the roles of right and left folds would reverse.

Let's take a closer look at the foldl definition for lists given above:

foldl f z (x:xs) = foldl f (z `f` x) xs

foldl calls itself recursively, in each call it invokes f as f z x providing it with the result accumulated till now z (the state) and the next element from the input container. First call to f is supplied with the initial value of the accumulator z and each subsequent call uses the output of the previous call to f z x.

> foldl' (+) 0 [1,2,3]
 6

The recursive call at the head of the output expression is bound to be evaluated until recursion terminates, therefore, a left fold always consumes the whole input container. The following would result in an error, even though the fold is not using the values at all:

> foldl' (\_ _ -> 0) 0 (1:undefined)
 *** Exception: Prelude.undefined

As foldl recurses, it builds the left associated expression shown below. Notice, the order of the arguments to the step function f b a. It follows the left associative recursive expression generated by expanding b `f` a.

foldl f z xs == (((z `f` x1) `f` x2) ...) `f` xn

The strict left fold foldl' forces the reduction of its argument z `f` x before using it, therefore it never builds the whole expression in memory. Thus, z `f` x1 would get reduced to z1 and then z1 `f` x2 would get reduced to z2 and so on, incrementally reducing the expression from left to right as it recurses, consuming the input in FIFO order. Try out foldl' (+) 0 (map (\x -> trace (show x) x) [1..10]) to see how it works. For example:

>>> Stream.foldl' (+) 0 $ Stream.fromList [1,2,3,4]
10

0 + 1 = 1
1 + 2 = 3
3 + 3 = 6
6 + 4 = 10

However, foldl' evaluates the accumulator only to WHNF. It may further help if the step function uses a strict data structure as accumulator to improve performance and to keep the expression fully reduced at all times during the fold.

A left fold can also build a new structure instead of reducing one if a constructor is used as a fold step. However, it may not be very useful because it will consume the whole input and construct the new structure in memory before we can consume it. Thus the whole structure gets buffered in memory. When the list constructor is used it would build a new list in reverse (LIFO) order:

>>> Stream.foldl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
[4,3,2,1]

A left fold is a push fold. The producer pushes its contents to the step function of the fold. The step function therefore has no control to stop the input, it can only discard it if it does not need it. We can also consider a left fold as a state machine where the state is store in the accumulator, the state can be modified based on new inputs that are pushed to the fold.

In general, a strict left fold is a reducing fold, whereas a right fold is a constructing fold. A strict left fold reduces in a FIFO order whereas it constructs in a LIFO order, and vice-versa for the right fold. See the documentation of foldrM for a discussion on where a left or right fold is suitable.

To perform a left fold lazily without having to consume all the input one can use scanl to stream the intermediate results of the fold and consume the resulting stream lazily.

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. foldl' reduce initial stream invokes reduce with the accumulator and the next input in the input stream, using initial as the initial value of the current value of the accumulator. When the input is exhausted the current value of the accumulator is returned. Make sure to use a strict data structure for accumulator to not build unnecessary lazy expressions unless that's what you want. See the previous section for more details.

Since: 0.2.0

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b Source #

Like foldl' but with a monadic step function.

Since: 0.2.0

Since: 0.8.0 (signature change)

Specific Folds

Full Folds

Folds that are guaranteed to evaluate the whole stream.

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ())
drain = Stream.fold Fold.drain

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example Stream.drain . fromAsync.

Since: 0.7.0

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (Stream.length xs - 1)
last = Stream.fold Fold.last

Since: 0.1.1

length :: Monad m => SerialT m a -> m Int Source #

Determine the length of the stream.

Since: 0.1.0

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns 0 when the stream is empty. Note that this is not numerically stable for floating point numbers.

sum = Stream.fold Fold.sum

Since: 0.1.0

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns 1 when the stream is empty.

product = Stream.fold Fold.product

Since: 0.1.1

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximumBy = Stream.fold Fold.maximumBy

Since: 0.6.0

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

maximum = maximumBy compare
maximum = Stream.fold Fold.maximum

Determine the maximum element in a stream.

Since: 0.1.0

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

minimumBy = Stream.fold Fold.minimumBy

Since: 0.6.0

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

minimum = minimumBy compare
minimum = Stream.fold Fold.minimum

Determine the minimum element in a stream.

Since: 0.1.0

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

Since: 0.6.0

Partial Folds

Folds that may terminate before evaluating the whole stream. These folds strictly evaluate the stream until the result is determined.

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = Stream.drain . Stream.take n
drainN n = Stream.fold (Fold.take n Fold.drain)

Run maximum up to n iterations of a stream.

Since: 0.7.0

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = Stream.drain . Stream.takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.7.0

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

Since: 0.6.0

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0)
head = Stream.fold Fold.head

Since: 0.1.0

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

findM = Stream.fold Fold.findM

Since: 0.6.0

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like findM but with a non-monadic predicate.

find p = findM (return . p)
find = Stream.fold Fold.find

Since: 0.5.0

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

lookup = snd <$> Stream.find ((==) . fst)
lookup = Stream.fold Fold.lookup

Since: 0.5.0

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

findIndex = Stream.fold Fold.findIndex

Since: 0.5.0

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = Stream.findIndex (== a)

Since: 0.5.0

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

null = Stream.fold Fold.null

Since: 0.1.1

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

elem = Stream.fold Fold.elem

Since: 0.1.0

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

notElem = Stream.fold Fold.length

Since: 0.1.0

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

all = Stream.fold Fold.all

Since: 0.1.0

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

any = Stream.fold Fold.any

Since: 0.1.0

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

and = Stream.fold Fold.and

Since: 0.5.0

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

or = Stream.fold Fold.or

Since: 0.5.0

To Containers

Convert a stream into a container holding all the values.

toList :: Monad m => SerialT m a -> m [a] Source #

toList = Stream.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Since: 0.1.0

Folding Concurrently

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function t m a -> m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.

If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.

The . at the end of the operator is a mnemonic for termination of the stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Prelude ((|$.))
>>> :{
 Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ())
     |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Same as |$. but with arguments reversed.

(|&.) = flip (|$.)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

Multi-Stream folds

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

Since: 0.6.0

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

Since: 0.6.0

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

>>> Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

>>> Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

stripPrefix prefix stream strips prefix from stream if it is a prefix of stream. Returns Nothing if the stream does not start with the given prefix, stripped stream otherwise. Returns Just nil when the prefix is the same as the stream.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".

Space: O(1)

Since: 0.6.0

Transformation

See also: Streamly.Internal.Data.Stream.IsStream.Transform for Pre-release functions.

Mapping

In imperative terms a map operation can be considered as a loop over the stream that transforms the stream into another stream by performing an operation on each element of the stream.

map is the least powerful transformation operation with strictest guarantees. A map, (1) is a stateless loop which means that no state is allowed to be carried from one iteration to another, therefore, operations on different elements are guaranteed to not affect each other, (2) is a strictly one-to-one transformation of stream elements which means it guarantees that no elements can be added or removed from the stream, it can merely transform them.

map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

map = fmap

Same as fmap.

> S.toList $ S.map (+1) $ S.fromList [1,2,3]
[2,3,4]

Since: 0.4.0

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

sequence = mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
abc

>>> :{
drain $ Stream.replicateM 10 (return $ threadDelay 1000000 >> print 1)
 & (fromSerial . Stream.sequence)
:}
1
...
1

>>> :{
drain $ Stream.replicateM 10 (return $ threadDelay 1000000 >> print 1)
 & (fromAsync . Stream.sequence)
:}
1
...
1

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.1.0

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapM f = sequence . map f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"]
abc

>>> :{
   drain $ Stream.replicateM 10 (return 1)
     & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x))
:}
1
...
1

> drain $ Stream.replicateM 10 (return 1)
 & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.1.0

Mapping Side Effects

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = Stream.drain . Stream.mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

Since: 0.1.0

trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

>>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2)
1
2

Compare with tap.

Since: 0.7.0

tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #

Tap the data flowing through a stream into a Fold. For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.

                  Fold m a b
                      |
-----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
1
2

Compare with trace.

Since: 0.7.0

delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of the stream except the first one.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Since: 0.8.0

Scanning

A scan is more powerful than map. While a map is a stateless loop, a scan is a stateful loop which means that a state can be shared across all the loop iterations, therefore, future iterations can be impacted by the state changes made by the past iterations. A scan yields the state of the loop after each iteration. Like a map, a postscan or prescan does not add or remove elements in the stream, it just transforms them. However, a scan adds one extra element to the stream.

A left associative scan, also known as a prefix sum, can be thought of as a stream transformation consisting of left folds of all prefixes of a stream. Another way of thinking about it is that it streams all the intermediate values of the accumulator while applying a left fold on the input stream. A right associative scan, on the other hand, can be thought of as a stream consisting of right folds of all the suffixes of a stream.

The following equations hold for lists:

scanl f z xs == map (foldl f z) $ inits xs
scanr f z xs == map (foldr f z) $ tails xs
> scanl (+) 0 [1,2,3,4]
0                 = 0
0 + 1             = 1
0 + 1 + 2         = 3
0 + 1 + 2 + 3     = 6
0 + 1 + 2 + 3 + 4 = 10

> scanr (+) 0 [1,2,3,4]
1 + 2 + 3 + 4 + 0 = 10
    2 + 3 + 4 + 0 = 9
        3 + 4 + 0 = 7
            4 + 0 = 4
                0 = 0

Left and right scans are duals:

scanr f z xs ==  reverse $ scanl (flip f) z (reverse xs)
scanl f z xs ==  reverse $ scanr (flip f) z (reverse xs)

A scan is a stateful map i.e. a combination of map and fold:

map f xs =           tail $ scanl (\_ x -> f x) z xs
map f xs = reverse $ head $ scanr (\_ x -> f x) z xs
foldl f z xs = last $ scanl f z xs
foldr f z xs = head $ scanr f z xs

Left scans

scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like map, scanl' too is a one to one transformation, however it adds an extra element.

>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4]
[0,1,3,6,10]

>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
[[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of scanl' is the initial value of the accumulator followed by all the intermediate steps and the final result of foldl'.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product of the elements in a stream in one go using a foldl':

>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4

Using scanl' we can make it modular by computing the sum in the first stage and passing it down to the next stage for computing the product:

>>> :{
  Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1)
  $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1)
  $ Stream.fromList [1,2,3,4]
:}
(10,24)

IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.

See also: usingStateT

Since: 0.2.0

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like scanl' but with a monadic step function and a monadic seed.

Since: 0.4.0

Since: 0.8.0 (signature change)

postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the initial value of the accumulator.

postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs

Since: 0.7.0

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function and a monadic seed.

Since: 0.7.0

Since: 0.8.0 (signature change)

scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #

Like scanl' but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty.

>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4]
[1,3,6,10]

Since: 0.6.0

scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #

Like scanl1' but with a monadic step function.

Since: 0.6.0

Scanning By Fold

scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Scan a stream using the given monadic fold.

>>> Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])
[0,1,3,6]

Since: 0.7.0

postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

>>> import Data.Maybe (fromJust)
>>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>> :{
 Stream.toList
  $ Stream.map (fromJust . fst)
  $ Stream.takeWhile (\(_,x) -> x <= 10)
  $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0)
:}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]

Since: 0.7.0

Filtering

Remove some elements from the stream based on a predicate. In imperative terms a filter over a stream corresponds to a loop with a continue clause for the cases when the predicate fails.

deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5]
[1,3,5]

Since: 0.6.0

filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

Since: 0.1.0

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as filter but with a monadic predicate.

Since: 0.4.0

uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #

Drop repeated elements that are adjacent to each other.

Since: 0.6.0

Trimming

Take or remove elements from one or both ends of a stream.

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as takeWhile but with a monadic predicate.

Since: 0.4.0

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

Since: 0.1.0

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as dropWhile but with a monadic predicate.

Since: 0.4.0

Inserting Elements

Inserting elements is a special case of interleaving/merging streams.

insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

insertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp.

insertBy cmp x = mergeBy cmp (fromPure x)
>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5]
[1,2,3,5]

Since: 0.6.0

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"

Since: 0.5.0

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

Insert a pure value between successive elements of a stream.

>>> Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"
"h,e,l,l,o"

Since: 0.7.0

Reordering Elements

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

Indexing

Indexing can be considered as a special type of zipping where we zip a stream with an index stream.

indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #

indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)

Pair each element in a stream with its index, starting from index 0.

>>> Stream.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]

Since: 0.6.0

indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #

indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))

Pair each element in a stream with its index, starting from the given index n and counting down.

>>> Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]

Since: 0.6.0

Searching

Finding the presence or location of an element, a sequence of elements or another stream within a stream.

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

Since: 0.5.0

elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

elemIndices a = findIndices (== a)

Since: 0.5.0

Maybe Streams

mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Equivalent to:

mapMaybe f = Stream.map fromJust . Stream.filter isJust . Stream.map f

Since: 0.3.0

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

Like mapMaybe but maps a monadic function.

Equivalent to:

mapMaybeM f = Stream.map fromJust . Stream.filter isJust . Stream.mapM f

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.3.0

Concurrent Transformation

Concurrent Pipelines

Stream processing functions can be composed in a chain using function application with or without the $ operator, or with reverse function application operator &. Streamly provides concurrent versions of these operators applying stream processing functions such that each stage of the stream can run in parallel. The operators start with a |; we can read |$ as "parallel dollar" to remember that | comes before $.

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation function t m a -> t m b to a stream t m a concurrently; the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the transformation function runs in another thread consuming the input from the buffer. |$ is just like regular function application operator $ except that it is concurrent.

If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can look at it as a transformation that converts a transform function to a buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

>>> :{
Stream.drain $
   Stream.mapM (\x -> threadDelay 1000000 >> print x)
     |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

Same as |$ but with arguments reversed.

(|&) = flip (|$)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Concurrency Control

These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the argument stream. A control parameter set at any point remains effective for any concurrent combinators used in the argument stream until it is reset by using the combinator again. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams.

Pitfall: Remember that maxBuffer in the following example applies to mapM and any other combinators that may follow it, and it does not apply to the combinators before it:

 ...
 $ Stream.maxBuffer 10
 $ Stream.mapM ...
 ...

If we use & instead of $ the situation will reverse, in the following example, maxBuffer does not apply to mapM, it applies to combinators that come before it, because those are the arguments to maxBuffer:

 ...
 & Stream.maxBuffer 10
 & Stream.mapM ...
 ...

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500. maxThreads does not affect ParallelT streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0 (Streamly)

Since: 0.8.0

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0 (Streamly)

Since: 0.8.0

Rate Limiting

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Constructors

Rate 

Fields

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0 (Streamly)

Since: 0.8.0

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0 (Streamly)

Since: 0.8.0

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0 (Streamly)

Since: 0.8.0

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Combining Streams

New streams can be constructed by appending, merging or zipping existing streams.

Any exceptions generated by concurrent evaluation are propagated to the consumer of the stream as soon as they occur. Exceptions from a particular stream are guaranteed to arrive in the same order in the output stream as they were generated in the input stream.

See maxThreads and maxBuffer to control the concurrency of the concurrent combinators.

See also: Streamly.Internal.Data.Stream.IsStream.Expand for Pre-release functions.

Linear combinators

These functions have O(n) append performance. They can be used efficiently with concatMapWith et. al.

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> import Streamly.Prelude (serial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ stream1 `serial` stream2
[1,2,3,4]

This operation can be used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

>>> import Streamly.Prelude (wSerial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
[1,3,2,4]

Note, for singleton streams wSerial and serial are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

>>> import Streamly.Prelude (ahead, SerialT)
>>> stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>> stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>> Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec
4 sec
[4,2]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec
2 sec
4 sec
[4,2,1]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec
1 sec
4 sec
[4,2,1]

Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using ahead.

ahead can be safely used to fold an infinite lazy container of streams.

Since: 0.3.0 (Streamly)

Since: 0.8.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

>>> import Streamly.Prelude (async)
>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> Stream.toList $ stream1 `async` stream2
2 sec
4 sec
[2,4]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `async` stream2 `async` stream3
...
[1,2,4]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
...
[2,1,4]

With a single thread, it becomes serial:

>>> Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
...
[4,2,1]

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

>>> stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>> stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>> Stream.toList $ stream1 `async` stream2 -- IO [Int]
...
[1,1,3,3]

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

>>> stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]
...
[1,1,3,2,3,2]

Thus async goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior, async can be safely used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, wAsync is the same as async. See async for singleton stream behavior. For multi-element streams, while async is left biased i.e. it tries to evaluate the left side stream as much as possible, wAsync tries to schedule them both fairly. In other words, async goes deep while wAsync goes wide. However, outputs are always used as they arrive.

With a single thread, async starts behaving like serial while wAsync starts behaving like wSerial.

>>> import Streamly.Prelude (wAsync)
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]

With two threads available, and combining three streams:

>>> stream3 = Stream.fromList [7,8,9]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that WSerialT and single threaded WAsyncT both interleave streams but the exact scheduling is slightly different in both cases.

Since: 0.2.0 (Streamly)

Since: 0.8.0

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like async except that the execution is much more strict. There is no limit on the number of threads. While async may not schedule a stream if there is no demand from the consumer, parallel always evaluates both the streams immediately. The only limit that applies to parallel is maxBuffer. Evaluation may block if the output buffer becomes full.

>>> import Streamly.Prelude (parallel)
>>> stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>> Stream.toList stream -- IO [Int]
1 sec
2 sec
[1,2]

parallel guarantees that all the streams are scheduled for execution immediately, therefore, we could use things like starting timers inside the streams and relying on the fact that all timers were started at the same time.

Unlike async this operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams strictly concurrently.

Since: 0.2.0 (Streamly)

Since: 0.8.0

PairWise combinators

These functions have O(n^2) append performance when used linearly e.g. using concatMapWith. However, they can be combined pair wise using concatPairsWith to give O(n * log n) complexity.

mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
[1,2,3,4,5,6,8]

Since: 0.6.0

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

>>> :{
do    
 let proportionately m n = do
      ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
      return $ _ _ -> do
         r <- readIORef ref
         writeIORef ref $ Prelude.tail r
         return $ Prelude.head r  
 f <- proportionately 2 1
 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2])
 print xs
:}
[1,1,2,1,1,2,1,1,2]

Since: 0.6.0

mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a pure zipping function.

> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
[5,7,9]

Since: 0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWith but using a monadic zipping function.

Since: 0.4.0

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like zipWith but zips concurrently i.e. both the streams being zipped are generated concurrently.

Since: 0.1.0

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWithM but zips concurrently i.e. both the streams being zipped are generated concurrently.

Since: 0.4.0

Nested Unfolds

unfoldMany :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

Since: 0.8.0

intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

intersperse followed by unfold and concat.

intercalate unf a str = unfoldMany unf $ intersperse a str
intersperse = intercalate (Unfold.function id)
unwords = intercalate Unfold.fromList " "
>>> Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]
"abc def ghi"

Since: 0.8.0

intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

intersperseSuffix followed by unfold and concat.

intercalateSuffix unf a str = unfoldMany unf $ intersperseSuffix a str
intersperseSuffix = intercalateSuffix (Unfold.function id)
unlines = intercalateSuffix Unfold.fromList "\n"
>>> Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]
"abc\ndef\nghi\n"

Since: 0.8.0

Nested Streams

Stream operations like map and filter represent loop processing in imperative programming terms. Similarly, the imperative concept of nested loops are represented by streams of streams. The concatMap operation represents nested looping. A concatMap operation loops over the input stream and then for each element of the input stream generates another stream and then loops over that inner stream as well producing effects and generating a single output stream. The Monad instances of different stream types provide a more convenient way of writing nested loops. Note that the monad bind operation is just flip concatMap.

One dimension loops are just a special case of nested loops. For example, concatMap can degenerate to a simple map operation:

map f m = S.concatMap (\x -> S.fromPure (f x)) m

Similarly, concatMap can perform filtering by mapping an element to a nil stream:

filter p m = S.concatMap (\x -> if p x then S.fromPure x else S.nil) m

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

concatMap f = concatMapM (return . f)
concatMap = concatMapWith serial
concatMap f = 'concat . map f'
concatMap f = unfoldMany (UF.lmap f UF.fromStream)

Since: 0.6.0

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

Containers of Streams

These are variants of standard Foldable fold functions that use a polymorphic stream sum operation (e.g. async or wSerial) to fold a finite container of streams. Note that these are just special cases of the more general concatMapWith operation.

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

concatFoldableWith async $ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f S.nil
concatFoldableWith f = S.concatMapFoldableWith f id

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

concatMapFoldableWith async return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like concatMapFoldableWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs
concatForFoldableWith = flip S.concatMapFoldableWith

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

Reducing

See also: Streamly.Internal.Data.Stream.IsStream.Reduce for Pre-release functions.

Nested Folds

foldMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Apply a Fold repeatedly on a stream and emit the fold outputs in the output stream.

To sum every two contiguous elements in a stream:

>>> f = Fold.take 2 Fold.sum
>>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
[3,7,11,15,19]

On an empty stream the output is empty:

>>> Stream.toList $ Stream.foldMany f $ Stream.fromList []
[]

Note Stream.foldMany (Fold.take 0) would result in an infinite loop in a non-empty stream.

Since: 0.8.0

chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #

Group the input stream into groups of n elements each and then fold each group using the provided fold function.

>>> Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
[3,7,11,15,19]

This can be considered as an n-fold version of take where we apply take repeatedly on the leftover stream until the stream exhausts.

chunksOf n f = foldMany (FL.take n f)

Since: 0.7.0

intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #

Group the input stream into windows of n second each and then fold each group using the provided fold function.

>>> Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1
[...,...,...,...,...]

Since: 0.7.0

Splitting

In general we can express splitting in terms of parser combinators. These are some common use functions for convenience and efficiency. While parsers can fail these functions are designed to transform a stream without failure with a predefined behavior for all cases.

In general, there are three possible ways of combining stream segments with a separator. The separator could be prefixed to each segment, suffixed to each segment, or it could be infixed between segments. intersperse and intercalate operations are examples of infixed combining whereas unlines is an example of suffixed combining. When we split a stream with separators we can split in three different ways, each being an opposite of the three ways of combining.

Splitting may keep the separator or drop it. Depending on how we split, the separator may be kept attached to the stream segments in prefix or suffix position or as a separate element in infix position. Combinators like splitOn that use On in their names drop the separator and combinators that use With in their names keep the separator. When a segment is missing it is considered as empty, therefore, we never encounter an error in parsing.

splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on an infixed separator element, dropping the separator. The supplied Fold is applied on the split segments. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments:

>>> splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
>>> splitOn' (== '.') "a.b"
["a","b"]

An empty stream is folded to the default value of the fold:

>>> splitOn' (== '.') ""
[""]

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

>>> splitOn' (== '.') "."
["",""]
>>> splitOn' (== '.') ".a"
["","a"]
>>> splitOn' (== '.') "a."
["a",""]
>>> splitOn' (== '.') "a..b"
["a","","b"]

splitOn is an inverse of intercalating single element:

Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id

Since: 0.7.0

splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a suffixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

>>> splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)
>>> splitOnSuffix' (== '.') "a.b."
["a","b"]
>>> splitOnSuffix' (== '.') "a."
["a"]

An empty stream results in an empty output stream:

>>> splitOnSuffix' (== '.') ""
[]

An empty segment consisting of only a suffix is folded to the default output of the fold:

>>> splitOnSuffix' (== '.') "."
[""]
>>> splitOnSuffix' (== '.') "a..b.."
["a","","b",""]

A suffix is optional at the end of the stream:

>>> splitOnSuffix' (== '.') "a"
["a"]
>>> splitOnSuffix' (== '.') ".a"
["","a"]
>>> splitOnSuffix' (== '.') "a.b"
["a","b"]
lines = splitOnSuffix (== '\n')

splitOnSuffix is an inverse of intercalateSuffix with a single element:

Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id

Since: 0.7.0

splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOnSuffix but keeps the suffix attached to the resulting splits.

>>> splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)
>>> splitWithSuffix' (== '.') ""
[]
>>> splitWithSuffix' (== '.') "."
["."]
>>> splitWithSuffix' (== '.') "a"
["a"]
>>> splitWithSuffix' (== '.') ".a"
[".","a"]
>>> splitWithSuffix' (== '.') "a."
["a."]
>>> splitWithSuffix' (== '.') "a.b"
["a.","b"]
>>> splitWithSuffix' (== '.') "a.b."
["a.","b."]
>>> splitWithSuffix' (== '.') "a..b.."
["a.",".","b.","."]

Since: 0.7.0

wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOn after stripping leading, trailing, and repeated separators. Therefore, ".a..b." with . as the separator would be parsed as ["a","b"]. In other words, its like parsing words from whitespace separated text.

>>> wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)
>>> wordsBy' (== ',') ""
[]
>>> wordsBy' (== ',') ","
[]
>>> wordsBy' (== ',') ",a,,b,"
["a","b"]
words = wordsBy isSpace

Since: 0.7.0

Grouping

Splitting a stream by combining multiple contiguous elements into groups using some criterion.

groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #

groups = groupsBy (==)
groups = groupsByRolling (==)

Groups contiguous spans of equal elements together in individual groups.

>>> Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]
[[1,1],[2,2]]

Since: 0.7.0

groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if b `cmp` a is True then b is also assigned to the same group. If c `cmp` a is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f and the result of the fold is emitted in the output stream.

>>> Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]
[[1,3,7],[0,2,5]]

Since: 0.7.0

groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Unlike groupsBy this function performs a rolling comparison of two successive elements in the input stream. groupsByRolling cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if a `cmp` b is True then b is also assigned to the same group. If b `cmp` c is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f.

>>> Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]
[[1,2,3],[7,8,9]]

Since: 0.7.0

Exceptions

Most of these combinators inhibit stream fusion, therefore, when possible, they should be called in an outer loop to mitigate the cost. For example, instead of calling them on a stream of chars call them on a stream of arrays before flattening it to a stream of chars.

See also: Streamly.Internal.Data.Stream.IsStream.Exception for Pre-release functions.

before :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Run the action m b before the stream yields its first element.

before action xs = 'nilM' action <> xs

Since: 0.7.0

after :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a Source #

Run the action m b whenever the stream t m a stops normally, or if it is garbage collected after a partial lazy evaluation.

The semantics of the action m b are similar to the semantics of cleanup action in bracket.

See also after_

Since: 0.7.0

bracket :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Run the alloc action m b with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output b as input to b -> t m a to generate an output stream.

b is usually a resource under the state of monad m, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> m c, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation.

bracket only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.

See also: bracket_

Inhibits stream fusion

Since: 0.7.0

onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Run the action m b if the stream aborts due to an exception. The exception is not caught, simply rethrown.

Inhibits stream fusion

Since: 0.7.0

finally :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a Source #

Run the action m b whenever the stream t m a stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

The semantics of running the action m b are similar to the cleanup action semantics described in bracket.

finally release = bracket (return ()) (const release)

See also finally_

Inhibits stream fusion

Since: 0.7.0

handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.

Inhibits stream fusion

Since: 0.7.0

Lifting Inner Monad

See also: Streamly.Internal.Data.Stream.IsStream.Lift for Pre-release functions.

liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a Source #

Lift the inner monad m of a stream t m a to tr m using the monad transformer tr.

Since: 0.8.0

runReaderT :: (IsStream t, Monad m) => m s -> t (ReaderT s m) a -> t m a Source #

Evaluate the inner monad of a stream as ReaderT.

Since: 0.8.0

runStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m (s, a) Source #

Evaluate the inner monad of a stream as StateT and emit the resulting state and value pair after each step.

This is supported only for SerialT as concurrent state updation may not be safe.

Since: 0.8.0

Stream Types

Stream types that end with a T (e.g. SerialT) are monad transformers.

Serial Streams

Serial streams are spatially ordered, they execute the actions in the stream strictly one after the other.

There are two serial stream types SerialT and WSerialT. They differ only in the Semigroup and Monad instance behavior.

data SerialT m a Source #

For SerialT streams:

(<>) = serial                       -- Semigroup
(>>=) = flip . concatMapWith serial -- Monad

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.toList $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances
Instances details
MonadTrans SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

Monad m => Functor (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

MonadIO m => MonadIO (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

NFData1 (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

IsList (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) #

Eq a => Eq (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

NFData a => NFData (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

type Item (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

data WSerialT m a Source #

For WSerialT streams:

(<>) = wSerial                       -- Semigroup
(>>=) = flip . concatMapWith wSerial -- Monad

Note that <> is associative only if we disregard the ordering of elements in the resulting stream.

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like interleaved nested for loops:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(2,3),(1,4),(2,4)]

It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:

>>> import Streamly.Prelude (wSerial)
>>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances
Instances details
MonadTrans WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

MonadIO m => MonadIO (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

NFData1 (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

IsList (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) #

Eq a => Eq (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

NFData a => NFData (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

type Item (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

Speculative Streams

Speculative streams evaluate some future actions speculatively and concurrently, and keep the results ready for consumption. As in serial streams, results are consumed in the same order as the actions in the stream.

Even though the results of actions are ordered, the side effects are only partially ordered. Therefore, the semigroup operation is not commutative from the pure outputs perspective but commutative from side effects perspective.

data AheadT m a Source #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: 0.8.0

Instances
Instances details
MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

Asynchronous Streams

Asynchronous streams evaluate some future actions concurrently, the results are given to the consumer as soon as they become available, they may not be in the same order as the actions in the stream.

The results of the actions as well as their side effects are partially ordered. Since the order of elements does not matter the Semigroup operation is effectively commutative.

There are two asynchronous stream types AsyncT and WAsyncT. They differ only in the Semigroup and Monad instance behavior.

data AsyncT m a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Instances
Instances details
MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

data WAsyncT m a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances
Instances details
MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

data ParallelT m a Source #

For ParallelT streams:

(<>) = parallel
(>>=) = flip . concatMapWith parallel

See AsyncT, ParallelT is similar except that all iterations are strictly concurrent while in AsyncT it depends on the consumer demand and available threads. See parallel for more details.

Since: 0.1.0 (Streamly)

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.8.0

Instances
Instances details
MonadTrans ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

Zipping Streams

ZipSerialM and ZipAsyncM, provide Applicative instances for zipping the corresponding elements of two streams together. Note that these types are not monads.

data ZipSerialM m a Source #

For ZipSerialM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id

Applicative evaluates the streams being zipped serially:

>>> s1 = Stream.fromFoldable [1, 2]
>>> s2 = Stream.fromFoldable [3, 4]
>>> s3 = Stream.fromFoldable [5, 6]
>>> Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances
Instances details
IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

Monad m => Functor (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Applicative (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

(Foldable m, Monad m) => Foldable (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fold :: Monoid m0 => ZipSerialM m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldr :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldr' :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldl :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldl' :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldr1 :: (a -> a -> a) -> ZipSerialM m a -> a #

foldl1 :: (a -> a -> a) -> ZipSerialM m a -> a #

toList :: ZipSerialM m a -> [a] #

null :: ZipSerialM m a -> Bool #

length :: ZipSerialM m a -> Int #

elem :: Eq a => a -> ZipSerialM m a -> Bool #

maximum :: Ord a => ZipSerialM m a -> a #

minimum :: Ord a => ZipSerialM m a -> a #

sum :: Num a => ZipSerialM m a -> a #

product :: Num a => ZipSerialM m a -> a #

Traversable (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

traverse :: Applicative f => (a -> f b) -> ZipSerialM Identity a -> f (ZipSerialM Identity b) #

sequenceA :: Applicative f => ZipSerialM Identity (f a) -> f (ZipSerialM Identity a) #

mapM :: Monad m => (a -> m b) -> ZipSerialM Identity a -> m (ZipSerialM Identity b) #

sequence :: Monad m => ZipSerialM Identity (m a) -> m (ZipSerialM Identity a) #

NFData1 (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

liftRnf :: (a -> ()) -> ZipSerialM Identity a -> () #

IsList (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Associated Types

type Item (ZipSerialM Identity a) #

Eq a => Eq (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Ord a => Ord (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Read a => Read (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Show a => Show (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

a ~ Char => IsString (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Monoid (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

NFData a => NFData (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

rnf :: ZipSerialM Identity a -> () #

type Item (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) = a

data ZipAsyncM m a Source #

For ZipAsyncM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id

Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:

>>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
...
[(1,1),(1,1),(1,1)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances
Instances details
IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

Monad m => Functor (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

MonadAsync m => Applicative (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

IO Streams

type Serial = SerialT IO Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type WSerial = WSerialT IO Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0 (Streamly)

Since: 0.8.0

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type ZipSerial = ZipSerialM IO Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type ZipAsync = ZipAsyncM IO Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Type Synonyms

type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Stream Type Adapters

You may want to use different stream composition styles at different points in your program. Stream types can be freely converted or adapted from one type to another. The IsStream type class facilitates type conversion of one stream type to another. It is not used directly, instead the type combinators provided below are used for conversions.

To adapt from one monomorphic type (e.g. AsyncT) to another monomorphic type (e.g. SerialT) use the adapt combinator. To give a polymorphic code a specific interpretation or to adapt a specific type to a polymorphic type use the type specific combinators e.g. fromAsync or fromWSerial. You cannot adapt polymorphic code to polymorphic code, as the compiler would not know which specific type you are converting from or to. If you see a an ambiguous type variable error then most likely you are using adapt unnecessarily on polymorphic code.

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Instances
Instances details
IsStream Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

toStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

consM :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

fromSerial :: IsStream t => SerialT m a -> t m a Source #

Fix the type of a polymorphic stream as SerialT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromWSerial :: IsStream t => WSerialT m a -> t m a Source #

Fix the type of a polymorphic stream as WSerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromAsync :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromAhead :: IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0 (Streamly)

Since: 0.8.0

fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromParallel :: IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipSerialM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipAsyncM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Deprecated

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.2.0

yield :: IsStream t => a -> t m a Source #

Deprecated: Please use fromPure instead.

Same as fromPure

Since: 0.4.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.4.0

each :: (IsStream t, Foldable f) => f a -> t m a Source #

Deprecated: Please use fromFoldable instead.

Same as fromFoldable.

Since: 0.1.0

scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Deprecated: Please use scanl followed by map instead.

Strict left scan with an extraction function. Like scanl', but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since 0.2.0

Since: 0.7.0 (Monad m constraint)

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Deprecated: Please use foldl' followed by fmap instead.

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.2.0

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Deprecated: Please use foldlM' followed by fmap instead.

Like foldx, but with a monadic step function.

Since: 0.2.0

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Deprecated: Use foldrM instead.

Lazy right fold for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

runStream :: Monad m => SerialT m a -> m () Source #

Deprecated: Please use "drain" instead

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . fromAsync.

Since: 0.2.0

runN :: Monad m => Int -> SerialT m a -> m () Source #

Deprecated: Please use "drainN" instead

runN n = runStream . take n

Run maximum up to n iterations of a stream.

Since: 0.6.0

runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

Deprecated: Please use "drainWhile" instead

runWhile p = runStream . takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.6.0

fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

Read lines from an IO Handle into a stream of Strings.

Since: 0.1.0

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

toHandle h = S.mapM_ $ hPutStrLn h

Write a stream of Strings to an IO Handle.

Since: 0.1.0

concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Deprecated: Please use unfoldMany instead.

streamly-0.8.0Streamly.Prelude