{-# LANGUAGE CPP #-} {-# LANGUAGE UndecidableInstances #-} -- | -- Module : Streamly.Internal.Data.StreamK.Type -- Copyright : (c) 2017 Composewell Technologies -- -- License : BSD3 -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC -- -- -- Continuation passing style (CPS) stream implementation. The symbol 'K' below -- denotes a function as well as a Kontinuation. -- module Streamly.Internal.Data.StreamK.Type ( -- * StreamK type Stream , StreamK (..) -- * CrossStreamK type wrapper , CrossStreamK , unCross , mkCross -- * foldr/build Fusion , mkStream , foldStream , foldStreamShared , foldrM , foldrS , foldrSShared , foldrSM , build , buildS , buildM , buildSM , augmentS , augmentSM , unShare -- * Construction -- ** Primitives , fromStopK , fromYieldK , consK , cons , (.:) , consM , consMBy , nil , nilM -- ** Unfolding , unfoldr , unfoldrMWith , unfoldrM -- ** From Values , fromEffect , fromPure , repeat , repeatMWith , replicateMWith -- ** From Indices , fromIndicesMWith -- ** Iteration , iterateMWith -- ** From Containers , fromFoldable , fromFoldableM -- ** Cyclic , mfix -- * Elimination -- ** Primitives , uncons -- ** Strict Left Folds , Streamly.Internal.Data.StreamK.Type.foldl' , foldlx' , foldlMx' , foldlM' -- ** Lazy Right Folds , Streamly.Internal.Data.StreamK.Type.foldr -- ** Specific Folds , drain , null , tail , init -- * Mapping , map , mapMWith , mapMSerial -- * Combining Two Streams -- ** Appending , conjoin , append -- ** Interleave , interleave , interleaveFst , interleaveMin -- ** Cross Product , crossApplyWith , crossApply , crossApplySnd , crossApplyFst , crossWith , cross -- * Concat , before , concatEffect , concatMapEffect , concatMapWith , concatMap , bindWith , concatIterateWith , concatIterateLeftsWith , concatIterateScanWith -- * Merge , mergeMapWith , mergeIterateWith -- * Buffered Operations , foldlS , reverse ) where #include "inline.hs" -- import Control.Applicative (liftA2) import Control.Monad ((>=>)) import Control.Monad.Catch (MonadThrow, throwM) import Control.Monad.Trans.Class (MonadTrans(lift)) #if !MIN_VERSION_base(4,18,0) import Control.Applicative (liftA2) #endif import Control.Monad.IO.Class (MonadIO(..)) import Data.Foldable (Foldable(foldl'), fold, foldr) import Data.Function (fix) import Data.Functor.Identity (Identity(..)) import Data.Maybe (fromMaybe) import Data.Semigroup (Endo(..)) import GHC.Exts (IsList(..), IsString(..), oneShot) import Streamly.Internal.BaseCompat ((#.)) import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe) import Streamly.Internal.Data.SVar.Type (State, adaptState, defState) import Text.Read ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec , readListPrecDefault) import qualified Prelude import Prelude hiding (map, mapM, concatMap, foldr, repeat, null, reverse, tail, init) #include "DocTestDataStreamK.hs" ------------------------------------------------------------------------------ -- Basic stream type ------------------------------------------------------------------------------ -- It uses stop, singleton and yield continuations equivalent to the following -- direct style type: -- -- @ -- data StreamK m a = Stop | Singleton a | Yield a (StreamK m a) -- @ -- -- To facilitate parallel composition we maintain a local state in an 'SVar' -- that is shared across and is used for synchronization of the streams being -- composed. -- -- The singleton case can be expressed in terms of stop and yield but we have -- it as a separate case to optimize composition operations for streams with -- single element. We build singleton streams in the implementation of 'pure' -- for Applicative and Monad, and in 'lift' for MonadTrans. -- XXX remove the State param. -- | Continuation Passing Style (CPS) version of "Streamly.Data.Stream.Stream". -- Unlike "Streamly.Data.Stream.Stream", 'StreamK' can be composed recursively -- without affecting performance. -- -- Semigroup instance appends two streams: -- -- >>> (<>) = Stream.append -- {-# DEPRECATED Stream "Please use StreamK instead." #-} type Stream = StreamK newtype StreamK m a = MkStream (forall r. State StreamK m a -- state -> (a -> StreamK m a -> m r) -- yield -> (a -> m r) -- singleton -> m r -- stop -> m r ) mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream :: forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a MkStream -- | A terminal function that has no continuation to follow. type StopK m = forall r. m r -> m r -- | A monadic continuation, it is a function that yields a value of type "a" -- and calls the argument (a -> m r) as a continuation with that value. We can -- also think of it as a callback with a handler (a -> m r). Category -- theorists call it a codensity type, a special type of right kan extension. type YieldK m a = forall r. (a -> m r) -> m r _wrapM :: Monad m => m a -> YieldK m a _wrapM :: forall (m :: * -> *) a. Monad m => m a -> YieldK m a _wrapM m a m = (m a m m a -> (a -> m r) -> m r forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>=) -- | Make an empty stream from a stop function. fromStopK :: StopK m -> StreamK m a fromStopK :: forall (m :: * -> *) a. StopK m -> StreamK m a fromStopK StopK m k = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r _ a -> m r _ m r stp -> m r -> m r StopK m k m r stp -- | Make a singleton stream from a callback function. The callback function -- calls the one-shot yield continuation to yield an element. fromYieldK :: YieldK m a -> StreamK m a fromYieldK :: forall (m :: * -> *) a. YieldK m a -> StreamK m a fromYieldK YieldK m a k = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r _ a -> m r sng m r _ -> (a -> m r) -> m r YieldK m a k a -> m r sng -- | Add a yield function at the head of the stream. consK :: YieldK m a -> StreamK m a -> StreamK m a consK :: forall (m :: * -> *) a. YieldK m a -> StreamK m a -> StreamK m a consK YieldK m a k StreamK m a r = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r yld a -> m r _ m r _ -> (a -> m r) -> m r YieldK m a k (a -> StreamK m a -> m r `yld` StreamK m a r) -- XXX Build a stream from a repeating callback function. ------------------------------------------------------------------------------ -- Construction ------------------------------------------------------------------------------ infixr 5 `cons` -- faster than consM because there is no bind. -- | A right associative prepend operation to add a pure value at the head of -- an existing stream:: -- -- >>> s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil -- >>> Stream.fold Fold.toList (StreamK.toStream s) -- [1,2,3] -- -- It can be used efficiently with 'Prelude.foldr': -- -- >>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil -- -- Same as the following but more efficient: -- -- >>> cons x xs = return x `StreamK.consM` xs -- {-# INLINE_NORMAL cons #-} cons :: a -> StreamK m a -> StreamK m a cons :: forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons a a StreamK m a r = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r yield a -> m r _ m r _ -> a -> StreamK m a -> m r yield a a StreamK m a r infixr 5 .: -- | Operator equivalent of 'cons'. -- -- @ -- > toList $ 1 .: 2 .: 3 .: nil -- [1,2,3] -- @ -- {-# INLINE (.:) #-} (.:) :: a -> StreamK m a -> StreamK m a .: :: forall a (m :: * -> *). a -> StreamK m a -> StreamK m a (.:) = a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons -- | A stream that terminates without producing any output or side effect. -- -- >>> Stream.fold Fold.toList (StreamK.toStream StreamK.nil) -- [] -- {-# INLINE_NORMAL nil #-} nil :: StreamK m a nil :: forall (m :: * -> *) a. StreamK m a nil = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r _ a -> m r _ m r stp -> m r stp -- | A stream that terminates without producing any output, but produces a side -- effect. -- -- >>> Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil"))) -- "nil" -- [] -- -- /Pre-release/ {-# INLINE_NORMAL nilM #-} nilM :: Applicative m => m b -> StreamK m a nilM :: forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a nilM m b m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r _ a -> m r _ m r stp -> m b m m b -> m r -> m r forall a b. m a -> m b -> m b forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b *> m r stp -- Create a singleton stream from a pure value. -- -- >>> fromPure a = a `StreamK.cons` StreamK.nil -- >>> fromPure = pure -- >>> fromPure = StreamK.fromEffect . pure -- {-# INLINE_NORMAL fromPure #-} fromPure :: a -> StreamK m a fromPure :: forall a (m :: * -> *). a -> StreamK m a fromPure a a = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r _ a -> m r single m r _ -> a -> m r single a a -- Create a singleton stream from a monadic action. -- -- >>> fromEffect m = m `StreamK.consM` StreamK.nil -- -- >>> Stream.fold Fold.drain $ StreamK.toStream $ StreamK.fromEffect (putStrLn "hello") -- hello -- {-# INLINE_NORMAL fromEffect #-} fromEffect :: Monad m => m a -> StreamK m a fromEffect :: forall (m :: * -> *) a. Monad m => m a -> StreamK m a fromEffect m a m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r _ a -> m r single m r _ -> m a m m a -> (a -> m r) -> m r forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= a -> m r single infixr 5 `consM` -- NOTE: specializing the function outside the instance definition seems to -- improve performance quite a bit at times, even if we have the same -- SPECIALIZE in the instance definition. -- | A right associative prepend operation to add an effectful value at the -- head of an existing stream:: -- -- >>> s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil -- >>> Stream.fold Fold.drain (StreamK.toStream s) -- hello -- world -- -- It can be used efficiently with 'Prelude.foldr': -- -- >>> fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil -- -- Same as the following but more efficient: -- -- >>> consM x xs = StreamK.fromEffect x `StreamK.append` xs -- {-# INLINE consM #-} {-# SPECIALIZE consM :: IO a -> StreamK IO a -> StreamK IO a #-} consM :: Monad m => m a -> StreamK m a -> StreamK m a consM :: forall (m :: * -> *) a. Monad m => m a -> StreamK m a -> StreamK m a consM m a m StreamK m a r = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a MkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a _ a -> StreamK m a -> m r yld a -> m r _ m r _ -> m a m m a -> (a -> m r) -> m r forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= (a -> StreamK m a -> m r `yld` StreamK m a r) -- XXX specialize to IO? {-# INLINE consMBy #-} consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a consMBy :: forall (m :: * -> *) a. Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a consMBy StreamK m a -> StreamK m a -> StreamK m a f m a m StreamK m a r = m a -> StreamK m a forall (m :: * -> *) a. Monad m => m a -> StreamK m a fromEffect m a m StreamK m a -> StreamK m a -> StreamK m a `f` StreamK m a r ------------------------------------------------------------------------------ -- Folding a stream ------------------------------------------------------------------------------ -- | Fold a stream by providing an SVar, a stop continuation, a singleton -- continuation and a yield continuation. The stream would share the current -- SVar passed via the State. {-# INLINE_EARLY foldStreamShared #-} foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r State StreamK m a s a -> StreamK m a -> m r yield a -> m r single m r stop (MkStream forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r k) = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r k State StreamK m a s a -> StreamK m a -> m r yield a -> m r single m r stop -- | Fold a stream by providing a State, stop continuation, a singleton -- continuation and a yield continuation. The stream will not use the SVar -- passed via State. {-# INLINE foldStream #-} foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream :: forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a s a -> StreamK m a -> m r yield a -> m r single m r stop (MkStream forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r k) = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r k (State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a s) a -> StreamK m a -> m r yield a -> m r single m r stop ------------------------------------------------------------------------------- -- foldr/build fusion ------------------------------------------------------------------------------- -- XXX perhaps we can just use foldrSM/buildM everywhere as they are more -- general and cover foldrS/buildS as well. -- | The function 'f' decides how to reconstruct the stream. We could -- reconstruct using a shared state (SVar) or without sharing the state. -- {-# INLINE foldrSWith #-} foldrSWith :: (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSWith :: forall (m :: * -> *) b a. (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSWith forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r f a -> StreamK m b -> StreamK m b step StreamK m b final StreamK m a m = StreamK m a -> StreamK m b go StreamK m a m where go :: StreamK m a -> StreamK m b go StreamK m a m1 = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let run :: StreamK m b -> m r run StreamK m b x = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r f State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp StreamK m b x stop :: m r stop = StreamK m b -> m r run StreamK m b final single :: a -> m r single a a = StreamK m b -> m r run (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ a -> StreamK m b -> StreamK m b step a a StreamK m b final yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = StreamK m b -> m r run (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ a -> StreamK m b -> StreamK m b step a a (StreamK m a -> StreamK m b go StreamK m a r) -- XXX if type a and b are the same we do not need adaptState, can we -- save some perf with that? -- XXX since we are using adaptState anyway here we can use -- foldStreamShared instead, will that save some perf? in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m1 -- XXX we can use rewrite rules just for foldrSWith, if the function f is the -- same we can rewrite it. -- | Fold sharing the SVar state within the reconstructed stream {-# INLINE_NORMAL foldrSShared #-} foldrSShared :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall (m :: * -> *) b a. (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSWith State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared -- XXX consM is a typeclass method, therefore rewritten already. Instead maybe -- we can make consM polymorphic using rewrite rules. -- {-# RULES "foldrSShared/id" foldrSShared consM nil = \x -> x #-} {-# RULES "foldrSShared/nil" forall k z. foldrSShared k z nil = z #-} {-# RULES "foldrSShared/single" forall k z x. foldrSShared k z (fromPure x) = k x z #-} -- {-# RULES "foldrSShared/app" [1] -- forall ys. foldrSShared consM ys = \xs -> xs `conjoin` ys #-} -- | Right fold to a streaming monad. -- -- > foldrS StreamK.cons StreamK.nil === id -- -- 'foldrS' can be used to perform stateless stream to stream transformations -- like map and filter in general. It can be coupled with a scan to perform -- stateful transformations. However, note that the custom map and filter -- routines can be much more efficient than this due to better stream fusion. -- -- >>> input = StreamK.fromStream $ Stream.fromList [1..5] -- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS StreamK.cons StreamK.nil input -- [1,2,3,4,5] -- -- Find if any element in the stream is 'True': -- -- >>> step x xs = if odd x then StreamK.fromPure True else xs -- >>> input = StreamK.fromStream (Stream.fromList (2:4:5:undefined)) :: StreamK IO Int -- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step (StreamK.fromPure False) input -- [True] -- -- Map (+2) on odd elements and filter out the even elements: -- -- >>> step x xs = if odd x then (x + 2) `StreamK.cons` xs else xs -- >>> input = StreamK.fromStream (Stream.fromList [1..5]) :: StreamK IO Int -- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step StreamK.nil input -- [3,5,7] -- -- /Pre-release/ {-# INLINE_NORMAL foldrS #-} foldrS :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrS :: forall a (m :: * -> *) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrS = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall (m :: * -> *) b a. (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSWith State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream {-# RULES "foldrS/id" foldrS cons nil = \x -> x #-} {-# RULES "foldrS/nil" forall k z. foldrS k z nil = z #-} -- See notes in GHC.Base about this rule -- {-# RULES "foldr/cons" -- forall k z x xs. foldrS k z (x `cons` xs) = k x (foldrS k z xs) #-} {-# RULES "foldrS/single" forall k z x. foldrS k z (fromPure x) = k x z #-} -- {-# RULES "foldrS/app" [1] -- forall ys. foldrS cons ys = \xs -> xs `conjoin` ys #-} ------------------------------------------------------------------------------- -- foldrS with monadic cons i.e. consM ------------------------------------------------------------------------------- {-# INLINE foldrSMWith #-} foldrSMWith :: Monad m => (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSMWith :: forall (m :: * -> *) b a. Monad m => (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSMWith forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r f m a -> StreamK m b -> StreamK m b step StreamK m b final StreamK m a m = StreamK m a -> StreamK m b go StreamK m a m where go :: StreamK m a -> StreamK m b go StreamK m a m1 = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let run :: StreamK m b -> m r run StreamK m b x = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r f State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp StreamK m b x stop :: m r stop = StreamK m b -> m r run StreamK m b final single :: a -> m r single a a = StreamK m b -> m r run (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ m a -> StreamK m b -> StreamK m b step (a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a a) StreamK m b final yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = StreamK m b -> m r run (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ m a -> StreamK m b -> StreamK m b step (a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a a) (StreamK m a -> StreamK m b go StreamK m a r) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m1 {-# INLINE_NORMAL foldrSM #-} foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSM :: forall (m :: * -> *) a b. Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSM = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall (m :: * -> *) b a. Monad m => (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSMWith State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream -- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-} {-# RULES "foldrSM/nil" forall k z. foldrSM k z nil = z #-} {-# RULES "foldrSM/single" forall k z x. foldrSM k z (fromEffect x) = k x z #-} -- {-# RULES "foldrSM/app" [1] -- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-} -- Like foldrSM but sharing the SVar state within the recostructed stream. {-# INLINE_NORMAL foldrSMShared #-} foldrSMShared :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall (m :: * -> *) b a. Monad m => (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r) -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSMWith State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared -- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-} {-# RULES "foldrSMShared/nil" forall k z. foldrSMShared k z nil = z #-} {-# RULES "foldrSMShared/single" forall k z x. foldrSMShared k z (fromEffect x) = k x z #-} -- {-# RULES "foldrSM/app" [1] -- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-} ------------------------------------------------------------------------------- -- build ------------------------------------------------------------------------------- {-# INLINE_NORMAL build #-} build :: forall m a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a build :: forall (m :: * -> *) a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a build forall b. (a -> b -> b) -> b -> b g = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a forall b. (a -> b -> b) -> b -> b g a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons StreamK m a forall (m :: * -> *) a. StreamK m a nil {-# RULES "foldrM/build" forall k z (g :: forall b. (a -> b -> b) -> b -> b). foldrM k z (build g) = g k z #-} {-# RULES "foldrS/build" forall k z (g :: forall b. (a -> b -> b) -> b -> b). foldrS k z (build g) = g k z #-} {-# RULES "foldrS/cons/build" forall k z x (g :: forall b. (a -> b -> b) -> b -> b). foldrS k z (x `cons` build g) = k x (g k z) #-} {-# RULES "foldrSShared/build" forall k z (g :: forall b. (a -> b -> b) -> b -> b). foldrSShared k z (build g) = g k z #-} {-# RULES "foldrSShared/cons/build" forall k z x (g :: forall b. (a -> b -> b) -> b -> b). foldrSShared k z (x `cons` build g) = k x (g k z) #-} -- build a stream by applying cons and nil to a build function {-# INLINE_NORMAL buildS #-} buildS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a buildS :: forall a (m :: * -> *). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a buildS (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons StreamK m a forall (m :: * -> *) a. StreamK m a nil {-# RULES "foldrS/buildS" forall k z (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrS k z (buildS g) = g k z #-} {-# RULES "foldrS/cons/buildS" forall k z x (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrS k z (x `cons` buildS g) = k x (g k z) #-} {-# RULES "foldrSShared/buildS" forall k z (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrSShared k z (buildS g) = g k z #-} {-# RULES "foldrSShared/cons/buildS" forall k z x (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrSShared k z (x `cons` buildS g) = k x (g k z) #-} -- build a stream by applying consM and nil to a build function {-# INLINE_NORMAL buildSM #-} buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a buildSM :: forall (m :: * -> *) a. Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a buildSM (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g = (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. Monad m => m a -> StreamK m a -> StreamK m a consM StreamK m a forall (m :: * -> *) a. StreamK m a nil {-# RULES "foldrSM/buildSM" forall k z (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrSM k z (buildSM g) = g k z #-} {-# RULES "foldrSMShared/buildSM" forall k z (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrSMShared k z (buildSM g) = g k z #-} -- Disabled because this may not fire as consM is a class Op {- {-# RULES "foldrS/consM/buildSM" forall k z x (g :: (m a -> t m a -> t m a) -> t m a -> t m a) . foldrSM k z (x `consM` buildSM g) = k x (g k z) #-} -} -- Build using monadic build functions (continuations) instead of -- reconstructing a stream. {-# INLINE_NORMAL buildM #-} buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r ) -> StreamK m a buildM :: forall (m :: * -> *) a. Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a buildM forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r g = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r g (\a a StreamK m a r -> State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp (a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a a m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. Monad m => m a -> StreamK m a -> StreamK m a `consM` StreamK m a r)) a -> m r sng m r stp -- | Like 'buildM' but shares the SVar state across computations. {-# INLINE_NORMAL sharedMWith #-} sharedMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r ) -> StreamK m a m a -> StreamK m a -> StreamK m a cns forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r g = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r g (\a a StreamK m a r -> State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp (a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a a m a -> StreamK m a -> StreamK m a `cns` StreamK m a r)) a -> m r sng m r stp ------------------------------------------------------------------------------- -- augment ------------------------------------------------------------------------------- {-# INLINE_NORMAL augmentS #-} augmentS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a augmentS :: forall a (m :: * -> *). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a augmentS (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g StreamK m a xs = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons StreamK m a xs {-# RULES "augmentS/nil" forall (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). augmentS g nil = buildS g #-} {-# RULES "foldrS/augmentS" forall k z xs (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrS k z (augmentS g xs) = g k (foldrS k z xs) #-} {-# RULES "augmentS/buildS" forall (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) (h :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). augmentS g (buildS h) = buildS (\c n -> g c (h c n)) #-} {-# INLINE_NORMAL augmentSM #-} augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a augmentSM :: forall (m :: * -> *) a. Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a augmentSM (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g StreamK m a xs = (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a g m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. Monad m => m a -> StreamK m a -> StreamK m a consM StreamK m a xs {-# RULES "augmentSM/nil" forall (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). augmentSM g nil = buildSM g #-} {-# RULES "foldrSM/augmentSM" forall k z xs (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). foldrSM k z (augmentSM g xs) = g k (foldrSM k z xs) #-} {-# RULES "augmentSM/buildSM" forall (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) (h :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a). augmentSM g (buildSM h) = buildSM (\c n -> g c (h c n)) #-} ------------------------------------------------------------------------------- -- Experimental foldrM/buildM ------------------------------------------------------------------------------- -- | Lazy right fold with a monadic step function. {-# INLINE_NORMAL foldrM #-} foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b foldrM :: forall a (m :: * -> *) b. (a -> m b -> m b) -> m b -> StreamK m a -> m b foldrM a -> m b -> m b step m b acc StreamK m a m = StreamK m a -> m b go StreamK m a m where go :: StreamK m a -> m b go StreamK m a m1 = let stop :: m b stop = m b acc single :: a -> m b single a a = a -> m b -> m b step a a m b acc yieldk :: a -> StreamK m a -> m b yieldk a a StreamK m a r = a -> m b -> m b step a a (StreamK m a -> m b go StreamK m a r) in State StreamK m a -> (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> StreamK m a -> m b forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m b yieldk a -> m b single m b stop StreamK m a m1 {-# INLINE_NORMAL foldrMKWith #-} foldrMKWith :: (State StreamK m a -> (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> StreamK m a -> m b) -> (a -> m b -> m b) -> m b -> ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b foldrMKWith :: forall (m :: * -> *) a b. (State StreamK m a -> (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> StreamK m a -> m b) -> (a -> m b -> m b) -> m b -> ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b foldrMKWith State StreamK m a -> (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> StreamK m a -> m b f a -> m b -> m b step m b acc = ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b go where go :: ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b go (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b k = let stop :: m b stop = m b acc single :: a -> m b single a a = a -> m b -> m b step a a m b acc yieldk :: a -> StreamK m a -> m b yieldk a a StreamK m a r = a -> m b -> m b step a a (((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b go (\a -> StreamK m a -> m b yld a -> m b sng m b stp -> State StreamK m a -> (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> StreamK m a -> m b f State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m b yld a -> m b sng m b stp StreamK m a r)) in (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b k a -> StreamK m a -> m b yieldk a -> m b single m b stop {- {-# RULES "foldrM/buildS" forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a) . foldrM k z (buildS g) = g k z #-} -} -- XXX in which case will foldrM/buildM fusion be useful? {-# RULES "foldrM/buildM" forall step acc (g :: (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r )). foldrM step acc (buildM g) = foldrMKWith foldStream step acc g #-} {- {-# RULES "foldrM/sharedM" forall step acc (g :: (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r )). foldrM step acc (sharedM g) = foldrMKWith foldStreamShared step acc g #-} -} ------------------------------------------------------------------------------ -- Left fold ------------------------------------------------------------------------------ -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third -- argument) to the folded value at the end. This is designed to work with the -- @foldl@ library. The suffix @x@ is a mnemonic for extraction. -- -- Note that the accumulator is always evaluated including the initial value. {-# INLINE foldlx' #-} foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b foldlx' :: forall (m :: * -> *) a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b foldlx' x -> a -> x step x begin x -> b done StreamK m a m = StreamK m x -> m b get (StreamK m x -> m b) -> StreamK m x -> m b forall a b. (a -> b) -> a -> b $ StreamK m a -> x -> StreamK m x go StreamK m a m x begin where {-# NOINLINE get #-} get :: StreamK m x -> m b get :: StreamK m x -> m b get StreamK m x m1 = -- XXX we are not strictly evaluating the accumulator here. Is this -- okay? let single :: x -> m b single = b -> m b forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return (b -> m b) -> (x -> b) -> x -> m b forall b c a. (b -> c) -> (a -> b) -> a -> c . x -> b done -- XXX this is foldSingleton. why foldStreamShared? in State StreamK m x -> (x -> StreamK m x -> m b) -> (x -> m b) -> m b -> StreamK m x -> m b forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m x forall a. HasCallStack => a undefined x -> StreamK m x -> m b forall a. HasCallStack => a undefined x -> m b single m b forall a. HasCallStack => a undefined StreamK m x m1 -- Note, this can be implemented by making a recursive call to "go", -- however that is more expensive because of unnecessary recursion -- that cannot be tail call optimized. Unfolding recursion explicitly via -- continuations is much more efficient. go :: StreamK m a -> x -> StreamK m x go :: StreamK m a -> x -> StreamK m x go StreamK m a m1 !x acc = (forall r. State StreamK m x -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> m r) -> StreamK m x forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m x -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> m r) -> StreamK m x) -> (forall r. State StreamK m x -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> m r) -> StreamK m x forall a b. (a -> b) -> a -> b $ \State StreamK m x _ x -> StreamK m x -> m r yld x -> m r sng m r _ -> let stop :: m r stop = x -> m r sng x acc single :: a -> m r single a a = x -> m r sng (x -> m r) -> x -> m r forall a b. (a -> b) -> a -> b $ x -> a -> x step x acc a a -- XXX this is foldNonEmptyStream yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = State StreamK m x -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> StreamK m x -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m x forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState x -> StreamK m x -> m r yld x -> m r sng m r forall a. HasCallStack => a undefined (StreamK m x -> m r) -> StreamK m x -> m r forall a b. (a -> b) -> a -> b $ StreamK m a -> x -> StreamK m x go StreamK m a r (x -> a -> x step x acc a a) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m1 -- | Strict left associative fold. {-# INLINE foldl' #-} foldl' :: Monad m => (b -> a -> b) -> b -> StreamK m a -> m b foldl' :: forall (m :: * -> *) b a. Monad m => (b -> a -> b) -> b -> StreamK m a -> m b foldl' b -> a -> b step b begin = (b -> a -> b) -> b -> (b -> b) -> StreamK m a -> m b forall (m :: * -> *) a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b foldlx' b -> a -> b step b begin b -> b forall a. a -> a id -- XXX replace the recursive "go" with explicit continuations. -- | Like 'foldx', but with a monadic step function. {-# INLINABLE foldlMx' #-} foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b foldlMx' :: forall (m :: * -> *) x a b. Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b foldlMx' x -> a -> m x step m x begin x -> m b done = m x -> StreamK m a -> m b go m x begin where go :: m x -> StreamK m a -> m b go !m x acc StreamK m a m1 = let stop :: m b stop = m x acc m x -> (x -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= x -> m b done single :: a -> m b single a a = m x acc m x -> (x -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \x b -> x -> a -> m x step x b a a m x -> (x -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= x -> m b done yieldk :: a -> StreamK m a -> m b yieldk a a StreamK m a r = m x acc m x -> (x -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \x b -> x -> a -> m x step x b a a m x -> (x -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \x x -> m x -> StreamK m a -> m b go (x -> m x forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return x x) StreamK m a r in State StreamK m a -> (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> StreamK m a -> m b forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m b yieldk a -> m b single m b stop StreamK m a m1 -- | Like 'foldl'' but with a monadic step function. {-# INLINE foldlM' #-} foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b foldlM' :: forall (m :: * -> *) b a. Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b foldlM' b -> a -> m b step m b begin = (b -> a -> m b) -> m b -> (b -> m b) -> StreamK m a -> m b forall (m :: * -> *) x a b. Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b foldlMx' b -> a -> m b step m b begin b -> m b forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return ------------------------------------------------------------------------------ -- Specialized folds ------------------------------------------------------------------------------ -- XXX use foldrM to implement folds where possible -- XXX This (commented) definition of drain and mapM_ perform much better on -- some benchmarks but worse on others. Need to investigate why, may there is -- an optimization opportunity that we can exploit. -- drain = foldrM (\_ xs -> return () >> xs) (return ()) -- -- > drain = foldl' (\_ _ -> ()) () -- > drain = mapM_ (\_ -> return ()) {-# INLINE drain #-} drain :: Monad m => StreamK m a -> m () drain :: forall (m :: * -> *) a. Monad m => StreamK m a -> m () drain = (a -> m () -> m ()) -> m () -> StreamK m a -> m () forall a (m :: * -> *) b. (a -> m b -> m b) -> m b -> StreamK m a -> m b foldrM (\a _ m () xs -> m () xs) (() -> m () forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return ()) {- drain = go where go m1 = let stop = return () single _ = return () yieldk _ r = go r in foldStream defState yieldk single stop m1 -} {-# INLINE null #-} null :: Monad m => StreamK m a -> m Bool -- null = foldrM (\_ _ -> return True) (return False) null :: forall (m :: * -> *) a. Monad m => StreamK m a -> m Bool null StreamK m a m = let stop :: m Bool stop = Bool -> m Bool forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return Bool True single :: p -> m Bool single p _ = Bool -> m Bool forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return Bool False yieldk :: p -> p -> m Bool yieldk p _ p _ = Bool -> m Bool forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return Bool False in State StreamK m a -> (a -> StreamK m a -> m Bool) -> (a -> m Bool) -> m Bool -> StreamK m a -> m Bool forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m Bool forall {m :: * -> *} {p} {p}. Monad m => p -> p -> m Bool yieldk a -> m Bool forall {m :: * -> *} {p}. Monad m => p -> m Bool single m Bool stop StreamK m a m ------------------------------------------------------------------------------ -- Semigroup ------------------------------------------------------------------------------ infixr 6 `append` {-# INLINE append #-} append :: StreamK m a -> StreamK m a -> StreamK m a -- XXX This doubles the time of toNullAp benchmark, may not be fusing properly -- serial xs ys = augmentS (\c n -> foldrS c n xs) ys append :: forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a append StreamK m a m1 StreamK m a m2 = StreamK m a -> StreamK m a go StreamK m a m1 where go :: StreamK m a -> StreamK m a go StreamK m a m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let stop :: m r stop = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp StreamK m a m2 single :: a -> m r single a a = a -> StreamK m a -> m r yld a a StreamK m a m2 yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = a -> StreamK m a -> m r yld a a (StreamK m a -> StreamK m a go StreamK m a r) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m -- join/merge/append streams depending on consM {-# INLINE conjoin #-} conjoin :: Monad m => StreamK m a -> StreamK m a -> StreamK m a conjoin :: forall (m :: * -> *) a. Monad m => StreamK m a -> StreamK m a -> StreamK m a conjoin StreamK m a xs = ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a forall (m :: * -> *) a. Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a augmentSM (\m a -> StreamK m a -> StreamK m a c StreamK m a n -> (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a b. Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSM m a -> StreamK m a -> StreamK m a c StreamK m a n StreamK m a xs) instance Semigroup (StreamK m a) where <> :: StreamK m a -> StreamK m a -> StreamK m a (<>) = StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a append ------------------------------------------------------------------------------ -- Monoid ------------------------------------------------------------------------------ instance Monoid (StreamK m a) where mempty :: StreamK m a mempty = StreamK m a forall (m :: * -> *) a. StreamK m a nil mappend :: StreamK m a -> StreamK m a -> StreamK m a mappend = StreamK m a -> StreamK m a -> StreamK m a forall a. Semigroup a => a -> a -> a (<>) ------------------------------------------------------------------------------- -- Functor ------------------------------------------------------------------------------- -- IMPORTANT: This is eta expanded on purpose. This should not be eta -- reduced. This will cause a lot of regressions, probably because of some -- rewrite rules. Ideally don't run hlint on this file. {-# INLINE_LATE mapFB #-} mapFB :: forall b m a. (b -> StreamK m b -> StreamK m b) -> (a -> b) -> a -> StreamK m b -> StreamK m b mapFB :: forall b (m :: * -> *) a. (b -> StreamK m b -> StreamK m b) -> (a -> b) -> a -> StreamK m b -> StreamK m b mapFB b -> StreamK m b -> StreamK m b c a -> b f = \a x StreamK m b ys -> b -> StreamK m b -> StreamK m b c (a -> b f a x) StreamK m b ys {-# RULES "mapFB/mapFB" forall c f g. mapFB (mapFB c f) g = mapFB c (f . g) "mapFB/id" forall c. mapFB c (\x -> x) = c #-} {-# INLINE map #-} map :: (a -> b) -> StreamK m a -> StreamK m b map :: forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b map a -> b f StreamK m a xs = ((b -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m b) -> StreamK m b forall a (m :: * -> *). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a buildS (\b -> StreamK m b -> StreamK m b c StreamK m b n -> (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall a (m :: * -> *) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrS ((b -> StreamK m b -> StreamK m b) -> (a -> b) -> a -> StreamK m b -> StreamK m b forall b (m :: * -> *) a. (b -> StreamK m b -> StreamK m b) -> (a -> b) -> a -> StreamK m b -> StreamK m b mapFB b -> StreamK m b -> StreamK m b c a -> b f) StreamK m b n StreamK m a xs) -- XXX This definition might potentially be more efficient, but the cost in the -- benchmark is dominated by unfoldrM cost so we cannot correctly determine -- differences in the mapping cost. We should perhaps deduct the cost of -- unfoldrM from the benchmarks and then compare. {- map f m = go m where go m1 = mkStream $ \st yld sng stp -> let single = sng . f yieldk a r = yld (f a) (go r) in foldStream (adaptState st) yieldk single stp m1 -} {-# INLINE_LATE mapMFB #-} mapMFB :: Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b mapMFB :: forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a. Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b mapMFB m b -> t m b -> t m b c a -> m b f m a x = m b -> t m b -> t m b c (m a x m a -> (a -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= a -> m b f) {-# RULES "mapMFB/mapMFB" forall c f g. mapMFB (mapMFB c f) g = mapMFB c (f >=> g) #-} -- XXX These rules may never fire because pure/return type class rules will -- fire first. {- "mapMFB/pure" forall c. mapMFB c (\x -> pure x) = c "mapMFB/return" forall c. mapMFB c (\x -> return x) = c -} -- This is experimental serial version supporting fusion. -- -- XXX what if we do not want to fuse two concurrent mapMs? -- XXX we can combine two concurrent mapM only if the SVar is of the same type -- So for now we use it only for serial streams. -- XXX fusion would be easier for monomoprhic stream types. -- {-# RULES "mapM serial" mapM = mapMSerial #-} {-# INLINE mapMSerial #-} mapMSerial :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b mapMSerial :: forall (m :: * -> *) a b. Monad m => (a -> m b) -> StreamK m a -> StreamK m b mapMSerial a -> m b f StreamK m a xs = ((m b -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m b) -> StreamK m b forall (m :: * -> *) a. Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a buildSM (\m b -> StreamK m b -> StreamK m b c StreamK m b n -> (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall (m :: * -> *) a b. Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSMShared ((m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> m a -> StreamK m b -> StreamK m b forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a. Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b mapMFB m b -> StreamK m b -> StreamK m b c a -> m b f) StreamK m b n StreamK m a xs) {-# INLINE mapMWith #-} mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b mapMWith :: forall (m :: * -> *) b a. (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b mapMWith m b -> StreamK m b -> StreamK m b cns a -> m b f = (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b forall a (m :: * -> *) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldrSShared (\a x StreamK m b xs -> a -> m b f a x m b -> StreamK m b -> StreamK m b `cns` StreamK m b xs) StreamK m b forall (m :: * -> *) a. StreamK m a nil {- -- See note under map definition above. mapMWith cns f = go where go m1 = mkStream $ \st yld sng stp -> let single a = f a >>= sng yieldk a r = foldStreamShared st yld sng stp $ f a `cns` go r in foldStream (adaptState st) yieldk single stp m1 -} -- XXX in fact use the Stream type everywhere and only use polymorphism in the -- high level modules/prelude. instance Monad m => Functor (StreamK m) where fmap :: forall a b. (a -> b) -> StreamK m a -> StreamK m b fmap = (a -> b) -> StreamK m a -> StreamK m b forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b map ------------------------------------------------------------------------------ -- Lists ------------------------------------------------------------------------------ -- Serial streams can act like regular lists using the Identity monad -- XXX Show instance is 10x slower compared to read, we can do much better. -- The list show instance itself is really slow. -- XXX The default definitions of "<" in the Ord instance etc. do not perform -- well, because they do not get inlined. Need to add INLINE in Ord class in -- base? instance IsList (StreamK Identity a) where type (Item (StreamK Identity a)) = a {-# INLINE fromList #-} fromList :: [Item (StreamK Identity a)] -> StreamK Identity a fromList = [a] -> StreamK Identity a [Item (StreamK Identity a)] -> StreamK Identity a forall (f :: * -> *) a (m :: * -> *). Foldable f => f a -> StreamK m a fromFoldable {-# INLINE toList #-} toList :: StreamK Identity a -> [Item (StreamK Identity a)] toList = (a -> [a] -> [a]) -> [a] -> StreamK Identity a -> [a] forall a b. (a -> b -> b) -> b -> StreamK Identity a -> b forall (t :: * -> *) a b. Foldable t => (a -> b -> b) -> b -> t a -> b Data.Foldable.foldr (:) [] -- XXX Fix these {- instance Eq a => Eq (StreamK Identity a) where {-# INLINE (==) #-} (==) xs ys = runIdentity $ eqBy (==) xs ys instance Ord a => Ord (StreamK Identity a) where {-# INLINE compare #-} compare xs ys = runIdentity $ cmpBy compare xs ys {-# INLINE (<) #-} x < y = case compare x y of LT -> True _ -> False {-# INLINE (<=) #-} x <= y = case compare x y of GT -> False _ -> True {-# INLINE (>) #-} x > y = case compare x y of GT -> True _ -> False {-# INLINE (>=) #-} x >= y = case compare x y of LT -> False _ -> True {-# INLINE max #-} max x y = if x <= y then y else x {-# INLINE min #-} min x y = if x <= y then x else y -} instance Show a => Show (StreamK Identity a) where showsPrec :: Int -> StreamK Identity a -> ShowS showsPrec Int p StreamK Identity a dl = Bool -> ShowS -> ShowS showParen (Int p Int -> Int -> Bool forall a. Ord a => a -> a -> Bool > Int 10) (ShowS -> ShowS) -> ShowS -> ShowS forall a b. (a -> b) -> a -> b $ String -> ShowS showString String "fromList " ShowS -> ShowS -> ShowS forall b c a. (b -> c) -> (a -> b) -> a -> c . [a] -> ShowS forall a. Show a => a -> ShowS shows (StreamK Identity a -> [Item (StreamK Identity a)] forall l. IsList l => l -> [Item l] toList StreamK Identity a dl) instance Read a => Read (StreamK Identity a) where readPrec :: ReadPrec (StreamK Identity a) readPrec = ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a) forall a. ReadPrec a -> ReadPrec a parens (ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a)) -> ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a) forall a b. (a -> b) -> a -> b $ Int -> ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a) forall a. Int -> ReadPrec a -> ReadPrec a prec Int 10 (ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a)) -> ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a) forall a b. (a -> b) -> a -> b $ do Ident String "fromList" <- ReadPrec Lexeme lexP [a] -> StreamK Identity a [Item (StreamK Identity a)] -> StreamK Identity a forall l. IsList l => [Item l] -> l fromList ([a] -> StreamK Identity a) -> ReadPrec [a] -> ReadPrec (StreamK Identity a) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> ReadPrec [a] forall a. Read a => ReadPrec a readPrec readListPrec :: ReadPrec [StreamK Identity a] readListPrec = ReadPrec [StreamK Identity a] forall a. Read a => ReadPrec [a] readListPrecDefault instance (a ~ Char) => IsString (StreamK Identity a) where {-# INLINE fromString #-} fromString :: String -> StreamK Identity a fromString = String -> StreamK Identity a [Item (StreamK Identity a)] -> StreamK Identity a forall l. IsList l => [Item l] -> l fromList ------------------------------------------------------------------------------- -- Foldable ------------------------------------------------------------------------------- -- | Lazy right associative fold. {-# INLINE foldr #-} foldr :: Monad m => (a -> b -> b) -> b -> StreamK m a -> m b foldr :: forall (m :: * -> *) a b. Monad m => (a -> b -> b) -> b -> StreamK m a -> m b foldr a -> b -> b step b acc = (a -> m b -> m b) -> m b -> StreamK m a -> m b forall a (m :: * -> *) b. (a -> m b -> m b) -> m b -> StreamK m a -> m b foldrM (\a x m b xs -> m b xs m b -> (b -> m b) -> m b forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \b b -> b -> m b forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return (a -> b -> b step a x b b)) (b -> m b forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return b acc) -- The default Foldable instance has several issues: -- 1) several definitions do not have INLINE on them, so we provide -- re-implementations with INLINE pragmas. -- 2) the definitions of sum/product/maximum/minimum are inefficient as they -- use right folds, they cannot run in constant memory. We provide -- implementations using strict left folds here. instance (Foldable m, Monad m) => Foldable (StreamK m) where {-# INLINE foldMap #-} foldMap :: forall m a. Monoid m => (a -> m) -> StreamK m a -> m foldMap a -> m f = m m -> m forall m. Monoid m => m m -> m forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m fold (m m -> m) -> (StreamK m a -> m m) -> StreamK m a -> m forall b c a. (b -> c) -> (a -> b) -> a -> c . (a -> m -> m) -> m -> StreamK m a -> m m forall (m :: * -> *) a b. Monad m => (a -> b -> b) -> b -> StreamK m a -> m b Streamly.Internal.Data.StreamK.Type.foldr (m -> m -> m forall a. Monoid a => a -> a -> a mappend (m -> m -> m) -> (a -> m) -> a -> m -> m forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> m f) m forall a. Monoid a => a mempty {-# INLINE foldr #-} foldr :: forall a b. (a -> b -> b) -> b -> StreamK m a -> b foldr a -> b -> b f b z StreamK m a t = Endo b -> b -> b forall a. Endo a -> a -> a appEndo ((a -> Endo b) -> StreamK m a -> Endo b forall m a. Monoid m => (a -> m) -> StreamK m a -> m forall (t :: * -> *) m a. (Foldable t, Monoid m) => (a -> m) -> t a -> m foldMap ((b -> b) -> Endo b forall a. (a -> a) -> Endo a Endo ((b -> b) -> Endo b) -> (a -> b -> b) -> a -> Endo b forall b c a. Coercible b c => (b -> c) -> (a -> b) -> a -> c #. a -> b -> b f) StreamK m a t) b z {-# INLINE foldl' #-} foldl' :: forall b a. (b -> a -> b) -> b -> StreamK m a -> b foldl' b -> a -> b f b z0 StreamK m a xs = (a -> (b -> b) -> b -> b) -> (b -> b) -> StreamK m a -> b -> b forall a b. (a -> b -> b) -> b -> StreamK m a -> b forall (t :: * -> *) a b. Foldable t => (a -> b -> b) -> b -> t a -> b Data.Foldable.foldr a -> (b -> b) -> b -> b forall {b}. a -> (b -> b) -> b -> b f' b -> b forall a. a -> a id StreamK m a xs b z0 where f' :: a -> (b -> b) -> b -> b f' a x b -> b k = (b -> b) -> b -> b forall a b. (a -> b) -> a -> b oneShot ((b -> b) -> b -> b) -> (b -> b) -> b -> b forall a b. (a -> b) -> a -> b $ \b z -> b -> b k (b -> b) -> b -> b forall a b. (a -> b) -> a -> b $! b -> a -> b f b z a x {-# INLINE length #-} length :: forall a. StreamK m a -> Int length = (Int -> a -> Int) -> Int -> StreamK m a -> Int forall b a. (b -> a -> b) -> b -> StreamK m a -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b Data.Foldable.foldl' (\Int n a _ -> Int n Int -> Int -> Int forall a. Num a => a -> a -> a + Int 1) Int 0 {-# INLINE elem #-} elem :: forall a. Eq a => a -> StreamK m a -> Bool elem = (a -> Bool) -> StreamK m a -> Bool forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool any ((a -> Bool) -> StreamK m a -> Bool) -> (a -> a -> Bool) -> a -> StreamK m a -> Bool forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> a -> Bool forall a. Eq a => a -> a -> Bool (==) {-# INLINE maximum #-} maximum :: forall a. Ord a => StreamK m a -> a maximum = a -> Maybe a -> a forall a. a -> Maybe a -> a fromMaybe (String -> a forall a. String -> a errorWithoutStackTrace String "maximum: empty stream") (Maybe a -> a) -> (StreamK m a -> Maybe a) -> StreamK m a -> a forall b c a. (b -> c) -> (a -> b) -> a -> c . Maybe' a -> Maybe a forall a. Maybe' a -> Maybe a toMaybe (Maybe' a -> Maybe a) -> (StreamK m a -> Maybe' a) -> StreamK m a -> Maybe a forall b c a. (b -> c) -> (a -> b) -> a -> c . (Maybe' a -> a -> Maybe' a) -> Maybe' a -> StreamK m a -> Maybe' a forall b a. (b -> a -> b) -> b -> StreamK m a -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b Data.Foldable.foldl' Maybe' a -> a -> Maybe' a forall {a}. Ord a => Maybe' a -> a -> Maybe' a getMax Maybe' a forall a. Maybe' a Nothing' where getMax :: Maybe' a -> a -> Maybe' a getMax Maybe' a Nothing' a x = a -> Maybe' a forall a. a -> Maybe' a Just' a x getMax (Just' a mx) a x = a -> Maybe' a forall a. a -> Maybe' a Just' (a -> Maybe' a) -> a -> Maybe' a forall a b. (a -> b) -> a -> b $! a -> a -> a forall a. Ord a => a -> a -> a max a mx a x {-# INLINE minimum #-} minimum :: forall a. Ord a => StreamK m a -> a minimum = a -> Maybe a -> a forall a. a -> Maybe a -> a fromMaybe (String -> a forall a. String -> a errorWithoutStackTrace String "minimum: empty stream") (Maybe a -> a) -> (StreamK m a -> Maybe a) -> StreamK m a -> a forall b c a. (b -> c) -> (a -> b) -> a -> c . Maybe' a -> Maybe a forall a. Maybe' a -> Maybe a toMaybe (Maybe' a -> Maybe a) -> (StreamK m a -> Maybe' a) -> StreamK m a -> Maybe a forall b c a. (b -> c) -> (a -> b) -> a -> c . (Maybe' a -> a -> Maybe' a) -> Maybe' a -> StreamK m a -> Maybe' a forall b a. (b -> a -> b) -> b -> StreamK m a -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b Data.Foldable.foldl' Maybe' a -> a -> Maybe' a forall {a}. Ord a => Maybe' a -> a -> Maybe' a getMin Maybe' a forall a. Maybe' a Nothing' where getMin :: Maybe' a -> a -> Maybe' a getMin Maybe' a Nothing' a x = a -> Maybe' a forall a. a -> Maybe' a Just' a x getMin (Just' a mn) a x = a -> Maybe' a forall a. a -> Maybe' a Just' (a -> Maybe' a) -> a -> Maybe' a forall a b. (a -> b) -> a -> b $! a -> a -> a forall a. Ord a => a -> a -> a min a mn a x {-# INLINE sum #-} sum :: forall a. Num a => StreamK m a -> a sum = (a -> a -> a) -> a -> StreamK m a -> a forall b a. (b -> a -> b) -> b -> StreamK m a -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b Data.Foldable.foldl' a -> a -> a forall a. Num a => a -> a -> a (+) a 0 {-# INLINE product #-} product :: forall a. Num a => StreamK m a -> a product = (a -> a -> a) -> a -> StreamK m a -> a forall b a. (b -> a -> b) -> b -> StreamK m a -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b Data.Foldable.foldl' a -> a -> a forall a. Num a => a -> a -> a (*) a 1 ------------------------------------------------------------------------------- -- Traversable ------------------------------------------------------------------------------- instance Traversable (StreamK Identity) where {-# INLINE traverse #-} traverse :: forall (f :: * -> *) a b. Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) traverse a -> f b f StreamK Identity a xs = Identity (f (StreamK Identity b)) -> f (StreamK Identity b) forall a. Identity a -> a runIdentity (Identity (f (StreamK Identity b)) -> f (StreamK Identity b)) -> Identity (f (StreamK Identity b)) -> f (StreamK Identity b) forall a b. (a -> b) -> a -> b $ (a -> f (StreamK Identity b) -> f (StreamK Identity b)) -> f (StreamK Identity b) -> StreamK Identity a -> Identity (f (StreamK Identity b)) forall (m :: * -> *) a b. Monad m => (a -> b -> b) -> b -> StreamK m a -> m b Streamly.Internal.Data.StreamK.Type.foldr a -> f (StreamK Identity b) -> f (StreamK Identity b) forall {m :: * -> *}. a -> f (StreamK m b) -> f (StreamK m b) consA (StreamK Identity b -> f (StreamK Identity b) forall a. a -> f a forall (f :: * -> *) a. Applicative f => a -> f a pure StreamK Identity b forall a. Monoid a => a mempty) StreamK Identity a xs where consA :: a -> f (StreamK m b) -> f (StreamK m b) consA a x f (StreamK m b) ys = (b -> StreamK m b -> StreamK m b) -> f b -> f (StreamK m b) -> f (StreamK m b) forall a b c. (a -> b -> c) -> f a -> f b -> f c forall (f :: * -> *) a b c. Applicative f => (a -> b -> c) -> f a -> f b -> f c liftA2 b -> StreamK m b -> StreamK m b forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons (a -> f b f a x) f (StreamK m b) ys ------------------------------------------------------------------------------- -- Nesting ------------------------------------------------------------------------------- -- | Detach a stream from an SVar {-# INLINE unShare #-} unShare :: StreamK m a -> StreamK m a StreamK m a x = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp StreamK m a x -- XXX the function stream and value stream can run in parallel {-# INLINE crossApplyWith #-} crossApplyWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b crossApplyWith :: forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b crossApplyWith StreamK m b -> StreamK m b -> StreamK m b par StreamK m (a -> b) fstream StreamK m a stream = StreamK m (a -> b) -> StreamK m b go1 StreamK m (a -> b) fstream where go1 :: StreamK m (a -> b) -> StreamK m b go1 StreamK m (a -> b) m = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let foldShared :: StreamK m b -> m r foldShared = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp single :: (a -> b) -> m r single a -> b f = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare ((a -> b) -> StreamK m a -> StreamK m b forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b go2 a -> b f StreamK m a stream) yieldk :: (a -> b) -> StreamK m (a -> b) -> m r yieldk a -> b f StreamK m (a -> b) r = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare ((a -> b) -> StreamK m a -> StreamK m b forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b go2 a -> b f StreamK m a stream) StreamK m b -> StreamK m b -> StreamK m b `par` StreamK m (a -> b) -> StreamK m b go1 StreamK m (a -> b) r in State StreamK m (a -> b) -> ((a -> b) -> StreamK m (a -> b) -> m r) -> ((a -> b) -> m r) -> m r -> StreamK m (a -> b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m (a -> b) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) (a -> b) -> StreamK m (a -> b) -> m r yieldk (a -> b) -> m r single m r stp StreamK m (a -> b) m go2 :: (t -> a) -> StreamK m t -> StreamK m a go2 t -> a f StreamK m t m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let single :: t -> m r single t a = a -> m r sng (t -> a f t a) yieldk :: t -> StreamK m t -> m r yieldk t a StreamK m t r = a -> StreamK m a -> m r yld (t -> a f t a) ((t -> a) -> StreamK m t -> StreamK m a go2 t -> a f StreamK m t r) in State StreamK m t -> (t -> StreamK m t -> m r) -> (t -> m r) -> m r -> StreamK m t -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m a -> State StreamK m t forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a st) t -> StreamK m t -> m r yieldk t -> m r single m r stp StreamK m t m -- | Apply a stream of functions to a stream of values and flatten the results. -- -- Note that the second stream is evaluated multiple times. -- -- Definition: -- -- >>> crossApply = StreamK.crossApplyWith StreamK.append -- >>> crossApply = Stream.crossWith id -- {-# INLINE crossApply #-} crossApply :: StreamK m (a -> b) -> StreamK m a -> StreamK m b crossApply :: forall (m :: * -> *) a b. StreamK m (a -> b) -> StreamK m a -> StreamK m b crossApply StreamK m (a -> b) fstream StreamK m a stream = StreamK m (a -> b) -> StreamK m b forall {a}. StreamK m (a -> a) -> StreamK m a go1 StreamK m (a -> b) fstream where go1 :: StreamK m (a -> a) -> StreamK m a go1 StreamK m (a -> a) m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let foldShared :: StreamK m a -> m r foldShared = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp single :: (a -> a) -> m r single a -> a f = StreamK m a -> m r foldShared (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ (a -> a) -> StreamK m a -> StreamK m a forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b go3 a -> a f StreamK m a stream yieldk :: (a -> a) -> StreamK m (a -> a) -> m r yieldk a -> a f StreamK m (a -> a) r = StreamK m a -> m r foldShared (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ (a -> a) -> StreamK m (a -> a) -> StreamK m a -> StreamK m a go2 a -> a f StreamK m (a -> a) r StreamK m a stream in State StreamK m (a -> a) -> ((a -> a) -> StreamK m (a -> a) -> m r) -> ((a -> a) -> m r) -> m r -> StreamK m (a -> a) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m a -> State StreamK m (a -> a) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a st) (a -> a) -> StreamK m (a -> a) -> m r yieldk (a -> a) -> m r single m r stp StreamK m (a -> a) m go2 :: (a -> a) -> StreamK m (a -> a) -> StreamK m a -> StreamK m a go2 a -> a f StreamK m (a -> a) r1 StreamK m a m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let foldShared :: StreamK m a -> m r foldShared = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp stop :: m r stop = StreamK m a -> m r foldShared (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ StreamK m (a -> a) -> StreamK m a go1 StreamK m (a -> a) r1 single :: a -> m r single a a = a -> StreamK m a -> m r yld (a -> a f a a) (StreamK m (a -> a) -> StreamK m a go1 StreamK m (a -> a) r1) yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = a -> StreamK m a -> m r yld (a -> a f a a) ((a -> a) -> StreamK m (a -> a) -> StreamK m a -> StreamK m a go2 a -> a f StreamK m (a -> a) r1 StreamK m a r) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a st) a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m go3 :: (t -> a) -> StreamK m t -> StreamK m a go3 t -> a f StreamK m t m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let single :: t -> m r single t a = a -> m r sng (t -> a f t a) yieldk :: t -> StreamK m t -> m r yieldk t a StreamK m t r = a -> StreamK m a -> m r yld (t -> a f t a) ((t -> a) -> StreamK m t -> StreamK m a go3 t -> a f StreamK m t r) in State StreamK m t -> (t -> StreamK m t -> m r) -> (t -> m r) -> m r -> StreamK m t -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m a -> State StreamK m t forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a st) t -> StreamK m t -> m r yieldk t -> m r single m r stp StreamK m t m {-# INLINE crossApplySnd #-} crossApplySnd :: StreamK m a -> StreamK m b -> StreamK m b crossApplySnd :: forall (m :: * -> *) a b. StreamK m a -> StreamK m b -> StreamK m b crossApplySnd StreamK m a fstream StreamK m b stream = StreamK m a -> StreamK m b forall {a}. StreamK m a -> StreamK m b go1 StreamK m a fstream where go1 :: StreamK m a -> StreamK m b go1 StreamK m a m = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let foldShared :: StreamK m b -> m r foldShared = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp single :: p -> m r single p _ = StreamK m b -> m r foldShared StreamK m b stream yieldk :: p -> StreamK m a -> m r yieldk p _ StreamK m a r = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m a -> StreamK m b -> StreamK m b go2 StreamK m a r StreamK m b stream in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) a -> StreamK m a -> m r forall {p}. p -> StreamK m a -> m r yieldk a -> m r forall {p}. p -> m r single m r stp StreamK m a m go2 :: StreamK m a -> StreamK m b -> StreamK m b go2 StreamK m a r1 StreamK m b m = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let foldShared :: StreamK m b -> m r foldShared = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp stop :: m r stop = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m a -> StreamK m b go1 StreamK m a r1 single :: b -> m r single b a = b -> StreamK m b -> m r yld b a (StreamK m a -> StreamK m b go1 StreamK m a r1) yieldk :: b -> StreamK m b -> m r yieldk b a StreamK m b r = b -> StreamK m b -> m r yld b a (StreamK m a -> StreamK m b -> StreamK m b go2 StreamK m a r1 StreamK m b r) in State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m b st b -> StreamK m b -> m r yieldk b -> m r single m r stop StreamK m b m {-# INLINE crossApplyFst #-} crossApplyFst :: StreamK m a -> StreamK m b -> StreamK m a crossApplyFst :: forall (m :: * -> *) a b. StreamK m a -> StreamK m b -> StreamK m a crossApplyFst StreamK m a fstream StreamK m b stream = StreamK m a -> StreamK m a forall {a}. StreamK m a -> StreamK m a go1 StreamK m a fstream where go1 :: StreamK m a -> StreamK m a go1 StreamK m a m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let foldShared :: StreamK m a -> m r foldShared = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp single :: a -> m r single a f = StreamK m a -> m r foldShared (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ a -> StreamK m b -> StreamK m a forall {a} {m :: * -> *} {a}. a -> StreamK m a -> StreamK m a go3 a f StreamK m b stream yieldk :: a -> StreamK m a -> m r yieldk a f StreamK m a r = StreamK m a -> m r foldShared (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ a -> StreamK m a -> StreamK m b -> StreamK m a go2 a f StreamK m a r StreamK m b stream in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yieldk a -> m r single m r stp StreamK m a m go2 :: a -> StreamK m a -> StreamK m b -> StreamK m a go2 a f StreamK m a r1 StreamK m b m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let foldShared :: StreamK m a -> m r foldShared = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp stop :: m r stop = StreamK m a -> m r foldShared (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ StreamK m a -> StreamK m a go1 StreamK m a r1 single :: p -> m r single p _ = a -> StreamK m a -> m r yld a f (StreamK m a -> StreamK m a go1 StreamK m a r1) yieldk :: p -> StreamK m b -> m r yieldk p _ StreamK m b r = a -> StreamK m a -> m r yld a f (a -> StreamK m a -> StreamK m b -> StreamK m a go2 a f StreamK m a r1 StreamK m b r) in State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m a -> State StreamK m b forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a st) b -> StreamK m b -> m r forall {p}. p -> StreamK m b -> m r yieldk b -> m r forall {p}. p -> m r single m r stop StreamK m b m go3 :: a -> StreamK m a -> StreamK m a go3 a f StreamK m a m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let single :: p -> m r single p _ = a -> m r sng a f yieldk :: p -> StreamK m a -> m r yieldk p _ StreamK m a r = a -> StreamK m a -> m r yld a f (a -> StreamK m a -> StreamK m a go3 a f StreamK m a r) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m a st) a -> StreamK m a -> m r forall {p}. p -> StreamK m a -> m r yieldk a -> m r forall {p}. p -> m r single m r stp StreamK m a m -- | -- Definition: -- -- >>> crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2 -- -- Note that the second stream is evaluated multiple times. -- {-# INLINE crossWith #-} crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c crossWith :: forall (m :: * -> *) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c crossWith a -> b -> c f StreamK m a m1 StreamK m b m2 = (a -> b -> c) -> StreamK m a -> StreamK m (b -> c) forall a b. (a -> b) -> StreamK m a -> StreamK m b forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap a -> b -> c f StreamK m a m1 StreamK m (b -> c) -> StreamK m b -> StreamK m c forall (m :: * -> *) a b. StreamK m (a -> b) -> StreamK m a -> StreamK m b `crossApply` StreamK m b m2 -- | Given a @StreamK m a@ and @StreamK m b@ generate a stream with all possible -- combinations of the tuple @(a, b)@. -- -- Definition: -- -- >>> cross = StreamK.crossWith (,) -- -- The second stream is evaluated multiple times. If that is not desired it can -- be cached in an 'Data.Array.Array' and then generated from the array before -- calling this function. Caching may also improve performance if the stream is -- expensive to evaluate. -- -- See 'Streamly.Internal.Data.Unfold.cross' for a much faster fused -- alternative. -- -- Time: O(m x n) -- -- /Pre-release/ {-# INLINE cross #-} cross :: Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b) cross :: forall (m :: * -> *) a b. Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b) cross = (a -> b -> (a, b)) -> StreamK m a -> StreamK m b -> StreamK m (a, b) forall (m :: * -> *) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c crossWith (,) -- XXX This is just concatMapWith with arguments flipped. We need to keep this -- instead of using a concatMap style definition because the bind -- implementation in Async and WAsync streams show significant perf degradation -- if the argument order is changed. {-# INLINE bindWith #-} bindWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b bindWith :: forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b bindWith StreamK m b -> StreamK m b -> StreamK m b par StreamK m a m1 a -> StreamK m b f = StreamK m a -> StreamK m b go StreamK m a m1 where go :: StreamK m a -> StreamK m b go StreamK m a m = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let foldShared :: StreamK m b -> m r foldShared = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp single :: a -> m r single a a = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare (a -> StreamK m b f a a) yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare (a -> StreamK m b f a a) StreamK m b -> StreamK m b -> StreamK m b `par` StreamK m a -> StreamK m b go StreamK m a r in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) a -> StreamK m a -> m r yieldk a -> m r single m r stp StreamK m a m -- XXX express in terms of foldrS? -- XXX can we use a different stream type for the generated stream being -- falttened so that we can combine them differently and keep the resulting -- stream different? -- XXX do we need specialize to IO? -- XXX can we optimize when c and a are same, by removing the forall using -- rewrite rules with type applications? -- | Perform a 'concatMap' using a specified concat strategy. The first -- argument specifies a merge or concat function that is used to merge the -- streams generated by the map function. -- {-# INLINE concatMapWith #-} concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMapWith :: forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMapWith StreamK m b -> StreamK m b -> StreamK m b par a -> StreamK m b f StreamK m a xs = (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b bindWith StreamK m b -> StreamK m b -> StreamK m b par StreamK m a xs a -> StreamK m b f {-# INLINE concatMap #-} concatMap :: (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMap :: forall a (m :: * -> *) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMap = (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMapWith StreamK m b -> StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a append {- -- Fused version. -- XXX This fuses but when the stream is nil this performs poorly. -- The filterAllOut benchmark degrades. Need to investigate and fix that. {-# INLINE concatMap #-} concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b concatMap f xs = buildS (\c n -> foldrS (\x b -> foldrS c b (f x)) n xs) -- Stream polymorphic concatMap implementation -- XXX need to use buildSM/foldrSMShared for parallel behavior -- XXX unShare seems to degrade the fused performance {-# INLINE_EARLY concatMap_ #-} concatMap_ :: IsStream t => (a -> t m b) -> t m a -> t m b concatMap_ f xs = buildS (\c n -> foldrSShared (\x b -> foldrSShared c b (unShare $ f x)) n xs) -} -- | Combine streams in pairs using a binary combinator, the resulting streams -- are then combined again in pairs recursively until we get to a single -- combined stream. The composition would thus form a binary tree. -- -- For example, you can sort a stream using merge sort like this: -- -- >>> s = StreamK.fromStream $ Stream.fromList [5,1,7,9,2] -- >>> generate = StreamK.fromPure -- >>> combine = StreamK.mergeBy compare -- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.mergeMapWith combine generate s -- [1,2,5,7,9] -- -- Note that if the stream length is not a power of 2, the binary tree composed -- by mergeMapWith would not be balanced, which may or may not be important -- depending on what you are trying to achieve. -- -- /Caution: the stream of streams must be finite/ -- -- /Pre-release/ -- {-# INLINE mergeMapWith #-} mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b mergeMapWith :: forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b mergeMapWith StreamK m b -> StreamK m b -> StreamK m b combine a -> StreamK m b f StreamK m a str = StreamK m (StreamK m b) -> StreamK m b go (StreamK m a -> StreamK m (StreamK m b) forall {m :: * -> *}. StreamK m a -> StreamK m (StreamK m b) leafPairs StreamK m a str) where go :: StreamK m (StreamK m b) -> StreamK m b go StreamK m (StreamK m b) stream = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let foldShared :: StreamK m b -> m r foldShared = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp single :: StreamK m b -> m r single StreamK m b a = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare StreamK m b a yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b a StreamK m (StreamK m b) r = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m (StreamK m b) -> StreamK m b go1 StreamK m b a StreamK m (StreamK m b) r in State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> StreamK m (StreamK m b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m (StreamK m b) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b -> m r single m r stp StreamK m (StreamK m b) stream go1 :: StreamK m b -> StreamK m (StreamK m b) -> StreamK m b go1 StreamK m b a1 StreamK m (StreamK m b) stream = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let foldShared :: StreamK m b -> m r foldShared = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp stop :: m r stop = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare StreamK m b a1 single :: StreamK m b -> m r single StreamK m b a = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b forall (m :: * -> *) a. StreamK m a -> StreamK m a unShare StreamK m b a1 StreamK m b -> StreamK m b -> StreamK m b `combine` StreamK m b a yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b a StreamK m (StreamK m b) r = StreamK m b -> m r foldShared (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m (StreamK m b) -> StreamK m b go (StreamK m (StreamK m b) -> StreamK m b) -> StreamK m (StreamK m b) -> StreamK m b forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m b -> StreamK m b combine StreamK m b a1 StreamK m b a StreamK m b -> StreamK m (StreamK m b) -> StreamK m (StreamK m b) forall a (m :: * -> *). a -> StreamK m a -> StreamK m a `cons` StreamK m (StreamK m b) -> StreamK m (StreamK m b) forall {m :: * -> *}. StreamK m (StreamK m b) -> StreamK m (StreamK m b) nonLeafPairs StreamK m (StreamK m b) r in State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> StreamK m (StreamK m b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m (StreamK m b) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b -> m r single m r stop StreamK m (StreamK m b) stream -- Exactly the same as "go" except that stop continuation extracts the -- stream. leafPairs :: StreamK m a -> StreamK m (StreamK m b) leafPairs StreamK m a stream = (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b)) -> (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall a b. (a -> b) -> a -> b $ \State StreamK m (StreamK m b) st StreamK m b -> StreamK m (StreamK m b) -> m r yld StreamK m b -> m r sng m r stp -> let foldShared :: StreamK m (StreamK m b) -> m r foldShared = State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> StreamK m (StreamK m b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m (StreamK m b) st StreamK m b -> StreamK m (StreamK m b) -> m r yld StreamK m b -> m r sng m r stp single :: a -> m r single a a = StreamK m b -> m r sng (a -> StreamK m b f a a) yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = StreamK m (StreamK m b) -> m r foldShared (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r forall a b. (a -> b) -> a -> b $ a -> StreamK m a -> StreamK m (StreamK m b) leafPairs1 a a StreamK m a r in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m (StreamK m b) -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m (StreamK m b) st) a -> StreamK m a -> m r yieldk a -> m r single m r stp StreamK m a stream leafPairs1 :: a -> StreamK m a -> StreamK m (StreamK m b) leafPairs1 a a1 StreamK m a stream = (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b)) -> (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall a b. (a -> b) -> a -> b $ \State StreamK m (StreamK m b) st StreamK m b -> StreamK m (StreamK m b) -> m r yld StreamK m b -> m r sng m r _ -> let stop :: m r stop = StreamK m b -> m r sng (a -> StreamK m b f a a1) single :: a -> m r single a a = StreamK m b -> m r sng (a -> StreamK m b f a a1 StreamK m b -> StreamK m b -> StreamK m b `combine` a -> StreamK m b f a a) yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = StreamK m b -> StreamK m (StreamK m b) -> m r yld (a -> StreamK m b f a a1 StreamK m b -> StreamK m b -> StreamK m b `combine` a -> StreamK m b f a a) (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r forall a b. (a -> b) -> a -> b $ StreamK m a -> StreamK m (StreamK m b) leafPairs StreamK m a r in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m (StreamK m b) -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m (StreamK m b) st) a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a stream -- Exactly the same as "leafPairs" except that it does not map "f" nonLeafPairs :: StreamK m (StreamK m b) -> StreamK m (StreamK m b) nonLeafPairs StreamK m (StreamK m b) stream = (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b)) -> (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall a b. (a -> b) -> a -> b $ \State StreamK m (StreamK m b) st StreamK m b -> StreamK m (StreamK m b) -> m r yld StreamK m b -> m r sng m r stp -> let foldShared :: StreamK m (StreamK m b) -> m r foldShared = State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> StreamK m (StreamK m b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m (StreamK m b) st StreamK m b -> StreamK m (StreamK m b) -> m r yld StreamK m b -> m r sng m r stp single :: StreamK m b -> m r single StreamK m b a = StreamK m b -> m r sng StreamK m b a yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b a StreamK m (StreamK m b) r = StreamK m (StreamK m b) -> m r foldShared (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m (StreamK m b) -> StreamK m (StreamK m b) nonLeafPairs1 StreamK m b a StreamK m (StreamK m b) r in State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> StreamK m (StreamK m b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m (StreamK m b) -> State StreamK m (StreamK m b) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m (StreamK m b) st) StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b -> m r single m r stp StreamK m (StreamK m b) stream nonLeafPairs1 :: StreamK m b -> StreamK m (StreamK m b) -> StreamK m (StreamK m b) nonLeafPairs1 StreamK m b a1 StreamK m (StreamK m b) stream = (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b)) -> (forall r. State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> m r) -> StreamK m (StreamK m b) forall a b. (a -> b) -> a -> b $ \State StreamK m (StreamK m b) st StreamK m b -> StreamK m (StreamK m b) -> m r yld StreamK m b -> m r sng m r _ -> let stop :: m r stop = StreamK m b -> m r sng StreamK m b a1 single :: StreamK m b -> m r single StreamK m b a = StreamK m b -> m r sng (StreamK m b a1 StreamK m b -> StreamK m b -> StreamK m b `combine` StreamK m b a) yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b a StreamK m (StreamK m b) r = StreamK m b -> StreamK m (StreamK m b) -> m r yld (StreamK m b a1 StreamK m b -> StreamK m b -> StreamK m b `combine` StreamK m b a) (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r forall a b. (a -> b) -> a -> b $ StreamK m (StreamK m b) -> StreamK m (StreamK m b) nonLeafPairs StreamK m (StreamK m b) r in State StreamK m (StreamK m b) -> (StreamK m b -> StreamK m (StreamK m b) -> m r) -> (StreamK m b -> m r) -> m r -> StreamK m (StreamK m b) -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m (StreamK m b) -> State StreamK m (StreamK m b) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m (StreamK m b) st) StreamK m b -> StreamK m (StreamK m b) -> m r yieldk StreamK m b -> m r single m r stop StreamK m (StreamK m b) stream {- instance Monad m => Applicative (StreamK m) where {-# INLINE pure #-} pure = fromPure {-# INLINE (<*>) #-} (<*>) = crossApply {-# INLINE liftA2 #-} liftA2 f x = (<*>) (fmap f x) {-# INLINE (*>) #-} (*>) = crossApplySnd {-# INLINE (<*) #-} (<*) = crossApplyFst -- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK, -- the monad instance of StreamD is slower than StreamK after foldr/build -- fusion. instance Monad m => Monad (StreamK m) where {-# INLINE return #-} return = pure {-# INLINE (>>=) #-} (>>=) = flip concatMap -} {- -- Like concatMap but generates stream using an unfold function. Similar to -- unfoldMany but for StreamK. concatUnfoldr :: IsStream t => (b -> t m (Maybe (a, b))) -> t m b -> t m a concatUnfoldr = undefined -} ------------------------------------------------------------------------------ -- concatIterate - Map and flatten Trees of Streams ------------------------------------------------------------------------------ -- | Yield an input element in the output stream, map a stream generator on it -- and repeat the process on the resulting stream. Resulting streams are -- flattened using the 'concatMapWith' combinator. This can be used for a depth -- first style (DFS) traversal of a tree like structure. -- -- Example, list a directory tree using DFS: -- -- >>> f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil) -- >>> input = StreamK.fromPure (Left ".") -- >>> ls = StreamK.concatIterateWith StreamK.append f input -- -- Note that 'iterateM' is a special case of 'concatIterateWith': -- -- >>> iterateM f = StreamK.concatIterateWith StreamK.append (StreamK.fromEffect . f) . StreamK.fromEffect -- -- /Pre-release/ -- {-# INLINE concatIterateWith #-} concatIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a concatIterateWith :: forall (m :: * -> *) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a concatIterateWith StreamK m a -> StreamK m a -> StreamK m a combine a -> StreamK m a f = StreamK m a -> StreamK m a iterateStream where iterateStream :: StreamK m a -> StreamK m a iterateStream = (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMapWith StreamK m a -> StreamK m a -> StreamK m a combine a -> StreamK m a generate generate :: a -> StreamK m a generate a x = a x a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a `cons` StreamK m a -> StreamK m a iterateStream (a -> StreamK m a f a x) -- | Like 'concatIterateWith' but uses the pairwise flattening combinator -- 'mergeMapWith' for flattening the resulting streams. This can be used for a -- balanced traversal of a tree like structure. -- -- Example, list a directory tree using balanced traversal: -- -- >>> f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil) -- >>> input = StreamK.fromPure (Left ".") -- >>> ls = StreamK.mergeIterateWith StreamK.interleave f input -- -- /Pre-release/ -- {-# INLINE mergeIterateWith #-} mergeIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a mergeIterateWith :: forall (m :: * -> *) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a mergeIterateWith StreamK m a -> StreamK m a -> StreamK m a combine a -> StreamK m a f = StreamK m a -> StreamK m a iterateStream where iterateStream :: StreamK m a -> StreamK m a iterateStream = (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b mergeMapWith StreamK m a -> StreamK m a -> StreamK m a combine a -> StreamK m a generate generate :: a -> StreamK m a generate a x = a x a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a `cons` StreamK m a -> StreamK m a iterateStream (a -> StreamK m a f a x) ------------------------------------------------------------------------------ -- Flattening Graphs ------------------------------------------------------------------------------ -- To traverse graphs we need a state to be carried around in the traversal. -- For example, we can use a hashmap to store the visited status of nodes. -- | Like 'iterateMap' but carries a state in the stream generation function. -- This can be used to traverse graph like structures, we can remember the -- visited nodes in the state to avoid cycles. -- -- Note that a combination of 'iterateMap' and 'usingState' can also be used to -- traverse graphs. However, this function provides a more localized state -- instead of using a global state. -- -- See also: 'mfix' -- -- /Pre-release/ -- {-# INLINE concatIterateScanWith #-} concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a concatIterateScanWith :: forall (m :: * -> *) a b. Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a concatIterateScanWith StreamK m a -> StreamK m a -> StreamK m a combine b -> a -> m (b, StreamK m a) f m b initial StreamK m a stream = m (StreamK m a) -> StreamK m a forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a concatEffect (m (StreamK m a) -> StreamK m a) -> m (StreamK m a) -> StreamK m a forall a b. (a -> b) -> a -> b $ do b b <- m b initial (b, StreamK m a) -> m (StreamK m a) iterateStream (b b, StreamK m a stream) where iterateStream :: (b, StreamK m a) -> m (StreamK m a) iterateStream (b b, StreamK m a s) = StreamK m a -> m (StreamK m a) forall a. a -> m a forall (f :: * -> *) a. Applicative f => a -> f a pure (StreamK m a -> m (StreamK m a)) -> StreamK m a -> m (StreamK m a) forall a b. (a -> b) -> a -> b $ (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a forall (m :: * -> *) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b concatMapWith StreamK m a -> StreamK m a -> StreamK m a combine (b -> a -> StreamK m a generate b b) StreamK m a s generate :: b -> a -> StreamK m a generate b b a a = a a a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a `cons` b -> a -> StreamK m a feedback b b a a feedback :: b -> a -> StreamK m a feedback b b a a = m (StreamK m a) -> StreamK m a forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a concatEffect (m (StreamK m a) -> StreamK m a) -> m (StreamK m a) -> StreamK m a forall a b. (a -> b) -> a -> b $ b -> a -> m (b, StreamK m a) f b b a a m (b, StreamK m a) -> ((b, StreamK m a) -> m (StreamK m a)) -> m (StreamK m a) forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= (b, StreamK m a) -> m (StreamK m a) iterateStream ------------------------------------------------------------------------------ -- Either streams ------------------------------------------------------------------------------ -- Keep concating either streams as long as rights are generated, stop as soon -- as a left is generated and concat the left stream. -- -- See also: 'handle' -- -- /Unimplemented/ -- {- concatMapEitherWith :: (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either (StreamK m b) b)) -> StreamK m a -> StreamK m b concatMapEitherWith = undefined -} -- XXX We should prefer using the Maybe stream returning signatures over this. -- This API should perhaps be removed in favor of those. -- | In an 'Either' stream iterate on 'Left's. This is a special case of -- 'concatIterateWith': -- -- >>> concatIterateLeftsWith combine f = StreamK.concatIterateWith combine (either f (const StreamK.nil)) -- -- To traverse a directory tree: -- -- >>> input = StreamK.fromPure (Left ".") -- >>> ls = StreamK.concatIterateLeftsWith StreamK.append (StreamK.fromStream . Dir.readEither) input -- -- /Pre-release/ -- {-# INLINE concatIterateLeftsWith #-} concatIterateLeftsWith :: (b ~ Either a c) => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b concatIterateLeftsWith :: forall b a c (m :: * -> *). (b ~ Either a c) => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b concatIterateLeftsWith StreamK m b -> StreamK m b -> StreamK m b combine a -> StreamK m b f = (StreamK m b -> StreamK m b -> StreamK m b) -> (b -> StreamK m b) -> StreamK m b -> StreamK m b forall (m :: * -> *) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a concatIterateWith StreamK m b -> StreamK m b -> StreamK m b combine ((a -> StreamK m b) -> (c -> StreamK m b) -> Either a c -> StreamK m b forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either a -> StreamK m b f (StreamK m b -> c -> StreamK m b forall a b. a -> b -> a const StreamK m b forall (m :: * -> *) a. StreamK m a nil)) ------------------------------------------------------------------------------ -- Interleaving ------------------------------------------------------------------------------ infixr 6 `interleave` -- Additionally we can have m elements yield from the first stream and n -- elements yielding from the second stream. We can also have time slicing -- variants of positional interleaving, e.g. run first stream for m seconds and -- run the second stream for n seconds. -- | Note: When joining many streams in a left associative manner earlier -- streams will get exponential priority than the ones joining later. Because -- of exponentially high weighting of left streams it can be used with -- 'concatMapWith' even on a large number of streams. -- {-# INLINE interleave #-} interleave :: StreamK m a -> StreamK m a -> StreamK m a interleave :: forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleave StreamK m a m1 StreamK m a m2 = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> do let stop :: m r stop = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp StreamK m a m2 single :: a -> m r single a a = a -> StreamK m a -> m r yld a a StreamK m a m2 yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = a -> StreamK m a -> m r yld a a (StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleave StreamK m a m2 StreamK m a r) State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m1 infixr 6 `interleaveFst` -- | Like `interleave` but stops interleaving as soon as the first stream stops. -- {-# INLINE interleaveFst #-} interleaveFst :: StreamK m a -> StreamK m a -> StreamK m a interleaveFst :: forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleaveFst StreamK m a m1 StreamK m a m2 = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> do let yieldFirst :: a -> StreamK m a -> m r yieldFirst a a StreamK m a r = a -> StreamK m a -> m r yld a a (StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a yieldSecond StreamK m a r StreamK m a m2) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yieldFirst a -> m r sng m r stp StreamK m a m1 where yieldSecond :: StreamK m a -> StreamK m a -> StreamK m a yieldSecond StreamK m a s1 StreamK m a s2 = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> do let stop :: m r stop = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp StreamK m a s1 single :: a -> m r single a a = a -> StreamK m a -> m r yld a a StreamK m a s1 yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = a -> StreamK m a -> m r yld a a (StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleave StreamK m a s1 StreamK m a r) in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a s2 infixr 6 `interleaveMin` -- | Like `interleave` but stops interleaving as soon as any of the two streams -- stops. -- {-# INLINE interleaveMin #-} interleaveMin :: StreamK m a -> StreamK m a -> StreamK m a interleaveMin :: forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleaveMin StreamK m a m1 StreamK m a m2 = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r _ m r stp -> do let stop :: m r stop = m r stp -- "single a" is defined as "yld a (interleaveMin m2 nil)" instead of -- "sng a" to keep the behaviour consistent with the yield -- continuation. single :: a -> m r single a a = a -> StreamK m a -> m r yld a a (StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleaveMin StreamK m a m2 StreamK m a forall (m :: * -> *) a. StreamK m a nil) yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = a -> StreamK m a -> m r yld a a (StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a interleaveMin StreamK m a m2 StreamK m a r) State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a m1 ------------------------------------------------------------------------------- -- Generation ------------------------------------------------------------------------------- -- | -- >>> :{ -- unfoldr step s = -- case step s of -- Nothing -> StreamK.nil -- Just (a, b) -> a `StreamK.cons` unfoldr step b -- :} -- -- Build a stream by unfolding a /pure/ step function @step@ starting from a -- seed @s@. The step function returns the next element in the stream and the -- next seed value. When it is done it returns 'Nothing' and the stream ends. -- For example, -- -- >>> :{ -- let f b = -- if b > 2 -- then Nothing -- else Just (b, b + 1) -- in StreamK.toList $ StreamK.unfoldr f 0 -- :} -- [0,1,2] -- {-# INLINE unfoldr #-} unfoldr :: (b -> Maybe (a, b)) -> b -> StreamK m a unfoldr :: forall b a (m :: * -> *). (b -> Maybe (a, b)) -> b -> StreamK m a unfoldr b -> Maybe (a, b) next b s0 = (forall b. (a -> b -> b) -> b -> b) -> StreamK m a forall (m :: * -> *) a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a build ((forall b. (a -> b -> b) -> b -> b) -> StreamK m a) -> (forall b. (a -> b -> b) -> b -> b) -> StreamK m a forall a b. (a -> b) -> a -> b $ \a -> b -> b yld b stp -> let go :: b -> b go b s = case b -> Maybe (a, b) next b s of Just (a a, b b) -> a -> b -> b yld a a (b -> b go b b) Maybe (a, b) Nothing -> b stp in b -> b go b s0 {-# INLINE unfoldrMWith #-} unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a unfoldrMWith :: forall (m :: * -> *) a b. Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a unfoldrMWith m a -> StreamK m a -> StreamK m a cns b -> m (Maybe (a, b)) step = b -> StreamK m a go where go :: b -> StreamK m a go b s = (m a -> StreamK m a -> StreamK m a) -> (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. Monad m => (m a -> StreamK m a -> StreamK m a) -> (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a sharedMWith m a -> StreamK m a -> StreamK m a cns ((forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \a -> StreamK m a -> m r yld a -> m r _ m r stp -> do Maybe (a, b) r <- b -> m (Maybe (a, b)) step b s case Maybe (a, b) r of Just (a a, b b) -> a -> StreamK m a -> m r yld a a (b -> StreamK m a go b b) Maybe (a, b) Nothing -> m r stp -- | Build a stream by unfolding a /monadic/ step function starting from a -- seed. The step function returns the next element in the stream and the next -- seed value. When it is done it returns 'Nothing' and the stream ends. For -- example, -- -- >>> :{ -- let f b = -- if b > 2 -- then return Nothing -- else return (Just (b, b + 1)) -- in StreamK.toList $ StreamK.unfoldrM f 0 -- :} -- [0,1,2] -- {-# INLINE unfoldrM #-} unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a unfoldrM :: forall (m :: * -> *) b a. Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a unfoldrM = (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a forall (m :: * -> *) a b. Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a unfoldrMWith m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. Monad m => m a -> StreamK m a -> StreamK m a consM -- | Generate an infinite stream by repeating a pure value. -- -- /Pre-release/ {-# INLINE repeat #-} repeat :: a -> StreamK m a repeat :: forall a (m :: * -> *). a -> StreamK m a repeat a a = let x :: StreamK m a x = a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons a a StreamK m a x in StreamK m a forall {m :: * -> *}. StreamK m a x -- | Like 'repeatM' but takes a stream 'cons' operation to combine the actions -- in a stream specific manner. A serial cons would repeat the values serially -- while an async cons would repeat concurrently. -- -- /Pre-release/ repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a repeatMWith :: forall (m :: * -> *) a (t :: (* -> *) -> * -> *). (m a -> t m a -> t m a) -> m a -> t m a repeatMWith m a -> t m a -> t m a cns = m a -> t m a go where go :: m a -> t m a go m a m = m a m m a -> t m a -> t m a `cns` m a -> t m a go m a m {-# INLINE replicateMWith #-} replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a replicateMWith :: forall (m :: * -> *) a. (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a replicateMWith m a -> StreamK m a -> StreamK m a cns Int n m a m = Int -> StreamK m a forall {t}. (Ord t, Num t) => t -> StreamK m a go Int n where go :: t -> StreamK m a go t cnt = if t cnt t -> t -> Bool forall a. Ord a => a -> a -> Bool <= t 0 then StreamK m a forall (m :: * -> *) a. StreamK m a nil else m a m m a -> StreamK m a -> StreamK m a `cns` t -> StreamK m a go (t cnt t -> t -> t forall a. Num a => a -> a -> a - t 1) {-# INLINE fromIndicesMWith #-} fromIndicesMWith :: (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a fromIndicesMWith :: forall (m :: * -> *) a. (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a fromIndicesMWith m a -> StreamK m a -> StreamK m a cns Int -> m a gen = Int -> StreamK m a go Int 0 where go :: Int -> StreamK m a go Int i = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r stp a -> m r sng m r yld -> do State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r stp a -> m r sng m r yld (Int -> m a gen Int i m a -> StreamK m a -> StreamK m a `cns` Int -> StreamK m a go (Int i Int -> Int -> Int forall a. Num a => a -> a -> a + Int 1)) {-# INLINE iterateMWith #-} iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a iterateMWith :: forall (m :: * -> *) a. Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a iterateMWith m a -> StreamK m a -> StreamK m a cns a -> m a step = m a -> StreamK m a go where go :: m a -> StreamK m a go m a s = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r stp a -> m r sng m r yld -> do !a next <- m a s State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r stp a -> m r sng m r yld (a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a next m a -> StreamK m a -> StreamK m a `cns` m a -> StreamK m a go (a -> m a step a next)) {-# INLINE headPartial #-} headPartial :: Monad m => StreamK m a -> m a headPartial :: forall (m :: * -> *) a. Monad m => StreamK m a -> m a headPartial = (a -> m a -> m a) -> m a -> StreamK m a -> m a forall a (m :: * -> *) b. (a -> m b -> m b) -> m b -> StreamK m a -> m b foldrM (\a x m a _ -> a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a x) (String -> m a forall a. HasCallStack => String -> a error String "head of nil") {-# INLINE tailPartial #-} tailPartial :: StreamK m a -> StreamK m a tailPartial :: forall (m :: * -> *) a. StreamK m a -> StreamK m a tailPartial StreamK m a m = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let stop :: a stop = String -> a forall a. HasCallStack => String -> a error String "tail of nil" single :: p -> m r single p _ = m r stp yieldk :: p -> StreamK m a -> m r yieldk p _ StreamK m a r = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp StreamK m a r in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r forall {p}. p -> StreamK m a -> m r yieldk a -> m r forall {p}. p -> m r single m r forall {a}. a stop StreamK m a m -- | We can define cyclic structures using @let@: -- -- >>> let (a, b) = ([1, b], head a) in (a, b) -- ([1,1],1) -- -- The function @fix@ defined as: -- -- >>> fix f = let x = f x in x -- -- ensures that the argument of a function and its output refer to the same -- lazy value @x@ i.e. the same location in memory. Thus @x@ can be defined -- in terms of itself, creating structures with cyclic references. -- -- >>> f ~(a, b) = ([1, b], head a) -- >>> fix f -- ([1,1],1) -- -- 'Control.Monad.mfix' is essentially the same as @fix@ but for monadic -- values. -- -- Using 'mfix' for streams we can construct a stream in which each element of -- the stream is defined in a cyclic fashion. The argument of the function -- being fixed represents the current element of the stream which is being -- returned by the stream monad. Thus, we can use the argument to construct -- itself. -- -- In the following example, the argument @action@ of the function @f@ -- represents the tuple @(x,y)@ returned by it in a given iteration. We define -- the first element of the tuple in terms of the second. -- -- >>> import System.IO.Unsafe (unsafeInterleaveIO) -- -- >>> :{ -- main = Stream.fold (Fold.drainMapM print) $ StreamK.toStream $ StreamK.mfix f -- where -- f action = StreamK.unCross $ do -- let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act -- x <- StreamK.mkCross $ StreamK.fromStream $ Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action] -- y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [4,5] -- return (x, y) -- :} -- -- Note: you cannot achieve this by just changing the order of the monad -- statements because that would change the order in which the stream elements -- are generated. -- -- Note that the function @f@ must be lazy in its argument, that's why we use -- 'unsafeInterleaveIO' on @action@ because IO monad is strict. -- -- /Pre-release/ {-# INLINE mfix #-} mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a mfix :: forall (m :: * -> *) a. Monad m => (m a -> StreamK m a) -> StreamK m a mfix m a -> StreamK m a f = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> let single :: a -> m r single a a = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ a a a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a `cons` StreamK m a ys yieldk :: a -> p -> m r yieldk a a p _ = State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp (StreamK m a -> m r) -> StreamK m a -> m r forall a b. (a -> b) -> a -> b $ a a a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a `cons` StreamK m a ys in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a st a -> StreamK m a -> m r forall {p}. a -> p -> m r yieldk a -> m r single m r stp StreamK m a xs where -- fix the head element of the stream xs :: StreamK m a xs = (StreamK m a -> StreamK m a) -> StreamK m a forall a. (a -> a) -> a fix (m a -> StreamK m a f (m a -> StreamK m a) -> (StreamK m a -> m a) -> StreamK m a -> StreamK m a forall b c a. (b -> c) -> (a -> b) -> a -> c . StreamK m a -> m a forall (m :: * -> *) a. Monad m => StreamK m a -> m a headPartial) -- now fix the tail recursively ys :: StreamK m a ys = (m a -> StreamK m a) -> StreamK m a forall (m :: * -> *) a. Monad m => (m a -> StreamK m a) -> StreamK m a mfix (StreamK m a -> StreamK m a forall (m :: * -> *) a. StreamK m a -> StreamK m a tailPartial (StreamK m a -> StreamK m a) -> (m a -> StreamK m a) -> m a -> StreamK m a forall b c a. (b -> c) -> (a -> b) -> a -> c . m a -> StreamK m a f) ------------------------------------------------------------------------------- -- Conversions ------------------------------------------------------------------------------- -- | -- >>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil -- -- Construct a stream from a 'Foldable' containing pure values: -- {-# INLINE fromFoldable #-} fromFoldable :: Foldable f => f a -> StreamK m a fromFoldable :: forall (f :: * -> *) a (m :: * -> *). Foldable f => f a -> StreamK m a fromFoldable = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> f a -> StreamK m a forall a b. (a -> b -> b) -> b -> f a -> b forall (t :: * -> *) a b. Foldable t => (a -> b -> b) -> b -> t a -> b Prelude.foldr a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons StreamK m a forall (m :: * -> *) a. StreamK m a nil {-# INLINE fromFoldableM #-} fromFoldableM :: (Foldable f, Monad m) => f (m a) -> StreamK m a fromFoldableM :: forall (f :: * -> *) (m :: * -> *) a. (Foldable f, Monad m) => f (m a) -> StreamK m a fromFoldableM = (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> f (m a) -> StreamK m a forall a b. (a -> b -> b) -> b -> f a -> b forall (t :: * -> *) a b. Foldable t => (a -> b -> b) -> b -> t a -> b Prelude.foldr m a -> StreamK m a -> StreamK m a forall (m :: * -> *) a. Monad m => m a -> StreamK m a -> StreamK m a consM StreamK m a forall (m :: * -> *) a. StreamK m a nil ------------------------------------------------------------------------------- -- Deconstruction ------------------------------------------------------------------------------- {-# INLINE uncons #-} uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) uncons :: forall (m :: * -> *) a. Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) uncons StreamK m a m = let stop :: m (Maybe a) stop = Maybe a -> m (Maybe a) forall a. a -> m a forall (f :: * -> *) a. Applicative f => a -> f a pure Maybe a forall a. Maybe a Nothing single :: a -> f (Maybe (a, StreamK m a)) single a a = Maybe (a, StreamK m a) -> f (Maybe (a, StreamK m a)) forall a. a -> f a forall (f :: * -> *) a. Applicative f => a -> f a pure ((a, StreamK m a) -> Maybe (a, StreamK m a) forall a. a -> Maybe a Just (a a, StreamK m a forall (m :: * -> *) a. StreamK m a nil)) yieldk :: a -> b -> f (Maybe (a, b)) yieldk a a b r = Maybe (a, b) -> f (Maybe (a, b)) forall a. a -> f a forall (f :: * -> *) a. Applicative f => a -> f a pure ((a, b) -> Maybe (a, b) forall a. a -> Maybe a Just (a a, b r)) in State StreamK m a -> (a -> StreamK m a -> m (Maybe (a, StreamK m a))) -> (a -> m (Maybe (a, StreamK m a))) -> m (Maybe (a, StreamK m a)) -> StreamK m a -> m (Maybe (a, StreamK m a)) forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m (Maybe (a, StreamK m a)) forall {f :: * -> *} {a} {b}. Applicative f => a -> b -> f (Maybe (a, b)) yieldk a -> m (Maybe (a, StreamK m a)) forall {f :: * -> *} {a} {m :: * -> *} {a}. Applicative f => a -> f (Maybe (a, StreamK m a)) single m (Maybe (a, StreamK m a)) forall {a}. m (Maybe a) stop StreamK m a m {-# INLINE tail #-} tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) tail :: forall (m :: * -> *) a. Applicative m => StreamK m a -> m (Maybe (StreamK m a)) tail = let stop :: m (Maybe a) stop = Maybe a -> m (Maybe a) forall a. a -> m a forall (f :: * -> *) a. Applicative f => a -> f a pure Maybe a forall a. Maybe a Nothing single :: p -> f (Maybe (StreamK m a)) single p _ = Maybe (StreamK m a) -> f (Maybe (StreamK m a)) forall a. a -> f a forall (f :: * -> *) a. Applicative f => a -> f a pure (Maybe (StreamK m a) -> f (Maybe (StreamK m a))) -> Maybe (StreamK m a) -> f (Maybe (StreamK m a)) forall a b. (a -> b) -> a -> b $ StreamK m a -> Maybe (StreamK m a) forall a. a -> Maybe a Just StreamK m a forall (m :: * -> *) a. StreamK m a nil yieldk :: p -> a -> f (Maybe a) yieldk p _ a r = Maybe a -> f (Maybe a) forall a. a -> f a forall (f :: * -> *) a. Applicative f => a -> f a pure (Maybe a -> f (Maybe a)) -> Maybe a -> f (Maybe a) forall a b. (a -> b) -> a -> b $ a -> Maybe a forall a. a -> Maybe a Just a r in State StreamK m a -> (a -> StreamK m a -> m (Maybe (StreamK m a))) -> (a -> m (Maybe (StreamK m a))) -> m (Maybe (StreamK m a)) -> StreamK m a -> m (Maybe (StreamK m a)) forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState a -> StreamK m a -> m (Maybe (StreamK m a)) forall {f :: * -> *} {p} {a}. Applicative f => p -> a -> f (Maybe a) yieldk a -> m (Maybe (StreamK m a)) forall {f :: * -> *} {p} {m :: * -> *} {a}. Applicative f => p -> f (Maybe (StreamK m a)) single m (Maybe (StreamK m a)) forall {a}. m (Maybe a) stop -- | Extract all but the last element of the stream, if any. -- -- Note: This will end up buffering the entire stream. -- -- /Pre-release/ {-# INLINE init #-} init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) init :: forall (m :: * -> *) a. Applicative m => StreamK m a -> m (Maybe (StreamK m a)) init = StreamK m a -> m (Maybe (StreamK m a)) forall (m :: * -> *) a. Applicative m => StreamK m a -> m (Maybe (StreamK m a)) go1 where go1 :: StreamK m t -> m (Maybe (StreamK m t)) go1 StreamK m t m1 = do (\case Maybe (t, StreamK m t) Nothing -> Maybe (StreamK m t) forall a. Maybe a Nothing Just (t h, StreamK m t t) -> StreamK m t -> Maybe (StreamK m t) forall a. a -> Maybe a Just (StreamK m t -> Maybe (StreamK m t)) -> StreamK m t -> Maybe (StreamK m t) forall a b. (a -> b) -> a -> b $ t -> StreamK m t -> StreamK m t forall a (m :: * -> *). a -> StreamK m a -> StreamK m a go t h StreamK m t t) (Maybe (t, StreamK m t) -> Maybe (StreamK m t)) -> m (Maybe (t, StreamK m t)) -> m (Maybe (StreamK m t)) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> StreamK m t -> m (Maybe (t, StreamK m t)) forall (m :: * -> *) a. Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) uncons StreamK m t m1 go :: t -> StreamK m t -> StreamK m t go t p StreamK m t m1 = (forall r. State StreamK m t -> (t -> StreamK m t -> m r) -> (t -> m r) -> m r -> m r) -> StreamK m t forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m t -> (t -> StreamK m t -> m r) -> (t -> m r) -> m r -> m r) -> StreamK m t) -> (forall r. State StreamK m t -> (t -> StreamK m t -> m r) -> (t -> m r) -> m r -> m r) -> StreamK m t forall a b. (a -> b) -> a -> b $ \State StreamK m t _ t -> StreamK m t -> m r yld t -> m r sng m r stp -> let single :: p -> m r single p _ = t -> m r sng t p yieldk :: t -> StreamK m t -> m r yieldk t a StreamK m t x = t -> StreamK m t -> m r yld t p (StreamK m t -> m r) -> StreamK m t -> m r forall a b. (a -> b) -> a -> b $ t -> StreamK m t -> StreamK m t go t a StreamK m t x in State StreamK m t -> (t -> StreamK m t -> m r) -> (t -> m r) -> m r -> StreamK m t -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m t forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a defState t -> StreamK m t -> m r yieldk t -> m r forall {p}. p -> m r single m r stp StreamK m t m1 ------------------------------------------------------------------------------ -- Reordering ------------------------------------------------------------------------------ -- | Lazy left fold to a stream. {-# INLINE foldlS #-} foldlS :: (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldlS :: forall (m :: * -> *) b a. (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldlS StreamK m b -> a -> StreamK m b step = StreamK m b -> StreamK m a -> StreamK m b go where go :: StreamK m b -> StreamK m a -> StreamK m b go StreamK m b acc StreamK m a rest = (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b) -> (forall r. State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r) -> StreamK m b forall a b. (a -> b) -> a -> b $ \State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp -> let run :: StreamK m b -> m r run StreamK m b x = State StreamK m b -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> StreamK m b -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream State StreamK m b st b -> StreamK m b -> m r yld b -> m r sng m r stp StreamK m b x stop :: m r stop = StreamK m b -> m r run StreamK m b acc single :: a -> m r single a a = StreamK m b -> m r run (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> a -> StreamK m b step StreamK m b acc a a yieldk :: a -> StreamK m a -> m r yieldk a a StreamK m a r = StreamK m b -> m r run (StreamK m b -> m r) -> StreamK m b -> m r forall a b. (a -> b) -> a -> b $ StreamK m b -> StreamK m a -> StreamK m b go (StreamK m b -> a -> StreamK m b step StreamK m b acc a a) StreamK m a r in State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStream (State StreamK m b -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b. State t m a -> State t n b adaptState State StreamK m b st) a -> StreamK m a -> m r yieldk a -> m r single m r stop StreamK m a rest {-# INLINE reverse #-} reverse :: StreamK m a -> StreamK m a reverse :: forall (m :: * -> *) a. StreamK m a -> StreamK m a reverse = (StreamK m a -> a -> StreamK m a) -> StreamK m a -> StreamK m a -> StreamK m a forall (m :: * -> *) b a. (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b foldlS ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> a -> StreamK m a forall a b c. (a -> b -> c) -> b -> a -> c flip a -> StreamK m a -> StreamK m a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a cons) StreamK m a forall (m :: * -> *) a. StreamK m a nil ------------------------------------------------------------------------------ -- Running effects ------------------------------------------------------------------------------ -- | Run an action before evaluating the stream. {-# INLINE before #-} before :: Monad m => m b -> StreamK m a -> StreamK m a before :: forall (m :: * -> *) b a. Monad m => m b -> StreamK m a -> StreamK m a before m b action StreamK m a stream = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> m b action m b -> m r -> m r forall a b. m a -> m b -> m b forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp StreamK m a stream {-# INLINE concatEffect #-} concatEffect :: Monad m => m (StreamK m a) -> StreamK m a concatEffect :: forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a concatEffect m (StreamK m a) action = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> m (StreamK m a) action m (StreamK m a) -> (StreamK m a -> m r) -> m r forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp {-# INLINE concatMapEffect #-} concatMapEffect :: Monad m => (b -> StreamK m a) -> m b -> StreamK m a concatMapEffect :: forall (m :: * -> *) b a. Monad m => (b -> StreamK m a) -> m b -> StreamK m a concatMapEffect b -> StreamK m a f m b action = (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall (m :: * -> *) a. (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a mkStream ((forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a) -> (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp -> m b action m b -> (b -> m r) -> m r forall a b. m a -> (a -> m b) -> m b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r forall (m :: * -> *) a r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r foldStreamShared State StreamK m a st a -> StreamK m a -> m r yld a -> m r sng m r stp (StreamK m a -> m r) -> (b -> StreamK m a) -> b -> m r forall b c a. (b -> c) -> (a -> b) -> a -> c . b -> StreamK m a f ------------------------------------------------------------------------------ -- Stream with a cross product style monad instance ------------------------------------------------------------------------------ -- | A newtype wrapper for the 'StreamK' type adding a cross product style -- monad instance. -- -- A 'Monad' bind behaves like a @for@ loop: -- -- >>> :{ -- Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do -- x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2] -- -- Perform the following actions for each x in the stream -- return x -- :} -- [1,2] -- -- Nested monad binds behave like nested @for@ loops: -- -- >>> :{ -- Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do -- x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2] -- y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [3,4] -- -- Perform the following actions for each x, for each y -- return (x, y) -- :} -- [(1,3),(1,4),(2,3),(2,4)] -- newtype CrossStreamK m a = CrossStreamK {forall (m :: * -> *) a. CrossStreamK m a -> StreamK m a