module Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
(
readOutputQPaced
, readOutputQBounded
, postProcessPaced
, postProcessBounded
)
where
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (readIORef)
import Streamly.Internal.Control.Concurrent (MonadRunInIO)
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Dispatcher
import Streamly.Internal.Data.Channel.Types
{-# INLINE readOutputQChan #-}
readOutputQChan :: Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan :: forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv = do
let ss :: Maybe SVarStats
ss = if Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) else Maybe SVarStats
forall a. Maybe a
Nothing
in IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) Maybe SVarStats
ss
readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded :: forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval Channel m a
sv = do
([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv
if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
m ()
sendOneWorker
[ChildEvent a] -> m [ChildEvent a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
sendOneWorker :: m ()
sendOneWorker = do
Int
cnt <- IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Bool
done <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv)
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
eagerEval Channel m a -> IO ()
forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelay (Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0) Channel m a
sv
IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Channel m a -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv)
readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a]
readOutputQPaced :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced Channel m a
sv = do
([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv
if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv
[ChildEvent a] -> m [ChildEvent a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
False Channel m a -> IO ()
forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelayPaced Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv
IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Channel m a -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv)
postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool
postProcessPaced :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced Channel m a
sv = do
Bool
workersDone <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
if Bool
workersDone
then do
Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv
Bool
noWorker <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv
Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool
postProcessBounded :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded Channel m a
sv = do
Bool
workersDone <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
if Bool
workersDone
then do
Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv)
Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False