{-# OPTIONS_GHC -Wno-deprecations #-}
-- |
-- Module      : Streamly.Internal.Data.Unfold.SVar
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Unfold.SVar
    (
      fromSVar
    , fromProducer
    )
where

#include "inline.hs"

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream (Step(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.SVar (printSVar, cleanupSVar, sendStopToProducer)
import Streamly.Internal.Data.Unfold (Unfold(..))
import System.Mem (performMajorGC)

import qualified Control.Monad.Catch as MC

import Streamly.Internal.Data.SVar.Type
-------------------------------------------------------------------------------
-- Generation from SVar
-------------------------------------------------------------------------------

data FromSVarState t m a =
      FromSVarInit (SVar t m a)
    | FromSVarRead (SVar t m a)
    | FromSVarLoop (SVar t m a) [ChildEvent a]
    | FromSVarDone (SVar t m a)

-- | /Internal/
--
{-# INLINE_NORMAL fromSVar #-}
fromSVar :: MonadAsync m => Unfold m (SVar t m a) a
fromSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Unfold m (SVar t m a) a
fromSVar = (FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> (SVar t m a -> m (FromSVarState t m a))
-> Unfold m (SVar t m a) a
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall {m :: * -> *} {t :: (* -> *) -> * -> *} {a}.
(MonadIO m, MonadThrow m) =>
FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (FromSVarState t m a -> m (FromSVarState t m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (FromSVarState t m a -> m (FromSVarState t m a))
-> (SVar t m a -> FromSVarState t m a)
-> SVar t m a
-> m (FromSVarState t m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarInit)
    where

    {-# INLINE_LATE step #-}
    step :: FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (FromSVarInit SVar t m a
svar) = do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: SVar t m a
sv = SVar t m a
svar{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv)

        where

        {-# NOINLINE hook #-}
        hook :: IO ()
hook = do
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
svar))
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
svar String
"SVar Garbage Collected"
            SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
svar
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) IO ()
performMajorGC

    step (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step (FromSVarLoop SVar t m a
sv []) = do
        Bool
done <- SVar t m a -> m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar t m a
sv
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv
                      else SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv

    step (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        Bool
stop <- ThreadId -> m Bool
forall {m :: * -> *}. MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
                        if Bool
stop
                        then do
                            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv)
                            Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                        else Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv) m ()
-> m (Step (FromSVarState t m a) a)
-> m (Step (FromSVarState t m a) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Step (FromSVarState t m a) a)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
ex
        where

        shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
            case SVar t m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar t m a
sv of
                SVarStopStyle
StopNone -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                SVarStopStyle
StopAny -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                SVarStopStyle
StopBy -> do
                    ThreadId
sid <- IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IORef ThreadId -> IO ThreadId
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar t m a
sv)
                    Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ ThreadId
tid ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
sid

    step (FromSVarDone SVar t m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
Stop

-------------------------------------------------------------------------------
-- Process events received by a fold consumer from a stream producer
-------------------------------------------------------------------------------

-- XXX Have an error for "FromSVarInit" instead of undefined. The error can
-- redirect the user to report the failure to the developers.
-- | /Internal/
--
{-# INLINE_NORMAL fromProducer #-}
fromProducer :: MonadAsync m => Unfold m (SVar t m a) a
fromProducer :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Unfold m (SVar t m a) a
fromProducer = (FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> (SVar t m a -> m (FromSVarState t m a))
-> Unfold m (SVar t m a) a
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall {m :: * -> *} {t :: (* -> *) -> * -> *} {a}.
MonadIO m =>
FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (FromSVarState t m a -> m (FromSVarState t m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (FromSVarState t m a -> m (FromSVarState t m a))
-> (SVar t m a -> FromSVarState t m a)
-> SVar t m a
-> m (FromSVarState t m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead)
    where

    {-# INLINE_LATE step #-}
    step :: FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step (FromSVarLoop SVar t m a
sv []) = Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
    step (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv
                        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                    Just SomeException
_ -> String -> m (Step (FromSVarState t m a) a)
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"

    step (FromSVarDone SVar t m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
Stop

    step (FromSVarInit SVar t m a
_) = m (Step (FromSVarState t m a) a)
forall a. HasCallStack => a
undefined