Help language development. Donate to The Perl Foundation

cro zef:cro last updated on 2022-11-30

# The Cro approach

At its heart, Cro is all about building up chains of Raku supplies (streams of
asynchronous values) that process messages that arrive from the network and
produce messages to be sent over the network.

## Key roles

Messages are represented by `Cro::Message`. Concrete implementations include:

* `Cro::TCP::Message`
* `Cro::HTTP::Request`
* `Cro::HTTP::Response`

Incoming connections are represented by `Cro::Connection`; implementations

* `Cro::TCP::ServerConnection`
* `Cro::TLS::ServerConnection`

A `Cro::Source` is a source of either messages or connections. For example,
`Cro::TCP::Listener` produces `Cro::TCP::ServerConnection` objects.

A `Cro::Transform` transforms one connection or message into another. For
example, `Cro::HTTP::RequestParser` will transform `Cro::TCP::Message`s into
`Cro::HTTP::Request`s, while a `Cro::HTTP::ResponseSerializer` transforms
`Cro::HTTP::Response`s into `Cro::TCP::Message`s. This means that in Cro,
HTTP applications are simply transforms from `Cro::HTTP::Request` into

A `Cro::Sink` consumes messages. It doesn't produce anything. A sink comes at
the end of a message processing pipeline. A sink in a TCP server would consume
`Cro::TCP::Message`s and send them over the network.

Some messages or connections can be replied to with one or more messages. These
do the `Cro::Replyable` role. Anything that produces a replyable is also
responsible for providing something that can process the reply messages. This
"something" may either be a transform or a sink. Examples of replyables include
`Cro::TCP::ServerConnection` and `Cro::TLS::ServerConnection`, which give a
`Cro::Sink` replier that sends `Cro::TCP::Message` objects back to the client.

## Composition

Cro components (sources, transforms, and sinks) can be put together to form
pipelines. This process is called pipeline composition. Perhaps the simplest
possible example is setting up an echo server:

class Echo does Cro::Transform {
    method consumes() { Cro::TCP::Message }
    method produces() { Cro::TCP::Message }

    method reply(Supply $source) {
        # We could actually just `return $source` here, but the identity
        # supply is written out here to illustrate what a transform will
        # often look like.
        supply {
            whenever $source -> $message {
                emit $message;

Which can then be composed into a service and started as follows:

    my Cro::Service $echo-server = Cro.compose( => 8000),

Note that `Cro.compose(...)` only returns a `Cro::Service` in the case that
it has something that starts with a source and ends with a sink. So where did
the sink come from in this case? From the fact that a connection is replyable,
and so it provided the sink. It's also worth noting that a stream of connections
magically turned into a stream of messages. If the composer spots that something
producing connections is followed by something consuming messages, it will
pass the rest of the pipeline to a `Cro::ConnectionManager` instance, so the
processing of the remainder of the pipeline will be per connection.

## An HTTP server example

Most Cro HTTP services will be assembled using high-level modules such as
`Cro::HTTP::Router` and `Cro::HTTP::Server`. However, it is possible to use
`Cro.compose(...)` to piece together a HTTP processing pipeline without these
conveniences. First, various components and message types are needed:

    use Cro;
    use Cro::HTTP::Request;
    use Cro::HTTP::RequestParser;
    use Cro::HTTP::Response;
    use Cro::HTTP::ResponseSerializer;
    use Cro::TCP;

The HTTP application itself - a simple "Hello, world" - is a `Cro::Transform`
that turns a request into a response:

class HTTPHello does Cro::Transform {
    method consumes() { Cro::HTTP::Request }
    method produces() { Cro::HTTP::Response }

    method transformer($request-stream) {
        supply {
            whenever $request-stream -> $request {
                given {
                    .append-header('Content-type', 'text/html');
                    .set-body("<strong>Hello from Cro!</strong>");

These are composed into a service:

    my Cro::Service $http-service = Cro.compose( :host('localhost'), :port(8181) ),,

Which can then be used like this:

    signal(SIGINT).tap: {
        note "Shutting down...";

## Client pipelines

Clients, such as HTTP clients, are also expressed as pipelines. Unlike with
server pipelines, where the application is at the center of the pipeline and
the network at either end, a client pipeline has the network at the center and
the application at either end.

The component at the center of a client pipeline will be a `Cro::Connector`,
which establishes a connection. A `Cro::Connector` is able to establish a
connection and produce a `Cro::Transform` that will send messages it consumes
using the connection and emit messages received from the network connection.
Pipelines featuring a connector must not have a `Cro::Source` nor a

A minimal TCP client that connects, sends a message, and disconnects as soon
as it has received something, could be expressed as:

    my Cro::Connector $conn = Cro.compose(Cro::TCP::Connector);
    my Supply $responses = $conn.establish(
        host => 'localhost',
        port => 4242,
        supply {
            emit :data('hello'.encode('ascii')) )
    react {
        whenever $responses {
            say .data;

The `establish` method on a client establishes a connection. It takes a single
positional argument with a `Supply`, which will be tapped to receive messages
to send; all named arguments will be passed along to the `connect` method,
which which makes a connection and returns a transform. The `establish` method
will return a `Supply` that the response messages will be emitted on.

More complex pipelines are possible. For example, a (not entirely convenient,
but functional) HTTP client would look like:

my Cro::Connector $conn = Cro.compose(

my $req = supply {
    my Cro::HTTP::Request $req .= new(:method<GET>, :target</>);
    $req.add-header('Host', '');
    emit $req;
react {
    whenever $conn.establish($req, :host<>, :port(80)) {
        say ~$response; # Dump headers