Unfolding a Stream of paginated items

Posted on Wed 24 January 2018 in Code • Tagged with Rust, Tokio, streams, HTTPLeave a comment

My most recent Rust crate is an API client for the Path of Exile’s public stash tabs. One problem that I had to solve while writing it was to turn a sequence of paginated items (in this case, player stash tabs) into a single, asynchronous Stream.

In this post, I’ll explain how to use the Stream interface, along with functions from the futures crate, to create a single Stream from multiple batches of entities.

Pagination 101

To divide a long list of items into pages is a very common pattern in many HTTP-based APIs.

If the client requests a sequence of entities that would be too large to serve as a single response, there has to be some way to split it over multiple HTTP roundtrips. To accomplish that, API servers will often return a constant number of items at first (like 50), followed by some form of continuation token:

$ curl http://api.example.com/items
{
    "items": [
        {...},
        {...},
        {...}
    ],
    "continuationToken": "e53c68db0ee412ac239173db147a02a0"
}

Such token is preferably an opaque sequence of bytes, though sometimes it can be an explicit offset (index) into the list of results1. Regardless of its exact nature, clients need to pass the token with their next request in order to obtain another batch of results:

$ curl 'http://api.example.com/items?after=e53c68db0ee412ac239173db147a02a0'
{
    "items": [
        {...},
        {...}
    ],
    "continuationToken": "4e3986e4c7f591b8cb17cf14addd40a6"
}

Repeat this procedure for as long as the response contains a continuation token, and you will eventually go through the entire sequence. If it’s really, really long (e.g. it’s a Twitter firehose for a popular hashtag), then you may of course hit some problems due to the sheer number of requests. For many datasets, however, this pagination scheme is absolutely sufficient while remaining relatively simple for clients to implement.

Stream it in Rust

What the client code would typically do, however, is to hide the pagination details completely and present only the final, unified sequence of items. Such abstraction is useful even for end-user applications, but it’s definitely expected from any shared library that wraps the third-party API.

Depending on your programming language of choice, this abstraction layer may be very simple to implement. Here’s how it could be done in Python, whose concepts of iterables and generators are a perfect fit for this task2:

import requests

def iter_items(after=None):
    """Yield items from an example API.
    :param after: Optional continuation token
    """
    while True:
        url = "http://api.example.com/items"
        if after is not None:
            url += "?after=%s" % after
        response = requests.get(url)
        response.raise_for_status()
        for item in response.json()['items']:
            yield item
        after = response.json().get("continuationToken")
        if after is None:
            break

# consumer
for item in iter_items():
    print(item)

In Rust, you can find their analogues in the Iterator and Stream traits, so we’re off to a pretty good start. What’s missing, however, is the equivalent of yield: something to tell the consumer “Here, have the next item!”, and then go back to the exact same place in the producer function.

This ability to jump back and forth between two (or more) functions involves having a language support for coroutines. Not many mainstream languages pass this requirement, although Python and C# would readily come to mind. In case of Rust, there have been some nightly proposals and experiments, but nothing seems to be stabilizing anytime soon.

DIY streaming

But of course, if you do want a Stream of paginated items, there is at least one straightforward solution: just implement the Stream trait directly.

This is actually quite a viable approach, very similar to rolling out a custom Iterator. Some minor differences stem mostly from a more complicated state management in Stream::poll compared to Iterator::next. While an iterator is either exhausted or not, a stream can also be waiting for the next item to “arrive” (Ok(Async::NotReady)), or have errored out permanently (Err(e)). As a consequence, the return value of Stream::poll is slightly more complex than just plain Option, but nevertheless quite manageable.

Irrespective of difficulty, writing a custom Stream from scratch would inevitably involve a lot of boilerplate. You may find it necessary in more complicated applications, of course, but for something that’s basically a glorified while loop, it doesn’t seem like a big ask to have a more concise solution.

The stream unfolds

Fortunately there is one! Its crucial element is the standalone stream::unfold function from the futures crate:

pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> where
    F: FnMut(T) -> Option<Fut>,
    Fut: IntoFuture<Item = (It, T)>,

Reading through the signature of this function can be a little intimidating at first. Part of it is Rust’s verbose syntax for anything that involves both trait bounds and closures3, making stream::unfold seem more complicated than it actually is. Indeed, if you’ve ever used Iterator adapters like .filter_map or .fold, the unfold function will be pretty easy to understand. (And if you haven’t, don’t worry! It’s really quite simple :))

If you look closely, you’ll see that stream::unfold takes the following two arguments:

  • first one is essentially an arbitrary initial value, called a seed
  • second one is a closure that receives the seed and returns an optional pair of values

What are those values?… Well, the entire purpose of the unfold function is to create a Stream, and a stream should inevitably produce some items. Consequently, the first value in the returned pair will be the next item in the stream.

And what about the second value? That’s just the next state of the seed! It will be received by the very same closure when someone asks the Stream to produce its next item. By passing around a useful value — say, a continuation token — you can create something that’s effectively a while loop from the Python example above.

The last important bits about this pair of values is the wrapping.

First, it is actually a Future, allowing your stream to yield objects that it doesn’t quite have yet — for example, those which ultimately come from an HTTP response.

Secondly, its outermost layer is an Option. This enables you to terminate the stream when the underlying source is exhausted by simply returning None. Until then, however, you should return Some with the (future of) aforementioned pair of values.

Paginate! Paginate!

If you have doubts about how all those pieces of stream::unfold fit in, then looking at the usage example in the docs may give you some idea of what it enables you to do. It’s a very artificial example, though: the resulting Stream isn’t waiting for any asynchronous Futures, which is the very reason you’d use a Stream over an Iterator in the first place4.

We can find a more natural application for unfold if we go back to our original problem. To reiterate, we want to repeatedly query an HTTP API for a long list of items, giving our callers a Stream of such items they can process at their leisure. At the same time, all details about pagination and handling of continuation tokens or offsets should be completely hidden from the user.

To use stream::unfold for this task, we need the initial seed and an appropriate closure. I have hinted already at using the continuation token as our seed, or the state that we pass around from one closure invocation to another. What remains is mostly making the actual HTTP request and interpreting the JSON response, for which we’ll use the defacto standard Rust crates: hyper, Serde, and serde_json:

use std::error::Error;

use futures::{future, Future, stream, Stream};
use hyper::{Client, Method};
use hyper::client::Request;
use serde_json;
use tokio_core::reactor::Handle;

const URL: &str = "http://api.example.com/items";

fn items(
    handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
    let client = Client::new(handle);
    Box::new(stream::unfold(after, move |cont_token| {
        let url = match cont_token {
            Some(ct) => format!("{}?after={}", URL, ct),
            None => return None,
        };
        let req = Request::new(Method::Get, url.parse().unwrap());
        Some(client.request(req).from_err().and_then(move |resp| {
            let status = resp.status();
            resp.body().concat2().from_err().and_then(move |body| {
                if status.is_success() {
                    serde_json::from_slice::<ItemsResponse>(&body)
                        .map_err(Box::<Error>::from)
                } else {
                    Err(format!("HTTP status: {}", status).into())
                }
            })
            .map(move |items_resp| {
                (stream::iter_ok(items_resp.items), items_resp.continuation_token)
            })
        }))
    })
    .flatten())
}

#[derive(Deserialize)]
struct ItemsResponse {
    items: Vec<Item>,
    #[serde(rename = "continuationToken")]
    continuation_token: Option<String>,
}

While this code may be a little challenging to decipher at first, it’s not out of line compared to how working with Futures and Streams looks like in general. In either case, you can expect a lot of .and_then callbacks :)

There is one detail here that I haven’t mentioned previously, though. It relates to the stream::iter_ok and Stream::flatten calls which you may have already noticed.
The issue with stream::unfold is that it only allows to yield an item once per closure invocation. For us, this is too limiting: a single batch response from the API will contain many such items, but we have no way of “splitting” them.

What we can do instead is to produce a Stream of entire batches of items, at least at first, and then flatten it. What Stream::flatten does here is to turn a nested Stream<Stream<Item>> into a flat Stream<Item>. The latter is what we eventually want to return, so all we need now is to create this nested stream of streams.

How? Well, that’s actually pretty easy.

We can already deserialize a Vec<Item> from the JSON response — that’s our item batch! — which is essentially an iterable of Items5. Another utility function from the stream module, namely stream::iter_ok, can readily turn such iterable into a “immediate” Stream. Such Stream won’t be asynchronous at all — its items will have been ready from the very beginning — but it will still conform to the Stream interface, enabling it to be flattened as we request.

But wait! There is a bug!

So in the end, is this the solution we’re looking for?…

Well, almost. First, here’s the expected usage of the function we just wrote:

let mut core = tokio_core::reactor::Core::new().unwrap();
core.run({
    let continuation_token = None;  // start from the beginning
    items(&core.handle(), continuation_token).for_each(|item| {
        println!("{:?}", item);
        Ok(())
    })
}).unwrap();

While this is more complicated than the plain for loop in Python, most of it is just Tokio boilerplate. The notable part is the invocation of items(), where we pass None as a continuation token to indicate that we want the entire sequence, right from its beginning.

And since we’re talking about fetching long sequences, we would indeed expect a lot of items. So it is probably quite surprising to hear that the stream we’ve created here will be completely empty.

…What? How?!

If you look again at the source code of items(), the direct reason should be pretty easy to find. The culprit lies in the return None branch of the first match. If we don’t pass Some(continuation_token) as a parameter to items(), this branch will be hit immediately, terminating the stream before it had a chance to produce anything.

It may not be very clear how to fix this problem. After all, the purpose of the match was to detect the end of the sequence, but it apparently prevents us from starting it in the first place!

Looking at the problem from another angle, we can see we’ve conflated two distinct states of our stream — “before it has started” and “after it’s ended” — into a single one (“no continuation token”). Since we obviously don’t want to make the after parameter mandatory — users should be able to say “Give me everything!” — we need another way of telling those two states apart.

In terms of Rust types, it seems that Option<String> is no longer sufficient for encoding all possible states of our Stream. Although we could try to fix that in some ad-hoc way (e.g. by adding another bool flag), it feels cleaner to just define a new, dedicated type. For one, this allows us to designate a name for each of the states in question, improving the readability and maintainability of our code:

enum State {
    Start(Option<String>),
    Next(String),
    End,
}

Note that we can put this definition directly inside the items() function, without cluttering the module namespace. All the relevant details of our Stream are thus nicely contained within a single function:

fn items(
    handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
    // (definition of State enum can go here)

    let client = Client::new(handle);
    Box::new(stream::unfold(State::Start(after), move |state| {
        let cont_token = match state {
            State::Start(opt_ct) => opt_ct,
            State::Next(ct) => Some(ct),
            State::End => return None,
        };
        let url = match cont_token {
            Some(ct) => format!("{}?after={}", URL, ct),
            None => URL.into(),
        };
        let req = Request::new(Method::Get, url.parse().unwrap());
        Some(client.request(req).from_err().and_then(move |resp| {
            let status = resp.status();
            resp.body().concat2().from_err().and_then(move |body| {
                if status.is_success() {
                    serde_json::from_slice::<ItemsResponse>(&body)
                        .map_err(Box::<Error>::from)
                } else {
                    Err(format!("HTTP status: {}", status).into())
                }
            })
            .map(move |items_resp| {
                let next_state = match items_resp.continuation_token {
                    Some(ct) => State::Next(ct),
                    None => State::End,
                };
                (stream::iter_ok(items_resp.items), next_state)
            })
        }))
    })
    .flatten())
}

Sure, there is a little more bookkeeping required now, but at least all the items are being emitted by the Stream as intended.


You can see the complete source in the playground here.


  1. Furthermore, the token doesn’t have to come as part of the HTTP response body. Some API providers (such as GitHub) may use the Link: header to point directly to the next URL to query. 

  2. This example uses “traditional”, synchronous Python code. However, it should be easy to convert it to the asynchronous equivalent that works in Python 3.5 and above, provided you can replace requests with some async HTTP library. 

  3. If you are curious whether other languages could express it better, you can check the Data.Conduit.List.unfold function from the Haskell’s conduit package. For most intents and purposes, it is their equivalent of stream::unfold

  4. Coincidentally, you can create iterators in the very same manner through the itertools::unfold function from the itertools crate

  5. In more technical Rust terms, it means Vec implements the IntoIterator trait, allowing anyone to get an Iterator from it. 

Continue reading

Terminating a Stream in Rust

Posted on Sat 16 December 2017 in Code • Tagged with Rust, streams, Tokio, asyncLeave a comment

Here’s a little trick that may be useful in dealing with asynchronous Streams in Rust.

When you consume a Stream using the for_each method, its default behavior is to finish early should an error be produced by the stream:

use futures::prelude::*;
use futures::stream;
use tokio_core::reactor::Core;

let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)]);
let fut = s.for_each(|n| {
    println!("{}", n);
    Ok(())
});

In more precise terms, it means that the Future returned by for_each will resolve with the first error from the underlying stream:

// Prints 1, 2, and then panics with "false".
Core::new().unwrap().run(fut).unwrap();

For most purposes, this is perfectly alright; errors are generally meant to propagate, after all.

Certain kinds of errors, however, are better off silenced. Perhaps they are expected to pop up during normal program operation, or maybe their occurrence should merely affect program execution in a particular way, and not halt it outright. In a simple case like above, you can of course check what for_each itself has returned, but that doesn’t scale to building larger Stream pipelines.

I encountered a situation like this myself when using the hubcaps library. The code I was writing was meant to search for GitHub issues within a specific repository. In GitHub API, this is accomplished by sending a search query like repo:$OWNER/$NAME, which may result in a rather obscure HTTP error (422 Unprocessable Entity) if the given repository doesn’t actually exist. But I didn’t care about this error; should it occur, I’d simply return an empty stream, because doing so was more convenient for the larger bit of logic that was consuming it.

Unfortunately, the Stream trait offers no interface that’d target this use case. There are only a few methods that even allow to look at errors mid-stream, and even fewer that can end it prematurely. On the flip side, at least we don’t have to consider too many combinations when looking for the solution ;)

Indeed, it seems there are only two Stream methods that are worthy of our attention:

  • Stream::then, because it allows for a closure to receive all stream values (items and errors)
  • Stream::take_while, because it accepts a closure that can end the stream early (but only based on items, not errors)

Combining them both, we arrive at the following recipe:

  • Inside a .then closure, look for Errors that you consider non-fatal and replace them with a special item value. The natural choice for such a value is None. As a side effect, this forces us to convert the regular (“successful”) items into Some(item), effectively transforming a Stream<Item=T> into Stream<Item=Option<T>>.

  • Looks for the special value (i.e. None) in the .take_while closure and terminate the stream when it’s been found.

  • Finally, convert the wrapped items back into their original form using .map, thus giving us back a Stream of Ts.

Applying this technique to our initial example, we get something that looks like this:

let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)])
    .then(|r| match r {
        Ok(r) => Ok(Some(r)),  // no-op passthrough of items
        Err(false) => Ok(None) // non-fatal error, terminate the stream
        Err(e) => Err(e),      // no-op passthrough of other errors
    })
    .take_while(|x| future::ok(x.is_some()))
    .map(Option::unwrap);

If we now try to consume this stream like before:

Core::new().run(
    s.for_each(|n| { println!("{}", n); Ok(()) })
).unwrap();

it will still end after the first two items, but without producing any errors afterwards.


For a more reusable version of the trick, you can check this gist; it adds a Stream::take_while_err method through an extension trait.

This isn’t a perfect solution, however, because it requires Boxing even on nightly Rust1. We can fix that by introducing a dedicated TakeWhileErr stream type, similarly to what native Stream methods do. I leave that as an exercise for the reader ;-)


  1. This is due to a limitation in the impl Trait feature which prevents it from being used as a return type of trait methods. 

Continue reading

Asynchronous Rust for fun & profit

Posted on Fri 28 April 2017 in Programming • Tagged with Rust, async, Tokio, futures, HTTPLeave a comment

…or: Is Rust webscale?

In this day and age, no language can really make an impact anymore unless it enables its programmers to harness the power of the Internet. Rust is no different here. Despite posing as a true systems language (as opposed to those only marketed as such), it includes highly scalable servers as a prominent objective in its 2017 agenda.

Presumably to satisfy this very objective, the Rust ecosystem has recently seen some major developments in the space of asynchronous I/O. Given the pace of those improvements, it may seem that production quality async services are quite possible already.

But is that so? How exactly do you write async Rust servers in the early to mid 2017?

To find out, I set to code up a toy application. Said program was a small intermediary/API server (a “microservice”, if you will) that tries to hit many of the typical requirements that arise in such projects. The main objective was to test the limits of asynchronous Rust, and see how easily (or difficult) they can be pushed.

This post is a summary of all the lessons I’ve learned from that.

It is necessarily quite long, so if you look for some TL;DR, scroll down straight to Conclusions.

Asynchro-what?

Before we dive in, I have to clarify what “asynchronous” means in this context. Those familiar with async concepts can freely skip this section.

Pulling some threads

Asynchronous processing (or async for short) is brought up most often in the context of I/O operations: disk reads, network calls, database queries, and so on.

Relatively speaking, all those tasks tend to be slow: they take orders of magnitude longer than just executing code or even accessing RAM. The “traditional” (synchronous) approach to dealing with them is to relegate those tasks to separate threads.

When one thread has to wait for a lengthy I/O operation to complete, the operating system (its scheduler, to be precise) can suspend that thread. This lets others execute their code in the mean time and not waste CPU cycles.

This is the essence of concurrency1.

Schedule yourself

But threads are not the only option when dealing with many things (i.e. requests) at once.

The alternative approach is one where no threads are automatically suspended or resumed by the OS. Instead, a special version of I/O subroutines allows the program to continue execution immediately after issuing an I/O call. While the operation happens in the background2, the code is given an opaque handle — usually called a promise, a future, or an async result — that will eventually resolve to the actual return value.

The program can wait for the handle synchronously, but it would typically hand it over to an event loop, an abstraction provided by a dedicated async framework. Such a framework (among which node.js is probably the best known example) maintains a list of all I/O “descriptors” (fds in Unix) that are associated with pending I/O operations.

Then, in the loop, it simply waits on all of them, usually via the epoll system call. Whenever an I/O task completes, the loop would execute a callback associated with its result (or promise, or future). Through this callback, the application is able to process it.

In a sense, we can treat the event loop as a dedicated scheduler for its program.

But why?

So, what exactly the benefit of asynchronous I/O? If anything, it definitely sounds more complicated for the programmer. (Spoiler alert: it is).

The impetus for the development of async techniques most likely came from the C10K problem. The short version of it is that computers are nowadays very fast and should therefore be able to serve thousands of requests simultaneously. (especially when those requests are mostly I/O, which translate to waiting time for the CPU).

And if “serving” queries is indeed almost all waiting, then handling thousands of clients should be very possible.

In some cases, however, it was found that when the OS is scheduling the threads, it introduces too much overhead on the frequent pause/resume state changes (context switching). Like I mentioned above, the asynchronous alternative does away with all that, and indeed lets the CPU just wait (on epoll) until something interesting happens. Once it does, the application can deal with it quickly, issue another I/O call, and let the server go back to waiting.

With today’s processing power we can theoretically handle a lot of concurrent clients this way: up to hundreds of thousands or even millions.

Reality check

Well, ain’t that grand? No wonder everyone is writing everything in node.js now!

Jokes aside, the actual benefits of asynchronous I/O (especially when weighed against its inconvenience for developers) are a bit harder to quantify. For one, they rely heavily on the assumption of fast code & slow I/O being valid in all situations.

But this isn’t really self-evident, and becomes increasingly dubious as time goes on and code complexity grows. It should be obvious, for example, that a Python web frontend talking mostly to in-memory caches in the same datacenter will have radically different performance characteristics than a C++ proxy server calling HTTP APIs over public Internet. Those nuances are often lost in translation between simplistic benchmarks and exaggerated blog posts3.

Upon a closer look, however, these details point quite clearly in favor of asynchronous Rust. Being a language that compiles to native code, it should usually run faster than interpreted (Python, Ruby) or even JITed (JVM & .NET) languages, very close to what is typically referred to as “bare metal” speed. For async I/O, it means the event loop won’t be disturbed for a (relatively) long time to do some trivial processing, leading to higher potential throughput of requests.

All in all, it would seem that Rust is one of the few languages where async actually makes sense.

Rust: the story so far

Obviously, this means it’s been built into the language right from the start… right?

Well, not really. It was always possible to use native epoll through FFI, of course, but that’s not exactly the level of abstraction we’d like to work with. Still, the upper layers of the async I/O stack have been steadily growing at least since Rust 1.0.

The major milestones here include mio, a comparatively basic building block that provides an asynchronous version of TCP/IP. It also offers idiomatic wrappers over epoll, allowing us to write our own event loop.

On the application side, the futures crate abstracts the notion of a potentially incomplete operation into, well, a future. Manipulating those futures is how one can now write asynchronous code in Rust.

More recently, Tokio has been emerging as defacto framework for async I/O in Rust. It essentially combines the two previously mentioned crates, and provides additional abstractions specifically for network clients and servers.

And finally, the popular HTTP framework Hyper is now also supporting asynchronous request handling via Tokio. What this means is that bread-and-butter of the Internet’s application layer — API servers talking JSON over HTTP — should now be fully supported by the ecosystem of asynchronous Rust.

Let’s take it for a spin then, shall we?

The Grand Project

Earlier on, we have established that the main use case for asynchronous I/O is intermediate microservices. They often sit somewhere between a standard web frontend and a storage server or a database. Because of their typical role within a bigger system, these kinds of projects don’t tend to be particularly exciting on their own.

But perhaps we can liven them up a little.

In the end, it is all about the Internet that we’re talking here, and everything on the Internet can usually be improved by one simple addition.


Image source

…Okay, two possible additions — the other one being:

Memes!

If you’re really pedantic, you may call them image macros. But regardless of the name, the important part is putting text on pictures, preferably in a funny way.

The microservice I wrote is doing just that. Thought it won’t ensure your memes are sufficiently hilarious, it will try to deliver them exactly to your specifications. You may thus think of it as possible backend for an image site like this one.

Flimsy excuses & post-hoc justifications

It is, of course, a complete coincidence, lacking any premeditation on my part, that when it comes to evaluating an async platform, a service like this fits the bill very well.

And especially when said platform is async Rust.

Why, though, is it such a happy, er, accident?

  • It’s a simple, well-defined application. There is basically a single endpoint, accepting simple input (JSON or query string) and producing a straightforward result (an image). No need to persist any state made creating an MVP significantly easier.

  • Caching can be used for meme templates and fonts. Besides being an inherent part of most network services, a cache also represents a point of contention for Rust programs. The language is widely known for its alergy to global mutable state, which is exactly what programmatic caches boil down to.

  • Image captioning is a CPU-intensive operation. While the “async” part of async I/O may sometimes go all the way down, many practical services either evolve some important CPU-bound code, or require it right from the start. For this reason, I wanted to check if & how async Rust can mix with threaded concurrency.

  • Configuration knobs can be added. Unlike trivial experiments in the vein of an echo or “Hello world” server, this kind of service warrants some flags that the user could tweak, like the number of image captioning threads, or the size of the template cache. We can see how easy (or how hard) it is to make them applicable across all future-based requests.

All in all, and despite its frivolous subject matter, a meme server is actually hitting quite a few notable spots in the microservice domain.

Learnings

As you may glean from its GitHub repo, it would seem that the experiment was successful. Sure, you could implement some features in the captioning department (supporting animated GIFs comes to mind), but none of these are pertinent to the async mechanics of the server.

And since it’s the async (I/O) in Rust that we’re interested in, let me now present you with an assorted collection of practical experiences with it.

>0-cost futures

If you read the docs’ preamble to the futures crate, you will see it mentioning the “zero-cost” aspect of the library. Consistent with the philosophy behind Rust, it proclaims to deliver its abstractions without any overhead.

Thing is, I’m not sure how this promise can be delivered on in practice.

Flip through the introductory tutorial to Tokio, for example, and you will already find plenty of compromises. Without the crucial (but nightly-only) impl Trait feature, you are basically required to put all your futures in a Box4. They even encourage it themselves, offering a convenient Future::boxed method exactly for this purpose, as well the matching BoxFuture typedef right in the crate.

But hey, you can always just use nightly Rust, right? impl Trait will stabilize eventually, so your code should be, ahem, future-proof either way.

Unfortunately, this assumes all the futures that you’re building your request handlers from shall never cross any thread boundaries. (BoxFuture, for example, automatically constrains them to be Send). As you’ve likely guessed, this doesn’t jive very well with computationally intensive tasks which are best relegated to a separate thread.

To deal with them properly, you’re going to need a thread pool-based executor, which is currently implemented in the futures_cpupool crate. Using it requires a lot of care, though, and a deep understanding of both types of concurrency involved.

Evidently, this was something that I lacked at the time, which is why I encountered problems ensuring that my futures are properly Send. In the end, I settled on making them Send in the most straightforward (and completely unnecessary) manner: by wrapping them in Arc/Mutex. That in itself wasn’t without its perils, but at least allowed me to move forward.

Ironically, this also shows an important, pragmatic property of the futures’ system: sub-par hacks around it are possible — a fact you’ll be glad to know about on the proverbial day before a deadline.

Templates-worthy error messages

Other significant properties of the futures’ abstraction shall include telling the programmer what’s wrong with his code in the simplest, most straightforward, and concise manner possible.

Here, let me show you an example:


…which you can also behold in its gist form .

The reason you will encounter such incomprehensible messages stems from the very building blocks of async code.

Right now, each chained operation on a future — map, and_then, or_else, and so on — produces a nested type. Every subsequent application of those methods “contains” (in terms of the type system) all the previous ones. Keep going, and it will eventually balloon into one big onion of Chain<Map<OrElse<Chain<Map<...etc...>>>>>.


Futures are like ogres.

I haven’t personally hit any compiler limits in this regard, but I’m sure it is plausible for a complicated, real-world program.

It also gets worse if you use nightly Rust with impl Trait. In this case, function boundaries no longer “break” type stacking via Boxing the results into trait objects. Indeed, you can very well end up with some truly gigantic constructs as the compiler tries represent the return types of your most complex handlers.

But even if rustc is up to snuff and can deal with those fractals just fine, it doesn’t necessarily mean the programmer can. Looking at those error messages, I had vivid flashbacks from hacking on C++ templates with ancient compilers like VS2005. The difference is, of course, that we’re not trying any arcane metaprogramming here; we just want to do some relatively mundane I/O.

I have no doubt the messaging will eventually improve, and the mile-long types will at least get pretty-printed. At the moment, however, prepare for some serious squinting and bracket-counting.

Where is my (language) support?

Sadly, those long, cryptic error messages are not the only way in which the Rust compiler disappoints us.

I keep mentioning impl Trait as a generally desirable language feature for writers of asynchronous code. This improvement is still a relatively long way from getting precisely defined, much less stabilized. And it is only a somewhat minor improvement in the async ergonomics.

The wishlist is vastly longer and even more inchoate.

Saying it bluntly, right now Rust doesn’t really support the async style at all. All the combined API surface of futures/Tokio/Hyper/etc. is a clever, but ultimately contrived design, and it has no intentional backing in the Rust language itself.

This is a stark contrast with numerous other languages. They often support asynchronous I/O as something of a first class feature. The list includes at least C#, Python 3.5+, Hack/PHP, ES8 / JavaScript, and basically all the functional languages. They all have dedicated async, await, or equivalent constructs that make the callback-based nature of asynchronous code essentially transparent.

The absence of similar support puts Rust in the same bucket as frontend JavaScript circa 2010, where .then-chaining of promises reigned supreme. This is of course better than the callback hell of early Node, but I wouldn’t think that’s a particularly high bar. In this regard, Rust leaves plenty to be desired.

There are proposals, obviously, to bring async coroutines into Rust. There is an even broader wish to make the language cross the OOP/FP fence already and commit to the functional way; this would mean adding an equivalent of Haskell’s do notation.

Either development could be sufficient. Both, however, require significant amount of design and implementation work. If solved now, this would easily be the most significant addition to the language since its 1.0 release — but the solution is currently in the RFC stages at best.

Future<Ecosystem>

While the core language support is lacking, the great as usual Rust community has been picking up some of the slack by establishing and cultivating a steadily growing ecosystem.

The constellation of async-related crates clusters mostly around the two core libraries: futures crate itself and Tokio. Any functionality you may need while writing asynchronous should likely be found quite easily by searching for one of those two keywords (plus Rust, of course). Another way of finding what you need is to look at the list of Tokio-related crates directly.

To be fair, I can’t really say much about the completeness of this ecosystem. The project didn’t really require too many external dependencies — the only relevant ones were:

  • futures_cpupool mentioned before
  • tokio-timer for imposing a timeout on caption requests
  • tokio-signal which handles SIGINT/Ctrl+C and allows for a graceful shutdown

Normally, you’d also want to research the async database drivers for your storage system of choice. I would not expect anything resembling the Diesel ORM crate, though, nor a web framework comparable to Iron, Pencil, or Rocket.

Conclusions

Alright, so what can we get from this overall analysis?

Given the rapid development of async Rust ecosystem so far, it is clear the technology is very promising. If the community maintains its usual enthusiasm and keeps funneling it into Tokio et al., it won’t be long before it matures into something remarkable.

Right now, however, it exposes way too many rough edges to fully bet on it. Still, there may be some applications where you could get away with an async Rust backend even in production. But personally, I wouldn’t recommend it outside of non-essential services, or tools internal to your organization.

If you do use async Rust for microservices, I’d also advise to take steps to ensure they remain “micro”. Like I’ve elaborated in the earlier sections, there are several issues that make future-based Rust code scale poorly with respect to maintainability. Keeping it simple is therefore essential.

To sum up, async Rust is currently an option only for the adventurous and/or small. Others should stick to a tried & tested solution: something like Java (with Quasar), .NET, Go, or perhaps node.js at the very least.


  1. It is also the crux of parallelism, but that’s different and is not the focus here. 

  2. Background” here refers to the low level, innate concurrency of the OS kernel (mediated with hardware interrupts), not the epoll-based event loops on the application side. 

  3. There is a great parallel to be drawn between a trivial echo/Hello world server, and a 3D graphics program that only redraws an empty screen. Both may start at some very high performance numbers (requests/frames per second) but once you start adding practical stuff, those metrics must drop hyperbolically

  4. Technically, you are not, but the alternative is extremely cumbersome.
    In short, you’d have to follow an approach similar to custom Iterators: define a new struct for each individual case (possibly just newtype‘ing an existing one), and then implement the necessary trait for it.
    For iterators, this works reasonably well, and you don’t need custom ones that often anyway. But futures, by their very nature, are meant to encapsulate any computation. For them, “each individual case” is literally every asynchronous function in your code. 

Continue reading