#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif

#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

-- |
-- Module      : Streamly.Internal.Data.Stream.Channel.Operations
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Channel.Operations
    (
    -- *** Reading Stream
      fromChannelK
    , fromChannel

    -- ** Enqueuing Work
    , toChannelK
    , toChannel
    )
where

#include "inline.hs"

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)

import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Channel.Types hiding (inspect)
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Type hiding (inspect)

import Prelude hiding (map, concat, concatMap)

#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif

------------------------------------------------------------------------------
-- Generating streams from a channel
------------------------------------------------------------------------------

-- $concurrentEval
--
-- Usually a channel is used to concurrently evaluate multiple actions in a
-- stream using many worker threads that push the results to the channel and a
-- single puller that pulls them from channel generating the evaluated stream.
--
-- @
--                  input stream
--                       |
--     <-----------------|<--------worker
--     |  exceptions     |
-- output stream <---Channel<------worker
--                       |
--                       |<--------worker
--
-- @
--
-- The puller itself schedules the worker threads based on demand.
-- Exceptions are propagated from the worker threads to the puller.

-------------------------------------------------------------------------------
-- Write a stream to a channel
-------------------------------------------------------------------------------

-- XXX Should be a Fold, singleton API could be called joinChannel, or the fold
-- can be called joinChannel.
-- XXX If we use toChannelK multiple times on a channel make sure the channel
-- does not go away before we use the subsequent ones.

-- | High level function to enqueue a work item on the channel. The fundamental
-- unit of work is a stream. Each stream enqueued on the channel is picked up
-- and evaluated by a worker thread. The worker evaluates the stream it picked
-- up serially. When multiple streams are queued on the channel each stream can
-- be evaluated concurrently by different workers.
--
-- Note that the items in each stream are not concurrently evaluated, streams
-- are fundamentally serial, therefore, elements in one particular stream will
-- be generated serially one after the other. Only two or more streams can be
-- run concurrently with each other.
--
-- See 'chanConcatMapK' for concurrent evaluation of each element of a stream.
-- Alternatively, you can wrap each element of the original stream into a
-- stream generating action and queue all those streams on the channel. Then
-- all of them would be evaluated concurrently. However, that would not be
-- streaming in nature, it would require buffering space for the entire
-- original stream. Prefer 'chanConcatMapK' for larger streams.
--
-- Items from each evaluated streams are queued to the same output queue of the
-- channel which can be read using 'fromChannelK'. 'toChannelK' can be called
-- multiple times to enqueue multiple streams on the channel.
--
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
m = do
    RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> (RunInIO m, StreamK m a) -> IO ()
forall (m :: * -> *) a.
Channel m a -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m a
chan (RunInIO m
runIn, StreamK m a
m)

-- INLINE for fromStreamK/toStreamK fusion

-- | A wrapper over 'toChannelK' for 'Stream' type.
{-# INLINE toChannel #-}
toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m ()
toChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> Stream m a -> m ()
toChannel Channel m a
chan = Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> m ())
-> (Stream m a -> StreamK m a) -> Stream m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK

{-
-- | Send a stream of streams to a concurrent channel for evaluation.
{-# INLINE joinChannel #-}
joinChannel :: Channel m a -> Fold m (Stream m a) ()
joinChannel = undefined
-}

-------------------------------------------------------------------------------
-- Read a stream from a channel
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar.
{-# NOINLINE fromChannelRaw #-}
fromChannelRaw :: (MonadIO m, MonadThrow m) => Channel m a -> K.StreamK m a
fromChannelRaw :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> StreamK m a
processEvents ([ChildEvent a] -> StreamK m a) -> [ChildEvent a] -> StreamK m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    cleanup :: m ()
cleanup = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
            IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
sv) String
"SVar Done"

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> StreamK m a
processEvents [] = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        Bool
done <- Channel m a -> m Bool
forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
        if Bool
done
        then m ()
cleanup m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
        else State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ Channel m a -> StreamK m a
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let rest :: StreamK m a
rest = [ChildEvent a] -> StreamK m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a
a -> a -> StreamK m a -> m r
yld a
a StreamK m a
rest
            ChildEvent a
ChildStopChannel -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                m ()
cleanup m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                Channel m a -> ThreadId -> m ()
forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
rest
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                -- We terminate the loop after sending
                                -- ThreadAbort to workers so we should never
                                -- get it unless it is thrown from inside a
                                -- worker thread or by someone else to our
                                -- thread.
                                String -> m r
forall a. HasCallStack => String -> a
error String
"processEvents: got ThreadAbort"
                                -- K.foldStream st yld sng stp rest
                            Maybe ThreadAbort
Nothing -> do
                                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                                m ()
cleanup m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m r
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex

#ifdef INSPECTION
-- Use of GHC constraint tuple (GHC.Classes.(%,,%)) in fromStreamVar leads to
-- space leak because the tuple gets allocated in every recursive call and each
-- allocation holds on to the previous allocation. This test is to make sure
-- that we do not use the constraint tuple type class.
--
inspect $ hasNoTypeClassesExcept 'fromChannelRaw
    [ ''Monad
    , ''Applicative
    , ''MonadThrow
    , ''Exception
    , ''MonadIO
    , ''MonadBaseControl
    , ''Typeable
    , ''Functor
    ]
#endif

-- XXX Add a lock in the channel so that fromChannel cannot be called multiple
-- times.
--
-- XXX Add an option to block the consumer rather than stopping the stream if
-- the work queue gets over.

chanCleanupOnGc :: Channel m a -> IO ()
chanCleanupOnGc :: forall (m :: * -> *) a. Channel m a -> IO ()
chanCleanupOnGc Channel m a
chan = do
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
chan) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
chan))
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
chan) String
"Channel Garbage Collected"
    IORef (Set ThreadId) -> IO ()
cleanupSVar (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan)
    -- If there are any other channels referenced by this channel a GC will
    -- prompt them to be cleaned up quickly.
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
chan) IO ()
performMajorGC

-- | Draw a stream from a concurrent channel. The stream consists of the
-- evaluated values from the input streams that were enqueued on the channel
-- using 'toChannelK'.
--
-- This is the event processing loop for the channel which does two
-- things, (1) dispatch workers, (2) process the events sent by the workers.
-- Workers are dispatched based on the channel's configuration settings.
--
-- The stream stops and the channel is shutdown if any of the following occurs:
--
-- * the work queue becomes empty
-- * channel's max yield limit is reached
-- * an exception is thrown by a worker
-- * 'shutdown' is called on the channel
--
-- Before the channel stops, all the workers are drained and no more workers
-- are dispatched. When the channel is garbage collected a 'ThreadAbort'
-- exception is thrown to all pending workers. If 'inspect' option is enabled
-- then channel's stats are printed on stdout when the channel stops.
--
-- CAUTION! This API must not be called more than once on a channel.
{-# INLINE fromChannelK #-}
fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a
fromChannelK :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m a
chan =
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (Channel m a -> IO ()
forall (m :: * -> *) a. Channel m a -> IO ()
chanCleanupOnGc Channel m a
chan)

        Channel m a -> m ()
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m ()
startChannel Channel m a
chan
        -- We pass a copy of sv to fromStreamVar, so that we know that it has
        -- no other references, when that copy gets garbage collected "ref"
        -- will get garbage collected and our hook will be called.
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$
            Channel m a -> StreamK m a
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
chan{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}

-- | A wrapper over 'fromChannelK' for 'Stream' type.
{-# INLINE fromChannel #-}
fromChannel :: MonadAsync m => Channel m a -> Stream m a
fromChannel :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel = StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK (StreamK m a -> Stream m a)
-> (Channel m a -> StreamK m a) -> Channel m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a -> StreamK m a
forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK

data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (Channel m a)
    | FromSVarLoop (Channel m a) [ChildEvent a]
    | FromSVarDone (Channel m a)

-- | Like 'fromSVar' but generates a StreamD style stream instead of CPS.
--
{-# INLINE_NORMAL _fromChannelD #-}
_fromChannelD :: (MonadIO m, MonadThrow m) => Channel m a -> D.Stream m a
_fromChannelD :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> Stream m a
_fromChannelD Channel m a
svar = (State StreamK m a
 -> FromSVarState Any m a -> m (Step (FromSVarState Any m a) a))
-> FromSVarState Any m a -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState Any m a -> m (Step (FromSVarState Any m a) a)
forall {p} {t} {t}.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step FromSVarState Any m a
forall t (m :: * -> *) a. FromSVarState t m a
FromSVarInit
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: Channel m a
sv = Channel m a
svar{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv)

        where

        {-# NOINLINE hook #-}
        hook :: IO ()
hook = do
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
svar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
svar))
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
svar) String
"SVar Garbage Collected"
            IORef (Set ThreadId) -> IO ()
cleanupSVar (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
svar)
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
svar) IO ()
performMajorGC

    step p
_ (FromSVarRead Channel m a
sv) = do
        [ChildEvent a]
list <- Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a
sv []) = do
        Bool
done <- Channel m a -> m Bool
forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv
                      else Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv

    step p
_ (FromSVarLoop Channel m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                Channel m a -> ThreadId -> m ()
forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> do
                                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                                SomeException -> m (Step (FromSVarState t m a) a)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex

    step p
_ (FromSVarDone Channel m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
sv) String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop