module Streamly.Internal.Data.Fold.Async
(
takeInterval
, intervalsOf
)
where
import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, swapMVar, readMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Streamly.Data.Fold (many)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Control.Concurrent (MonadAsync, withRunInIO)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
takeInterval :: forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
done s -> m b
final) =
(Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
-> (Tuple3' s (MVar Bool) ThreadId -> m b)
-> (Tuple3' s (MVar Bool) ThreadId -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' Tuple3' s (MVar Bool) ThreadId -> m b
forall {b} {c}. Tuple3' s b c -> m b
done' Tuple3' s (MVar Bool) ThreadId -> m b
forall {b} {c}. Tuple3' s b c -> m b
final'
where
initial' :: m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' = do
Step s b
res <- m (Step s b)
initial
case Step s b
res of
Partial s
s -> do
MVar Bool
mv <- IO (MVar Bool) -> m (MVar Bool)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar Bool) -> m (MVar Bool))
-> IO (MVar Bool) -> m (MVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
ThreadId
t <-
((forall a. m a -> IO (StM m a)) -> IO (StM m ThreadId))
-> m ThreadId
forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO (((forall a. m a -> IO (StM m a)) -> IO (StM m ThreadId))
-> m ThreadId)
-> ((forall a. m a -> IO (StM m a)) -> IO (StM m ThreadId))
-> m ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run ->
((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId))
-> ((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
tid <-
IO () -> IO ThreadId
forkIO
(IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ()) -> IO (StM m ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall a. m a -> IO (StM m a)
run (MVar Bool -> m ()
forall {m :: * -> *}. MonadIO m => MVar Bool -> m ()
timerThread MVar Bool
mv))
(MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv)
m ThreadId -> IO (StM m ThreadId)
forall a. m a -> IO (StM m a)
run (ThreadId -> m ThreadId
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. s -> Step s b
Partial (Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall a b. (a -> b) -> a -> b
$ s -> MVar Bool -> ThreadId -> Tuple3' s (MVar Bool) ThreadId
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
s MVar Bool
mv ThreadId
t
Done b
b -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
b
step' :: Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' (Tuple3' s
s MVar Bool
mv ThreadId
t) a
a = do
Bool
val <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar MVar Bool
mv
if Bool
val
then do
Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
case Step s b
res of
Partial s
sres -> b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done (b -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> m b -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
final s
sres
Done b
bres -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
bres
else do
Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
case Step s b
res of
Partial s
fs -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. s -> Step s b
Partial (Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall a b. (a -> b) -> a -> b
$ s -> MVar Bool -> ThreadId -> Tuple3' s (MVar Bool) ThreadId
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
fs MVar Bool
mv ThreadId
t
Done b
b -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ThreadId -> IO ()
killThread ThreadId
t) m ()
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
b)
done' :: Tuple3' s b c -> m b
done' (Tuple3' s
s b
_ c
_) = s -> m b
done s
s
final' :: Tuple3' s b c -> m b
final' (Tuple3' s
s b
_ c
_) = s -> m b
final s
s
timerThread :: MVar Bool -> m ()
timerThread MVar Bool
mv = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True
handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv SomeException
_ = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf :: forall (m :: * -> *) a b c.
MonadAsync m =>
Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf Double
n Fold m a b
split = Fold m a b -> Fold m b c -> Fold m a c
forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
many (Double -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n Fold m a b
split)