To run examples in this module:

>>> import qualified Streamly.Prelude as Stream
>>> import Control.Concurrent (threadDelay)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int
:}

Documentation

data AheadT m a Source #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: 0.8.0

Instances
Instances details
MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0 (Streamly)

Since: 0.8.0

fromAhead :: IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0 (Streamly)

Since: 0.8.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

>>> import Streamly.Prelude (ahead, SerialT)
>>> stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>> stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>> Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec
4 sec
[4,2]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec
2 sec
4 sec
[4,2,1]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec
1 sec
4 sec
[4,2,1]

Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using ahead.

ahead can be safely used to fold an infinite lazy container of streams.

Since: 0.3.0 (Streamly)

Since: 0.8.0

streamly-0.8.0Streamly.Internal.Data.Stream.Ahead