{-# OPTIONS_GHC -Wno-deprecations #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Streamly.Internal.Data.Stream.IsStream.Common {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
(
fromPure
, fromEffect
, repeatM
, timesWith
, absTimesWith
, relTimesWith
, foldContinue
, fold
, map
, scanlMAfter'
, postscanlMAfter'
, postscanlM'
, smapM
, foldManyPost
, take
, takeWhile
, takeEndBy
, drop
, findIndices
, intersperseM
, interjectSuffix
, reverse
, reverse'
, mkAsync
, mkParallel
, parallelFst
, concatM
, concatMapM
, concatMap
, splitOnSeq
, zipWithM
, zipWith
, yield
, yieldM
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Array (Array)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators (maxYields)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import Streamly.Internal.System.IO (defaultChunkSize)
import Streamly.Data.MutByteArray (Unbox)
import qualified Streamly.Internal.Data.Array as A
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.StreamK as K
(fromPure, fromEffect, repeatMWith, reverse)
import qualified Streamly.Internal.Data.Stream as D
(repeatM, timesWith, foldAddLazy, map, scanlMAfter', postscanlMAfter'
, postscanlM', take, takeWhile, takeEndBy, drop, findIndices
, fromStreamK, toStreamK, concatMapM, concatMap, foldManyPost, splitOnSeq
, zipWithM, zipWith, intersperseM, reverse, fold)
import Prelude hiding (take, takeWhile, drop, reverse, concatMap, map, zipWith)
{-# INLINE fromPure #-}
fromPure :: IsStream t => a -> t m a
fromPure :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (a -> StreamK m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# DEPRECATED yield "Please use fromPure instead." #-}
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield = a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure
{-# INLINE fromEffect #-}
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (m a -> StreamK m a) -> m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
{-# DEPRECATED yieldM "Please use fromEffect instead." #-}
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
yieldM = m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM = (m a -> t m a -> t m a) -> m a -> t m a
forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
(m a -> t m a -> t m a) -> m a -> t m a
K.repeatMWith m a -> t m a -> t m a
forall (m :: * -> *) a. MonadAsync m => m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
IsStream.consM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial :: forall (m :: * -> *) a. MonadAsync m => m a -> SerialT m a
repeatMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
D.repeatM
{-# INLINE timesWith #-}
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
timesWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith Double
g = Stream m (AbsTime, RelTime64) -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (AbsTime, RelTime64) -> t m (AbsTime, RelTime64))
-> Stream m (AbsTime, RelTime64) -> t m (AbsTime, RelTime64)
forall a b. (a -> b) -> a -> b
$ Double -> Stream m (AbsTime, RelTime64)
forall (m :: * -> *).
MonadIO m =>
Double -> Stream m (AbsTime, RelTime64)
D.timesWith Double
g
{-# INLINE absTimesWith #-}
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
absTimesWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith = ((AbsTime, RelTime64) -> AbsTime)
-> t m (AbsTime, RelTime64) -> t m AbsTime
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((AbsTime -> RelTime64 -> AbsTime)
-> (AbsTime, RelTime64) -> AbsTime
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry AbsTime -> RelTime64 -> AbsTime
addToAbsTime64) (t m (AbsTime, RelTime64) -> t m AbsTime)
-> (Double -> t m (AbsTime, RelTime64)) -> Double -> t m AbsTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith
{-# INLINE relTimesWith #-}
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m RelTime64
relTimesWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m RelTime64
relTimesWith = ((AbsTime, RelTime64) -> RelTime64)
-> t m (AbsTime, RelTime64) -> t m RelTime64
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AbsTime, RelTime64) -> RelTime64
forall a b. (a, b) -> b
snd (t m (AbsTime, RelTime64) -> t m RelTime64)
-> (Double -> t m (AbsTime, RelTime64)) -> Double -> t m RelTime64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith
{-# INLINE foldContinue #-}
foldContinue :: Monad m => Fold m a b -> SerialT m a -> Fold m a b
foldContinue :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> Fold m a b
foldContinue Fold m a b
f SerialT m a
s = Fold m a b -> Stream m a -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Fold m a b
D.foldAddLazy Fold m a b
f (Stream m a -> Fold m a b) -> Stream m a -> Fold m a b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD SerialT m a
s
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> SerialT m a -> m b
fold :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
fl SerialT m a
strm = Fold m a b -> Stream m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a b
fl (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD SerialT m a
strm
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map a -> b
f = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map a -> b
f (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE scanlMAfter' #-}
scanlMAfter' :: (IsStream t, Monad m)
=> (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' b -> a -> m b
step m b
initial b -> m b
done t m a
stream =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
D.scanlMAfter' b -> a -> m b
step m b
initial b -> m b
done (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
stream
{-# INLINE postscanlMAfter' #-}
postscanlMAfter' :: (IsStream t, Monad m)
=> (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
postscanlMAfter' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
postscanlMAfter' b -> a -> m b
step m b
initial b -> m b
done t m a
stream =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
D.postscanlMAfter' b -> a -> m b
step m b
initial b -> m b
done (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
stream
{-# INLINE postscanlM' #-}
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' b -> a -> m b
step m b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.postscanlM' b -> a -> m b
step m b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE smapM #-}
smapM :: (IsStream t, Monad m) =>
(s -> a -> m (s, b))
-> m s
-> t m a
-> t m b
smapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a b.
(IsStream t, Monad m) =>
(s -> a -> m (s, b)) -> m s -> t m a -> t m b
smapM s -> a -> m (s, b)
step m s
initial t m a
stream =
let r :: t m (s, b)
r = ((s, b) -> a -> m (s, b)) -> m (s, b) -> t m a -> t m (s, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
postscanlM'
(\(s
s, b
_) a
a -> s -> a -> m (s, b)
step s
s a
a)
((s -> (s, b)) -> m s -> m (s, b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,b
forall a. HasCallStack => a
undefined) m s
initial)
t m a
stream
in ((s, b) -> b) -> t m (s, b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map (s, b) -> b
forall a b. (a, b) -> b
snd t m (s, b)
r
{-# INLINE take #-}
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
take :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m a
take Int
n t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Applicative m =>
Int -> Stream m a -> Stream m a
D.take Int
n (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
(Maybe Int64 -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Int64 -> t m a -> t m a
maxYields (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)) t m a
m)
{-# INLINE takeWhile #-}
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeWhile :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
takeWhile a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.takeWhile a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE takeEndBy #-}
takeEndBy :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeEndBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
takeEndBy a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.takeEndBy a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE drop #-}
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
drop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m a
drop Int
n t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
D.drop Int
n (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE findIndices #-}
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
findIndices :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices a -> Bool
p t m a
m = Stream m Int -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m Int -> t m Int) -> Stream m Int -> t m Int
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m Int
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
D.findIndices a -> Bool
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE intersperseM #-}
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseM m a
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interjectSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n m a
f t m a
xs = t m a
xs t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallelFst` m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m a
timed
where timed :: m a
timed = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)) m () -> m a -> m a
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
f
{-# INLINE reverse #-}
reverse :: (IsStream t, Monad m) => t m a -> t m a
reverse :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse t m a
s = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.reverse (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
s
{-# INLINE reverse' #-}
reverse' :: (IsStream t, MonadIO m, Unbox a) => t m a -> t m a
reverse' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Unbox a) =>
t m a -> t m a
reverse' =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
(Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m a
forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
A.concatRev
(Stream m (Array a) -> Stream m a)
-> (t m a -> Stream m (Array a)) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamK m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK
(StreamK m (Array a) -> Stream m (Array a))
-> (t m a -> StreamK m (Array a)) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamK m (Array a) -> StreamK m (Array a)
forall (m :: * -> *) a. StreamK m a -> StreamK m a
K.reverse
(StreamK m (Array a) -> StreamK m (Array a))
-> (t m a -> StreamK m (Array a)) -> t m a -> StreamK m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> StreamK m (Array a)
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK
(Stream m (Array a) -> StreamK m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> StreamK m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.chunksOf Int
defaultChunkSize
(Stream m a -> Stream m (Array a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_NORMAL mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
mkAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkAsync = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Async.mkAsyncD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
mkParallel :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkParallel = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Par.mkParallelD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE parallelFst #-}
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelFst :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallelFst t m a
m1 t m a
m2 = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> StreamK m a -> t m a
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
Par.parallelFstK (t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m1) (t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m2)
{-# INLINE concatMapM #-}
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
concatMapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
concatMapM a -> m (t m b)
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> m (Stream m b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
D.concatMapM ((t m b -> Stream m b) -> m (t m b) -> m (Stream m b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (m (t m b) -> m (Stream m b))
-> (a -> m (t m b)) -> a -> m (Stream m b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m (t m b)
f) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatMap #-}
concatMap ::(IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
concatMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap a -> t m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
D.concatMap (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (t m b -> Stream m b) -> (a -> t m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
f) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatM #-}
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
concatM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM m (t m a)
generator = (() -> m (t m a)) -> t m () -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
concatMapM (\() -> m (t m a)
generator) (() -> t m ()
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure ())
{-# INLINE foldManyPost #-}
foldManyPost
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldManyPost :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldManyPost Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE splitOnSeq #-}
splitOnSeq
:: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSeq Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
m)
{-# INLINE zipWithM #-}
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM a -> b -> m c
f t m a
m1 t m b
m2 =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
(Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
D.zipWithM a -> b -> m c
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m b
m2)
{-# INLINE zipWith #-}
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith a -> b -> c
f t m a
m1 t m b
m2 =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
(Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
D.zipWith a -> b -> c
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m b
m2)