{-# OPTIONS_GHC -Wno-deprecations #-} -- | -- Module : Streamly.Internal.Data.Stream.Combinators -- Copyright : (c) 2017 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC -- -- module Streamly.Internal.Data.Stream.IsStream.Combinators {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-} ( maxThreads , maxBuffer , maxYields , rate , avgRate , minRate , maxRate , constRate , inspectMode , printState ) where #include "inline.hs" import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.Int (Int64) import Streamly.Internal.Data.Stream.IsStream.Type (IsStream, mkStream, foldStreamShared) import Streamly.Internal.Data.Stream.Serial (SerialT) import Streamly.Internal.Data.StreamK (Stream) import Streamly.Internal.Data.SVar ------------------------------------------------------------------------------- -- Concurrency control ------------------------------------------------------------------------------- -- -- XXX need to write these in direct style otherwise they will break fusion. -- -- | Specify the maximum number of threads that can be spawned concurrently for -- any concurrent combinator in a stream. -- A value of 0 resets the thread limit to default, a negative value means -- there is no limit. The default value is 1500. 'maxThreads' does not affect -- 'ParallelT' streams as they can use unbounded number of threads. -- -- When the actions in a stream are IO bound, having blocking IO calls, this -- option can be used to control the maximum number of in-flight IO requests. -- When the actions are CPU bound this option can be used to -- control the amount of CPU used by the stream. -- -- /Since: 0.4.0 ("Streamly")/ -- -- @since 0.8.0 {-# INLINE_NORMAL maxThreads #-} maxThreads :: IsStream t => Int -> t m a -> t m a maxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Int -> t m a -> t m a maxThreads Int n t m a m = (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a) -> (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> t m a -> m r stp a -> m r sng m r yld -> State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r. IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r foldStreamShared (Int -> State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Int -> State t m a -> State t m a setMaxThreads Int n State StreamK m a st) a -> t m a -> m r stp a -> m r sng m r yld t m a m {- {-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-} maxThreadsSerial :: Int -> SerialT m a -> SerialT m a maxThreadsSerial _ = id -} -- | Specify the maximum size of the buffer for storing the results from -- concurrent computations. If the buffer becomes full we stop spawning more -- concurrent tasks until there is space in the buffer. -- A value of 0 resets the buffer size to default, a negative value means -- there is no limit. The default value is 1500. -- -- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value) -- coupled with an unbounded 'maxThreads' value is a recipe for disaster in -- presence of infinite streams, or very large streams. Especially, it must -- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in -- applicative zip streams generates an infinite stream causing unbounded -- concurrent generation with no limit on the buffer or threads. -- -- /Since: 0.4.0 ("Streamly")/ -- -- @since 0.8.0 {-# INLINE_NORMAL maxBuffer #-} maxBuffer :: IsStream t => Int -> t m a -> t m a maxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Int -> t m a -> t m a maxBuffer Int n t m a m = (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a) -> (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> t m a -> m r stp a -> m r sng m r yld -> State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r. IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r foldStreamShared (Int -> State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Int -> State t m a -> State t m a setMaxBuffer Int n State StreamK m a st) a -> t m a -> m r stp a -> m r sng m r yld t m a m {- {-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-} maxBufferSerial :: Int -> SerialT m a -> SerialT m a maxBufferSerial _ = id -} -- | Specify the pull rate of a stream. -- A 'Nothing' value resets the rate to default which is unlimited. When the -- rate is specified, concurrent production may be ramped up or down -- automatically to achieve the specified yield rate. The specific behavior for -- different styles of 'Rate' specifications is documented under 'Rate'. The -- effective maximum production rate achieved by a stream is governed by: -- -- * The 'maxThreads' limit -- * The 'maxBuffer' limit -- * The maximum rate that the stream producer can achieve -- * The maximum rate that the stream consumer can achieve -- -- /Since: 0.5.0 ("Streamly")/ -- -- @since 0.8.0 {-# INLINE_NORMAL rate #-} rate :: IsStream t => Maybe Rate -> t m a -> t m a rate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Maybe Rate -> t m a -> t m a rate Maybe Rate r t m a m = (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a) -> (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> t m a -> m r stp a -> m r sng m r yld -> case Maybe Rate r of Just (Rate Double low Double goal Double _ Int _) | Double goal Double -> Double -> Bool forall a. Ord a => a -> a -> Bool < Double low -> [Char] -> m r forall a. HasCallStack => [Char] -> a error [Char] "rate: Target rate cannot be lower than minimum rate." Just (Rate Double _ Double goal Double high Int _) | Double goal Double -> Double -> Bool forall a. Ord a => a -> a -> Bool > Double high -> [Char] -> m r forall a. HasCallStack => [Char] -> a error [Char] "rate: Target rate cannot be greater than maximum rate." Just (Rate Double low Double _ Double high Int _) | Double low Double -> Double -> Bool forall a. Ord a => a -> a -> Bool > Double high -> [Char] -> m r forall a. HasCallStack => [Char] -> a error [Char] "rate: Minimum rate cannot be greater than maximum rate." Maybe Rate _ -> State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r. IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r foldStreamShared (Maybe Rate -> State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Maybe Rate -> State t m a -> State t m a setStreamRate Maybe Rate r State StreamK m a st) a -> t m a -> m r stp a -> m r sng m r yld t m a m -- XXX implement for serial streams as well, as a simple delay {- {-# RULES "rate serial" rate = yieldRateSerial #-} yieldRateSerial :: Double -> SerialT m a -> SerialT m a yieldRateSerial _ = id -} -- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@ -- -- Specifies the average production rate of a stream in number of yields -- per second (i.e. @Hertz@). Concurrent production is ramped up or down -- automatically to achieve the specified average yield rate. The rate can -- go down to half of the specified rate on the lower side and double of -- the specified rate on the higher side. -- -- /Since: 0.5.0 ("Streamly")/ -- -- @since 0.8.0 avgRate :: IsStream t => Double -> t m a -> t m a avgRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Double -> t m a -> t m a avgRate Double r = Maybe Rate -> t m a -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Maybe Rate -> t m a -> t m a rate (Rate -> Maybe Rate forall a. a -> Maybe a Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate forall a b. (a -> b) -> a -> b $ Double -> Double -> Double -> Int -> Rate Rate (Double rDouble -> Double -> Double forall a. Fractional a => a -> a -> a /Double 2) Double r (Double 2Double -> Double -> Double forall a. Num a => a -> a -> a *Double r) Int forall a. Bounded a => a maxBound) -- | Same as @rate (Just $ Rate r r (2*r) maxBound)@ -- -- Specifies the minimum rate at which the stream should yield values. As -- far as possible the yield rate would never be allowed to go below the -- specified rate, even though it may possibly go above it at times, the -- upper limit is double of the specified rate. -- -- /Since: 0.5.0 ("Streamly")/ -- -- @since 0.8.0 minRate :: IsStream t => Double -> t m a -> t m a minRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Double -> t m a -> t m a minRate Double r = Maybe Rate -> t m a -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Maybe Rate -> t m a -> t m a rate (Rate -> Maybe Rate forall a. a -> Maybe a Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate forall a b. (a -> b) -> a -> b $ Double -> Double -> Double -> Int -> Rate Rate Double r Double r (Double 2Double -> Double -> Double forall a. Num a => a -> a -> a *Double r) Int forall a. Bounded a => a maxBound) -- | Same as @rate (Just $ Rate (r/2) r r maxBound)@ -- -- Specifies the maximum rate at which the stream should yield values. As -- far as possible the yield rate would never be allowed to go above the -- specified rate, even though it may possibly go below it at times, the -- lower limit is half of the specified rate. This can be useful in -- applications where certain resource usage must not be allowed to go -- beyond certain limits. -- -- /Since: 0.5.0 ("Streamly")/ -- -- @since 0.8.0 maxRate :: IsStream t => Double -> t m a -> t m a maxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Double -> t m a -> t m a maxRate Double r = Maybe Rate -> t m a -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Maybe Rate -> t m a -> t m a rate (Rate -> Maybe Rate forall a. a -> Maybe a Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate forall a b. (a -> b) -> a -> b $ Double -> Double -> Double -> Int -> Rate Rate (Double rDouble -> Double -> Double forall a. Fractional a => a -> a -> a /Double 2) Double r Double r Int forall a. Bounded a => a maxBound) -- | Same as @rate (Just $ Rate r r r 0)@ -- -- Specifies a constant yield rate. If for some reason the actual rate -- goes above or below the specified rate we do not try to recover it by -- increasing or decreasing the rate in future. This can be useful in -- applications like graphics frame refresh where we need to maintain a -- constant refresh rate. -- -- /Since: 0.5.0 ("Streamly")/ -- -- @since 0.8.0 constRate :: IsStream t => Double -> t m a -> t m a constRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Double -> t m a -> t m a constRate Double r = Maybe Rate -> t m a -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Maybe Rate -> t m a -> t m a rate (Rate -> Maybe Rate forall a. a -> Maybe a Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate forall a b. (a -> b) -> a -> b $ Double -> Double -> Double -> Int -> Rate Rate Double r Double r Double r Int 0) -- | Specify the average latency, in nanoseconds, of a single threaded action -- in a concurrent composition. Streamly can measure the latencies, but that is -- possible only after at least one task has completed. This combinator can be -- used to provide a latency hint so that rate control using 'rate' can take -- that into account right from the beginning. When not specified then a -- default behavior is chosen which could be too slow or too fast, and would be -- restricted by any other control parameters configured. -- A value of 0 indicates default behavior, a negative value means there is no -- limit i.e. zero latency. -- This would normally be useful only in high latency and high throughput -- cases. -- {-# INLINE_NORMAL _serialLatency #-} _serialLatency :: IsStream t => Int -> t m a -> t m a _serialLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Int -> t m a -> t m a _serialLatency Int n t m a m = (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a) -> (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> t m a -> m r stp a -> m r sng m r yld -> State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r. IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r foldStreamShared (Int -> State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Int -> State t m a -> State t m a setStreamLatency Int n State StreamK m a st) a -> t m a -> m r stp a -> m r sng m r yld t m a m {- {-# RULES "serialLatency serial" _serialLatency = serialLatencySerial #-} serialLatencySerial :: Int -> SerialT m a -> SerialT m a serialLatencySerial _ = id -} -- Stop concurrent dispatches after this limit. This is useful in API's like -- "take" where we want to dispatch only upto the number of elements "take" -- needs. This value applies only to the immediate next level and is not -- inherited by everything in enclosed scope. {-# INLINE_NORMAL maxYields #-} maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a maxYields :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => Maybe Int64 -> t m a -> t m a maxYields Maybe Int64 n t m a m = (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a) -> (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> t m a -> m r stp a -> m r sng m r yld -> State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r. IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r foldStreamShared (Maybe Int64 -> State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. Maybe Int64 -> State t m a -> State t m a setYieldLimit Maybe Int64 n State StreamK m a st) a -> t m a -> m r stp a -> m r sng m r yld t m a m {-# RULES "maxYields serial" maxYields = maxYieldsSerial #-} maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a maxYieldsSerial :: forall (m :: * -> *) a. Maybe Int64 -> SerialT m a -> SerialT m a maxYieldsSerial Maybe Int64 _ = SerialT m a -> SerialT m a forall a. a -> a id printState :: MonadIO m => State Stream m a -> m () printState :: forall (m :: * -> *) a. MonadIO m => State StreamK m a -> m () printState State StreamK m a st = IO () -> m () forall a. IO a -> m a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> IO () -> m () forall a b. (a -> b) -> a -> b $ do let msv :: Maybe (SVar StreamK m a) msv = State StreamK m a -> Maybe (SVar StreamK m a) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> Maybe (SVar t m a) streamVar State StreamK m a st case Maybe (SVar StreamK m a) msv of Just SVar StreamK m a sv -> SVar StreamK m a -> IO [Char] forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SVar t m a -> IO [Char] dumpSVar SVar StreamK m a sv IO [Char] -> ([Char] -> IO ()) -> IO () forall a b. IO a -> (a -> IO b) -> IO b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= [Char] -> IO () putStrLn Maybe (SVar StreamK m a) Nothing -> [Char] -> IO () putStrLn [Char] "No SVar" -- | Print debug information about an SVar when the stream ends -- -- /Pre-release/ -- inspectMode :: IsStream t => t m a -> t m a inspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => t m a -> t m a inspectMode t m a m = (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a) -> (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a forall a b. (a -> b) -> a -> b $ \State StreamK m a st a -> t m a -> m r stp a -> m r sng m r yld -> State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r. IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r foldStreamShared (State StreamK m a -> State StreamK m a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a -> State t m a setInspectMode State StreamK m a st) a -> t m a -> m r stp a -> m r sng m r yld t m a m