Reactive Programming
Reactive programming is modeled beautifully using concurrent streaming in streamly. It involves generation of streams of events, merging concurrent streams and processing events concurrently. Streamly provides native high-level facilities to do all this easily.
We will illustrate the basics below using a few trivial examples.
Acid Rain Game
Objective of the Game
The game starts with a certain measure of health of the player. As time passes the health of the player keeps on deteriorating because acid rain is going on. If the health reaches 0 the player dies and the game is over. If the player types “potion” on the CLI, the health is improved, the game continues if the player keeps typing potion rapidly enough. If the player types “harm” instead the health of the player deteriorates and the player dies sooner. If the player types “quit” then the game ends.
Importing Required Modules
Let’s first import the required modules from streamly
and base
.
{-# LANGUAGE FlexibleContexts #-}
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.State (MonadState, get, modify)
import Data.Function ((&))
import Streamly.Data.Stream.Prelude (MonadAsync, Stream)
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream
Events
The possible events in the system are represented by the Event
data type.
Acid rain generates the Harm
event, typing “potion” on the CLI generates the
Heal
event, typing “harm” generates the Harm
event, and typing “quit”
generates the Quit
event. Harm and Heal events have an integer associated
which represents the degree of harm or healing.
data Event = Quit | Harm Int | Heal Int deriving (Eq, Show)
This application has two independent and concurrent sources of event
streams, acidRain
and userAction
.
Acid Rain Stream
Now let’s simulate acid rain. The acidRain function below generates a stream of
Harm 1
events, one event is generated per second.
acidRain :: MonadAsync m => Stream m Event
= Stream.parRepeatM (Stream.constRate 1) (return $ Harm 1) acidRain
User Event Stream
The second stream is the stream of events generated by the user by typing
commands on the CI. The userAction
function reads the standard input,
interprets the command typed and generates the appropriate event. It keeps
doing this forever, this is an infinite stream.
userAction :: MonadAsync m => Stream m Event
= Stream.repeatM $ liftIO askUser
userAction
where
= do
askUser <- getLine
command case command of
"potion" -> return (Heal 10)
"harm" -> return (Harm 10)
"quit" -> return Quit
-> putStrLn "Type potion or harm or quit" >> askUser _
Combined Stream
Now let’s combine the streams generated by acid rain and the stream generated
by the CLI. Both the streams should be generated concurrently, therefore, we
use the parList
function to combine them, this function combines a list of
streams concurrently. We use the eager True
option to ensure that both the
streams are evaluated as soon as possible.
parallel :: MonadAsync m => [Stream m a] -> Stream m a
= Stream.parList (Stream.eager True)
parallel
eventStream :: Stream m Event
= parallel [userAction, acidRain] eventStream
Process Health Events
The runEvents
function below maintains the health of the player as an integer
value in the State
monad. It maps the processEvents
function on the event
stream. The Harm
or Heal
events decrement or increment the player health
value appropriately. If we encounter a Quit
event the function returns Done
otherwise it returns Continue
. The resulting stream is a stream of Result
values.
data Result = Continue | Done
runEvents :: (MonadAsync m, MonadState Int m) => Stream m Result
= Stream.mapM processEvents eventStream
runEvents
where
=
processEvents event case event of
Harm n -> modify (\h -> h - n) >> return Continue
Heal n -> modify (\h -> h + n) >> return Continue
Quit -> return Done
Check the Player Status
The runEvents
function above returns a stream of results after processing the
health events. The stream consists of results indicating whether the game
should continue or end, the State
monad supplies the current health of the
player.
Now we map the getStatus
function on the Result
stream. If we encounter a
Done
in the result stream then it means the user has quit the game, so we
return GameOver
, if the health of the player is 0 or less then the player has
died and we return GameOver
, otherwise we return Alive
.
data Status = Alive | GameOver deriving Eq
getStatus :: (MonadAsync m, MonadState Int m) => Result -> m Status
=
getStatus result case result of
Done -> liftIO $ putStrLn "You quit!" >> return GameOver
Check -> do
<- get
h
liftIO$ if (h <= 0)
then putStrLn "You die!" >> return GameOver
else putStrLn ("Health = " <> show h) >> return Alive
Tying it all Together
We start with the Result
stream using the runEvents
function. Then we map
the getStatus
function on this stream and turn it into a Status
stream.
Then we run the State
monad using runStateT
, supplying the initial health
to be 60, the resulting stream is a tuple of (health, status) in the IO monad.
We then discard the health and just keep the status, resulting in a Status
stream. We then fold this Status
stream using the takeEndBy
fold, this fold
terminates as soon as a GameOver
value is encountered in the stream.
main :: IO ()
= do
main putStrLn "Your health is deteriorating due to acid rain,\\
\ type \"potion\" or \"quit\""
-- Stream (StateT Int IO) Result
runEvents & Stream.mapM getStatus -- Stream (StateT Int IO) Status
& Stream.runStateT (pure 60) -- Stream IO (Int, Status)
& fmap snd -- Stream IO Status
& Stream.fold (Fold.takeEndBy (== GameOver) Fold.drain) -- IO ()
return ()
Complete Working Example
You can find a complete working source of this example in the streamly-examples repo as AcidRain.hs. The idea of this game example has been taken from Gabriella Gonzalez’s pipes-concurrency package.
Circling Square
For a simple graphical example where we generate an animation by rendering a graphics frame periodically, see the SDL based circling square example adapted from Yampa in CirclingSquare.hs.