In this tutorial we will show how streamly can be used for idiomatic and declarative concurrent programming. Before you go through this tutorial we recommend that you take a look at the Streamly serial streams tutorial.

Concurrent Streams

Many stream operations can be done concurrently:

  • Streams can be generated concurrently.
  • Streams can be merged concurrently.
  • Multiple stages in a streaming pipeline can run concurrently.
  • Streams can be mapped and zipped concurrently.
  • In monadic composition they combine like a list transformer, providing concurrent non-determinism.

There are three basic concurrent stream styles, Ahead, Async, and Parallel. The Ahead style streams are similar to Serial except that they can speculatively execute multiple stream actions concurrently in advance. Ahead would return exactly the same stream as Serial except that it may execute the actions concurrently. The Async style streams, like Ahead, speculatively execute multiple stream actions in advance but return the results in their finishing order rather than in the stream traversal order. Parallel is like Async except that it provides unbounded parallelism instead of controlled parallelism.

For easy reference, we can classify the stream types based on execution order, consumption order, and bounded or unbounded concurrency. Execution could be serial (i.e. synchronous) or asynchronous. In serial execution we execute the next action in the stream only after the previous one has finished executing. In asynchronous execution multiple actions in the stream can be executed asynchronously i.e. the next action can start executing even before the first one has finished. Consumption order determines the order in which the outputs generated by the composition are consumed. Consumption could be serial or asynchronous. In serial consumption, the outputs are consumed in the traversal order, in asynchronous consumption the outputs are consumed as they arrive i.e. first come first serve order.

TypeExecutionConsumptionConcurrency
SerialSerialSerialNone
AheadAsynchronousSerialbounded
AsyncAsynchronousAsynchronousbounded
ParallelAsynchronousAsynchronousunbounded

All these types can be freely inter-converted using type conversion combinators or type annotations, without any cost, to achieve the desired composition style. To force a particular type of composition, we coerce the stream type using the corresponding type adapting combinator from fromSerial, fromAhead, fromAsync, or fromParallel. The default stream type is inferred as Serial unless you change it by using one of the combinators or by using a type annotation.

Combining Streams

Streams can be combined using <> or mappend to form a composite. Composite streams can be interpreted in a depth first or breadth first manner using an appropriate type conversion before consumption. Deep (e.g. Serial) stream type variants traverse a composite stream in a depth first manner, such that each stream is traversed fully before traversing the next stream. Wide (e.g. WSerial) stream types traverse it in a breadth first manner, such that one element from each stream is traversed before coming back to the first stream again.

Each stream type has a wide traversal variant prefixed by W. The wide variant differs only in the Semigroup/Monoid, Applicative/Monad compositions of the streams. The following table summarizes the basic types and the corresponding wide variants:

+------------+-----------+
| Deep       | Wide      |
+============+===========+
| Serial     | WSerial   |
+------------+-----------+
| Ahead      | WAhead    |
+------------+-----------+
| Async      | WAsync    |
+------------+-----------+

Other than these types there are also ZipSerial and ZipAsync types that zip streams serially or concurrently using Applicative operation. These types are not monads they are only applicatives and they do not differ in Semigroup composition.

Imports and Supporting Code

In most of example snippets we do not repeat the imports. Where imports are not explicitly specified use the imports shown below.

>>> :m
>>> import Data.Function ((&))
>>> import Streamly.Prelude ((|:), (|&))
>>> import qualified Streamly.Prelude as Stream
>>> import qualified Streamly.Data.Fold as Fold

To illustrate concurrent vs serial composition aspects, we will use the following delay function to introduce a sleep or delay specified in seconds. After the delay it prints the number of seconds it slept.

>>> import Control.Concurrent (threadDelay, myThreadId)
>>> :{
  delay n = Stream.fromEffect $ do
     threadDelay (n * 1000000)
     tid <- myThreadId
     putStrLn (show tid ++ ": Delay " ++ show n)
:}

For concurrent examples, use line buffering, otherwise output from different threads may get mixed:

>>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
>>> hSetBuffering stdout LineBuffering

Generating Streams Concurrently

Monadic construction and generation functions like consM, unfoldrM, replicateM, repeatM, iterateM and fromFoldableM work concurrently when used with appropriate stream type combinator. The pure versions of these APIs are not concurrent, however you can use the monadic versions even for pure computations by wrapping the pure value in a monad to get the concurrent generation capability where required.

The following code finishes in 3 seconds (6 seconds when serial):

>>> let p n = threadDelay (n * 1000000) >> return n
>>> Stream.toList $ Stream.fromParallel $ p 3 |: p 2 |: p 1 |: Stream.nil
[1,2,3]
>>> Stream.toList $ Stream.fromAhead $ p 3 |: p 2 |: p 1 |: Stream.nil
[3,2,1]

The following finishes in 10 seconds (100 seconds when serial):

>>> Stream.drain $ Stream.fromAsync $ Stream.replicateM 10 $ p 10

Concurrent Pipeline Stages

The concurrent function application operators |$ and |& apply a stream argument to a stream function concurrently to compose a concurrent pipeline of stream processing functions:

Because both the stages run concurrently, we would see a delay of only 1 second instead of 2 seconds in the following:

>>> let p n = threadDelay (n * 1000000) >> return n
>>> :{
parApp =
       Stream.repeatM (p 1)
    |& Stream.mapM (\x -> p 1 >> print x)
     & Stream.drain
:}

Mapping Concurrently

Monadic transformation functions mapM and sequence work concurrently when used with appropriate stream type combinators. The pure versions do not work concurrently, however you can use the monadic versions even for pure computations to get the concurrent transformation capability where required.

This would print a value every second (2 seconds when serial):

>>> let p n = threadDelay (n * 1000000) >> return n
>>> :{
parMap =
      Stream.repeatM (p 1)
    & Stream.fromSerial   -- repeatM is serial
    & Stream.mapM (\x -> p 1 >> print x)
    & Stream.fromAhead    -- mapM is cocnurrent using Ahead style
    & Stream.drain
:}

Merging Streams

Semigroup Style

Deep Speculative Composition (Ahead)

The Semigroup operation <> of the Ahead type combines two streams in a serial depth first manner with concurrent lookahead. We use the fromAhead type combinator to effect Ahead style of composition. We can also use an explicit Ahead type annotation for the stream to achieve the same effect.

When two streams are combined in this manner, the streams are traversed in depth first manner just like Serial, however it can execute the next stream concurrently and keep the results ready when its turn arrives. Concurrent execution of the next stream(s) is performed if the first stream blocks or if it cannot produce output at the rate that is enough to meet the consumer demand. Multiple streams can be executed concurrently to meet the demand. The following example would print the result in a second even though each action in each stream takes one second:

>>> p n = threadDelay 1000000 >> return n
>>> stream1 = p 1 |: p 2 |: Stream.nil
>>> stream2 = p 3 |: p 4 |: Stream.nil
>>> Stream.toList $ Stream.fromAhead $ stream1 <> stream2
[1,2,3,4]

Each stream is constructed fromAhead and then both the streams are merged fromAhead, therefore, all the actions can run concurrently but the result is presented in serial order.

You can also use the polymorphic combinator ahead in place of <> to compose any type of streams in this manner.

Deep Asynchronous Composition (Async)

The Semigroup operation <> of the Async type combines the two streams in a depth first manner with parallel look ahead. We use the fromAsync type combinator to effect Async style of composition. We can also use the Async type annotation for the stream type to achieve the same effect.

When two streams with multiple elements are combined in this manner, the streams are traversed in depth first manner just like Serial, however it can execute the next stream concurrently and return the results from it as they arrive i.e. the results from the next stream may be yielded even before the results from the first stream. Concurrent execution of the next stream(s) is performed if the first stream blocks or if it cannot produce output at the rate that is enough to meet the consumer demand. Multiple streams can be executed concurrently to meet the demand. In the example below each element in the stream introduces a constant delay of 1 second, however, it takes just one second to produce all the results. The results are not guaranteed to be in any particular order:

>>> p n = threadDelay 1000000 >> return n
>>> stream1 = p 1 |: p 2 |: Stream.nil
>>> stream2 = p 3 |: p 4 |: Stream.nil
>>> Stream.toList $ Stream.fromAsync $ stream1 <> stream2
...

The constituent streams are also composed in Async manner and the composition of streams too. We can compose the constituent streams to run serially, in that case it would take 2 seconds to produce all the results. The elements in the serial streams would be in serial order in the results:

>>> p n = threadDelay 1000000 >> return n
>>> stream = (Stream.fromSerial stream1) <> (Stream.fromSerial stream2)
>>> Stream.toList $ Stream.fromAsync stream
...

In the following example we can see that new threads are started when a computation blocks. Notice that the output from the stream with the shortest delay is printed first. The whole computation takes maximum of (3, 2, 1) = 3 seconds:

>>> Stream.drain $ Stream.fromAsync $ delay 3 <> delay 2 <> delay 1
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3

When we have a tree of computations composed using this style, the tree is traversed in DFS style just like the Serial style, the only difference is that here we can move on to executing the next stream if a stream blocks. However, we will not start new threads if we have sufficient output to saturate the consumer. This is why we call it left-biased demand driven or adaptive concurrency style, the concurrency tends to stay on the left side of the composition as long as possible. More threads are started based on the pull rate of the consumer. The following example prints an output every second as all of the actions are concurrent.

>>> Stream.drain $ Stream.fromAsync $ (delay 1 <> delay 2) <> (delay 3 <> delay 4)
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3
ThreadId ...: Delay 4

All the computations may even run in a single thread when more threads are not needed. As you can see, in the following example the computations are run in a single thread one after another, because none of them blocks. However, if the thread consuming the stream were faster than the producer then it would have started parallel threads for each computation to keep up even if none of them blocks:

>>> :{
traced m = Stream.fromEffect (myThreadId >>= print) >> return m
stream = traced (sqrt 9) <> traced (sqrt 16) <> traced (sqrt 25)
main = Stream.drain $ Stream.fromAsync stream
:}

Note that the order of printing in the above examples may change due to variations in scheduling latencies for concurrent threads.

The polymorphic version of the Async binary operation <> is called async. We can use async to join streams in a left biased adaptively concurrent manner irrespective of the type, notice that we have not used the fromAsync combinator in the following example:

>>> Stream.drain $ delay 3 `Stream.async` delay 2 `Stream.async` delay 1
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3

Since the concurrency provided by this operator is demand driven it cannot be used when the composed computations start timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for any reason Parallel style must be used instead.

Async style utilizes resources optimally and should be preferred over Parallel or WAsync unless you really need those. Async should be used when we know that the computations can run in parallel but we do not care if they actually run in parallel or not, that decision can be left to the scheduler based on demand. Also, note that async operator can be used to fold infinite number of streams in contrast to the Parallel or WAsync styles, because it does not require us to run all of them at the same time in a fair manner.

Wide Asynchronous Composition (WAsync)

The Semigroup operation <> of the WAsync type combines two streams in a concurrent manner using breadth first traversal. We use the fromWAsync type combinator to effect WAsync style of composition. We can also use the WAsync type annotation for the stream to achieve the same effect.

When streams with multiple elements are combined in this manner, we traverse all the streams concurrently in a breadth first manner i.e. one action from each stream is performed and yielded to the resulting stream before we come back to the first stream again and so on. Even though we execute the actions in a breadth first order the outputs are consumed on a first come first serve basis.

In the following example we can see that outputs are produced in the breadth first traversal order but this is not guaranteed.

>>> stream1 = print 1 |: print 2 |: Stream.nil
>>> stream2 = print 3 |: print 4 |: Stream.nil
>>> Stream.drain $ Stream.fromWAsync $ stream1 <> stream2
1
3
2
4

The polymorphic version of the binary operation <> of the WAsync type is wAsync. We can use wAsync to join streams using a breadth first concurrent traversal irrespective of the type, notice that we have not used the fromWAsync combinator in the following example:

>>> Stream.drain $ delay 3 `Stream.wAsync` delay 2 `Stream.wAsync` delay 1
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3

Since the concurrency provided by this style is demand driven it may not be used when the composed computations start timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for any reason Parallel style must be used instead.

Parallel Asynchronous Composition (Parallel)

The Semigroup operation <> of the Parallel type combines the two streams in a fairly concurrent manner with round robin scheduling. We use the fromParallel type combinator to effect Parallel style of composition. We can also use the Parallel type annotation for the stream type to achieve the same effect.

When two streams with multiple elements are combined in this manner, the monadic actions in both the streams are performed concurrently with a fair round robin scheduling. The outputs are yielded in the order in which the actions complete. This is pretty similar to the WAsync type, the difference is that WAsync is adaptive to the consumer demand and may or may not execute all actions in parallel depending on the demand, whereas Parallel runs all the streams in parallel irrespective of the demand.

The polymorphic version of the binary operation <> of the Parallel type is parallel. We can use parallel to join streams in a fairly concurrent manner irrespective of the type, notice that we have not used the fromParallel combinator in the following example:

>>> Stream.drain $ delay 3 `Stream.parallel` delay 2 `Stream.wAsync` delay 1
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3

Note that this style of composition cannot be used to combine infinite number of streams, as it will lead to an infinite sized scheduling queue.

Monoid Style

All of the following are equivalent and start ten concurrent tasks each with a delay from 1 to 10 seconds, resulting in the printing of each number every second:

>>> :{
main = do
 Stream.drain $ Stream.fromAsync $ foldMap delay [1..10]
 Stream.drain $ Stream.concatFoldableWith Stream.async (map delay [1..10])
 Stream.drain $ Stream.concatMapFoldableWith Stream.async delay [1..10]
 Stream.drain $ Stream.concatForFoldableWith Stream.async [1..10] delay
:}

Nesting Streams

Monad

Deep Speculative Nesting (Ahead)

The Monad composition of Ahead type behaves just like Serial except that it can speculatively perform a bounded number of next iterations of a loop concurrently.

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromFoldable [3,2,1]
    delay x
    return x
:}
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3
[3,2,1]

This code finishes in 3 seconds, Serial would take 6 seconds. As we can see all the three iterations are concurrent and run in different threads, however, the results are returned in the serial order.

Concurrency is demand driven, when multiple streams are composed using this style, the iterations are executed in a depth first manner just like Serial i.e. nested iterations are executed before we proceed to the next outer iteration. The only difference is that we may execute multiple future iterations concurrently and keep the results ready.

The fromAhead type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Ahead.

Deep Asynchronous Nesting (Async)

The Monad composition of Async type can perform the iterations of a loop concurrently.

>>> :{
Stream.drain $ Stream.fromAsync $ do
     x <- Stream.fromFoldable [3,2,1]
     delay x
:}
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3

As we can see the code after the fromFoldable statement is run three times, once for each value of x. All the three iterations are concurrent and run in different threads. The iteration with least delay finishes first. When compared to imperative programming, this can be viewed as a for loop with three concurrent iterations.

Concurrency is demand driven i.e. more concurrent iterations are started only if the previous iterations are not able to saturate the consumer of the output stream. This works exactly the same way as the merging of two streams using async works.

The fromAsync type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Async.

When multiple streams are nested using this style, the iterations are concurrently evaluated in a depth first manner:

>>> :{
Stream.drain $ Stream.fromAsync $ do
    x <- Stream.fromFoldable [1,2]
    y <- Stream.fromFoldable [3,4]
    Stream.fromEffect $ putStrLn $ show (x, y)
:}
(1,3)
(1,4)
(2,3)
(2,4)

Nested iterations are given preference for concurrent evaluation i.e. (1,4) will be scheduled in preference to (2,3).

Wide Asynchronous Nesting (WAsync)

Like Async, the Monad composition of WAsync runs the iterations of a loop concurrently. It differs from Async in the nested loop behavior. Like WSerial, the nested loops in this type are traversed and executed in a breadth first manner rather than the depth first manner of Async style.

>>> :{
Stream.drain $ Stream.fromWAsync $ do
    x <- Stream.fromSerial $ Stream.fromFoldable [1,2]
    y <- Stream.fromSerial $ Stream.fromFoldable [3,4]
    Stream.fromEffect $ putStrLn $ show (x, y)
:}
(1,3)
(1,4)
(2,3)
(2,4)

Note that (2,3) is preferred to (1,4) when evaluating the iterations concurrently. This works exactly the same way as the merging of two streams using wAsync works.

The fromWAsync type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as WAsync.

Parallel Asynchronous Nesting (Parallel)

Just like Async or WAsync the Monad composition of Parallel runs the iterations of a loop concurrently.

>>> :{
Stream.drain $ Stream.fromParallel $ do
   x <- Stream.fromFoldable [3,2,1]
   delay x
:}
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3

It differs from Async and WAsync in the nested loop behavior. All iterations of the loop are run fully concurrently irrespective of the demand. This works exactly the same way as the merging of streams using parallel works.

The fromParallel type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Parallel.

Applicative

Async can run the iterations concurrently, therefore, it takes a total of 6 seconds which is max (1, 2) + max (3, 4):

>>> (Stream.toList $ Stream.fromAsync $ (,) <$> s1 <*> s2) >>= print
...
ThreadId 34: Delay 1
ThreadId 36: Delay 2
ThreadId 35: Delay 3
ThreadId 36: Delay 3
ThreadId 35: Delay 4
ThreadId 36: Delay 4
[(1,3),(2,3),(1,4),(2,4)]

Similarly, WAsync as well can run the iterations concurrently, but with a different style of scheduling than Async as explained in the Monad section, therefore, it too takes a total of 6 seconds (2 + 4):

>>> (Stream.toList $ Stream.fromWAsync $ (,) <$> s1 <*> s2) >>= print
...
ThreadId 34: Delay 1
ThreadId 36: Delay 2
ThreadId 35: Delay 3
ThreadId 36: Delay 3
ThreadId 35: Delay 4
ThreadId 36: Delay 4
[(1,3),(2,3),(1,4),(2,4)]

Zipping Streams

Parallel Zipping

The applicative instance of ZipAsync type zips streams concurrently. fromZipAsync type combinator can be used to switch to parallel applicative zip composition:

This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 are produced concurrently, and 2 and 4 are produced concurrently:

>>> d n = delay n >> return n
>>> s1 = Stream.fromSerial $ d 1 <> d 2
>>> s2 = Stream.fromSerial $ d 3 <> d 4
>>> (Stream.toList $ Stream.fromZipAsync $ (,) <$> s1 <*> s2) >>= print
ThreadId ...: Delay 1
ThreadId ...: Delay 2
ThreadId ...: Delay 3
ThreadId ...: Delay 4
[(1,3),(2,4)]

Concurrent Programming

When writing concurrent programs there are two distinct places where the programmer can control the concurrency. First, when composing a stream by merging multiple streams we can choose an appropriate sum style operators to combine them concurrently or serially. Second, when processing a stream in a monadic composition we can choose one of the monad composition types to choose the desired type of concurrency.

In the following example the squares of x and y are computed concurrently using the async operation and the square roots of their sum are computed serially because of the fromSerial combinator. We can choose different combinators for the monadic processing and the stream generation, to control the concurrency. We can also use the fromAsync combinator instead of explicitly folding with async.

>>> import Data.List (sum)
>>> :{
main = do
    z <-   Stream.toList
         $ Stream.fromSerial     -- Serial monadic processing (sqrt below)
         $ do
             x2 <- Stream.concatForFoldableWith Stream.async [1..100] $ -- Concurrent @"for"@ loop
                         \x -> return $ x * x  -- body of the loop
             y2 <- Stream.concatForFoldableWith Stream.async [1..100] $
                         \y -> return $ y * y
             return $ sqrt (x2 + y2)
    print $ sum z
:}

We can see how this directly maps to the imperative style OpenMP model, we use combinators and operators instead of the ugly pragmas.

For more concurrent programming examples see, https://github.com/composewell/streamly-examples.

Writing Concurrent Programs

When writing concurrent programs it is advised to not use the concurrent style stream combinators blindly at the top level. That might create too much concurrency where it is not even required, and can even degrade performance in some cases. In some cases it can also lead to surprising behavior because of some code that is supposed to be serial becoming concurrent. Please be aware that all concurrency capable APIs that you may have used under the scope of a concurrent stream combinator will become concurrent. For example if you have a repeatM somewhere in your program and you use fromParallel on top, the repeatM becomes fully parallel, resulting into an infinite parallel execution . Instead, use the Keep It Serial and Stupid principle, start with the default serial composition and enable concurrent combinators only when and where necessary. When you use a concurrent combinator you can use an explicit fromSerial combinator to suppress any unnecessary concurrency under the scope of that combinator.

Where to go next?