module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
(
Channel(..)
, yield
, stop
, stopChannel
, dumpSVar
)
where
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef)
import Data.List (intersperse)
import Data.Set (Set)
import Streamly.Internal.Control.Concurrent (RunInIO)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker
(sendYield, sendStop, sendWithDoorBell)
import Streamly.Internal.Data.StreamK (StreamK)
import Streamly.Internal.Data.Channel.Types
data Channel m a = Channel
{
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun :: RunInIO m
, forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, forall (m :: * -> *) a. Channel m a -> m Bool
postProcess :: m Bool
, forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit :: Limit
, forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit :: Limit
, forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork :: Maybe (IORef Count)
, forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo :: Maybe YieldRateInfo
, forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
, forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch :: m ()
, forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone :: IO Bool
, forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone :: IO Bool
, forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ :: IORef Bool
, forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop :: Maybe WorkerInfo -> m ()
, forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads :: IORef (Set ThreadId)
, forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount :: IORef Int
, forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread :: ThreadId -> m ()
, forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar :: MVar ()
, forall (m :: * -> *) a. Channel m a -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator :: ThreadId
}
{-# INLINE yield #-}
yield :: Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield :: forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
x =
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
forall a.
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv)
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
Maybe WorkerInfo
winfo
(Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv)
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
(a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
{-# INLINE stop #-}
stop :: Channel m a -> Maybe WorkerInfo -> IO ()
stop :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo =
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
forall a.
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
Maybe WorkerInfo
winfo
(Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv)
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINABLE stopChannel #-}
stopChannel :: MonadIO m => Channel m a -> m ()
stopChannel :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m a
chan = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
ChildEvent a
forall a. ChildEvent a
ChildStopChannel
{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a -> IO String
dumpSVar :: forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv = do
[String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a -> ThreadId
forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator Channel m a
sv))
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
, MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
, IORef Bool -> IO String
forall a. Show a => IORef a -> IO String
dumpNeedDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
, IORef (Set ThreadId) -> IO String
forall a. Show a => IORef a -> IO String
dumpRunningThreads (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
, IORef Int -> IO String
forall a. Show a => IORef a -> IO String
dumpWorkerCount (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
]
String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs