{-# LANGUAGE CPP #-} -- | -- Module : Streamly.Internal.Data.Stream.Top -- Copyright : (c) 2020 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC -- -- Top level module that can depend on all other lower level Stream modules. module Streamly.Internal.Data.Stream.Top ( -- * Transformation -- ** Sampling -- | Value agnostic filtering. strideFromThen -- * Nesting -- ** Set like operations -- | These are not exactly set operations because streams are not -- necessarily sets, they may have duplicated elements. These operations -- are generic i.e. they work on streams of unconstrained types, therefore, -- they have quadratic performance characterstics. For better performance -- using Set structures see the Streamly.Internal.Data.Stream.Container -- module. , filterInStreamGenericBy , deleteInStreamGenericBy , unionWithStreamGenericBy -- ** Set like operations on sorted streams , filterInStreamAscBy , deleteInStreamAscBy , unionWithStreamAscBy -- ** Join operations , joinInnerGeneric -- * Joins on sorted stream , joinInnerAscBy , joinLeftAscBy , joinOuterAscBy ) where #include "inline.hs" import Control.Monad.IO.Class (MonadIO(..)) import Data.IORef (newIORef, readIORef, modifyIORef') import Streamly.Internal.Data.Fold.Type (Fold) import Streamly.Internal.Data.Stream.Type (Stream, cross) import qualified Data.List as List import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Stream.Type as Stream import qualified Streamly.Internal.Data.Stream.Nesting as Stream import qualified Streamly.Internal.Data.Stream.Transform as Stream import Prelude hiding (filter, zipWith, concatMap, concat) #include "DocTestDataStream.hs" ------------------------------------------------------------------------------ -- Sampling ------------------------------------------------------------------------------ -- XXX We can implement this using addition instead of "mod" to make it more -- efficient. -- | @strideFromthen offset stride@ takes the element at @offset@ index and -- then every element at strides of @stride@. -- -- >>> Stream.fold Fold.toList $ Stream.strideFromThen 2 3 $ Stream.enumerateFromTo 0 10 -- [2,5,8] -- {-# INLINE strideFromThen #-} strideFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a strideFromThen :: forall (m :: * -> *) a. Monad m => Int -> Int -> Stream m a -> Stream m a strideFromThen Int offset Int stride = (Stream m a -> Stream m (Int, a)) -> (((Int, a) -> Bool) -> Stream m (Int, a) -> Stream m (Int, a)) -> ((Int, a) -> Bool) -> Stream m a -> Stream m a forall (m :: * -> *) a s b. Monad m => (Stream m a -> Stream m (s, a)) -> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)) -> ((s, a) -> b) -> Stream m a -> Stream m a Stream.with Stream m a -> Stream m (Int, a) forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a) Stream.indexed ((Int, a) -> Bool) -> Stream m (Int, a) -> Stream m (Int, a) forall (m :: * -> *) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a Stream.filter (\(Int i, a _) -> Int i Int -> Int -> Bool forall a. Ord a => a -> a -> Bool >= Int offset Bool -> Bool -> Bool && (Int i Int -> Int -> Int forall a. Num a => a -> a -> a - Int offset) Int -> Int -> Int forall a. Integral a => a -> a -> a `mod` Int stride Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0) ------------------------------------------------------------------------------ -- SQL Joins ------------------------------------------------------------------------------ -- -- Some references: -- * https://en.wikipedia.org/wiki/Relational_algebra -- * https://en.wikipedia.org/wiki/Join_(SQL) -- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only -- constraint, the best would be to use an Array with linear search. If the -- second stream is sorted we can also use a binary search, using Ord -- constraint or an ordering function. -- -- For Storables we can cache the second stream into an unboxed array for -- possibly faster access/compact representation? -- -- If we do not want to keep the stream in memory but always read it from the -- source (disk/network) every time we iterate through it then we can do that -- too by reading the stream every time, the stream must have immutable state -- in that case and the user is responsible for the behavior if the stream -- source changes during iterations. We can also use an Unfold instead of -- stream. We probably need a way to distinguish streams that can be read -- mutliple times without any interference (e.g. unfolding a stream using an -- immutable handle would work i.e. using pread/pwrite instead of maintaining -- an offset in the handle). -- XXX We can do this concurrently. -- XXX If the second stream is sorted and passed as an Array we could use -- binary search if we have an Ord instance or Ordering returning function. The -- time complexity would then become (m x log n). -- | Like 'cross' but emits only those tuples where @a == b@ using the -- supplied equality predicate. -- -- Definition: -- -- >>> joinInnerGeneric eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2 -- -- You should almost always prefer @joinInnerOrd@ over 'joinInnerGeneric' if -- possible. @joinInnerOrd@ is an order of magnitude faster but may take more -- space for caching the second stream. -- -- See 'Streamly.Internal.Data.Unfold.joinInnerGeneric' for a much faster fused -- alternative. -- -- Time: O(m x n) -- -- /Pre-release/ {-# INLINE joinInnerGeneric #-} joinInnerGeneric :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b) joinInnerGeneric :: forall (m :: * -> *) a b. Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b) joinInnerGeneric a -> b -> Bool eq Stream m a s1 Stream m b s2 = ((a, b) -> Bool) -> Stream m (a, b) -> Stream m (a, b) forall (m :: * -> *) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a Stream.filter (\(a a, b b) -> a a a -> b -> Bool `eq` b b) (Stream m (a, b) -> Stream m (a, b)) -> Stream m (a, b) -> Stream m (a, b) forall a b. (a -> b) -> a -> b $ Stream m a -> Stream m b -> Stream m (a, b) forall (m :: * -> *) a b. Monad m => Stream m a -> Stream m b -> Stream m (a, b) cross Stream m a s1 Stream m b s2 {- joinInnerGeneric eq s1 s2 = do -- ConcatMap works faster than bind Stream.concatMap (\a -> Stream.concatMap (\b -> if a `eq` b then Stream.fromPure (a, b) else Stream.nil ) s2 ) s1 -} -- | A more efficient 'joinInner' for sorted streams. -- -- Space: O(1) -- -- Time: O(m + n) -- -- /Unimplemented/ {-# INLINE joinInnerAscBy #-} joinInnerAscBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b) joinInnerAscBy :: forall a b (m :: * -> *). (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b) joinInnerAscBy = (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b) forall a. HasCallStack => a undefined -- | A more efficient 'joinLeft' for sorted streams. -- -- Space: O(1) -- -- Time: O(m + n) -- -- /Unimplemented/ {-# INLINE joinLeftAscBy #-} joinLeftAscBy :: -- Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b) joinLeftAscBy :: forall a b (m :: * -> *). (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b) joinLeftAscBy a -> b -> Ordering _eq Stream m a _s1 Stream m b _s2 = Stream m (a, Maybe b) forall a. HasCallStack => a undefined -- | A more efficient 'joinOuter' for sorted streams. -- -- Space: O(1) -- -- Time: O(m + n) -- -- /Unimplemented/ {-# INLINE joinOuterAscBy #-} joinOuterAscBy :: -- Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b) joinOuterAscBy :: forall a b (m :: * -> *). (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b) joinOuterAscBy a -> b -> Ordering _eq Stream m a _s1 Stream m b _s2 = Stream m (Maybe a, Maybe b) forall a. HasCallStack => a undefined ------------------------------------------------------------------------------ -- Set operations (special joins) ------------------------------------------------------------------------------ -- -- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only constraint -- the best would be to use an Array with linear search. If the second stream -- is sorted we can also use a binary search, using Ord constraint. -- | Keep only those elements in the second stream that are present in the -- first stream too. The first stream is folded to a container using the -- supplied fold and then the elements in the container are looked up using the -- supplied lookup function. -- -- The first stream must be finite and must not block. {-# INLINE filterStreamWith #-} filterStreamWith :: Monad m => Fold m a (f a) -> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a filterStreamWith :: forall (m :: * -> *) a (f :: * -> *). Monad m => Fold m a (f a) -> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a filterStreamWith Fold m a (f a) fld a -> f a -> Bool member Stream m a s1 Stream m a s2 = m (Stream m a) -> Stream m a forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a forall a b. (a -> b) -> a -> b $ do f a xs <- Fold m a (f a) -> Stream m a -> m (f a) forall (m :: * -> *) a b. Monad m => Fold m a b -> Stream m a -> m b Stream.fold Fold m a (f a) fld Stream m a s1 Stream m a -> m (Stream m a) forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a) forall a b. (a -> b) -> a -> b $ (a -> Bool) -> Stream m a -> Stream m a forall (m :: * -> *) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a Stream.filter (a -> f a -> Bool `member` f a xs) Stream m a s2 -- | 'filterInStreamGenericBy' retains only those elements in the second stream that -- are present in the first stream. -- -- >>> Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3]) -- [2,1,1] -- -- >>> Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4]) -- [1,2,2] -- -- Similar to the list intersectBy operation but with the stream argument order -- flipped. -- -- The first stream must be finite and must not block. Second stream is -- processed only after the first stream is fully realized. -- -- Space: O(n) where @n@ is the number of elements in the second stream. -- -- Time: O(m x n) where @m@ is the number of elements in the first stream and -- @n@ is the number of elements in the second stream. -- -- /Pre-release/ {-# INLINE filterInStreamGenericBy #-} filterInStreamGenericBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a filterInStreamGenericBy :: forall (m :: * -> *) a. Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a filterInStreamGenericBy a -> a -> Bool eq = -- XXX Use an (unboxed) array instead. Fold m a [a] -> (a -> [a] -> Bool) -> Stream m a -> Stream m a -> Stream m a forall (m :: * -> *) a (f :: * -> *). Monad m => Fold m a (f a) -> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a filterStreamWith (Fold m a (Maybe a) -> Fold m a [a] -> Fold m a [a] forall (m :: * -> *) a b c. Monad m => Fold m a (Maybe b) -> Fold m b c -> Fold m a c Fold.scanMaybe ((a -> a -> Bool) -> Fold m a (Maybe a) forall (m :: * -> *) a. Monad m => (a -> a -> Bool) -> Fold m a (Maybe a) Fold.uniqBy a -> a -> Bool eq) Fold m a [a] forall (m :: * -> *) a. Monad m => Fold m a [a] Fold.toListRev) ((a -> Bool) -> [a] -> Bool forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool List.any ((a -> Bool) -> [a] -> Bool) -> (a -> a -> Bool) -> a -> [a] -> Bool forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> a -> Bool eq) -- | Like 'filterInStreamGenericBy' but assumes that the input streams are sorted in -- ascending order. To use it on streams sorted in descending order pass an -- inverted comparison function returning GT for less than and LT for greater -- than. -- -- Space: O(1) -- -- Time: O(m+n) -- -- /Pre-release/ {-# INLINE filterInStreamAscBy #-} filterInStreamAscBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a filterInStreamAscBy :: forall (m :: * -> *) a. Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a filterInStreamAscBy a -> a -> Ordering eq Stream m a s1 Stream m a s2 = (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a forall (m :: * -> *) a. Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Stream.intersectBySorted a -> a -> Ordering eq Stream m a s2 Stream m a s1 -- | Delete all elements of the first stream from the seconds stream. If an -- element occurs multiple times in the first stream as many occurrences of it -- are deleted from the second stream. -- -- >>> Stream.fold Fold.toList $ Stream.deleteInStreamGenericBy (==) (Stream.fromList [1,2,3]) (Stream.fromList [1,2,2]) -- [2] -- -- The following laws hold: -- -- > deleteInStreamGenericBy (==) s1 (s1 `append` s2) === s2 -- > deleteInStreamGenericBy (==) s1 (s1 `interleave` s2) === s2 -- -- Same as the list 'Data.List.//' operation but with argument order flipped. -- -- The first stream must be finite and must not block. Second stream is -- processed only after the first stream is fully realized. -- -- Space: O(m) where @m@ is the number of elements in the first stream. -- -- Time: O(m x n) where @m@ is the number of elements in the first stream and -- @n@ is the number of elements in the second stream. -- -- /Pre-release/ {-# INLINE deleteInStreamGenericBy #-} deleteInStreamGenericBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a deleteInStreamGenericBy :: forall (m :: * -> *) a. Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a deleteInStreamGenericBy a -> a -> Bool eq Stream m a s1 Stream m a s2 = m (Stream m a) -> Stream m a forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a forall a b. (a -> b) -> a -> b $ do -- This may work well if s1 is small -- If s1 is big we can go through s1, deleting elements from s2 and -- not emitting an element if it was successfully deleted from s2. -- we will need a deleteBy that can return whether the element was -- deleted or not. [a] xs <- Fold m a [a] -> Stream m a -> m [a] forall (m :: * -> *) a b. Monad m => Fold m a b -> Stream m a -> m b Stream.fold Fold m a [a] forall (m :: * -> *) a. Monad m => Fold m a [a] Fold.toList Stream m a s2 let f :: Fold m a [a] f = ([a] -> a -> [a]) -> [a] -> Fold m a [a] forall (m :: * -> *) b a. Monad m => (b -> a -> b) -> b -> Fold m a b Fold.foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a] forall a b c. (a -> b -> c) -> b -> a -> c flip ((a -> a -> Bool) -> a -> [a] -> [a] forall a. (a -> a -> Bool) -> a -> [a] -> [a] List.deleteBy a -> a -> Bool eq)) [a] xs ([a] -> Stream m a) -> m [a] -> m (Stream m a) forall a b. (a -> b) -> m a -> m b forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap [a] -> Stream m a forall (m :: * -> *) a. Applicative m => [a] -> Stream m a Stream.fromList (m [a] -> m (Stream m a)) -> m [a] -> m (Stream m a) forall a b. (a -> b) -> a -> b $ Fold m a [a] -> Stream m a -> m [a] forall (m :: * -> *) a b. Monad m => Fold m a b -> Stream m a -> m b Stream.fold Fold m a [a] f Stream m a s1 -- | A more efficient 'deleteInStreamGenericBy' for streams sorted in ascending order. -- -- Space: O(1) -- -- /Unimplemented/ {-# INLINE deleteInStreamAscBy #-} deleteInStreamAscBy :: -- (Monad m) => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a deleteInStreamAscBy :: forall a (m :: * -> *). (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a deleteInStreamAscBy a -> a -> Ordering _eq Stream m a _s1 Stream m a _s2 = Stream m a forall a. HasCallStack => a undefined -- XXX Remove the MonadIO constraint. We can just cache one stream and then -- implement using differenceEqBy. -- | This essentially appends to the second stream all the occurrences of -- elements in the first stream that are not already present in the second -- stream. -- -- Equivalent to the following except that @s2@ is evaluated only once: -- -- >>> unionWithStreamGenericBy eq s1 s2 = s2 `Stream.append` (Stream.deleteInStreamGenericBy eq s2 s1) -- -- Example: -- -- >>> Stream.fold Fold.toList $ Stream.unionWithStreamGenericBy (==) (Stream.fromList [1,1,2,3]) (Stream.fromList [1,2,2,4]) -- [1,2,2,4,3] -- -- Space: O(n) -- -- Time: O(m x n) -- -- /Pre-release/ {-# INLINE unionWithStreamGenericBy #-} unionWithStreamGenericBy :: MonadIO m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a unionWithStreamGenericBy :: forall (m :: * -> *) a. MonadIO m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a unionWithStreamGenericBy a -> a -> Bool eq Stream m a s1 Stream m a s2 = m (Stream m a) -> Stream m a forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a forall a b. (a -> b) -> a -> b $ do [a] xs <- Fold m a [a] -> Stream m a -> m [a] forall (m :: * -> *) a b. Monad m => Fold m a b -> Stream m a -> m b Stream.fold Fold m a [a] forall (m :: * -> *) a. Monad m => Fold m a [a] Fold.toList Stream m a s1 -- XXX we can use postscanlMAfter' instead of IORef IORef [a] ref <- IO (IORef [a]) -> m (IORef [a]) forall a. IO a -> m a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO (IORef [a]) -> m (IORef [a])) -> IO (IORef [a]) -> m (IORef [a]) forall a b. (a -> b) -> a -> b $ [a] -> IO (IORef [a]) forall a. a -> IO (IORef a) newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a]) forall a b. (a -> b) -> a -> b $! (a -> a -> Bool) -> [a] -> [a] forall a. (a -> a -> Bool) -> [a] -> [a] List.nubBy a -> a -> Bool eq [a] xs let f :: a -> m a f a x = do IO () -> m () forall a. IO a -> m a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> IO () -> m () forall a b. (a -> b) -> a -> b $ IORef [a] -> ([a] -> [a]) -> IO () forall a. IORef a -> (a -> a) -> IO () modifyIORef' IORef [a] ref ((a -> a -> Bool) -> a -> [a] -> [a] forall a. (a -> a -> Bool) -> a -> [a] -> [a] List.deleteBy a -> a -> Bool eq a x) a -> m a forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return a x s3 :: Stream m a s3 = m (Stream m a) -> Stream m a forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a forall a b. (a -> b) -> a -> b $ do [a] xs1 <- IO [a] -> m [a] forall a. IO a -> m a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO [a] -> m [a]) -> IO [a] -> m [a] forall a b. (a -> b) -> a -> b $ IORef [a] -> IO [a] forall a. IORef a -> IO a readIORef IORef [a] ref Stream m a -> m (Stream m a) forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a) forall a b. (a -> b) -> a -> b $ [a] -> Stream m a forall (m :: * -> *) a. Applicative m => [a] -> Stream m a Stream.fromList [a] xs1 Stream m a -> m (Stream m a) forall a. a -> m a forall (m :: * -> *) a. Monad m => a -> m a return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a) forall a b. (a -> b) -> a -> b $ (a -> m a) -> Stream m a -> Stream m a forall (m :: * -> *) a b. Monad m => (a -> m b) -> Stream m a -> Stream m b Stream.mapM a -> m a forall {m :: * -> *}. MonadIO m => a -> m a f Stream m a s2 Stream m a -> Stream m a -> Stream m a forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a -> Stream m a `Stream.append` Stream m a s3 -- | A more efficient 'unionWithStreamGenericBy' for sorted streams. -- -- Space: O(1) -- -- /Unimplemented/ {-# INLINE unionWithStreamAscBy #-} unionWithStreamAscBy :: -- (Monad m) => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a unionWithStreamAscBy :: forall a (m :: * -> *). (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a unionWithStreamAscBy a -> a -> Ordering _eq Stream m a _s1 Stream m a _s2 = Stream m a forall a. HasCallStack => a undefined