{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Scanl.Window
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : Apache-2.0
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Simple incremental statistical measures over a stream of data. All
-- operations use numerically stable floating point arithmetic.
--
-- Measurements can be performed over the entire input stream or on a sliding
-- window of fixed or variable size.  Where possible, measures are computed
-- online without buffering the input stream.
--
-- Currently there is no overflow detection.
--
-- For more advanced statistical measures see the @streamly-statistics@
-- package.

-- XXX A window fold can be driven either using the RingArray.slidingWindow
-- combinator or by zipping nthLast fold and last fold.

module Streamly.Internal.Data.Scanl.Window
    (
    -- * Types
      Incr (..)

    -- * Running Incremental Scans
    -- | Scans of type @Scanl m (Incr a) b@ are incremental sliding-window
    -- scans. Names of such scans are prefixed with @incr@. An input of type
    -- @(Insert a)@ indicates that the input element @a@ is being inserted in
    -- the window without ejecting an old value, increasing the window size by
    -- 1. An input of type @(Replace a a)@ indicates that the first argument of
    -- Replace is being removed from the window and the second argument is being
    -- inserted in the window, the window size remains the same. The window
    -- size can only increase and never decrease.
    --
    -- You can compute the statistics over the entire stream using window folds
    -- by always supplying input of type @Insert a@.
    --
    -- The incremental scans are converted into scans over a window using the
    -- 'incrScan' operation which maintains a sliding window and supplies the
    -- new and/or exiting element of the window to the window scan in the form
    -- of an incremental operation. The names of window scans are prefixed with
    -- @window@.
    --
    , cumulativeScan
    , incrScan
    , incrScanWith

    -- * Incremental Scans
    , incrRollingMap -- XXX remove?
    , incrRollingMapM -- XXX remove?

    -- ** Sums
    , incrCount
    , incrSum
    , incrSumInt
    , incrPowerSum
    , incrPowerSumFrac

    -- ** Location
    , windowRange
    , windowMinimum
    , windowMaximum
    , incrMean
    )
where

import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Proxy (Proxy(..))
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.RingArray (RingArray(..))
import Streamly.Internal.Data.Scanl.Type (Scanl(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict
    (Tuple'(..), Tuple3Fused' (Tuple3Fused'))
import Streamly.Internal.Data.Unbox (Unbox(..))

import qualified Streamly.Internal.Data.MutArray.Type as MutArray
import qualified Streamly.Internal.Data.RingArray as RingArray
import qualified Streamly.Internal.Data.Scanl.Type as Scanl

import Prelude hiding (length, sum, minimum, maximum)

#include "ArrayMacros.h"
#include "DocTestDataScanl.hs"

-------------------------------------------------------------------------------
-- Incremental operations
-------------------------------------------------------------------------------

-- The delete operation could be quite useful e.g. if we are computing stats
-- over last one hour of trades. The window would be growing when trade
-- frequency is increasing, the window would remain constant when the trade
-- frequency is steady, but it would shrink when the trade frequency reduces.
-- If no trades are happening our clock would still be ticking and to maintain
-- a 1 hour window we would be ejecting the oldest elements from the window
-- even without any other elements entering the window. In fact, it is required
-- for time based windows.
--
-- Replace can be implemented using Insert and Delete.
--

-- | Represents incremental input for a scan. 'Insert' means a new element is
-- being added to the collection, 'Replace' means an old value in the
-- collection is being replaced with a new value.
data Incr a =
      Insert !a
    --  | Delete !a
    | Replace !a !a -- ^ Replace old new

instance Functor Incr where
    fmap :: forall a b. (a -> b) -> Incr a -> Incr b
fmap a -> b
f (Insert a
x) = b -> Incr b
forall a. a -> Incr a
Insert (a -> b
f a
x)
    -- fmap f (Delete x) = Delete (f x)
    fmap a -> b
f (Replace a
x a
y) = b -> b -> Incr b
forall a. a -> a -> Incr a
Replace (a -> b
f a
x) (a -> b
f a
y)

-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------

{-# ANN type SlidingWindow Fuse #-}
data SlidingWindow a r s = SWArray !a !Int !s | SWRing !r !s
-- data SlidingWindow a s = SWArray !a !Int !s !Int | SWRing !a !Int !s

-- | Like 'incrScan' but also provides the ring array to the scan. The ring
-- array reflects the state of the ring after inserting the incoming element.
--
-- IMPORTANT NOTE: The ring is mutable, therefore, references to it should not
-- be stored and used later, the state would have changed by then. If you need
-- to store it then copy it to an array or another ring and store it.
{-# INLINE incrScanWith #-}
incrScanWith :: forall m a b. (MonadIO m, Unbox a)
    => Int -> Scanl m (Incr a, RingArray a) b -> Scanl m a b
incrScanWith :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Int -> Scanl m (Incr a, RingArray a) b -> Scanl m a b
incrScanWith Int
n (Scanl s -> (Incr a, RingArray a) -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1 s -> m b
final1) =
    (SlidingWindow (MutArray a) (RingArray a) s
 -> a -> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b))
-> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
-> (SlidingWindow (MutArray a) (RingArray a) s -> m b)
-> (SlidingWindow (MutArray a) (RingArray a) s -> m b)
-> Scanl m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl SlidingWindow (MutArray a) (RingArray a) s
-> a -> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
step m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
forall {r}. m (Step (SlidingWindow (MutArray a) r s) b)
initial SlidingWindow (MutArray a) (RingArray a) s -> m b
forall {a} {r}. SlidingWindow a r s -> m b
extract SlidingWindow (MutArray a) (RingArray a) s -> m b
forall {a} {r}. SlidingWindow a r s -> m b
final

    where

    initial :: m (Step (SlidingWindow (MutArray a) r s) b)
initial = do
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
        then [Char] -> m (Step (SlidingWindow (MutArray a) r s) b)
forall a. HasCallStack => [Char] -> a
error [Char]
"Window size must be > 0"
        else do
            Step s b
r <- m (Step s b)
initial1
            MutArray a
arr <- IO (MutArray a) -> m (MutArray a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> m (MutArray a)
MutArray.emptyOf Int
n
            Step (SlidingWindow (MutArray a) r s) b
-> m (Step (SlidingWindow (MutArray a) r s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SlidingWindow (MutArray a) r s) b
 -> m (Step (SlidingWindow (MutArray a) r s) b))
-> Step (SlidingWindow (MutArray a) r s) b
-> m (Step (SlidingWindow (MutArray a) r s) b)
forall a b. (a -> b) -> a -> b
$
                case Step s b
r of
                    Partial s
s -> SlidingWindow (MutArray a) r s
-> Step (SlidingWindow (MutArray a) r s) b
forall s b. s -> Step s b
Partial (SlidingWindow (MutArray a) r s
 -> Step (SlidingWindow (MutArray a) r s) b)
-> SlidingWindow (MutArray a) r s
-> Step (SlidingWindow (MutArray a) r s) b
forall a b. (a -> b) -> a -> b
$ MutArray a -> Int -> s -> SlidingWindow (MutArray a) r s
forall a r s. a -> Int -> s -> SlidingWindow a r s
SWArray MutArray a
arr (Int
0 :: Int) s
s
                    Done b
b -> b -> Step (SlidingWindow (MutArray a) r s) b
forall s b. b -> Step s b
Done b
b

    step :: SlidingWindow (MutArray a) (RingArray a) s
-> a -> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
step (SWArray MutArray a
arr Int
i s
st) a
a = do
        -- XXX compare this with the slidingWindow impl
        MutArray a
arr1 <- IO (MutArray a) -> m (MutArray a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ MutArray a -> a -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> a -> m (MutArray a)
MutArray.unsafeSnoc MutArray a
arr a
a
        Step s b
r <- s -> (Incr a, RingArray a) -> m (Step s b)
step1 s
st (a -> Incr a
forall a. a -> Incr a
Insert a
a, MutArray a -> RingArray a
forall a. Unbox a => MutArray a -> RingArray a
RingArray.unsafeCastMutArray MutArray a
arr1)
        Step (SlidingWindow (MutArray a) (RingArray a) s) b
-> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SlidingWindow (MutArray a) (RingArray a) s) b
 -> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b))
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
-> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Partial s
s ->
                let i1 :: Int
i1 = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                in if Int
i1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
                   then SlidingWindow (MutArray a) (RingArray a) s
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall s b. s -> Step s b
Partial (SlidingWindow (MutArray a) (RingArray a) s
 -> Step (SlidingWindow (MutArray a) (RingArray a) s) b)
-> SlidingWindow (MutArray a) (RingArray a) s
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall a b. (a -> b) -> a -> b
$ MutArray a
-> Int -> s -> SlidingWindow (MutArray a) (RingArray a) s
forall a r s. a -> Int -> s -> SlidingWindow a r s
SWArray MutArray a
arr1 Int
i1 s
s
                   else SlidingWindow (MutArray a) (RingArray a) s
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall s b. s -> Step s b
Partial (SlidingWindow (MutArray a) (RingArray a) s
 -> Step (SlidingWindow (MutArray a) (RingArray a) s) b)
-> SlidingWindow (MutArray a) (RingArray a) s
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall a b. (a -> b) -> a -> b
$ RingArray a -> s -> SlidingWindow (MutArray a) (RingArray a) s
forall a r s. r -> s -> SlidingWindow a r s
SWRing (MutArray a -> RingArray a
forall a. Unbox a => MutArray a -> RingArray a
RingArray.unsafeCastMutArray MutArray a
arr1) s
s
            Done b
b -> b -> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall s b. b -> Step s b
Done b
b

    step (SWRing RingArray a
rb s
st) a
a = do
        (RingArray a
rb1, a
old) <- RingArray a -> a -> m (RingArray a, a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
RingArray a -> a -> m (RingArray a, a)
RingArray.replace RingArray a
rb a
a
        Step s b
r <- s -> (Incr a, RingArray a) -> m (Step s b)
step1 s
st (a -> a -> Incr a
forall a. a -> a -> Incr a
Replace a
old a
a, RingArray a
rb1)
        Step (SlidingWindow (MutArray a) (RingArray a) s) b
-> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SlidingWindow (MutArray a) (RingArray a) s) b
 -> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b))
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
-> m (Step (SlidingWindow (MutArray a) (RingArray a) s) b)
forall a b. (a -> b) -> a -> b
$
            case Step s b
r of
                Partial s
s -> SlidingWindow (MutArray a) (RingArray a) s
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall s b. s -> Step s b
Partial (SlidingWindow (MutArray a) (RingArray a) s
 -> Step (SlidingWindow (MutArray a) (RingArray a) s) b)
-> SlidingWindow (MutArray a) (RingArray a) s
-> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall a b. (a -> b) -> a -> b
$ RingArray a -> s -> SlidingWindow (MutArray a) (RingArray a) s
forall a r s. r -> s -> SlidingWindow a r s
SWRing RingArray a
rb1 s
s
                Done b
b -> b -> Step (SlidingWindow (MutArray a) (RingArray a) s) b
forall s b. b -> Step s b
Done b
b

    extract :: SlidingWindow a r s -> m b
extract (SWArray a
_ Int
_ s
st) = s -> m b
extract1 s
st
    extract (SWRing r
_ s
st) = s -> m b
extract1 s
st

    final :: SlidingWindow a r s -> m b
final (SWArray a
_ Int
_ s
st) = s -> m b
final1 s
st
    final (SWRing r
_ s
st) = s -> m b
final1 s
st

    -- Alternative implementation flattening the constructors
    -- Improves some benchmarks, worsens some others, need more investigation.
    {-
    initial = do
        if n <= 0
        then error "Window size must be > 0"
        else do
            r <- initial1
            arr :: MutArray.MutArray a <- liftIO $ MutArray.emptyOf n
            return $
                case r of
                    Partial s -> Partial
                        $ SWArray (MutArray.arrContents arr) 0 s n
                    Done b -> Done b

    step (SWArray mba rh st i) a = do
        RingArray _ _ rh1 <- RingArray.insert_ (RingArray mba (n * SIZE_OF(a)) rh) a
        r <- step1 st (Insert a, RingArray mba ((n - i) * SIZE_OF(a)) rh1)
        return $
            case r of
                Partial s ->
                    if i > 0
                    then Partial $ SWArray mba rh1 s (i-1)
                    else Partial $ SWRing mba rh1 s
                Done b -> Done b

    step (SWRing mba rh st) a = do
        (rb1@(RingArray _ _ rh1), old) <-
            RingArray.insert (RingArray mba (n * SIZE_OF(a)) rh) a
        r <- step1 st (Replace old a, rb1)
        return $
            case r of
                Partial s -> Partial $ SWRing mba rh1 s
                Done b -> Done b

    extract (SWArray _ _ st _) = extract1 st
    extract (SWRing _ _ st) = extract1 st

    final (SWArray _ _ st _) = final1 st
    final (SWRing _ _ st) = final1 st
    -}

-- | @incrScan collector@ is an incremental sliding window scan that does not
-- require all the intermediate elements in each step of the scan computation.
-- This maintains @n@ elements in the window, when a new element comes it
-- slides out the oldest element. The new element along with the old element
-- are supplied to the collector fold.
--
{-# INLINE incrScan #-}
incrScan :: forall m a b. (MonadIO m, Unbox a)
    => Int -> Scanl m (Incr a) b -> Scanl m a b
incrScan :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Int -> Scanl m (Incr a) b -> Scanl m a b
incrScan Int
n Scanl m (Incr a) b
f = Int -> Scanl m (Incr a, RingArray a) b -> Scanl m a b
forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Int -> Scanl m (Incr a, RingArray a) b -> Scanl m a b
incrScanWith Int
n (((Incr a, RingArray a) -> Incr a)
-> Scanl m (Incr a) b -> Scanl m (Incr a, RingArray a) b
forall a b (m :: * -> *) r. (a -> b) -> Scanl m b r -> Scanl m a r
Scanl.lmap (Incr a, RingArray a) -> Incr a
forall a b. (a, b) -> a
fst Scanl m (Incr a) b
f)

-- | Convert an incremental scan to a cumulative scan using the entire input
-- stream as a single window.
--
-- >>> cumulativeScan = Scanl.lmap Scanl.Insert
--
{-# INLINE cumulativeScan #-}
cumulativeScan :: Scanl m (Incr a) b -> Scanl m a b
cumulativeScan :: forall (m :: * -> *) a b. Scanl m (Incr a) b -> Scanl m a b
cumulativeScan = (a -> Incr a) -> Scanl m (Incr a) b -> Scanl m a b
forall a b (m :: * -> *) r. (a -> b) -> Scanl m b r -> Scanl m a r
Scanl.lmap a -> Incr a
forall a. a -> Incr a
Insert

-- | Apply an effectful function on the entering and the exiting element of the
-- window. The first argument of the mapped function is the exiting element and
-- the second argument is the entering element.
{-# INLINE incrRollingMapM #-}
incrRollingMapM :: Monad m =>
    (Maybe a -> a -> m (Maybe b)) -> Scanl m (Incr a) (Maybe b)
incrRollingMapM :: forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> m (Maybe b)) -> Scanl m (Incr a) (Maybe b)
incrRollingMapM Maybe a -> a -> m (Maybe b)
f = (Maybe b -> Incr a -> m (Maybe b))
-> m (Maybe b) -> Scanl m (Incr a) (Maybe b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Scanl m a b
Scanl.mkScanlM Maybe b -> Incr a -> m (Maybe b)
forall {p}. p -> Incr a -> m (Maybe b)
f1 m (Maybe b)
forall {a}. m (Maybe a)
initial

    where

    initial :: m (Maybe a)
initial = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

    f1 :: p -> Incr a -> m (Maybe b)
f1 p
_ (Insert a
a) = Maybe a -> a -> m (Maybe b)
f Maybe a
forall a. Maybe a
Nothing a
a
    -- f1 _ (Delete _) = return Nothing
    f1 p
_ (Replace a
old a
new) = Maybe a -> a -> m (Maybe b)
f (a -> Maybe a
forall a. a -> Maybe a
Just a
old) a
new

-- | Apply a pure function on the latest and the oldest element of the window.
--
-- >>> incrRollingMap f = Scanl.incrRollingMapM (\x y -> return $ f x y)
--
{-# INLINE incrRollingMap #-}
incrRollingMap :: Monad m =>
    (Maybe a -> a -> Maybe b) -> Scanl m (Incr a) (Maybe b)
incrRollingMap :: forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> Maybe b) -> Scanl m (Incr a) (Maybe b)
incrRollingMap Maybe a -> a -> Maybe b
f = (Maybe b -> Incr a -> Maybe b)
-> Maybe b -> Scanl m (Incr a) (Maybe b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Scanl m a b
Scanl.mkScanl Maybe b -> Incr a -> Maybe b
forall {p}. p -> Incr a -> Maybe b
f1 Maybe b
forall a. Maybe a
initial

    where

    initial :: Maybe a
initial = Maybe a
forall a. Maybe a
Nothing

    f1 :: p -> Incr a -> Maybe b
f1 p
_ (Insert a
a) = Maybe a -> a -> Maybe b
f Maybe a
forall a. Maybe a
Nothing a
a
    -- f1 _ (Delete _) = Nothing
    f1 p
_ (Replace a
old a
new) = Maybe a -> a -> Maybe b
f (a -> Maybe a
forall a. a -> Maybe a
Just a
old) a
new

-------------------------------------------------------------------------------
-- Sum
-------------------------------------------------------------------------------

-- XXX Overflow.

-- | The sum of all the elements in a rolling window. The input elements are
-- required to be integral numbers.
--
-- This was written in the hope that it would be a tiny bit faster than 'incrSum'
-- for 'Integral' values. But turns out that 'incrSum' is 2% faster than this even
-- for integral values!
--
-- /Internal/
--
{-# INLINE incrSumInt #-}
incrSumInt :: forall m a. (Monad m, Integral a) => Scanl m (Incr a) a
incrSumInt :: forall (m :: * -> *) a. (Monad m, Integral a) => Scanl m (Incr a) a
incrSumInt = (a -> Incr a -> m (Step a a))
-> m (Step a a) -> (a -> m a) -> (a -> m a) -> Scanl m (Incr a) a
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl a -> Incr a -> m (Step a a)
forall {m :: * -> *} {s} {b}.
(Monad m, Num s) =>
s -> Incr s -> m (Step s b)
step m (Step a a)
forall {b}. m (Step a b)
initial a -> m a
forall {a}. a -> m a
extract a -> m a
forall {a}. a -> m a
extract

    where

    initial :: m (Step a b)
initial = Step a b -> m (Step a b)
forall {a}. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step a b -> m (Step a b)) -> Step a b -> m (Step a b)
forall a b. (a -> b) -> a -> b
$ a -> Step a b
forall s b. s -> Step s b
Partial (a
0 :: a)

    step :: s -> Incr s -> m (Step s b)
step s
s (Insert s
a) = Step s b -> m (Step s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Partial (s
s s -> s -> s
forall a. Num a => a -> a -> a
+ s
a)
    -- step s (Delete a) = return $ Partial (s - a)
    step s
s (Replace s
old s
new) = Step s b -> m (Step s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Partial (s
s s -> s -> s
forall a. Num a => a -> a -> a
+ s
new s -> s -> s
forall a. Num a => a -> a -> a
- s
old)

    extract :: a -> m a
extract = a -> m a
forall {a}. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return

-- XXX Overflow.

-- | Sum of all the elements in a rolling window:
--
-- \(S = \sum_{i=1}^n x_{i}\)
--
-- This is the first power sum.
--
-- >>> incrSum = Scanl.incrPowerSum 1
--
-- Uses Kahan-Babuska-Neumaier style summation for numerical stability of
-- floating precision arithmetic.
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
--
{-# INLINE incrSum #-}
incrSum :: forall m a. (Monad m, Num a) => Scanl m (Incr a) a
incrSum :: forall (m :: * -> *) a. (Monad m, Num a) => Scanl m (Incr a) a
incrSum = (Tuple' a a -> Incr a -> m (Step (Tuple' a a) a))
-> m (Step (Tuple' a a) a)
-> (Tuple' a a -> m a)
-> (Tuple' a a -> m a)
-> Scanl m (Incr a) a
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl Tuple' a a -> Incr a -> m (Step (Tuple' a a) a)
forall {m :: * -> *} {b} {b}.
(Monad m, Num b) =>
Tuple' b b -> Incr b -> m (Step (Tuple' b b) b)
step m (Step (Tuple' a a) a)
forall {b}. m (Step (Tuple' a a) b)
initial Tuple' a a -> m a
forall {m :: * -> *} {a} {b}. Monad m => Tuple' a b -> m a
extract Tuple' a a -> m a
forall {m :: * -> *} {a} {b}. Monad m => Tuple' a b -> m a
extract

    where

    initial :: m (Step (Tuple' a a) b)
initial =
        Step (Tuple' a a) b -> m (Step (Tuple' a a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (Tuple' a a) b -> m (Step (Tuple' a a) b))
-> Step (Tuple' a a) b -> m (Step (Tuple' a a) b)
forall a b. (a -> b) -> a -> b
$ Tuple' a a -> Step (Tuple' a a) b
forall s b. s -> Step s b
Partial
            (Tuple' a a -> Step (Tuple' a a) b)
-> Tuple' a a -> Step (Tuple' a a) b
forall a b. (a -> b) -> a -> b
$ a -> a -> Tuple' a a
forall a b. a -> b -> Tuple' a b
Tuple'
                (a
0 :: a) -- running sum
                (a
0 :: a) -- accumulated rounding error

    add :: b -> b -> m (Step (Tuple' b b) b)
add b
total b
incr =
        let
            -- total is large and incr may be small, we may round incr here but
            -- we will accumulate the rounding error in err1 in the next step.
            total1 :: b
total1 = b
total b -> b -> b
forall a. Num a => a -> a -> a
+ b
incr
            -- Accumulate any rounding error in err1
            -- XXX In the Insert case we may lose err, therefore we
            -- should use ((total1 - total) - new) + err here.
            -- Or even in the Replace case if (new - old) is large we may lose
            -- err, so we should use ((total1 - total) + (old - new)) + err.
            err1 :: b
err1 = (b
total1 b -> b -> b
forall a. Num a => a -> a -> a
- b
total) b -> b -> b
forall a. Num a => a -> a -> a
- b
incr
        in Step (Tuple' b b) b -> m (Step (Tuple' b b) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' b b) b -> m (Step (Tuple' b b) b))
-> Step (Tuple' b b) b -> m (Step (Tuple' b b) b)
forall a b. (a -> b) -> a -> b
$ Tuple' b b -> Step (Tuple' b b) b
forall s b. s -> Step s b
Partial (Tuple' b b -> Step (Tuple' b b) b)
-> Tuple' b b -> Step (Tuple' b b) b
forall a b. (a -> b) -> a -> b
$ b -> b -> Tuple' b b
forall a b. a -> b -> Tuple' a b
Tuple' b
total1 b
err1

    step :: Tuple' b b -> Incr b -> m (Step (Tuple' b b) b)
step (Tuple' b
total b
err) (Insert b
new) =
        -- XXX if new is large we may lose err
        let incr :: b
incr = b
new b -> b -> b
forall a. Num a => a -> a -> a
- b
err
         in b -> b -> m (Step (Tuple' b b) b)
forall {m :: * -> *} {b} {b}.
(Monad m, Num b) =>
b -> b -> m (Step (Tuple' b b) b)
add b
total b
incr
    {-
    step (Tuple' total err) (Delete new) =
        -- XXX if new is large we may lose err
        let incr = -new - err
         in add total incr
    -}
    step (Tuple' b
total b
err) (Replace b
old b
new) =
        -- XXX if (new - old) is large we may lose err
        let incr :: b
incr = (b
new b -> b -> b
forall a. Num a => a -> a -> a
- b
old) b -> b -> b
forall a. Num a => a -> a -> a
- b
err
         in b -> b -> m (Step (Tuple' b b) b)
forall {m :: * -> *} {b} {b}.
(Monad m, Num b) =>
b -> b -> m (Step (Tuple' b b) b)
add b
total b
incr

    extract :: Tuple' a b -> m a
extract (Tuple' a
total b
_) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
total

-- | The number of elements in the rolling window.
--
-- This is the \(0\)th power sum.
--
-- >>> incrCount = Scanl.incrPowerSum 0
--
{-# INLINE incrCount #-}
incrCount :: (Monad m, Num b) => Scanl m (Incr a) b
incrCount :: forall (m :: * -> *) b a. (Monad m, Num b) => Scanl m (Incr a) b
incrCount = (b -> Incr a -> b) -> b -> Scanl m (Incr a) b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Scanl m a b
Scanl.mkScanl b -> Incr a -> b
forall {a} {a}. Num a => a -> Incr a -> a
step b
0

    where

    step :: a -> Incr a -> a
step a
w (Insert a
_) = a
w a -> a -> a
forall a. Num a => a -> a -> a
+ a
1
    -- step w (Delete _) = w - 1
    step a
w (Replace a
_ a
_) = a
w

-- | Sum of the \(k\)th power of all the elements in a rolling window:
--
-- \(S_k = \sum_{i=1}^n x_{i}^k\)
--
-- >>> incrPowerSum k = Scanl.lmap (fmap (^ k)) Scanl.incrSum
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
{-# INLINE incrPowerSum #-}
incrPowerSum :: (Monad m, Num a) => Int -> Scanl m (Incr a) a
incrPowerSum :: forall (m :: * -> *) a.
(Monad m, Num a) =>
Int -> Scanl m (Incr a) a
incrPowerSum Int
k = (Incr a -> Incr a) -> Scanl m (Incr a) a -> Scanl m (Incr a) a
forall a b (m :: * -> *) r. (a -> b) -> Scanl m b r -> Scanl m a r
Scanl.lmap ((a -> a) -> Incr a -> Incr a
forall a b. (a -> b) -> Incr a -> Incr b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> Int -> a
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
k)) Scanl m (Incr a) a
forall (m :: * -> *) a. (Monad m, Num a) => Scanl m (Incr a) a
incrSum

-- | Like 'incrPowerSum' but powers can be negative or fractional. This is
-- slower than 'incrPowerSum' for positive intergal powers.
--
-- >>> incrPowerSumFrac p = Scanl.lmap (fmap (** p)) Scanl.incrSum
--
{-# INLINE incrPowerSumFrac #-}
incrPowerSumFrac :: (Monad m, Floating a) => a -> Scanl m (Incr a) a
incrPowerSumFrac :: forall (m :: * -> *) a.
(Monad m, Floating a) =>
a -> Scanl m (Incr a) a
incrPowerSumFrac a
p = (Incr a -> Incr a) -> Scanl m (Incr a) a -> Scanl m (Incr a) a
forall a b (m :: * -> *) r. (a -> b) -> Scanl m b r -> Scanl m a r
Scanl.lmap ((a -> a) -> Incr a -> Incr a
forall a b. (a -> b) -> Incr a -> Incr b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> a -> a
forall a. Floating a => a -> a -> a
** a
p)) Scanl m (Incr a) a
forall (m :: * -> *) a. (Monad m, Num a) => Scanl m (Incr a) a
incrSum

-------------------------------------------------------------------------------
-- Location
-------------------------------------------------------------------------------

{-# INLINE ringRange #-}
ringRange :: (MonadIO m, Unbox a, Ord a) => RingArray a -> m (Maybe (a, a))
-- Ideally this should perform the same as the implementation below, but it is
-- 2x worse, need to investigate why.
-- ringRange = RingArray.fold (Fold.fromScanl Scanl.range)
ringRange :: forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
RingArray a -> m (Maybe (a, a))
ringRange rb :: RingArray a
rb@RingArray{Int
MutByteArray
ringContents :: MutByteArray
ringSize :: Int
ringHead :: Int
ringContents :: forall a. RingArray a -> MutByteArray
ringSize :: forall a. RingArray a -> Int
ringHead :: forall a. RingArray a -> Int
..} = do
    if Int
ringSize Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
    then Maybe (a, a) -> m (Maybe (a, a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, a)
forall a. Maybe a
Nothing
    else do
        a
x <- IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ Int -> MutByteArray -> IO a
forall a. Unbox a => Int -> MutByteArray -> IO a
peekAt Int
0 MutByteArray
ringContents
        let accum :: (b, b) -> b -> m (b, b)
accum (b
mn, b
mx) b
a = (b, b) -> m (b, b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> b -> b
forall a. Ord a => a -> a -> a
min b
mn b
a, b -> b -> b
forall a. Ord a => a -> a -> a
max b
mx b
a)
         in ((a, a) -> Maybe (a, a)) -> m (a, a) -> m (Maybe (a, a))
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, a) -> Maybe (a, a)
forall a. a -> Maybe a
Just (m (a, a) -> m (Maybe (a, a))) -> m (a, a) -> m (Maybe (a, a))
forall a b. (a -> b) -> a -> b
$ ((a, a) -> a -> m (a, a)) -> (a, a) -> RingArray a -> m (a, a)
forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
(b -> a -> m b) -> b -> RingArray a -> m b
RingArray.foldlM' (a, a) -> a -> m (a, a)
forall {m :: * -> *} {b}.
(Monad m, Ord b) =>
(b, b) -> b -> m (b, b)
accum (a
x, a
x) RingArray a
rb

-- | Determine the maximum and minimum in a rolling window.
--
-- This implementation traverses the entire window buffer to compute the
-- range whenever we demand it.  It performs better than the dequeue based
-- implementation in @streamly-statistics@ package when the window size is
-- small (< 30).
--
-- If you want to compute the range of the entire stream
-- 'Streamly.Data.Scanl.range' would be much faster.
--
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowRange #-}
windowRange :: forall m a. (MonadIO m, Unbox a, Ord a) =>
    Int -> Scanl m a (Maybe (a, a))
-- windowRange = RingArray.scanFoldRingsBy (Fold.fromScanl Scanl.range)

-- Ideally this should perform the same as the implementation below which is
-- just expanded form of this. Some inlining/exitify optimization makes this
-- perform much worse. Need to investigate and fix that.
-- windowRange = RingArray.scanCustomFoldRingsBy ringRange

windowRange :: forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
Int -> Scanl m a (Maybe (a, a))
windowRange Int
n = (Tuple3Fused' MutByteArray Int Int
 -> a
 -> m (Step (Tuple3Fused' MutByteArray Int Int) (Maybe (a, a))))
-> m (Step (Tuple3Fused' MutByteArray Int Int) (Maybe (a, a)))
-> (Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a)))
-> (Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a)))
-> Scanl m a (Maybe (a, a))
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl Tuple3Fused' MutByteArray Int Int
-> a -> m (Step (Tuple3Fused' MutByteArray Int Int) (Maybe (a, a)))
forall {m :: * -> *} {a} {c} {b}.
(MonadIO m, Unbox a, Num c) =>
Tuple3Fused' MutByteArray Int c
-> a -> m (Step (Tuple3Fused' MutByteArray Int c) b)
step m (Step (Tuple3Fused' MutByteArray Int Int) (Maybe (a, a)))
forall {b}. m (Step (Tuple3Fused' MutByteArray Int Int) b)
initial Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a))
forall {m :: * -> *} {a}.
(MonadIO m, Ord a, Unbox a) =>
Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a))
extract Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a))
forall {m :: * -> *} {a}.
(MonadIO m, Ord a, Unbox a) =>
Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a))
extract

    where

    initial :: m (Step (Tuple3Fused' MutByteArray Int Int) b)
initial =
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
        then [Char] -> m (Step (Tuple3Fused' MutByteArray Int Int) b)
forall a. HasCallStack => [Char] -> a
error [Char]
"ringsOf: window size must be > 0"
        else do
            MutArray a
arr :: MutArray.MutArray a <- IO (MutArray a) -> m (MutArray a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> m (MutArray a)
MutArray.emptyOf Int
n
            Step (Tuple3Fused' MutByteArray Int Int) b
-> m (Step (Tuple3Fused' MutByteArray Int Int) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3Fused' MutByteArray Int Int) b
 -> m (Step (Tuple3Fused' MutByteArray Int Int) b))
-> Step (Tuple3Fused' MutByteArray Int Int) b
-> m (Step (Tuple3Fused' MutByteArray Int Int) b)
forall a b. (a -> b) -> a -> b
$ Tuple3Fused' MutByteArray Int Int
-> Step (Tuple3Fused' MutByteArray Int Int) b
forall s b. s -> Step s b
Partial (Tuple3Fused' MutByteArray Int Int
 -> Step (Tuple3Fused' MutByteArray Int Int) b)
-> Tuple3Fused' MutByteArray Int Int
-> Step (Tuple3Fused' MutByteArray Int Int) b
forall a b. (a -> b) -> a -> b
$ MutByteArray -> Int -> Int -> Tuple3Fused' MutByteArray Int Int
forall a b c. a -> b -> c -> Tuple3Fused' a b c
Tuple3Fused' (MutArray a -> MutByteArray
forall a. MutArray a -> MutByteArray
MutArray.arrContents MutArray a
arr) Int
0 Int
0

    step :: Tuple3Fused' MutByteArray Int c
-> a -> m (Step (Tuple3Fused' MutByteArray Int c) b)
step (Tuple3Fused' MutByteArray
mba Int
rh c
i) a
a = do
        RingArray MutByteArray
_ Int
_ Int
rh1 <- RingArray a -> a -> m (RingArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
RingArray a -> a -> m (RingArray a)
RingArray.replace_ (MutByteArray -> Int -> Int -> RingArray a
forall a. MutByteArray -> Int -> Int -> RingArray a
RingArray MutByteArray
mba (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
* SIZE_OF(a)) rh) a
        Step (Tuple3Fused' MutByteArray Int c) b
-> m (Step (Tuple3Fused' MutByteArray Int c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3Fused' MutByteArray Int c) b
 -> m (Step (Tuple3Fused' MutByteArray Int c) b))
-> Step (Tuple3Fused' MutByteArray Int c) b
-> m (Step (Tuple3Fused' MutByteArray Int c) b)
forall a b. (a -> b) -> a -> b
$ Tuple3Fused' MutByteArray Int c
-> Step (Tuple3Fused' MutByteArray Int c) b
forall s b. s -> Step s b
Partial (Tuple3Fused' MutByteArray Int c
 -> Step (Tuple3Fused' MutByteArray Int c) b)
-> Tuple3Fused' MutByteArray Int c
-> Step (Tuple3Fused' MutByteArray Int c) b
forall a b. (a -> b) -> a -> b
$ MutByteArray -> Int -> c -> Tuple3Fused' MutByteArray Int c
forall a b c. a -> b -> c -> Tuple3Fused' a b c
Tuple3Fused' MutByteArray
mba Int
rh1 (c
i c -> c -> c
forall a. Num a => a -> a -> a
+ c
1)

    -- XXX exitify optimization causes a problem here when modular folds are
    -- used. Sometimes inlining "extract" is helpful.
    -- {-# INLINE extract #-}
    extract :: Tuple3Fused' MutByteArray Int Int -> m (Maybe (a, a))
extract (Tuple3Fused' MutByteArray
mba Int
rh Int
i) =
    -- XXX If newest is lower than the current min than new is the min.
    -- XXX Otherwise if exiting one was equal to min only then we need to find
    -- new min
        let rs :: Int
rs = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
i Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
* SIZE_OF(a)
            rh1 :: Int
rh1 = if Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
n then Int
0 else Int
rh
         in RingArray a -> m (Maybe (a, a))
forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
RingArray a -> m (Maybe (a, a))
ringRange (RingArray a -> m (Maybe (a, a)))
-> RingArray a -> m (Maybe (a, a))
forall a b. (a -> b) -> a -> b
$ MutByteArray -> Int -> Int -> RingArray a
forall a. MutByteArray -> Int -> Int -> RingArray a
RingArray MutByteArray
mba Int
rs Int
rh1

-- | Find the minimum element in a rolling window.
--
-- See the performance related comments in 'windowRange'.
--
-- If you want to compute the minimum of the entire stream
-- 'Streamly.Data.Scanl.minimum' is much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMinimum #-}
windowMinimum :: (MonadIO m, Unbox a, Ord a) => Int -> Scanl m a (Maybe a)
windowMinimum :: forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
Int -> Scanl m a (Maybe a)
windowMinimum Int
n = (Maybe (a, a) -> Maybe a)
-> Scanl m a (Maybe (a, a)) -> Scanl m a (Maybe a)
forall a b. (a -> b) -> Scanl m a a -> Scanl m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((a, a) -> a) -> Maybe (a, a) -> Maybe a
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, a) -> a
forall a b. (a, b) -> a
fst) (Scanl m a (Maybe (a, a)) -> Scanl m a (Maybe a))
-> Scanl m a (Maybe (a, a)) -> Scanl m a (Maybe a)
forall a b. (a -> b) -> a -> b
$ Int -> Scanl m a (Maybe (a, a))
forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
Int -> Scanl m a (Maybe (a, a))
windowRange Int
n
-- windowMinimum = RingArray.scanFoldRingsBy (Fold.fromScanl Scanl.minimum)

-- | The maximum element in a rolling window.
--
-- See the performance related comments in 'windowRange'.
--
-- If you want to compute the maximum of the entire stream
-- 'Streamly.Data.Scanl.maximum' would be much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMaximum #-}
windowMaximum :: (MonadIO m, Unbox a, Ord a) => Int -> Scanl m a (Maybe a)
windowMaximum :: forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
Int -> Scanl m a (Maybe a)
windowMaximum Int
n = (Maybe (a, a) -> Maybe a)
-> Scanl m a (Maybe (a, a)) -> Scanl m a (Maybe a)
forall a b. (a -> b) -> Scanl m a a -> Scanl m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((a, a) -> a) -> Maybe (a, a) -> Maybe a
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, a) -> a
forall a b. (a, b) -> b
snd) (Scanl m a (Maybe (a, a)) -> Scanl m a (Maybe a))
-> Scanl m a (Maybe (a, a)) -> Scanl m a (Maybe a)
forall a b. (a -> b) -> a -> b
$ Int -> Scanl m a (Maybe (a, a))
forall (m :: * -> *) a.
(MonadIO m, Unbox a, Ord a) =>
Int -> Scanl m a (Maybe (a, a))
windowRange Int
n
-- windowMaximum = RingArray.scanFoldRingsBy (Fold.fromScanl Scanl.maximum)

-- XXX Returns NaN on empty stream.
-- XXX remove teeWith for better fusion?

-- | Arithmetic mean of elements in a sliding window:
--
-- \(\mu = \frac{\sum_{i=1}^n x_{i}}{n}\)
--
-- This is also known as the Simple Moving Average (SMA) when used in the
-- sliding window and Cumulative Moving Avergae (CMA) when used on the entire
-- stream.
--
-- >>> incrMean = Scanl.teeWith (/) Scanl.incrSum Scanl.incrCount
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
{-# INLINE incrMean #-}
incrMean :: forall m a. (Monad m, Fractional a) => Scanl m (Incr a) a
incrMean :: forall (m :: * -> *) a.
(Monad m, Fractional a) =>
Scanl m (Incr a) a
incrMean = (a -> a -> a)
-> Scanl m (Incr a) a -> Scanl m (Incr a) a -> Scanl m (Incr a) a
forall (m :: * -> *) b c d a.
Monad m =>
(b -> c -> d) -> Scanl m a b -> Scanl m a c -> Scanl m a d
Scanl.teeWith a -> a -> a
forall a. Fractional a => a -> a -> a
(/) Scanl m (Incr a) a
forall (m :: * -> *) a. (Monad m, Num a) => Scanl m (Incr a) a
incrSum Scanl m (Incr a) a
forall (m :: * -> *) b a. (Monad m, Num b) => Scanl m (Incr a) b
incrCount