This is the notes during reading an excellent paper by Mario Blazevic on Haskell conroutines. The paper can be found here. It is the last paper of The Monad Reader issue 19.
Monad Trampolines
As we know, monad is of great help when you want to somehow sequence your computations(though this is not what monad is all about, it is probably the biggest motivation it was introduced in the first place). Now, when we execute a monad, what if we want to be able to pause the monad for whatever reasons? A Trampoline monad helps us achieve this by wrapping monads in a new type that will either run the underlying monad or return the current continuation and let the caller to decide when to continue. I highlighted either to emphasis that we are calling for a…well, Either
type to wrap around the underlying monad.
Let’s define such a type:
newtype Trampoline m r = Trampoline{ bounce :: m (Either (Trampoline m r) r) }
Trampoline m r
is a type that wraps an underlying Monad m
, and with a bounce
function to continue a trampoline until a continuation is hit instead of underlying monad.
It is of course itself a monad:
instance Monad m => Monad (Tranpoline m) where return r = Trampoline $ return (Right r) t >>= f = Trampoline (bounce t >>= either (return . Left . (>>=f)) (bounce . f))
Note that in the definition of bind function, we first bounce t
, if we get a continuation back, we wrap it in Left
constructor and return, else we continue to bounce the next Trampoline
returned by monadic function f
.
Also note how function bounce
is correctly implemented in both return
and the bind function for each Trampoline instance we created.
It is also a monad transformer so that we can easily lift underlying monad to Trampoline
.
instance MonadTrans Trampoline where lift = Trampoline . liftM Right
We then define pause
to be a trampoline to indicate a cont should be returned, and run
to be a function that runs a trampoline skipping all pauses.
pause :: Monad m => Trampoline m () pause = Trampoline (return $ Left $ return ()) run :: Monad m => Trampiline m a -> m a run t = bounce t >>= either run return
Now we have all we needed for a trampoline computation that can pause anywhere we want:
*Main> let hello = do{lift (putStr "Hello, "); pause; lift (putStr "World! ")} *Main> Left cont <- bounce hello Hello, *Main> putStr "wonderful " wonderful *Main> run cont "World!" ()
Because of the pausing/suspension feature, we can easily interleave two trampoline now, see the original paper for an example where multiple trampolines are synchronized(meaning those faster to reach a pause will be held until all trampolines reached to their next pause). Notice how this approach executes a list of actions cooperating with each other, and without introducing any threads(and the logic to coordinate those threads!).
Suspension Functors
We see that trampoline pauses the program to let the caller do whatever it wants during the pause. We can introduce two more types with the pause feature, but providing more during the pause. Generator generates a value to the caller for a pause, and Iteratee expects a value from the caller to proceed after the pause.
Naturally, a Generator
is a type when bounce
ed, will Either
generate a value and a continuation — (a, Generator a m x)
, or a result directly.
newtype Generator a m x = Generator { bounceGen :: m (Either (a, Generator a m x) x) } instance Monad m => Monad (Generator a m) where return r = Generator $ return (Right r) g >>= f = Generator (bounceGen g >>= either ((a,cont) -> return $ Left (a, cont >>= f) ) (bounceGen . f) ) instance MonadTrans (Generator a) where lift = Generator . liftM Right yeild :: Monad m => a -> Generator a m () yeild a = Generator (return $ Left (a, return ())) runGenerator :: Monad m => Generator a m x -> m ([a], x) runGenerator = run' id where run' f g = bounceGen g >>= either ((a,cont) -> run' (f . (a:)) cont) (x -> return (f [], x))
As for Iteratee
, just replace the tuple with a function, that takes an expecting value to return a new Iteratee
.
newtype Iteratee a m x = Iteratee{ bounceIter :: m (Either (a -> (Iteratee a m x)) x) } instance Monad m => Monad (Iteratee a m) where return r = Iteratee $ return (Right r) t >>= f = Iteratee (bounceIter t >>= either (cont -> return $ Left ((>>=f) . cont)) (bounceIter . f) ) instance MonadTrans (Iteratee a) where lift = Iteratee . liftM Right await :: Monad m => Iteratee a m a await = Iteratee (return $ Left return) runIteratee :: Monad m => [a] -> Iteratee a m x -> m x runIteratee (a:rest) i = bounceIter i >>= either (cont -> runIteratee rest (cont a)) return runIteratee [] i = bounceIter i >>= either (cont -> runIteratee [] (cont $ error "No more values to feed") ) return
Now, Trampoline
, Generator
and Iteratee
all have very similar structure in their implementation. In fact, they only differ in the Left
data construct of the Either
type: for Trampoline
it is a id
around the continuation, for Generator
it is a (a,)
and for Iteratee
it is a curried function. And the implementations only differs in how they handle these different types.
The key idea of Coroutine
type is based on the realization that all those (id
, (a,)
, curried function) are functors. So we start define a type of Coroutine
with all functors.
newtype Coroutine s m r = Coroutine{ resume :: m (Either (s (Coroutine s m r)) r) } instance (Functor s, Monad m) => Monad (Coroutine s m) where return r = Coroutine $ return (Right r) t >>= f = Coroutine (resume t >>= either (return . Left . (fmap (>>=f))) -- same impl but less elegant -- (fcont -> return $ Left (fmap (>>=f) fcont)) (resume . f) ) instance MonadTrans (Coroutine a) where lift = Coroutine . liftM Right suspend :: (Monad m, Functor s) => s (Coroutine s m x) -> Coroutine s m x suspend s = Coroutine( return (Left s) )
And now the definitions of Trampoline
, Generator
and Iteratee
can be defined using the Coroutine
typeclass.
type Trampoline' m x = Coroutine Identity m x type Generator' a m x = Coroutine ((,) a) m x type Iteratee' a m x = Coroutine ((->) a) m x
Above tells how each of the Coroutine
should be composed as a monad and monad transformer.
We now go define the run and pause action for each of them. Note how similar they look to the original definitions.
suspend :: (Monad m, Functor s) => s (Coroutine s m x) -> Coroutine s m x suspend s = Coroutine( return (Left s) ) pause' :: Monad m => Trampoline' m () pause' = suspend (Identity $ return ()) yield' :: (Monad m, Functor ((,) x)) => x -> Generator' x m () yield' x = suspend (x, return ()) await' :: (Monad m, Functor ((->) x)) => Iteratee' x m x await' = suspend return run' :: Monad m => Trampoline' m x -> m x run' t = resume t >>= either (run' . runIdentity) return runGenerator' :: Monad m => Generator' x m r -> m ([x], r) runGenerator' = run' id where run' f g = resume g >>= either ((x,cont) -> run' (f . (x:)) cont) (r -> return (f [], r)) runIteratee' :: Monad m => [x] -> Iteratee' x m r -> m r runIteratee' (x:xs) i = resume i >>= either (cont -> runIteratee' xs (cont x)) return runIteratee' [] i = resume i >>= either (cont -> runIteratee' [] (cont $ error "No more values to feed")) return
We also define an utility function to map one Coroutine
with functor s
to Coroutine
with functor s'
.
mapSuspentions :: (Functor s, Monad m) => (forall a. s a -> s' a) -> Coroutine s m x -> Coroutine s' m x mapSuspentions f cort = Coroutine {resume = liftM map' (resume cort)} where map' (Right r) = Right r map' (Left s) = Left $ f (fmap (mapSuspentions f) s)
By now, the idea of coroutine should be clear: it is a computation with predefined pausing
actions. The sequence between two pause
s are “atomic”, meaning they will run as one single computation, and they will stop when they hot a pause
, returning back a continuation instead of the value of the computation. The continuation might be just there for the pausing effect, or it can do some more work, like generating a value or expecting a value. With these pausing continuations, it is then possible to mix many coroutines together so that they work cooperatively(thus the name “co-routine”). They are “continued” by its neighbors and they “continue” it’s neighbors when they complete their part and found it is time to let the neighbors continue. As we will show below.
Communicating coroutines
Before we dive into the piping usage of coroutines which is the core of haskell lib pipes
and conduits
, there is one thing I want to point out: if we make a coroutine whose suspension functor simply pause and request a global scheduler to switch to another coroutine(randomly probably), then we had ourselves a non-preemptive green thread system(aka symmetric coroutine) where threads themselves decide when to give up cpu and let others run. This shows only one of many possible usages of coroutine. We can further improve the scheduler to run coroutines in different OS-thread to gain from multiple cpu. All these can be done relatively easily!
Now, let’s chain together two coroutines that are dual to each other: Generator
and Iteratee
. Naturally, one produces value and one consumes them.
bindM2 :: Monad m => (a -> b -> m c) -> m a -> m b -> m c bindM2 f ma mb = do{a<-ma;b<-mb;f a b} pipe1 :: Monad m => Generator' a m x -> Iteratee' a m y -> m (x,y) pipe1 g i = bindM2 proceed (resume g) (resume i) where proceed (Left (a,c)) (Left f) = pipe1 c (f a) proceed (Left (a,c)) (Right y) = pipe1 c (return y) -- more generated values proceed (Right x) (Left f) = error "The producer ended too soon" proceed (Right x) (Right y) = return (x,y) -- better error handling with Maybe type pipe2 :: Monad m => Generator' a m x -> Iteratee' (Maybe a) m y -> m (x,y) pipe2 g i = bindM2 proceed (resume g) (resume i) where proceed (Left (a,c)) (Left f) = pipe2 c (f $ Just a) proceed (Left (a,c)) (Right y) = pipe2 c (return y) -- more generated values proceed (Right x) (Left f) = pipe2 (return x) (f Nothing) proceed (Right x) (Right y) = return (x,y)
pipe1
and pipe2
does the same thing, they resume both Generator
and Iteratee
until either the values from generator is all consumed by iteratee, and the computation ends; or iteratee demands more, in which case, pipe1
reports an run time error and pipe2
uses Maybe
type to allow user handle it gracefully.
Example:
*Main> let gen = do {lift $ putStrLn "yielding one"; yield' 1; lift $ putStrLn "then two"; yield' 2; lift $ putStrLn "returning 3"; return 3} *Main> runGenerator' gen yielding one then two returning 3 ([1,2],3) *Main> let iter = do {lift $ putStrLn "Enter two numbers"; x <- await' ; y <- await'; lift $ putStrLn $ "sum is " ++ show (x+y)} *Main> pipe1 gen iter yielding one Enter two numbers then two returning 3 sum is 3 (3,()) *Main> let iter2 s = do {lift $ putStrLn "Enter two numbers"; x <- await' ; maybe (lift $ putStrLn $ "sum is " ++ show s) (n -> iter2 (s + n)) x} *Main> pipe2 gen (iter2 0) yielding one Enter two numbers then two Enter two numbers returning 3 Enter two numbers sum is 3 (3,()) *Main> pipe2 (return ()) (iter2 0) Enter two numbers sum is 0 ((),()) *Main> pipe2 (gen >> gen) (iter2 0) yielding one Enter two numbers then two Enter two numbers returning 3 yielding one Enter two numbers then two Enter two numbers returning 3 Enter two numbers sum is 6 (3,()) *Main> pipe1 (gen >> gen) iter yielding one Enter two numbers then two returning 3 yielding one sum is 3 then two returning 3 (3,()) *Main>
Note the difference between last two outputs, this is because iter2
is defined to be able to run infinitely, consuming all generated values, while iter
is not.
The above example is quite limited, and manual. We now look to generalize this process. To do that, we introduce a new type of Coroutine
: Transducer
, or Enumeratee
.
A Transducer
is also a Coroutine
, it’s a Coroutine
whose first type argument(s
in above definition) is a functor that is either a Generator
or a Iteratee
. First, we need EitherFunctor
type from transformers lib. It takes two functors and compose them to be a new functor.
data EitherFunctor l r x = LeftF (l x) | RightF (r x) instance (Functor l, Functor r) => Functor (EitherFunctor l r) where fmap f (LeftF l) = LeftF (fmap f l) fmap f (RightF r) = RightF (fmap f r)
Now we define Transducer
.
type Transducer a b m x = Coroutine (EitherFunctor ((->) (Maybe a)) ((,) b)) m x awaitT :: Monad m => Transducer a b m (Maybe a) awaitT = suspend (LeftF return) yieldT :: Monad m => b -> Transducer a b m () yieldT x = suspend (RightF (x, return ()))
We can show that we are able to lift pure or stateful computations into a Transducer
easily:
<br />-- lift a pure function to a Transcuder that first pauses for a value of type `a`, then yield `f a` once it is give and proceed/resume the computation lift121 :: Monad m => (a -> b) -> Transducer a b m () lift121 f = awaitT >>= maybe (return ()) (\a -> yieldT (f a) >> lift121 f) -- lift a pure function that when give a, produce a list of type b values. The resulting transducer awaits for a, and yield the list of values, then resumes liftStateless :: Monad m => (a -> [b]) -> Transducer a b m () liftStateless f = awaitT >>= maybe (return ()) (\a -> mapM_ yieldT (f a) >> liftStateless f) -- lift a stateful computation. The new state is passed to the resumed computation. liftStateful :: Monad m => (state -> a -> (state, [b])) -> (state -> [b]) -> state -> Transducer a b m () liftStateful f eof s = awaitT >>= maybe (mapM_ yieldT (eof s)) (\a -> let (s', bs) = f s a in mapM_ yieldT bs >> liftStateful f eof s' )
We could now pipe a Generator
to a Transducer
, or a Transducer
to a Iteratee
, or all three of them. However, if that is all we can do after all these careful type and function definitions, it will be very limited. Indeed, the reason we went through this trouble to have Transducer
is that it is a generalized form of Generator
and Iteratee
. Note we have defined mapSuspensions
earlier, which transforms before different functors. By using this, we can convert every Coroutine
types we have seen into a Transducer
, and if we define a function to pipe two Transducer
together, we have a generalized framework for piping any Coroutine
types!
Here we go:
data Naught fromGenerator :: Monad m => Generator' a m x -> Transducer Naught a m x fromGenerator = mapSuspensions RightF fromIteratee :: Monad m => Iteratee' (Maybe a) m x -> Transducer a Naught m x fromIteratee = mapSuspensions LeftF toGenerator :: Monad m => Transducer Naught a m x -> Generator' a m x toGenerator = mapSuspensions ((RightF a) -> a) toIteratee :: Monad m => Transducer a Naught m x -> Iteratee' (Maybe a) m x toIteratee = mapSuspensions ((LeftF a) -> a) (=>=) :: forall a b c m x y . Monad m=> Transducer a b m x -> Transducer b c m y -> Transducer a c m (x,y) -- Cont of t1 is a Iteratee t1 =>= t2 = Coroutine (bindM2 proceed (resume t1) (resume t2)) where -- t1 is demanding value, return a Transducer demanding value too proceed (Left (LeftF s)) c = return (Left $ LeftF $ fmap (=>= Coroutine (return c)) s) -- t2 is generating value, return a Transducer generating value too proceed c (Left (RightF s)) = return (Left $ RightF $ fmap (Coroutine (return c) =>=) s) -- t1 is generating and t2 is consuming, happy! proceed (Left (RightF (b, c1))) (Left (LeftF f)) = resume (c1 =>= f (Just b)) -- t1 is generating and t2 finished, return a transducer that continue to run t1 proceed (Left (RightF (b,c1))) (Right y) = resume (c1 =>= (return y :: Transducer b c m y)) -- t1 finished, and t2 is still demanding, feed Nothing to t2 proceed (Right x) (Left (LeftF f)) = resume ((return x :: Transducer a b m x) =>= f Nothing) -- both finished! proceed (Right x) (Right y) = return $ Right (x,y)
Let’s play around:
*Main> let iter2 s = do {lift $ putStrLn "Enter a number"; x <- await' ; maybe (lift $ putStrLn $ "sum is " ++ show s) (n -> iter2 (s + n)) x} *Main> runIteratee' [Just 3, Nothing] (toIteratee $ double =>= fromIteratee (iter2 0)) Enter a number Enter a number Enter a number sum is 6 ((),()) *Main> let gen = do {lift $ putStrLn "yielding one"; yield' 1; lift $ putStrLn "then two"; yield' 2; lift $ putStrLn "returning 3"; return 3} *Main> runGenerator' (toGenerator $ fromGenerator gen =>= double) yielding one then two returning 3 ([1,1,2,2],(3,())) *Main> run' (toTrampoline $ fromGenerator (yield' 3) =>= double =>= fromIteratee (iter2 0)) Enter a number Enter a number Enter a number sum is 6 (((),()),()) *Main> run' (toTrampoline $ fromGenerator (yield' 3) =>= double =>= double =>= fromIteratee (iter2 0)) Enter a number Enter a number Enter a number Enter a number Enter a number sum is 12 ((((),()),()),())
Note how the converted Transducer
are chain together and it function is exactly what is defined in the general pipe function.
More generalizations
OK, we have done some really cool generalization with Transducer
. Now, we are ready to move on to the next level.
We can define Coroutine
s that yields two type of values for downstream Coroutine
s, basically, it splits it’s generated values. Similarly, we can have a Coroutine
that awaits for two data sources and join them together.
With Splitter
and Join
, we are able to do non-linear piping, and define conditional Coroutine
s.
type Splitter a m x = Coroutine (EitherFunctor ((->) (Maybe a)) (EitherFunctor ((,) a) ((,)a))) m x type Join a m x = Coroutine (EitherFunctor (EitherFunctor ((->) (Maybe a)) ((->) (Maybe a))) ((,) a)) m x
I’ll stop here, please refer to the original paper for the yield
and await
definition. Also, the paper gives the type signature of the conditional combinators like ifThenElse
,not
, and
and others. It also gives an example of possible GREP implementation using these combinators and Splitter
, Join
and Transducer
s.
Summary
The post tries to show how Iteratee
and Enumeratee
in haskell are implemented. It gives a very generic and non-optimized implementation, compared to the libs that currently exist. Nevertheless, it helps to gain a deep understanding of Coroutine
, and it’s application in pipe line IOs(pipes and conduits).
Under the hood, Coroutine
provide a way to pause/resume a computaion. Exactly what it does when it is paused is controlled by the functor argument that is used to construct the Coroutine
. With different functors, we can have Trampoline
, Generator
, Iteratee
, Transducer
, Splitter
and Join
. With these types, it is possible to construct any stream that processes incoming value and generating new values, or in other words, it is possible to pipe a list of those types to create a single resulting type that knows how to process incoming values.
The benefit lies in that all these Coroutine
types are piped together using Continuation, they know how to pass the value to next Coroutine
, and they are woken up automatically when the value it awaits is ready. There is no context switch, as in multithreaded IO programming. However, they allow programmer to work as if each Coroutine
is in a separate thread. It achieve maximum efficiency of async IO without all the callbacks, thanks to the underlying continuations.
To the concern that it only utilize one OS thread when it runs, there are not much point to run it in many threads because they depend on each other, and would spend most of the time waiting, creating unnecessary context switches. In applications, it is typical that a Coroutine
, or pipe
/conduit
is constructed from many small Coroutine
s, they cooperate within one OS thread, minimize context switches. When you have two separate streams, you construct two of them and let them run on two OS thread. Each of them are optimal, and we still achieve parallel IO this way.