{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Serial
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
--
module Streamly.Internal.Data.Stream.Serial {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream\" from streamly-core package instead." #-}
    (
    -- * Serial appending stream
      SerialT(..)
    , toStreamK
    , fromStreamK
    , Serial
    , serial

    -- * Serial interleaving stream
    , WSerialT(..)
    , WSerial
    , wSerial
    , wSerialFst
    , wSerialMin
    , consMWSerial

    -- * Construction
    , cons
    , consM
    , repeat
    , unfoldrM
    , fromList

    -- * Elimination
    , toList

    -- * Transformation
    , map
    , mapM
    )
where

#if !(MIN_VERSION_base(4,18,0))
import Control.Applicative (liftA2)
#endif
import Control.DeepSeq (NFData(..), NFData1(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
import GHC.Exts (IsList(..), IsString(..), oneShot)
import Text.Read
       ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
       , readListPrecDefault)
import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.StreamK (Stream)

import qualified Streamly.Internal.Data.Stream.Common as P
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K

import Prelude hiding (map, mapM, repeat, filter)

#include "Instances.hs"
#include "inline.hs"

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import qualified Streamly.Prelude as IsStream

{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.Stream m a -> K.Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
    (forall r.
 State StreamK m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State StreamK m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
            yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a ((r -> r) -> Stream m a -> Stream m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
        in State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m

------------------------------------------------------------------------------
-- SerialT
------------------------------------------------------------------------------

-- | For 'SerialT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.serial'                       -- 'Semigroup'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.serial' -- 'Monad'
-- @
--
-- A single 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- IsStream.toList $ do
--      x <- IsStream.fromList [1,2] -- foreach x in stream
--      return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like nested @for@ loops:
--
-- >>> :{
-- IsStream.toList $ do
--     x <- IsStream.fromList [1,2] -- foreach x in stream
--     y <- IsStream.fromList [3,4] -- foreach y in stream
--     return (x, y)
-- :}
-- [(1,3),(1,4),(2,3),(2,4)]
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype SerialT m a = SerialT {forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT :: Stream m a}
    -- XXX when deriving do we inherit an INLINE?
    deriving (NonEmpty (SerialT m a) -> SerialT m a
SerialT m a -> SerialT m a -> SerialT m a
(SerialT m a -> SerialT m a -> SerialT m a)
-> (NonEmpty (SerialT m a) -> SerialT m a)
-> (forall b. Integral b => b -> SerialT m a -> SerialT m a)
-> Semigroup (SerialT m a)
forall b. Integral b => b -> SerialT m a -> SerialT m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (SerialT m a) -> SerialT m a
forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a b.
Integral b =>
b -> SerialT m a -> SerialT m a
$c<> :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
<> :: SerialT m a -> SerialT m a -> SerialT m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (SerialT m a) -> SerialT m a
sconcat :: NonEmpty (SerialT m a) -> SerialT m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> SerialT m a -> SerialT m a
stimes :: forall b. Integral b => b -> SerialT m a -> SerialT m a
Semigroup, Semigroup (SerialT m a)
SerialT m a
Semigroup (SerialT m a)
-> SerialT m a
-> (SerialT m a -> SerialT m a -> SerialT m a)
-> ([SerialT m a] -> SerialT m a)
-> Monoid (SerialT m a)
[SerialT m a] -> SerialT m a
SerialT m a -> SerialT m a -> SerialT m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (SerialT m a)
forall (m :: * -> *) a. SerialT m a
forall (m :: * -> *) a. [SerialT m a] -> SerialT m a
forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
$cmempty :: forall (m :: * -> *) a. SerialT m a
mempty :: SerialT m a
$cmappend :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
mappend :: SerialT m a -> SerialT m a -> SerialT m a
$cmconcat :: forall (m :: * -> *) a. [SerialT m a] -> SerialT m a
mconcat :: [SerialT m a] -> SerialT m a
Monoid)

-- | A serial IO stream of elements of type @a@. See 'SerialT' documentation
-- for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type Serial = SerialT IO

toStreamK :: SerialT m a -> Stream m a
toStreamK :: forall (m :: * -> *) a. SerialT m a -> Stream m a
toStreamK = SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT

fromStreamK :: Stream m a -> SerialT m a
fromStreamK :: forall (m :: * -> *) a. Stream m a -> SerialT m a
fromStreamK = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT

------------------------------------------------------------------------------
-- Generation
------------------------------------------------------------------------------

infixr 5 `cons`

{-# INLINE cons #-}
cons :: a -> SerialT m a -> SerialT m a
cons :: forall a (m :: * -> *). a -> SerialT m a -> SerialT m a
cons a
x (SerialT Stream m a
ms) = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
x Stream m a
ms

infixr 5 `consM`

{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> SerialT IO a -> SerialT IO a #-}
consM :: Monad m => m a -> SerialT m a -> SerialT m a
consM :: forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
consM m a
m (SerialT Stream m a
ms) = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
ms

-- |
-- Generate an infinite stream by repeating a pure value.
--
{-# INLINE_NORMAL repeat #-}
repeat :: Monad m => a -> SerialT m a
repeat :: forall (m :: * -> *) a. Monad m => a -> SerialT m a
repeat = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a)
-> (a -> Stream m a) -> a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (Stream m a -> Stream m a) -> (a -> Stream m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a
D.repeat

------------------------------------------------------------------------------
-- Combining
------------------------------------------------------------------------------

{-# INLINE serial #-}
serial :: SerialT m a -> SerialT m a -> SerialT m a
serial :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
serial = SerialT m a -> SerialT m a -> SerialT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (SerialT m) where
    return :: forall a. a -> SerialT m a
return = a -> SerialT m a
forall a. a -> SerialT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    -- Benchmarks better with StreamD bind and pure:
    -- toList, filterAllout, *>, *<, >> (~2x)
    --
    -- pure = SerialT . D.fromStreamD . D.fromPure
    -- m >>= f = D.fromStreamD $ D.concatMap (D.toStreamD . f) (D.toStreamD m)

    -- Benchmarks better with CPS bind and pure:
    -- Prime sieve (25x)
    -- n binds, breakAfterSome, filterAllIn, state transformer (~2x)
    --
    {-# INLINE (>>=) #-}
    >>= :: forall a b. SerialT m a -> (a -> SerialT m b) -> SerialT m b
(>>=) (SerialT Stream m a
m) a -> SerialT m b
f = Stream m b -> SerialT m b
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append Stream m a
m (SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m b -> Stream m b)
-> (a -> SerialT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m b
f)

    {-# INLINE (>>) #-}
    >> :: forall a b. SerialT m a -> SerialT m b -> SerialT m b
(>>) = SerialT m a -> SerialT m b -> SerialT m b
forall a b. SerialT m a -> SerialT m b -> SerialT m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

instance MonadTrans SerialT where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> SerialT m a
lift = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
mapM a -> m b
f (SerialT Stream m a
m) = Stream m b -> SerialT m b
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (Stream m b -> Stream m b) -> Stream m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
D.mapM a -> m b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m a
m

-- |
-- @
-- map = fmap
-- @
--
-- Same as 'fmap'.
--
-- @
-- > S.toList $ S.map (+1) $ S.fromList [1,2,3]
-- [2,3,4]
-- @
--
-- @since 0.4.0
{-# INLINE map #-}
map :: Monad m => (a -> b) -> SerialT m a -> SerialT m b
map :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> SerialT m a -> SerialT m b
map a -> b
f = (a -> m b) -> SerialT m a -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
mapM (b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (a -> b) -> a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

{-# INLINE apSerial #-}
apSerial :: Monad m => SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial :: forall (m :: * -> *) a b.
Monad m =>
SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial (SerialT Stream m (a -> b)
m1) (SerialT Stream m a
m2) =
    Stream m b -> SerialT m b
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (Stream m b -> Stream m b) -> Stream m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m (a -> b) -> Stream m (a -> b)
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m (a -> b)
m1 Stream m (a -> b) -> Stream m a -> Stream m b
forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
`D.crossApply` Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m a
m2

{-# INLINE apSequence #-}
apSequence :: Monad m => SerialT m a -> SerialT m b -> SerialT m b
apSequence :: forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m b
apSequence (SerialT Stream m a
m1) (SerialT Stream m b
m2) =
    Stream m b -> SerialT m b
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (Stream m b -> Stream m b) -> Stream m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m a
m1 Stream m a -> Stream m b -> Stream m b
forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f b
`D.crossApplySnd` Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2

{-# INLINE apDiscardSnd #-}
apDiscardSnd :: Monad m => SerialT m a -> SerialT m b -> SerialT m a
apDiscardSnd :: forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m a
apDiscardSnd (SerialT Stream m a
m1) (SerialT Stream m b
m2) =
    Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m a
m1 Stream m a -> Stream m b -> Stream m a
forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f a
`D.crossApplyFst` Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2

-- Note: we need to define all the typeclass operations because we want to
-- INLINE them.
instance Monad m => Applicative (SerialT m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> SerialT m a
pure = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a)
-> (a -> Stream m a) -> a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure

    {-# INLINE (<*>) #-}
    <*> :: forall a b. SerialT m (a -> b) -> SerialT m a -> SerialT m b
(<*>) = SerialT m (a -> b) -> SerialT m a -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial
    -- (<*>) = K.apSerial

    {-# INLINE liftA2 #-}
    liftA2 :: forall a b c.
(a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c
liftA2 a -> b -> c
f SerialT m a
x = SerialT m (b -> c) -> SerialT m b -> SerialT m c
forall a b. SerialT m (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) ((a -> b -> c) -> SerialT m a -> SerialT m (b -> c)
forall a b. (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f SerialT m a
x)

    {-# INLINE (*>) #-}
    *> :: forall a b. SerialT m a -> SerialT m b -> SerialT m b
(*>)  = SerialT m a -> SerialT m b -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m b
apSequence
    -- (*>)  = K.apSerialDiscardFst

    {-# INLINE (<*) #-}
    <* :: forall a b. SerialT m a -> SerialT m b -> SerialT m a
(<*) = SerialT m a -> SerialT m b -> SerialT m a
forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m a
apDiscardSnd
    -- (<*)  = K.apSerialDiscardSnd

MONAD_COMMON_INSTANCES(SerialT,)
LIST_INSTANCES(SerialT)
NFDATA1_INSTANCE(SerialT)
FOLDABLE_INSTANCE(SerialT)
TRAVERSABLE_INSTANCE(SerialT)

------------------------------------------------------------------------------
-- WSerialT
------------------------------------------------------------------------------

-- | For 'WSerialT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.wSerial'                       -- 'Semigroup'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.wSerial' -- 'Monad'
-- @
--
-- Note that '<>' is associative only if we disregard the ordering of elements
-- in the resulting stream.
--
-- A single 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- IsStream.toList $ IsStream.fromWSerial $ do
--      x <- IsStream.fromList [1,2] -- foreach x in stream
--      return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like interleaved nested @for@ loops:
--
-- >>> :{
-- IsStream.toList $ IsStream.fromWSerial $ do
--     x <- IsStream.fromList [1,2] -- foreach x in stream
--     y <- IsStream.fromList [3,4] -- foreach y in stream
--     return (x, y)
-- :}
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- It is a result of interleaving all the nested iterations corresponding to
-- element @1@ in the first stream with all the nested iterations of element
-- @2@:
--
-- >>> import Streamly.Prelude (wSerial)
-- >>> IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)]
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- The @W@ in the name stands for @wide@ or breadth wise scheduling in
-- contrast to the depth wise scheduling behavior of 'SerialT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype WSerialT m a = WSerialT {forall (m :: * -> *) a. WSerialT m a -> Stream m a
getWSerialT :: Stream m a}

instance MonadTrans WSerialT where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> WSerialT m a
lift = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a)
-> (m a -> Stream m a) -> m a -> WSerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect

-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT'
-- documentation for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type WSerial = WSerialT IO

{-# INLINE consMWSerial #-}
{-# SPECIALIZE consMWSerial :: IO a -> WSerialT IO a -> WSerialT IO a #-}
consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a
consMWSerial :: forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
consMWSerial m a
m (WSerialT Stream m a
ms) = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a) -> Stream m a -> WSerialT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
ms

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

infixr 6 `wSerial`

-- | Interleaves two streams, yielding one element from each stream
-- alternately.  When one stream stops the rest of the other stream is used in
-- the output stream.
--
-- This gives exponential priority to earlier streams than the ones joining
-- later. Because of exponential weighting it can be used with 'concatMapWith'.
--
-- /Not fused/

-- NOTE:
--
-- Note that evaluation of @a \`wSerial` b \`wSerial` c@ does not interleave
-- @a@, @b@ and @c@ with equal priority.  This expression is equivalent to @a
-- \`wSerial` (b \`wSerial` c)@, therefore, it fairly interleaves @a@ with the
-- result of @b \`wSerial` c@.  For example, @Stream.fromList [1,2] \`wSerial`
-- Stream.fromList [3,4] \`wSerial` Stream.fromList [5,6]@ would result in
-- [1,3,2,5,4,6].  In other words, the leftmost stream gets the same scheduling
-- priority as the rest of the streams taken together. The same is true for
-- each subexpression on the right.
--
{-# INLINE wSerial #-}
wSerial :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerial :: forall (m :: * -> *) a.
WSerialT m a -> WSerialT m a -> WSerialT m a
wSerial (WSerialT Stream m a
m1) (WSerialT Stream m a
m2) = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a) -> Stream m a -> WSerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.interleave Stream m a
m1 Stream m a
m2

{-# INLINE wSerialFst #-}
wSerialFst :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialFst :: forall (m :: * -> *) a.
WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialFst (WSerialT Stream m a
m1) (WSerialT Stream m a
m2) = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a) -> Stream m a -> WSerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.interleaveFst Stream m a
m1 Stream m a
m2

{-# INLINE wSerialMin #-}
wSerialMin :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialMin :: forall (m :: * -> *) a.
WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialMin (WSerialT Stream m a
m1) (WSerialT Stream m a
m2) = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a) -> Stream m a -> WSerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.interleaveMin Stream m a
m1 Stream m a
m2

instance Semigroup (WSerialT m a) where
    <> :: WSerialT m a -> WSerialT m a -> WSerialT m a
(<>) = WSerialT m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
WSerialT m a -> WSerialT m a -> WSerialT m a
wSerial

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance Monoid (WSerialT m a) where
    mempty :: WSerialT m a
mempty = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
    mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a
mappend = WSerialT m a -> WSerialT m a -> WSerialT m a
forall a. Semigroup a => a -> a -> a
(<>)

{-# INLINE apWSerial #-}
apWSerial :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial :: forall (m :: * -> *) a b.
WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial (WSerialT Stream m (a -> b)
m1) (WSerialT Stream m a
m2) =
    let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.interleave (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> WSerialT m b
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m b -> WSerialT m b) -> Stream m b -> WSerialT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.interleave (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1

instance Monad m => Applicative (WSerialT m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> WSerialT m a
pure = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a)
-> (a -> Stream m a) -> a -> WSerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
    {-# INLINE (<*>) #-}
    <*> :: forall a b. WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
(<*>) = WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
forall (m :: * -> *) a b.
WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (WSerialT m) where
    return :: forall a. a -> WSerialT m a
return = a -> WSerialT m a
forall a. a -> WSerialT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: forall a b. WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b
(>>=) (WSerialT Stream m a
m) a -> WSerialT m b
f = Stream m b -> WSerialT m b
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m b -> WSerialT m b) -> Stream m b -> WSerialT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.interleave Stream m a
m (WSerialT m b -> Stream m b
forall (m :: * -> *) a. WSerialT m a -> Stream m a
getWSerialT (WSerialT m b -> Stream m b)
-> (a -> WSerialT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> WSerialT m b
f)

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m) => MonadBase b (SerialT m) where
    liftBase :: forall α. b α -> SerialT m α
liftBase = b α -> SerialT m α
forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif

MONAD_COMMON_INSTANCES(WSerialT,)
LIST_INSTANCES(WSerialT)
NFDATA1_INSTANCE(WSerialT)
FOLDABLE_INSTANCE(WSerialT)
TRAVERSABLE_INSTANCE(WSerialT)

------------------------------------------------------------------------------
-- Construction
------------------------------------------------------------------------------

-- | Build a stream by unfolding a /monadic/ step function starting from a
-- seed.  The step function returns the next element in the stream and the next
-- seed value. When it is done it returns 'Nothing' and the stream ends. For
-- example,
--
-- @
-- let f b =
--         if b > 3
--         then return Nothing
--         else print b >> return (Just (b, b + 1))
-- in drain $ unfoldrM f 0
-- @
-- @
--  0
--  1
--  2
--  3
-- @
--
-- /Pre-release/
--
{-# INLINE unfoldrM #-}
unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrM :: forall (m :: * -> *) b a.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrM b -> m (Maybe (a, b))
step b
seed = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK ((b -> m (Maybe (a, b))) -> b -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
(s -> m (Maybe (a, s))) -> s -> Stream m a
D.unfoldrM b -> m (Maybe (a, b))
step b
seed)