In this post I show another way to implement pipes, by combining a producer and consumer monad transformer. This implementation is for educational and entertainment purposes only: you probably shouldn't try to use it in production software. To quote Donald Knuth: I have only proved it correct, not tried it.
One obvious thing that is missing is finalization, but that could be added by passing along a finalizer with each call to `yield`, as described by Gabriel Gonzalez.

Let's start with producers. A producer can produce a stream of values of type `o`, and then ends with a value of type `a`. At each step, it performs a monad action.

data ProducerT' o m a = Done a | More o (ProducerT o m a) newtype ProducerT o m a = ProducerT { runProducerT :: m (ProducerT' o m a) }

This monad transformer is similar to the ListT-done-right monad transformer. The difference is that the producer has a value at the end, while `ListT` ends with an empty list. More importantly, `ListT` is a monad over the list items, while `ProducerT` is a monad over the end value. It can't be a monad over the stream values, because then `return` would have to conjure a value of type `a` out of nowhere.

The `Monad` and `MonadTrans` instances are straightforward:

instance Monad m => Monad (ProducerT o m) where return = ProducerT . return . Done a >>= b = ProducerT $ runProducerT a >>= bind' where bind' (Done x) = runProducerT (b x) bind' (More o k) = return (More o (k >>= b)) instance MonadTrans (ProducerT o) where lift = ProducerT . liftM Done

The point of producers is that they can produce values. So, let's make a function for that

yield :: Monad m => o -> ProducerT o m () yield x = ProducerT $ return $ More x (return ())

Given a producer, we can try to extract the first value. This succeeds if the stream is not empty, otherwise it returns the end value. In both cases we also return a the remaining producer:

headProducerT :: Monad m => ProducerT o m a -> m (Either a o, ProducerT o m a) headProducerT = liftM step . runProducerT where step (Done x) = (Left x, return x) step (More o k) = (Right o, k)

This head function will form the building block for building consumers. If you look at the function's type, you might notice that it is very similar to that of a state monad. We could imagine a consumer as something that keeps track of the input producer, and repeatedly takes the head of it. So, a first idea might be

type ConsumerT' i t m = StateT (ProducerT i m t) m await' :: Monad m => ConsumerT' i t m (Either t i) await' = StateT headProducerT

This seems to work. We can compose a producer and a consumer very easily with `evalStateT`:

compose_{pc}:: Monad m => ProducerT i m t -> ConsumerT' i t m a -> m a compose_{pc}= flip evalStateT

In terms of pipes, we have composed a pipe with no input together with a pipe that produces no output, to give a 'pipe' with neither input nor output.

A general pipe is a computation that is both a producer and a consumer. There are two obvious ways of building one: with the consumer on the outside, or with the producer on the outside.

type Pipe_{CP}i a o m b = ConsumerT' i a (ProducerT o m) b type Pipe_{PC}i a o m b = ProducerT o (ConsumerT' i a m) b

These types are *not* the same. `Pipe _{CP}` first consumes a whole bunch of input, and then produces a whole bunch of output. In particular, it is impossible to stop early. In

Before doing fully general composition, let's first compose a producer with a pipe,

compose_{p}:: Monad m => ProducerT b m s -> Pipe_{PC}b s c m t -> ProducerT c m t

Remember that a value `p _{1}` of type

p_{1}= ProducerT $ StateT $ \s_{1}-> act >> return (More o_{1}p_{2}, s_{2})

As before, the upstream producer is the state for the downstream consumer.
So, we can fill in the upstream producer for `s _{1}`. Once we do so, we get access to

class MonadTransRebase t where rebase :: NestTrans m n -> t m a -> t n a instance MonadTransRebase (ProducerT o) where rebase f = ProducerT . runNestTrans f rebase' . runProducerT where rebase' f' (Done d) = Done d rebase' f' (More x k) = More x (rebase f' k)

The type `NestTrans` is a function from `m a` to `n b`, where the transformation inside the monadic values can use a different `NestTrans`. Hence the 'nested' part of the name.

newtype NestTrans m n = NestTrans { runNestTrans :: forall a b. (NestTrans m n -> a -> b) -> (m a -> n b) }

As said above, given the initial state, we can pass this state through the transfomer. Then the new state is used for nested `StateT` computations.

nestTransStateT :: Monad m => s -> NestTrans (StateT s m) m nestTransStateT s = NestTrans $ \f m -> liftM (\(a,s') -> f (nestTransStateT s') a) (runStateT m s)

This is all we need to define the composition:

compose_{p}u v = rebase (nestTransStateT u) v

Note that it is possible to write all this without the `NestTrans` newtype, but to do so generically requires rank 3 types (the first time that I have ever needed those). I leave that solution as an exercise to the reader.

Now let's also try to do this the other way around, and compose a pipe with a consumer,

compose_{c}:: Monad m => Pipe_{PC}a r b m s -> ConsumerT' b s m t -> ConsumerT' a r m t

But immediately we hit a problem. The downstream consumer expects its state to be of type `ProducerT b m s`, but the upstream has type `ProducerT b (ConsumerT .. m) s`. This is still a producer, but over a different base monad. In fact, the upstream producer's base monad is of the form `(t m)`, where `t` is another monad transformer.
We can't just get rid of the `ConsumerT`, like we did on the downstream side, because we still need to be able to pass in the state later on.

The solution is to make the state type more general, and allow it to be `ProducerT` over any transformation of a given base monad. Effectively we replace the state type `s m a` by `forall t. s (t m) a`. This gives us the *transformed state monad*:

data TStateT s a m b = TStateT { runTStateT :: forall t. (MonadTrans t, Monad (t m)) => s (t m) a -> t m (b, s (t m) a) }

Note that `s` is not a state *type*, but a state *monad transformer*.
The instances are straightforward, and look identical to the instances for `StateT`, with the exception of an extra `lift` in the `MonadTrans` instance.

instance Monad (TStateT s t m) where return a = TStateT $ \s -> return (a, s) m >>= k = TStateT $ \s -> do (a,s') <- runTStateT m s runTStateT (k a) s' instance MonadTrans (TStateT s t) where lift mx = TStateT $ \s -> lift $ liftM (\x -> (x,s)) mx

The new consumer type is just a `TState` with `ProducerT` as the state:

type ConsumerT i = TStateT (ProducerT i) type GPipe i o m = o (i m) type Pipe i a o m = GPipe (ConsumerT i a) (ProducerT o) m

Awaiting looks much like before,

await :: Monad m => Pipe i t o m (Either t i) await = lift $ TStateT headProducerT

All we need to do now to define composition is to make a `NestTrans` for `TStateT`.
The function to do this is essentially the same as `nestTransStateT` above:

nestTransTStateT :: (Monad (t m), MonadTrans t) => s (t m) a -> NestTrans (TStateT s a m) (t m) nestTransTStateT s = NestTrans $ \f m -> liftM (\(a,s') -> f (nestTransTStateT s') a) (runTStateT m s)

and by magic, we get composition:

compose :: Monad m => Pipe a r b m s -> Pipe b s c m t -> Pipe a r c m t compose = rebase . nestTransTStateT

There is nothing specific to `ConsumerT` or `ProducerT` in the composition function.
All we require is that the 'consumer' on the left is a monad transformer, and that 'producer' on the right can be rebased. This leads to the more general type of compose:

compose :: (MonadTransRebase t, MonadTrans r, Monad (r m)) => GPipe r s m a -> GPipe (TStateT s a) t m b -> GPipe r t m b

There are some interesting choices for `r`, `s` and `t` here.
By picking `r = IdentityT`, we get an upstream 'pipe' with no input, i.e. a producer.
By picking `t = IdentityT`, we get a downstream 'pipe' with no output, i.e. a consumer.

Finally, the transformer `s` determines what information is based between the two pipes.
By using `ProducerT o` you get a stream of `o`s followed by an `a` at the end.
If you use `ListT`, there is a stream of `a`s with no value at the end.
If you use `IdentityT`, just a single value is passed, so you get function composition.
If you use `InfiniteListT` you get a producer that guarantees that it gives an infinite stream of values.
And I believe it should also be possible to define more complex protocols, such as "first give 10 values of type `a`, then an unlimited number of `b`, and end with a `c`".
However, you do need a different `await` function for all of these.

To close things off, here are the producers and consumers based on `IdentityT`.

instance MonadTransRebase IdentityT where rebase f = IdentityT . runNestTrans f (const id) . runIdentityT type ProducerPipe o m = GPipe IdentityT (ProducerT o) m type ConsumerPipe i a m = GPipe (ConsumerT i a) IdentityT m type Pipeline m = GPipe IdentityT IdentityT m runPipeline :: Pipeline m a -> m a runPipeline = runIdentityT . runIdentityT

## Comment