Building pipes with monad transformers

In this post I show another way to implement pipes, by combining a producer and consumer monad transformer. This implementation is for educational and entertainment purposes only: you probably shouldn't try to use it in production software. To quote Donald Knuth: I have only proved it correct, not tried it. One obvious thing that is missing is finalization, but that could be added by passing along a finalizer with each call to yield, as described by Gabriel Gonzalez.

Producers

Let's start with producers. A producer can produce a stream of values of type o, and then ends with a value of type a. At each step, it performs a monad action.

data ProducerT' o m a = Done a | More o (ProducerT o m a)
newtype ProducerT o m a = ProducerT { runProducerT :: m (ProducerT' o m a) }

This monad transformer is similar to the ListT-done-right monad transformer. The difference is that the producer has a value at the end, while ListT ends with an empty list. More importantly, ListT is a monad over the list items, while ProducerT is a monad over the end value. It can't be a monad over the stream values, because then return would have to conjure a value of type a out of nowhere.

The Monad and MonadTrans instances are straightforward:

instance Monad m => Monad (ProducerT o m) where
    return = ProducerT . return . Done
    a >>= b = ProducerT $ runProducerT a >>= bind'
      where
        bind' (Done x) = runProducerT (b x)
        bind' (More o k) = return (More o (k >>= b))
instance MonadTrans (ProducerT o) where lift = ProducerT . liftM Done

The point of producers is that they can produce values. So, let's make a function for that

yield :: Monad m => o -> ProducerT o m ()
yield x = ProducerT $ return $ More x (return ())

Given a producer, we can try to extract the first value. This succeeds if the stream is not empty, otherwise it returns the end value. In both cases we also return a the remaining producer:

headProducerT :: Monad m => ProducerT o m a -> m (Either a o, ProducerT o m a)
headProducerT = liftM step . runProducerT
  where
    step (Done x) = (Left x, return x)
    step (More o k) = (Right o, k)

This head function will form the building block for building consumers. If you look at the function's type, you might notice that it is very similar to that of a state monad. We could imagine a consumer as something that keeps track of the input producer, and repeatedly takes the head of it. So, a first idea might be

type ConsumerT' i t m = StateT (ProducerT i m t) m
await' :: Monad m => ConsumerT' i t m (Either t i)
await' = StateT headProducerT

This seems to work. We can compose a producer and a consumer very easily with evalStateT:

composepc :: Monad m => ProducerT i m t -> ConsumerT' i t m a -> m a
composepc = flip evalStateT

In terms of pipes, we have composed a pipe with no input together with a pipe that produces no output, to give a 'pipe' with neither input nor output.

Pipes

A general pipe is a computation that is both a producer and a consumer. There are two obvious ways of building one: with the consumer on the outside, or with the producer on the outside.

type PipeCP i a o m b = ConsumerT' i a (ProducerT o m) b
type PipePC i a o m b = ProducerT o (ConsumerT' i a m) b

These types are not the same. PipeCP first consumes a whole bunch of input, and then produces a whole bunch of output. In particular, it is impossible to stop early. In PipePC, the operations are interleaved; before each output there can be more consuming. This second formulation is therefore the one that we want.

Before doing fully general composition, let's first compose a producer with a pipe,

composep :: Monad m => ProducerT b m s -> PipePC b s c m t -> ProducerT c m t

Remember that a value p1 of type PipePC can look something like this:

p1 = ProducerT $ StateT $ \s1 -> act >> return (More o1 p2, s2)

As before, the upstream producer is the state for the downstream consumer. So, we can fill in the upstream producer for s1. Once we do so, we get access to s2, which should be filled in into p2, etc. In this way we turn the pipe from a ProducerT o (StateT .. m) a into a ProducerT o m a. So more generally, we change the base monad of a monad transformer.

class MonadTransRebase t where
    rebase :: NestTrans m n -> t m a -> t n a
instance MonadTransRebase (ProducerT o) where rebase f = ProducerT . runNestTrans f rebase' . runProducerT where rebase' f' (Done d) = Done d rebase' f' (More x k) = More x (rebase f' k)

The type NestTrans is a function from m a to n b, where the transformation inside the monadic values can use a different NestTrans. Hence the 'nested' part of the name.

newtype NestTrans m n = NestTrans
    { runNestTrans :: forall a b. (NestTrans m n -> a -> b) -> (m a -> n b) }

As said above, given the initial state, we can pass this state through the transfomer. Then the new state is used for nested StateT computations.

nestTransStateT :: Monad m => s -> NestTrans (StateT s m) m
nestTransStateT s = NestTrans $ \f m ->
    liftM (\(a,s') -> f (nestTransStateT s') a) (runStateT m s)

This is all we need to define the composition:

composep u v = rebase (nestTransStateT u) v

Note that it is possible to write all this without the NestTrans newtype, but to do so generically requires rank 3 types (the first time that I have ever needed those). I leave that solution as an exercise to the reader.

Consumers, take 2

Now let's also try to do this the other way around, and compose a pipe with a consumer,

composec :: Monad m
         => PipePC a r b m s -> ConsumerT' b s m t -> ConsumerT' a r m t

But immediately we hit a problem. The downstream consumer expects its state to be of type ProducerT b m s, but the upstream has type ProducerT b (ConsumerT .. m) s. This is still a producer, but over a different base monad. In fact, the upstream producer's base monad is of the form (t m), where t is another monad transformer. We can't just get rid of the ConsumerT, like we did on the downstream side, because we still need to be able to pass in the state later on.

The solution is to make the state type more general, and allow it to be ProducerT over any transformation of a given base monad. Effectively we replace the state type s m a by forall t. s (t m) a. This gives us the transformed state monad:

data TStateT s a m b = TStateT
    { runTStateT :: forall t. (MonadTrans t, Monad (t m))
                 => s (t m) a -> t m (b, s (t m) a) }

Note that s is not a state type, but a state monad transformer. The instances are straightforward, and look identical to the instances for StateT, with the exception of an extra lift in the MonadTrans instance.

instance Monad (TStateT s t m) where
    return a = TStateT $ \s -> return (a, s)
    m >>= k = TStateT $ \s -> do
        (a,s') <- runTStateT m s
        runTStateT (k a) s'
instance MonadTrans (TStateT s t) where lift mx = TStateT $ \s -> lift $ liftM (\x -> (x,s)) mx

The new consumer type is just a TState with ProducerT as the state:

type ConsumerT i = TStateT (ProducerT i)
type GPipe i o m = o (i m)
type Pipe i a o m = GPipe (ConsumerT i a) (ProducerT o) m

Awaiting looks much like before,

await :: Monad m => Pipe i t o m (Either t i)
await = lift $ TStateT headProducerT

All we need to do now to define composition is to make a NestTrans for TStateT. The function to do this is essentially the same as nestTransStateT above:

nestTransTStateT :: (Monad (t m), MonadTrans t)
                 => s (t m) a -> NestTrans (TStateT s a m) (t m)
nestTransTStateT s = NestTrans $ \f m ->
    liftM (\(a,s') -> f (nestTransTStateT s') a) (runTStateT m s)

and by magic, we get composition:

compose :: Monad m => Pipe a r b m s -> Pipe b s c m t -> Pipe a r c m t
compose = rebase . nestTransTStateT

General consumers and producers

There is nothing specific to ConsumerT or ProducerT in the composition function. All we require is that the 'consumer' on the left is a monad transformer, and that 'producer' on the right can be rebased. This leads to the more general type of compose:

compose :: (MonadTransRebase t, MonadTrans r, Monad (r m))
        => GPipe r s m a -> GPipe (TStateT s a) t m b -> GPipe r t m b

There are some interesting choices for r, s and t here. By picking r = IdentityT, we get an upstream 'pipe' with no input, i.e. a producer. By picking t = IdentityT, we get a downstream 'pipe' with no output, i.e. a consumer.

Finally, the transformer s determines what information is based between the two pipes. By using ProducerT o you get a stream of os followed by an a at the end. If you use ListT, there is a stream of as with no value at the end. If you use IdentityT, just a single value is passed, so you get function composition. If you use InfiniteListT you get a producer that guarantees that it gives an infinite stream of values. And I believe it should also be possible to define more complex protocols, such as "first give 10 values of type a, then an unlimited number of b, and end with a c". However, you do need a different await function for all of these.

To close things off, here are the producers and consumers based on IdentityT.

instance MonadTransRebase IdentityT where
    rebase f = IdentityT . runNestTrans f (const id) . runIdentityT
type ProducerPipe o m = GPipe IdentityT (ProducerT o) m type ConsumerPipe i a m = GPipe (ConsumerT i a) IdentityT m type Pipeline m = GPipe IdentityT IdentityT m
runPipeline :: Pipeline m a -> m a runPipeline = runIdentityT . runIdentityT

Comment

(optional)
(optional, will not be revealed)
6 + 14 =
Use > code for code blocks, @code@ for inline code. Some html is also allowed.