{-# LANGUAGE CPP #-}
module Streamly.Internal.Data.Stream.Top
(
intersectBy
, deleteFirstsBy
, unionBy
, sortedIntersectBy
, sortedDeleteFirstsBy
, sortedUnionBy
, innerJoin
, innerSortedJoin
, leftSortedJoin
, outerSortedJoin
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Type (Stream(..), Step(..), cross)
import qualified Data.List as List
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Scanl as Scanl
import qualified Streamly.Internal.Data.Stream.Type as Stream
import qualified Streamly.Internal.Data.Stream.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.Transform as Stream
import Prelude hiding (filter, zipWith, concatMap, concat)
#include "DocTestDataStream.hs"
{-# INLINE innerJoin #-}
innerJoin :: Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
innerJoin :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
innerJoin a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = ((a, b) -> Bool) -> Stream m (a, b) -> Stream m (a, b)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (\(a
a, b
b) -> a
a a -> b -> Bool
`eq` b
b) (Stream m (a, b) -> Stream m (a, b))
-> Stream m (a, b) -> Stream m (a, b)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
Monad m =>
Stream m a -> Stream m b -> Stream m (a, b)
cross Stream m a
s1 Stream m b
s2
{-# INLINE innerSortedJoin #-}
innerSortedJoin ::
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
innerSortedJoin :: forall a b (m :: * -> *).
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
innerSortedJoin = (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
forall a. HasCallStack => a
undefined
{-# INLINE leftSortedJoin #-}
leftSortedJoin ::
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
leftSortedJoin :: forall a b (m :: * -> *).
(a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
leftSortedJoin a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE outerSortedJoin #-}
outerSortedJoin ::
(a -> b -> Ordering)
-> Stream m a
-> Stream m b
-> Stream m (Maybe a, Maybe b)
outerSortedJoin :: forall a b (m :: * -> *).
(a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
outerSortedJoin a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE filterStreamWith #-}
filterStreamWith :: Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool)
-> Stream m a
-> Stream m a
-> Stream m a
filterStreamWith :: forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith Fold m a (f a)
fld a -> f a -> Bool
member Stream m a
s1 Stream m a
s2 =
m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
f a
xs <- Fold m a (f a) -> Stream m a -> m (f a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a (f a)
fld Stream m a
s2
Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (a -> f a -> Bool
`member` f a
xs) Stream m a
s1
{-# INLINE intersectBy #-}
intersectBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
intersectBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
intersectBy a -> a -> Bool
eq =
Fold m a [a]
-> (a -> [a] -> Bool) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith
(Scanl m a (Maybe a) -> Fold m a [a] -> Fold m a [a]
forall (m :: * -> *) a b c.
Monad m =>
Scanl m a (Maybe b) -> Fold m b c -> Fold m a c
Fold.postscanlMaybe ((a -> a -> Bool) -> Scanl m a (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Scanl m a (Maybe a)
Scanl.uniqBy a -> a -> Bool
eq) Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toListRev)
((a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any ((a -> Bool) -> [a] -> Bool)
-> (a -> a -> Bool) -> a -> [a] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Bool
eq)
{-# INLINE_NORMAL sortedIntersectBy #-}
sortedIntersectBy :: Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedIntersectBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedIntersectBy a -> a -> Ordering
cmp (Stream State StreamK m a -> s -> m (Step s a)
stepa s
ta) (Stream State StreamK m a -> s -> m (Step s a)
stepb s
tb) =
(State StreamK m a
-> (s, s, Maybe a, Maybe a) -> m (Step (s, s, Maybe a, Maybe a) a))
-> (s, s, Maybe a, Maybe a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> (s, s, Maybe a, Maybe a) -> m (Step (s, s, Maybe a, Maybe a) a)
step
( s
ta
, s
tb
, Maybe a
forall a. Maybe a
Nothing
, Maybe a
forall a. Maybe a
Nothing
)
where
{-# INLINE_LATE step #-}
step :: State StreamK m a
-> (s, s, Maybe a, Maybe a) -> m (Step (s, s, Maybe a, Maybe a) a)
step State StreamK m a
gst (s
sa, s
sb, Maybe a
Nothing, Maybe a
b) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
stepa State StreamK m a
gst s
sa
Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a))
-> Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
sa' -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa', s
sb, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
b)
Skip s
sa' -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa', s
sb, Maybe a
forall a. Maybe a
Nothing, Maybe a
b)
Step s a
Stop -> Step (s, s, Maybe a, Maybe a) a
forall s a. Step s a
Stop
step State StreamK m a
gst (s
sa, s
sb, a :: Maybe a
a@(Just a
_), Maybe a
Nothing) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
stepb State StreamK m a
gst s
sb
Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a))
-> Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
b s
sb' -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb', Maybe a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Skip s
sb' -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb', Maybe a
a, Maybe a
forall a. Maybe a
Nothing)
Step s a
Stop -> Step (s, s, Maybe a, Maybe a) a
forall s a. Step s a
Stop
step State StreamK m a
_ (s
sa, s
sb, Just a
a, Just a
b) = do
let res :: Ordering
res = a -> a -> Ordering
cmp a
a a
b
Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a))
-> Step (s, s, Maybe a, Maybe a) a
-> m (Step (s, s, Maybe a, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ case Ordering
res of
Ordering
GT -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing)
Ordering
LT -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. s -> Step s a
Skip (s
sa, s
sb, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Ordering
EQ -> a -> (s, s, Maybe a, Maybe a) -> Step (s, s, Maybe a, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
a (s
sa, s
sb, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
{-# INLINE deleteFirstsBy #-}
deleteFirstsBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteFirstsBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteFirstsBy a -> a -> Bool
eq Stream m a
s2 Stream m a
s1 =
m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- Stream m a -> m [a]
forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
Stream.toList Stream m a
s1
let del :: a -> t a -> ([a], Bool)
del a
x =
(([a], Bool) -> a -> ([a], Bool))
-> ([a], Bool) -> t a -> ([a], Bool)
forall b a. (b -> a -> b) -> b -> t a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' (\([a]
ys,Bool
res) a
y ->
if Bool -> Bool
not Bool
res Bool -> Bool -> Bool
&& a
x a -> a -> Bool
`eq` a
y
then ([a]
ys, Bool
True)
else (a
ya -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
ys, Bool
res)) ([], Bool
False)
g :: (t a, b) -> a -> ([a], Maybe a)
g (t a
ys,b
_) a
x =
let ([a]
ys1, Bool
deleted) = a -> t a -> ([a], Bool)
forall {t :: * -> *}. Foldable t => a -> t a -> ([a], Bool)
del a
x t a
ys
in if Bool
deleted
then ([a]
ys1, Maybe a
forall a. Maybe a
Nothing)
else ([a]
ys1, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
in Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
(Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
(Stream m (Maybe a) -> Stream m a)
-> Stream m (Maybe a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ (([a], Maybe a) -> Maybe a)
-> Stream m ([a], Maybe a) -> Stream m (Maybe a)
forall a b. (a -> b) -> Stream m a -> Stream m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([a], Maybe a) -> Maybe a
forall a b. (a, b) -> b
snd
(Stream m ([a], Maybe a) -> Stream m (Maybe a))
-> Stream m ([a], Maybe a) -> Stream m (Maybe a)
forall a b. (a -> b) -> a -> b
$ (([a], Maybe a) -> a -> ([a], Maybe a))
-> ([a], Maybe a) -> Stream m a -> Stream m ([a], Maybe a)
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> a) -> a -> Stream m b -> Stream m a
Stream.postscanl' ([a], Maybe a) -> a -> ([a], Maybe a)
forall {t :: * -> *} {b}.
Foldable t =>
(t a, b) -> a -> ([a], Maybe a)
g ([a]
xs, Maybe a
forall a. Maybe a
Nothing) Stream m a
s2
{-# INLINE sortedDeleteFirstsBy #-}
sortedDeleteFirstsBy ::
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedDeleteFirstsBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedDeleteFirstsBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined
{-# INLINE unionBy #-}
unionBy :: MonadIO m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionBy :: forall (m :: * -> *) a.
MonadIO m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionBy a -> a -> Bool
eq Stream m a
s2 Stream m a
s1 =
m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList Stream m a
s1
IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: Stream m a
s3 = m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- IO [a] -> m [a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList [a]
xs1
Stream m a -> m (Stream m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
Stream.mapM a -> m a
forall {m :: * -> *}. MonadIO m => a -> m a
f Stream m a
s2 Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream m a
s3
{-# INLINE sortedUnionBy #-}
sortedUnionBy ::
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedUnionBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
sortedUnionBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined