{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.Top
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Top level module that can depend on all other lower level Stream modules.
--
-- Design notes:
--
-- The order of arguments in the join operations should ideally be opposite. It
-- should be such that the infinite stream is the last one. The transformation
-- should be on the last argument, so if you curry the functions with all other
-- arguments we get a @Stream -> Stream@ function. The first stream argument
-- may be considered as a config or modifier for the operation.
--
-- Benefit of changing the order is that we get a more intuitive Stream ->
-- Stream transformation after currying all other arguments. The inner loop
-- streams become arguments for the transformation, more like local modifiers
-- for the global outer stream as the last argument. Thus we can continue using
-- transformations on the outer stream in a composed pipeline. Otherwise we can
-- use flip to flip the order.
--
-- The fact that the inner stream can be used in the loop multiple times also
-- tells that this is not the real effectful stream, it is more like a pure
-- stream or an array. In fact we may consider using an Identity streams as
-- inner streams in which case these functions will not look nice.
--
-- Downsides:
--
-- * Maybe less intuitive to think about, because we usually think the first
--   stream as the outer loop and second as the inner.
-- * Zip and merge operations will continue using the opposite order.
-- * Need to change the order of cross, crossWith operations as well
-- * It will be inconsistent with Data.List. The functions cannot be used as
-- intuitive operators.
--
-- The choice is similar to concatMap vs bind. concatMap is pipeline
-- composition friendly but bind is user intuition friendly. Another option is
-- to have other functions with a different argument order e.g. flippedCross
-- instead of cross.
--
-- If we change the order we have to make sure that we have a consistent
-- convention for set-like and the cross join operations.

module Streamly.Internal.Data.Stream.Top
    (
    -- * Straight Joins
    -- | These are set-like operations but not exactly set operations because
    -- streams are not necessarily sets, they may have duplicated elements.
    -- These operations are generic i.e. they work on streams of unconstrained
    -- types, therefore, they have quadratic performance characterstics. For
    -- better performance using Set or Map structures see the
    -- Streamly.Internal.Data.Stream.Container module.
      intersectBy
    , deleteFirstsBy
    , unionBy

    -- Set like operations on sorted streams
    , sortedIntersectBy
    , sortedDeleteFirstsBy
    , sortedUnionBy

    -- * Cross Joins
    , innerJoin

    -- Joins on sorted stream
    , innerSortedJoin
    , leftSortedJoin
    , outerSortedJoin
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Type (Stream(..), Step(..), cross)

import qualified Data.List as List
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Scanl as Scanl
import qualified Streamly.Internal.Data.Stream.Type as Stream
import qualified Streamly.Internal.Data.Stream.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.Transform as Stream

import Prelude hiding (filter, zipWith, concatMap, concat)

#include "DocTestDataStream.hs"

------------------------------------------------------------------------------
-- SQL Joins
------------------------------------------------------------------------------
--
-- Some references:
-- * https://en.wikipedia.org/wiki/Relational_algebra
-- * https://en.wikipedia.org/wiki/Join_(SQL)

-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only
-- constraint, the best would be to use an Array with linear search. If the
-- second stream is sorted we can also use a binary search, using Ord
-- constraint or an ordering function.
--
-- For Storables we can cache the second stream into an unboxed array for
-- possibly faster access/compact representation?
--
-- If we do not want to keep the stream in memory but always read it from the
-- source (disk/network) every time we iterate through it then we can do that
-- too by reading the stream every time, the stream must have immutable state
-- in that case and the user is responsible for the behavior if the stream
-- source changes during iterations. We can also use an Unfold instead of
-- stream. We probably need a way to distinguish streams that can be read
-- mutliple times without any interference (e.g. unfolding a stream using an
-- immutable handle would work i.e. using pread/pwrite instead of maintaining
-- an offset in the handle).

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array we could use
-- binary search if we have an Ord instance or Ordering returning function. The
-- time complexity would then become (m x log n).

-- | Like 'cross' but emits only those tuples where @a == b@ using the supplied
-- equality predicate. This is essentially a @cross intersection@ of two
-- streams.
--
-- Definition:
--
-- >>> innerJoin eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2
--
-- The second (inner) stream must be finite. Moreover, it must be either pure
-- or capable of multiple evaluations. If not then the caller should cache it
-- in an 'Data.Array.Array', if the type does not have an 'Unbox' instance then
-- use the Generic 'Data.Array.Generic.Array'. Convert the array to stream
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- If you care about performance this function should be your last choice among
-- all inner joins. 'Streamly.Internal.Data.Unfold.innerJoin' is a much faster
-- fused alternative. 'innerSortedJoin' is a faster alternative when streams
-- are sorted. 'innerOrdJoin' is an order of magnitude faster alternative when
-- the type has an 'Ord' instance.
--
-- Note: Conceptually, this is a commutative operation. Result includes all the
-- elements from the left and the right stream. The order of streams can be
-- changed without affecting results, except for the ordering within the tuple.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE innerJoin #-}
innerJoin :: Monad m =>
    (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
innerJoin :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
innerJoin a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = ((a, b) -> Bool) -> Stream m (a, b) -> Stream m (a, b)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (\(a
a, b
b) -> a
a a -> b -> Bool
`eq` b
b) (Stream m (a, b) -> Stream m (a, b))
-> Stream m (a, b) -> Stream m (a, b)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
Monad m =>
Stream m a -> Stream m b -> Stream m (a, b)
cross Stream m a
s1 Stream m b
s2
{-
innerJoin eq s1 s2 = do
    -- ConcatMap works faster than bind
    Stream.concatMap (\a ->
        Stream.concatMap (\b ->
            if a `eq` b
            then Stream.fromPure (a, b)
            else Stream.nil
            ) s2
        ) s1
-}

-- | A more efficient 'innerJoin' for sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE innerSortedJoin #-}
innerSortedJoin ::
    (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
innerSortedJoin :: forall a b (m :: * -> *).
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
innerSortedJoin = (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
forall a. HasCallStack => a
undefined

-- | A more efficient 'leftJoin' for sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE leftSortedJoin #-}
leftSortedJoin :: -- Monad m =>
    (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
leftSortedJoin :: forall a b (m :: * -> *).
(a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
leftSortedJoin a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (a, Maybe b)
forall a. HasCallStack => a
undefined

-- | A more efficient 'outerJoin' for sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE outerSortedJoin #-}
outerSortedJoin :: -- Monad m =>
       (a -> b -> Ordering)
    -> Stream m a
    -> Stream m b
    -> Stream m (Maybe a, Maybe b)
outerSortedJoin :: forall a b (m :: * -> *).
(a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
outerSortedJoin a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Set operations (special joins)
------------------------------------------------------------------------------
--
-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only constraint
-- the best would be to use an Array with linear search. If the second stream
-- is sorted we can also use a binary search, using Ord constraint.

-- | Keep only those elements in the first stream that are present in the
-- second stream too. The second stream is folded to a container using the
-- supplied fold and then the elements in the container are looked up using the
-- supplied lookup function.
--
-- The first stream must be finite and must not block.
{-# INLINE filterStreamWith #-}
filterStreamWith :: Monad m =>
       Fold m a (f a)
    -> (a -> f a -> Bool)
    -> Stream m a
    -> Stream m a
    -> Stream m a
filterStreamWith :: forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith Fold m a (f a)
fld a -> f a -> Bool
member Stream m a
s1 Stream m a
s2 =
    m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
            f a
xs <- Fold m a (f a) -> Stream m a -> m (f a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a (f a)
fld Stream m a
s2
            Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (a -> f a -> Bool
`member` f a
xs) Stream m a
s1

-- XXX instead of folding the second stream to a list we could use it directly.
-- If the user wants they can generate the stream from an array and also call
-- uniq or nub on it. We can provide a convenience Stream -> Stream to cache
-- a finite stream in an array and serve it from the cache. The user can decide
-- what is best based on the context. They can also choose to use a boxed or
-- unboxed array for caching. To force caching we can make the second stream
-- monad type Identity. But that may be less flexible. One option is to use
-- cachedIntersectBy etc for automatic caching.

-- | 'intersectBy' returns a subsequence of the first stream which intersects
-- with the second stream. Note that this is not a commutative operation unlike
-- a set intersection, because of duplicate elements in the stream the order of
-- the streams matters. This is similar to 'Data.List.intersectBy'. Note that
-- intersectBy is a special case of 'innerJoin'.
--
-- >>> f s1 s2 = Stream.fold Fold.toList $ Stream.intersectBy (==) (Stream.fromList s1) (Stream.fromList s2)
-- >>> f [1,3,4,4,5] [2,3,4,5,5]
-- [3,4,4,5]
--
-- First stream can be infinite, the second stream must be finite and must be
-- capable of multiple evaluations.
--
-- Space: O(n) where @n@ is the number of elements in the second stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE intersectBy #-}
intersectBy :: Monad m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
intersectBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
intersectBy a -> a -> Bool
eq =
    -- XXX Use an (unboxed) array instead.
    Fold m a [a]
-> (a -> [a] -> Bool) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith
        (Scanl m a (Maybe a) -> Fold m a [a] -> Fold m a [a]
forall (m :: * -> *) a b c.
Monad m =>
Scanl m a (Maybe b) -> Fold m b c -> Fold m a c
Fold.postscanlMaybe ((a -> a -> Bool) -> Scanl m a (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Scanl m a (Maybe a)
Scanl.uniqBy a -> a -> Bool
eq) Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toListRev)
        ((a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any ((a -> Bool) -> [a] -> Bool)
-> (a -> a -> Bool) -> a -> [a] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Bool
eq)

-------------------------------------------------------------------------------
-- Intersection of sorted streams
-------------------------------------------------------------------------------

-- XXX The sort order is not important as long both the streams have the same
-- sort order. We need to move only in one direction in each stream.
-- XXX Fix the argument order to use the same behavior as intersectBy.

-- | Like 'intersectBy' but assumes that the input streams are sorted in
-- ascending order. To use it on streams sorted in descending order pass an
-- inverted comparison function returning GT for less than and LT for greater
-- than.
--
-- Both streams can be infinite.
--
-- Space: O(1)
--
-- Time: O(m+n)
--
-- /Pre-release/
{-# INLINE_NORMAL sortedIntersectBy #-}
sortedIntersectBy :: Monad m =>
    (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedIntersectBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedIntersectBy a -> a -> Ordering
cmp (Stream State StreamK m a -> s -> m (Step s a)
stepa s
ta) (Stream State StreamK m a -> s -> m (Step s a)
stepb s
tb) =
    (State StreamK m a
 -> (s, s, Maybe a, Maybe a) -> m (Step (s, s, Maybe a, Maybe a) a))
-> (s, s, Maybe a, Maybe a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> (s, s, Maybe a, Maybe a) -> m (Step (s, s, Maybe a, Maybe a) a)
step
        ( s
ta -- left stream state
        , s
tb -- right stream state
        , Maybe a
forall a. Maybe a
Nothing -- left value
        , Maybe a
forall a. Maybe a
Nothing -- right value
        )

    where

    {-# INLINE_LATE step #-}
    -- step 1, fetch the first value
    step :: State StreamK m a
-> (s, s, Maybe a, Maybe a) -> m (Step (s, s, Maybe a, Maybe a) a)
step State StreamK m a
gst (s
sa, s
sb, Maybe a
Nothing, Maybe a
b) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
stepa State StreamK m a
gst s
sa
        Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s, Maybe a, Maybe a) a
 -> m (Step (s, s, Maybe a, Maybe a) a))
-> Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
a s
sa' -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa', s
sb, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
b) -- step 2/3
            Skip s
sa'    -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa', s
sb, Maybe a
forall a. Maybe a
Nothing, Maybe a
b)
            Step s a
Stop        -> Step (s, s, Maybe a, Maybe a) a
forall s a. Step s a
Stop

    -- step 2, fetch the second value
    step State StreamK m a
gst (s
sa, s
sb, a :: Maybe a
a@(Just a
_), Maybe a
Nothing) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
stepb State StreamK m a
gst s
sb
        Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s, Maybe a, Maybe a) a
 -> m (Step (s, s, Maybe a, Maybe a) a))
-> Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
b s
sb' -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb', Maybe a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b) -- step 3
            Skip s
sb'    -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb', Maybe a
a, Maybe a
forall a. Maybe a
Nothing)
            Step s a
Stop        -> Step (s, s, Maybe a, Maybe a) a
forall s a. Step s a
Stop

    -- step 3, compare the two values
    step State StreamK m a
_ (s
sa, s
sb, Just a
a, Just a
b) = do
        let res :: Ordering
res = a -> a -> Ordering
cmp a
a a
b
        Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s, Maybe a, Maybe a) a
 -> m (Step (s, s, Maybe a, Maybe a) a))
-> Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ case Ordering
res of
            Ordering
GT -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing) -- step 2
            Ordering
LT -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b) -- step 1
            Ordering
EQ -> a -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
a (s
sa, s
sb, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b) -- step 1

-- | Returns a subsequence of the first stream, deleting first occurrences of
-- those elements that are present in the second stream. Note that this is not
-- a commutative operation. This is similar to the 'Data.List.deleteFirstsBy'.
--
-- >>> f xs ys = Stream.fold Fold.toList $ Stream.deleteFirstsBy (==) (Stream.fromList xs) (Stream.fromList ys)
-- >>> f [1,2,2,3,3,5] [1,2,2,3,4]
-- [3,5]
--
-- The following holds:
--
-- > deleteFirstsBy (==) (Stream.ordNub s2 `append` s1) s2 === s1
-- > deleteFirstsBy (==) (Stream.ordNub s2 `interleave` s1) s2 === s1
--
-- First stream can be infinite, second stream must be finite.
--
-- Space: O(m) where @m@ is the number of elements in the first stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE deleteFirstsBy #-}
deleteFirstsBy :: Monad m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteFirstsBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteFirstsBy a -> a -> Bool
eq Stream m a
s2 Stream m a
s1 =
    -- XXX s2 can be a sorted mutable array and we can use binary
    -- search to find. Mark the element deleted, count the deletions
    -- and reconsolidate the array when a min number of elements is
    -- deleted.

    -- XXX Use StreamK or list as second argument instead of Stream to avoid
    -- concatEffect?
    m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
        [a]
xs <- Stream m a -> m [a]
forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
Stream.toList Stream m a
s1
        -- It reverses the list but that is fine.
        let del :: a -> t a -> ([a], Bool)
del a
x =
                (([a], Bool) -> a -> ([a], Bool))
-> ([a], Bool) -> t a -> ([a], Bool)
forall b a. (b -> a -> b) -> b -> t a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' (\([a]
ys,Bool
res) a
y ->
                    if Bool -> Bool
not Bool
res Bool -> Bool -> Bool
&& a
x a -> a -> Bool
`eq` a
y
                    then ([a]
ys, Bool
True)
                    else (a
ya -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
ys, Bool
res)) ([], Bool
False)
            g :: (t a, b) -> a -> ([a], Maybe a)
g (t a
ys,b
_) a
x =
                let ([a]
ys1, Bool
deleted) = a -> t a -> ([a], Bool)
forall {t :: * -> *}. Foldable t => a -> t a -> ([a], Bool)
del a
x t a
ys
                 in if Bool
deleted
                    then ([a]
ys1, Maybe a
forall a. Maybe a
Nothing)
                    else ([a]
ys1, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
         in Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
                (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
                (Stream m (Maybe a) -> Stream m a)
-> Stream m (Maybe a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ (([a], Maybe a) -> Maybe a)
-> Stream m ([a], Maybe a) -> Stream m (Maybe a)
forall a b. (a -> b) -> Stream m a -> Stream m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([a], Maybe a) -> Maybe a
forall a b. (a, b) -> b
snd
                (Stream m ([a], Maybe a) -> Stream m (Maybe a))
-> Stream m ([a], Maybe a) -> Stream m (Maybe a)
forall a b. (a -> b) -> a -> b
$ (([a], Maybe a) -> a -> ([a], Maybe a))
-> ([a], Maybe a) -> Stream m a -> Stream m ([a], Maybe a)
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> a) -> a -> Stream m b -> Stream m a
Stream.postscanl' ([a], Maybe a) -> a -> ([a], Maybe a)
forall {t :: * -> *} {b}.
Foldable t =>
(t a, b) -> a -> ([a], Maybe a)
g ([a]
xs, Maybe a
forall a. Maybe a
Nothing) Stream m a
s2

-- | A more efficient 'deleteFirstsBy' for streams sorted in ascending order.
--
-- Both streams can be infinite.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE sortedDeleteFirstsBy #-}
sortedDeleteFirstsBy :: -- (Monad m) =>
    (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedDeleteFirstsBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedDeleteFirstsBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined

-- XXX Remove the MonadIO constraint. We can just cache one stream and then
-- implement using differenceEqBy.

-- | Returns the first stream appended with those unique elements from the
-- second stream that are not already present in the first stream. Note that
-- this is not a commutative operation unlike a set union, argument order
-- matters. The behavior is similar to 'Data.List.unionBy'.
--
-- Equivalent to the following except that @s2@ is evaluated only once:
--
-- >>> unionBy eq s1 s2 = s1 `Stream.append` Stream.deleteFirstsBy eq s1 (Stream.ordNub s2)
--
-- Example:
--
-- >>> f s1 s2 = Stream.fold Fold.toList $ Stream.unionBy (==) (Stream.fromList s1) (Stream.fromList s2)
-- >>> f [1,2,2,4] [1,1,2,3,3]
-- [1,2,2,4,3]
--
-- First stream can be infinite, but second stream must be finite. Note that if
-- the first stream is infinite the union means just the first stream. Thus
-- union is useful only when both streams are finite. See 'sortedUnionBy' where
-- union can work on infinite streams if they are sorted.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE unionBy #-}
unionBy :: MonadIO m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionBy :: forall (m :: * -> *) a.
MonadIO m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionBy a -> a -> Bool
eq Stream m a
s2 Stream m a
s1 =
    m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
            -- XXX use a rewrite rule such that if a list converted to stream
            -- is passed to unionBy then this becomes an identity operation.
            [a]
xs <- Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList  Stream m a
s1
            -- XXX we can use postscanlMAfter' instead of IORef
            IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
            let f :: a -> m a
f a
x = do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
                    a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
                s3 :: Stream m a
s3 = m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
                        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
                            [a]
xs1 <- IO [a] -> m [a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
                            Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList [a]
xs1
            Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
Stream.mapM a -> m a
forall {m :: * -> *}. MonadIO m => a -> m a
f Stream m a
s2 Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream m a
s3

-- | A more efficient 'unionBy' for sorted streams.
--
-- Note that the behavior is different from 'unionBy'. In 'unionBy' we append
-- the unique elements from second stream only after exhausting the first one
-- whereas in sorted streams we can determine unique elements early even when
-- we are going through the first stream. Thus the result is an interleaving of
-- the two streams, merging those elements from the second stream that are not
-- present in the first.
--
-- Space: O(1)
--
-- Both streams can be infinite.
--
-- /Unimplemented/
{-# INLINE sortedUnionBy #-}
sortedUnionBy :: -- (Monad m) =>
    (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedUnionBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedUnionBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined