{-# OPTIONS_GHC -Wno-deprecations #-}
module Streamly.Internal.Data.Stream.IsStream.Transform {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
(
transform
, foldrS
, foldrSShared
, foldrT
, map
, sequence
, mapM
, smapM
, trace
, trace_
, tap
, tapOffsetEvery
, tapAsync
, tapAsyncK
, distributeAsync_
, pollCounts
, scan
, scanMany
, postscan
, scanl'
, scanlM'
, scanlMAfter'
, postscanlMAfter'
, postscanl'
, postscanlM'
, prescanl'
, prescanlM'
, scanl1'
, scanl1M'
, with
, deleteBy
, filter
, filterM
, uniq
, uniqBy
, nubBy
, prune
, repeated
, take
, takeLast
, takeLastInterval
, takeWhile
, takeWhileM
, takeWhileLast
, takeWhileAround
, drop
, dropLast
, dropLastInterval
, dropWhile
, dropWhileM
, dropWhileLast
, dropWhileAround
, intersperse
, intersperseM
, intersperseMWith
, intersperseMSuffix
, intersperseMSuffixWith
, interjectSuffix
, intersperseM_
, delay
, intersperseMSuffix_
, delayPost
, intersperseMPrefix_
, delayPre
, insertBy
, reverse
, reverse'
, reassembleBy
, indexed
, indexedR
, timestamped
, timestampWith
, timeIndexed
, timeIndexWith
, findIndices
, elemIndices
, rollingMapM
, rollingMap
, rollingMap2
, catMaybes
, mapMaybe
, mapMaybeM
, lefts
, rights
, both
, mkAsync
, mkParallel
, applyAsync
, (|$)
, (|&)
, maxThreads
, maxBuffer
, sampleOld
, sampleNew
, sampleRate
, Rate (..)
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspectMode
, scanx
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Either (isLeft, isRight, fromLeft, fromRight)
import Data.Kind (Type)
import Data.Maybe (isJust, fromJust)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.Pipe (Pipe (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators
( inspectMode, maxBuffer, maxThreads, rate, avgRate, minRate
, maxRate, constRate)
import Streamly.Internal.Data.Stream.IsStream.Common
( absTimesWith
, drop
, findIndices
, map
, postscanlM'
, relTimesWith
, reverse
, reverse'
, scanlMAfter'
, postscanlMAfter'
, smapM
, take
, takeWhile
, interjectSuffix
, intersperseM
, mkAsync
, mkParallel
, zipWith
)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD, toConsK)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.SVar (Rate(..))
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Concurrent as Concur
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream as D
(transform, foldrT, tap, tapOffsetEvery, mapM, scan
, scanMany, postscan, scanlx', scanlM', scanl', postscanl', prescanl'
, prescanlM', scanl1M', scanl1', filter, filterM, uniq, deleteBy, takeWhileM
, dropWhile, dropWhileM, insertBy, intersperse
, intersperseM_, intersperseMSuffix, intersperseMSuffix_
, intersperseMSuffixWith, indexed, indexedR, rollingMap, rollingMapM
, rollingMap2, mapMaybe, mapMaybeM)
import qualified Streamly.Internal.Data.StreamK as K
(foldrS, foldrSShared, mapMWith)
import qualified Prelude
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, foldr, map, mapM, sequence
, reverse, foldr1 , scanl, scanl1, zipWith)
{-# INLINE transform #-}
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
transform :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Pipe m a b -> t m a -> t m b
transform Pipe m a b
pipe t m a
xs = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Pipe m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
D.transform Pipe m a b
pipe (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE foldrS #-}
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS a -> t m b -> t m b
f t m b
z t m a
xs =
StreamK m b -> t m b
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream
(StreamK m b -> t m b) -> StreamK m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
forall a (m :: * -> *) b.
(a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
K.foldrS
(\a
y StreamK m b
ys -> t m b -> StreamK m b
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream (t m b -> StreamK m b) -> t m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
f a
y (StreamK m b -> t m b
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m b
ys))
(t m b -> StreamK m b
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m b
z)
(t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
xs)
{-# INLINE foldrSShared #-}
foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
a -> t m b -> t m b
f t m b
z t m a
xs =
StreamK m b -> t m b
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream
(StreamK m b -> t m b) -> StreamK m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
forall a (m :: * -> *) b.
(a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
K.foldrSShared
(\a
y StreamK m b
ys -> t m b -> StreamK m b
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream (t m b -> StreamK m b) -> t m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
f a
y (StreamK m b -> t m b
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m b
ys))
(t m b -> StreamK m b
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m b
z)
(t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
xs)
{-# INLINE foldrT #-}
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s)
=> (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT :: forall (t :: (* -> *) -> * -> *) (m :: * -> *)
(s :: (* -> *) -> * -> *) a b.
(IsStream t, Monad m, Monad (s m), MonadTrans s) =>
(a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT a -> s m b -> s m b
f s m b
z t m a
s = (a -> s m b -> s m b) -> s m b -> Stream m a -> s m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
D.foldrT a -> s m b -> s m b
f s m b
z (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
s)
{-# INLINE_EARLY mapM #-}
mapM :: forall t m a b. (IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM a -> m b
f = StreamK m b -> t m b
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m b -> t m b) -> (t m a -> StreamK m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
K.mapMWith ((m b -> t m b -> t m b) -> m b -> StreamK m b -> StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t)) a -> m b
f (StreamK m a -> StreamK m b)
-> (t m a -> StreamK m a) -> t m a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream
{-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
mapMSerial = (a -> m b) -> SerialT m a -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
Serial.mapM
{-# INLINE sequence #-}
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m (m a) -> t m a
sequence = (m a -> m a) -> t m (m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM m a -> m a
forall a. a -> a
id
{-# INLINE tap #-}
tap :: (IsStream t, Monad m) => FL.Fold m a b -> t m a -> t m a
tap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m a
tap Fold m a b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
D.tap Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE tapOffsetEvery #-}
tapOffsetEvery :: (IsStream t, Monad m)
=> Int -> Int -> FL.Fold m a b -> t m a -> t m a
tapOffsetEvery :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Int -> Fold m a b -> t m a -> t m a
tapOffsetEvery Int
offset Int
n Fold m a b
f t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
D.tapOffsetEvery Int
offset Int
n Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => FL.Fold m a b -> t m a -> t m a
tapAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Fold m a b -> t m a -> t m a
tapAsync Fold m a b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
Par.tapAsyncF Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE tapAsyncK #-}
tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
tapAsyncK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsyncK t m a -> m b
f t m a
m = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> StreamK m a -> t m a
forall a b. (a -> b) -> a -> b
$ (StreamK m a -> m b) -> StreamK m a -> StreamK m a
forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
Par.tapAsyncK (t m a -> m b
f (t m a -> m b) -> (StreamK m a -> t m a) -> StreamK m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream) (t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m)
{-# INLINE distributeAsync_ #-}
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m)
=> f (t m a -> m b) -> t m a -> t m a
distributeAsync_ :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(Foldable f, IsStream t, MonadAsync m) =>
f (t m a -> m b) -> t m a -> t m a
distributeAsync_ = (t m a -> f (t m a -> m b) -> t m a)
-> f (t m a -> m b) -> t m a -> t m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip (((t m a -> m b) -> t m a -> t m a)
-> t m a -> f (t m a -> m b) -> t m a
forall a b. (a -> b -> b) -> b -> f a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr (t m a -> m b) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsyncK)
{-# INLINE pollCounts #-}
pollCounts ::
(IsStream t, MonadAsync m)
=> (a -> Bool)
-> (t m Int -> m b)
-> t m a
-> t m a
pollCounts :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> Bool) -> (t m Int -> m b) -> t m a -> t m a
pollCounts a -> Bool
predicate t m Int -> m b
f t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
(Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
Concur.parTapCount a -> Bool
predicate (t m Int -> m b
f (t m Int -> m b)
-> (Stream m Int -> t m Int) -> Stream m Int -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m Int -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD)
(Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs
{-# INLINE trace #-}
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
trace :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m a
trace a -> m b
f = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
x) m () -> m a -> m a
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE trace_ #-}
trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
trace_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ m b
eff = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
D.mapM (\a
x -> m b
eff m b -> m a -> m a
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x) (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE scan #-}
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
scan Fold m a b
fld t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.scan Fold m a b
fld (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanMany #-}
scanMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scanMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
scanMany Fold m a b
fld t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.scanMany Fold m a b
fld (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE postscan #-}
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
postscan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
postscan Fold m a b
fld = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.postscan Fold m a b
fld (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# DEPRECATED scanx "Please use scanl followed by map instead." #-}
{-# INLINE scanx #-}
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx x -> a -> x
step x
begin x -> b
done = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
D.scanlx' x -> a -> x
step x
begin x -> b
done (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE scanlM' #-}
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
scanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
scanlM' b -> a -> m b
step m b
begin t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.scanlM' b -> a -> m b
step m b
begin (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl' #-}
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
scanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
scanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.scanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE postscanl' #-}
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
postscanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
postscanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.postscanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanl' #-}
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
prescanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
prescanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.prescanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanlM' #-}
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' b -> a -> m b
step m b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.prescanlM' b -> a -> m b
step m b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1M' #-}
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
scanl1M' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> m a) -> t m a -> t m a
scanl1M' a -> a -> m a
step t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
D.scanl1M' a -> a -> m a
step (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1' #-}
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
scanl1' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> a) -> t m a -> t m a
scanl1' a -> a -> a
step t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
D.scanl1' a -> a -> a
step (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE with #-}
with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> (((s, a) -> b) -> t m a -> t m a)
with :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
with t m a -> t m (s, a)
f ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g = ((s, a) -> a) -> t m (s, a) -> t m a
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (s, a) -> a
forall a b. (a, b) -> b
snd (t m (s, a) -> t m a) -> (t m a -> t m (s, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g (t m (s, a) -> t m (s, a))
-> (t m a -> t m (s, a)) -> t m a -> t m (s, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (s, a)
f
{-# INLINE filter #-}
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
filter :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.filter a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE filterM #-}
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
filterM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
filterM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.filterM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE uniqBy #-}
uniqBy :: (IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy a -> a -> Bool
eq = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(Maybe a -> a -> b) -> t m a -> t m b
rollingMap Maybe a -> a -> Maybe a
f
where
f :: Maybe a -> a -> Maybe a
f Maybe a
pre a
curr =
case Maybe a
pre of
Maybe a
Nothing -> a -> Maybe a
forall a. a -> Maybe a
Just a
curr
Just a
x -> if a
x a -> a -> Bool
`eq` a
curr then Maybe a
forall a. Maybe a
Nothing else a -> Maybe a
forall a. a -> Maybe a
Just a
curr
{-# INLINE uniq #-}
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
uniq :: forall a (t :: (* -> *) -> * -> *) (m :: * -> *).
(Eq a, IsStream t, Monad m) =>
t m a -> t m a
uniq = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
D.uniq (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE prune #-}
prune ::
(a -> Bool) -> t m a -> t m a
prune :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
prune = [Char] -> (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
repeated ::
t m a -> t m a
repeated :: forall (t :: * -> * -> *) m a. t m a -> t m a
repeated = t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE nubBy #-}
nubBy ::
(a -> a -> Bool) -> t m a -> t m a
nubBy :: forall a (t :: * -> * -> *) m. (a -> a -> Bool) -> t m a -> t m a
nubBy = (a -> a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE deleteBy #-}
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> a -> t m a -> t m a
deleteBy a -> a -> Bool
cmp a
x t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
D.deleteBy a -> a -> Bool
cmp a
x (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE sampleOld #-}
sampleOld ::
Int -> t m a -> t m a
sampleOld :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
sampleOld = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE sampleNew #-}
sampleNew ::
Int -> t m a -> t m a
sampleNew :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
sampleNew = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE sampleRate #-}
sampleRate ::
Double -> t m a -> t m a
sampleRate :: forall (t :: * -> * -> *) m a. Double -> t m a -> t m a
sampleRate = Double -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeWhileM #-}
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
takeWhileM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
takeWhileM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.takeWhileM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE takeLast #-}
takeLast ::
Int -> t m a -> t m a
takeLast :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
takeLast = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeLastInterval #-}
takeLastInterval ::
Double -> t m a -> t m a
takeLastInterval :: forall (t :: * -> * -> *) m a. Double -> t m a -> t m a
takeLastInterval = Double -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeWhileLast #-}
takeWhileLast ::
(a -> Bool) -> t m a -> t m a
takeWhileLast :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
takeWhileLast = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeWhileAround #-}
takeWhileAround ::
(a -> Bool) -> t m a -> t m a
takeWhileAround :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
takeWhileAround = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropWhile #-}
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
dropWhile :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
dropWhile a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.dropWhile a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE dropWhileM #-}
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
dropWhileM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
dropWhileM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.dropWhileM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE dropLast #-}
dropLast ::
Int -> t m a -> t m a
dropLast :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
dropLast = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropLastInterval #-}
dropLastInterval ::
Int -> t m a -> t m a
dropLastInterval :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
dropLastInterval = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropWhileLast #-}
dropWhileLast ::
(a -> Bool) -> t m a -> t m a
dropWhileLast :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
dropWhileLast = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropWhileAround #-}
dropWhileAround ::
(a -> Bool) -> t m a -> t m a
dropWhileAround :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
dropWhileAround = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE insertBy #-}
insertBy ::
(IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> a -> t m a -> t m a
insertBy a -> a -> Ordering
cmp a
x t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
D.insertBy a -> a -> Ordering
cmp a
x (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE intersperse #-}
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
intersperse :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
a -> t m a -> t m a
intersperse a
a = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
D.intersperse a
a (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseM_ #-}
intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseM_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ m b
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseM_ m b
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseMWith #-}
intersperseMWith ::
Int -> m a -> t m a -> t m a
intersperseMWith :: forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
Int -> m a -> t m a -> t m a
intersperseMWith Int
_n m a
_f t m a
_xs = t m a
forall a. HasCallStack => a
undefined
{-# INLINE intersperseMSuffix #-}
intersperseMSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
intersperseMSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m a -> t m a -> t m a
intersperseMSuffix m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseMSuffix m a
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseMSuffix_ #-}
intersperseMSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseMSuffix_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseMSuffix_ m b
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseMSuffix_ m b
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseMSuffixWith #-}
intersperseMSuffixWith :: (IsStream t, Monad m)
=> Int -> m a -> t m a -> t m a
intersperseMSuffixWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> m a -> t m a -> t m a
intersperseMSuffixWith Int
n m a
eff =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
D.intersperseMSuffixWith Int
n m a
eff (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseMPrefix_ #-}
intersperseMPrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a
intersperseMPrefix_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
m b -> t m a -> t m a
intersperseMPrefix_ m b
m = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m m () -> m a -> m a
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE delay #-}
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delay :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delay Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE delayPost #-}
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPost :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delayPost Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseMSuffix_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE delayPre #-}
delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPre :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delayPre Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE reassembleBy #-}
reassembleBy
::
Fold m a b
-> (a -> a -> Int)
-> t m a
-> t m b
reassembleBy :: forall (m :: * -> *) a b (t :: (* -> *) -> * -> *).
Fold m a b -> (a -> a -> Int) -> t m a -> t m b
reassembleBy = Fold m a b -> (a -> a -> Int) -> t m a -> t m b
forall a. HasCallStack => a
undefined
{-# INLINE indexed #-}
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
indexed :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
indexed = Stream m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Int, a) -> t m (Int, a))
-> (t m a -> Stream m (Int, a)) -> t m a -> t m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
D.indexed (Stream m a -> Stream m (Int, a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE indexedR #-}
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
indexedR :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m (Int, a)
indexedR Int
n = Stream m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Int, a) -> t m (Int, a))
-> (t m a -> Stream m (Int, a)) -> t m a -> t m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
D.indexedR Int
n (Stream m a -> Stream m (Int, a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE timestampWith #-}
timestampWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (AbsTime, a)
timestampWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
g t m a
stream = (a -> AbsTime -> (AbsTime, a))
-> t m a -> t m AbsTime -> t m (AbsTime, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith ((AbsTime -> a -> (AbsTime, a)) -> a -> AbsTime -> (AbsTime, a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (Double -> t m AbsTime
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith Double
g)
{-# INLINE timestamped #-}
timestamped :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (AbsTime, a)
timestamped :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (AbsTime, a)
timestamped = Double -> t m a -> t m (AbsTime, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
0.01
{-# INLINE timeIndexWith #-}
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (RelTime64, a)
timeIndexWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
g t m a
stream = (a -> RelTime64 -> (RelTime64, a))
-> t m a -> t m RelTime64 -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith ((RelTime64 -> a -> (RelTime64, a))
-> a -> RelTime64 -> (RelTime64, a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (Double -> t m RelTime64
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m RelTime64
relTimesWith Double
g)
{-# INLINE timeIndexed #-}
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (RelTime64, a)
timeIndexed :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
timeIndexed = Double -> t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
0.01
{-# INLINE elemIndices #-}
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
elemIndices :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
(IsStream t, Eq a, Monad m) =>
a -> t m a -> t m Int
elemIndices a
a = (a -> Bool) -> t m a -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices (a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
a)
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (Maybe a -> a -> b) -> t m a -> t m b
rollingMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(Maybe a -> a -> b) -> t m a -> t m b
rollingMap Maybe a -> a -> b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (Maybe a -> a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap Maybe a -> a -> b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (Maybe a -> a -> m b) -> t m a -> t m b
rollingMapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(Maybe a -> a -> m b) -> t m a -> t m b
rollingMapM Maybe a -> a -> m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (Maybe a -> a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> m b) -> Stream m a -> Stream m b
D.rollingMapM Maybe a -> a -> m b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE rollingMap2 #-}
rollingMap2 :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap2 :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> b) -> t m a -> t m b
rollingMap2 a -> a -> b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap2 a -> a -> b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE mapMaybe #-}
mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
mapMaybe :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
mapMaybe a -> Maybe b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Maybe b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
D.mapMaybe a -> Maybe b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE_EARLY mapMaybeM #-}
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m))
=> (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
(a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM a -> m (Maybe b)
f = (Maybe b -> b) -> t m (Maybe b) -> t m b
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (t m (Maybe b) -> t m b)
-> (t m a -> t m (Maybe b)) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe b -> Bool) -> t m (Maybe b) -> t m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (t m (Maybe b) -> t m (Maybe b))
-> (t m a -> t m (Maybe b)) -> t m a -> t m (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m (Maybe b)) -> t m a -> t m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM a -> m (Maybe b)
f
{-# RULES "mapMaybeM serial" mapMaybeM = mapMaybeMSerial #-}
{-# INLINE mapMaybeMSerial #-}
mapMaybeMSerial :: Monad m => (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial a -> m (Maybe b)
f SerialT m a
m = Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ (a -> m (Maybe b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
D.mapMaybeM a -> m (Maybe b)
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT m a
m
{-# INLINE catMaybes #-}
catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a
catMaybes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes = (Maybe a -> a) -> t m (Maybe a) -> t m a
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe a -> a
forall a. HasCallStack => Maybe a -> a
fromJust (t m (Maybe a) -> t m a)
-> (t m (Maybe a) -> t m (Maybe a)) -> t m (Maybe a) -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> Bool) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Maybe a -> Bool
forall a. Maybe a -> Bool
isJust
{-# INLINE lefts #-}
lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a
lefts :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m a
lefts = (Either a b -> a) -> t m (Either a b) -> t m a
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> Either a b -> a
forall a b. a -> Either a b -> a
fromLeft a
forall a. HasCallStack => a
undefined) (t m (Either a b) -> t m a)
-> (t m (Either a b) -> t m (Either a b))
-> t m (Either a b)
-> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either a b -> Bool) -> t m (Either a b) -> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Either a b -> Bool
forall a b. Either a b -> Bool
isLeft
{-# INLINE rights #-}
rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b
rights :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m b
rights = (Either a b -> b) -> t m (Either a b) -> t m b
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b -> Either a b -> b
forall b a. b -> Either a b -> b
fromRight b
forall a. HasCallStack => a
undefined) (t m (Either a b) -> t m b)
-> (t m (Either a b) -> t m (Either a b))
-> t m (Either a b)
-> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either a b -> Bool) -> t m (Either a b) -> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Either a b -> Bool
forall a b. Either a b -> Bool
isRight
{-# INLINE both #-}
both :: Functor (t m) => t m (Either a a) -> t m a
both :: forall (t :: * -> * -> *) m a.
Functor (t m) =>
t m (Either a a) -> t m a
both = (Either a a -> a) -> t m (Either a a) -> t m a
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> a) -> (a -> a) -> Either a a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> a
forall a. a -> a
id a -> a
forall a. a -> a
id)
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> (t m a -> t m b)
|$ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$) t m a -> t m b
f = t m a -> t m b
f (t m a -> t m b) -> (t m a -> t m a) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkParallel
infixr 0 |$
{-# INLINE applyAsync #-}
applyAsync :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> (t m a -> t m b)
applyAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
applyAsync = (t m a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$)
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
t m a
x |& :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
t m a -> (t m a -> t m b) -> t m b
|& t m a -> t m b
f = t m a -> t m b
f (t m a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
|$ t m a
x
infixl 1 |&