Unfolding a Stream of paginated items

Posted on Wed 24 January 2018 in Code • Tagged with Rust, Tokio, streams, HTTPLeave a comment

My most recent Rust crate is an API client for the Path of Exile’s public stash tabs. One problem that I had to solve while writing it was to turn a sequence of paginated items (in this case, player stash tabs) into a single, asynchronous Stream.

In this post, I’ll explain how to use the Stream interface, along with functions from the futures crate, to create a single Stream from multiple batches of entities.

Pagination 101

To divide a long list of items into pages is a very common pattern in many HTTP-based APIs.

If the client requests a sequence of entities that would be too large to serve as a single response, there has to be some way to split it over multiple HTTP roundtrips. To accomplish that, API servers will often return a constant number of items at first (like 50), followed by some form of continuation token:

$ curl http://api.example.com/items
{
    "items": [
        {...},
        {...},
        {...}
    ],
    "continuationToken": "e53c68db0ee412ac239173db147a02a0"
}

Such token is preferably an opaque sequence of bytes, though sometimes it can be an explicit offset (index) into the list of results1. Regardless of its exact nature, clients need to pass the token with their next request in order to obtain another batch of results:

$ curl 'http://api.example.com/items?after=e53c68db0ee412ac239173db147a02a0'
{
    "items": [
        {...},
        {...}
    ],
    "continuationToken": "4e3986e4c7f591b8cb17cf14addd40a6"
}

Repeat this procedure for as long as the response contains a continuation token, and you will eventually go through the entire sequence. If it’s really, really long (e.g. it’s a Twitter firehose for a popular hashtag), then you may of course hit some problems due to the sheer number of requests. For many datasets, however, this pagination scheme is absolutely sufficient while remaining relatively simple for clients to implement.

Stream it in Rust

What the client code would typically do, however, is to hide the pagination details completely and present only the final, unified sequence of items. Such abstraction is useful even for end-user applications, but it’s definitely expected from any shared library that wraps the third-party API.

Depending on your programming language of choice, this abstraction layer may be very simple to implement. Here’s how it could be done in Python, whose concepts of iterables and generators are a perfect fit for this task2:

import requests

def iter_items(after=None):
    """Yield items from an example API.
    :param after: Optional continuation token
    """
    while True:
        url = "http://api.example.com/items"
        if after is not None:
            url += "?after=%s" % after
        response = requests.get(url)
        response.raise_for_status()
        for item in response.json()['items']:
            yield item
        after = response.json().get("continuationToken")
        if after is None:
            break

# consumer
for item in iter_items():
    print(item)

In Rust, you can find their analogues in the Iterator and Stream traits, so we’re off to a pretty good start. What’s missing, however, is the equivalent of yield: something to tell the consumer “Here, have the next item!”, and then go back to the exact same place in the producer function.

This ability to jump back and forth between two (or more) functions involves having a language support for coroutines. Not many mainstream languages pass this requirement, although Python and C# would readily come to mind. In case of Rust, there have been some nightly proposals and experiments, but nothing seems to be stabilizing anytime soon.

DIY streaming

But of course, if you do want a Stream of paginated items, there is at least one straightforward solution: just implement the Stream trait directly.

This is actually quite a viable approach, very similar to rolling out a custom Iterator. Some minor differences stem mostly from a more complicated state management in Stream::poll compared to Iterator::next. While an iterator is either exhausted or not, a stream can also be waiting for the next item to “arrive” (Ok(Async::NotReady)), or have errored out permanently (Err(e)). As a consequence, the return value of Stream::poll is slightly more complex than just plain Option, but nevertheless quite manageable.

Irrespective of difficulty, writing a custom Stream from scratch would inevitably involve a lot of boilerplate. You may find it necessary in more complicated applications, of course, but for something that’s basically a glorified while loop, it doesn’t seem like a big ask to have a more concise solution.

The stream unfolds

Fortunately there is one! Its crucial element is the standalone stream::unfold function from the futures crate:

pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> where
    F: FnMut(T) -> Option<Fut>,
    Fut: IntoFuture<Item = (It, T)>,

Reading through the signature of this function can be a little intimidating at first. Part of it is Rust’s verbose syntax for anything that involves both trait bounds and closures3, making stream::unfold seem more complicated than it actually is. Indeed, if you’ve ever used Iterator adapters like .filter_map or .fold, the unfold function will be pretty easy to understand. (And if you haven’t, don’t worry! It’s really quite simple :))

If you look closely, you’ll see that stream::unfold takes the following two arguments:

  • first one is essentially an arbitrary initial value, called a seed
  • second one is a closure that receives the seed and returns an optional pair of values

What are those values?… Well, the entire purpose of the unfold function is to create a Stream, and a stream should inevitably produce some items. Consequently, the first value in the returned pair will be the next item in the stream.

And what about the second value? That’s just the next state of the seed! It will be received by the very same closure when someone asks the Stream to produce its next item. By passing around a useful value — say, a continuation token — you can create something that’s effectively a while loop from the Python example above.

The last important bits about this pair of values is the wrapping.

First, it is actually a Future, allowing your stream to yield objects that it doesn’t quite have yet — for example, those which ultimately come from an HTTP response.

Secondly, its outermost layer is an Option. This enables you to terminate the stream when the underlying source is exhausted by simply returning None. Until then, however, you should return Some with the (future of) aforementioned pair of values.

Paginate! Paginate!

If you have doubts about how all those pieces of stream::unfold fit in, then looking at the usage example in the docs may give you some idea of what it enables you to do. It’s a very artificial example, though: the resulting Stream isn’t waiting for any asynchronous Futures, which is the very reason you’d use a Stream over an Iterator in the first place4.

We can find a more natural application for unfold if we go back to our original problem. To reiterate, we want to repeatedly query an HTTP API for a long list of items, giving our callers a Stream of such items they can process at their leisure. At the same time, all the details about pagination and handling of continuation tokens or offsets should be completely hidden from the caller.

To employ stream::unfold for this task, we need two things: the initial seed, and an appropriate closure.

I have hinted already at using the continuation token as our seed, or the state that we pass around from one closure invocation to another. What remains is mostly making the actual HTTP request and interpreting the JSON response, for which we’ll use the defacto standard Rust crates: hyper, Serde, and serde_json:

use std::error::Error;

use futures::{future, Future, stream, Stream};
use hyper::{Client, Method};
use hyper::client::Request;
use serde_json;
use tokio_core::reactor::Handle;

const URL: &str = "http://api.example.com/items";

fn items(
    handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
    let client = Client::new(handle);
    Box::new(stream::unfold(after, move |cont_token| {
        let url = match cont_token {
            Some(ct) => format!("{}?after={}", URL, ct),
            None => return None,
        };
        let req = Request::new(Method::Get, url.parse().unwrap());
        Some(client.request(req).from_err().and_then(move |resp| {
            let status = resp.status();
            resp.body().concat2().from_err().and_then(move |body| {
                if status.is_success() {
                    serde_json::from_slice::<ItemsResponse>(&body)
                        .map_err(Box::<Error>::from)
                } else {
                    Err(format!("HTTP status: {}", status).into())
                }
            })
            .map(move |items_resp| {
                (stream::iter_ok(items_resp.items), items_resp.continuation_token)
            })
        }))
    })
    .flatten())
}

#[derive(Deserialize)]
struct ItemsResponse {
    items: Vec<Item>,
    #[serde(rename = "continuationToken")]
    continuation_token: Option<String>,
}

While this code may be a little challenging to decipher at first, it’s not out of line compared to how working with Futures and Streams looks like in general. In either case, you can expect a lot of .and_then callbacks :)

There is one detail here that I haven’t mentioned previously, though. It relates to the stream::iter_ok and Stream::flatten calls which you may have already noticed.
The issue with stream::unfold is that it only allows to yield an item once per closure invocation. For us, this is too limiting: a single batch response from the API will contain many such items, but we have no way of “splitting” them.

What we can do instead is to produce a Stream of entire batches of items, at least at first, and then flatten it. What Stream::flatten does here is to turn a nested Stream<Stream<Item>> into a flat Stream<Item>. The latter is what we eventually want to return, so all we need now is to create this nested stream of streams.

How? Well, that’s actually pretty easy.

We can already deserialize a Vec<Item> from the JSON response — that’s our item batch! — which is essentially an iterable of Items5. Another utility function from the stream module, namely stream::iter_ok, can readily turn such iterable into a “immediate” Stream. Such Stream won’t be asynchronous at all — its items will have been ready from the very beginning — but it will still conform to the Stream interface, enabling it to be flattened as we request.

But wait! There is a bug!

So in the end, is this the solution we’re looking for?…

Well, almost. First, here’s the expected usage of the function we just wrote:

let mut core = tokio_core::reactor::Core::new().unwrap();
core.run({
    let continuation_token = None;  // start from the beginning
    items(&core.handle(), continuation_token).for_each(|item| {
        println!("{:?}", item);
        Ok(())
    })
}).unwrap();

While this is more complicated than the plain for loop in Python, most of it is just Tokio boilerplate. The notable part is the invocation of items(), where we pass None as a continuation token to indicate that we want the entire sequence, right from its beginning.

And since we’re talking about fetching long sequences, we would indeed expect a lot of items. So it is probably quite surprising to hear that the stream we’ve created here will be completely empty.

…What? How?!

If you look again at the source code of items(), the direct reason should be pretty easy to find. The culprit lies in the return None branch of the first match. If we don’t pass Some(continuation_token) as a parameter to items(), this branch will be hit immediately, terminating the stream before it had a chance to produce anything.

It may not be very clear how to fix this problem. After all, the purpose of the match was to detect the end of the sequence, but it apparently prevents us from starting it in the first place!

Looking at the problem from another angle, we can see we’ve conflated two distinct states of our stream — “before it has started” and “after it’s ended” — into a single one (“no continuation token”). Since we obviously don’t want to make the after parameter mandatory — users should be able to say “Give me everything!” — we need another way of telling those two states apart.

In terms of Rust types, it seems that Option<String> is no longer sufficient for encoding all possible states of our Stream. Although we could try to fix that in some ad-hoc way (e.g. by adding another bool flag), it feels cleaner to just define a new, dedicated type. For one, this allows us to designate a name for each of the states in question, improving the readability and maintainability of our code:

enum State {
    Start(Option<String>),
    Next(String),
    End,
}

Note that we can put this definition directly inside the items() function, without cluttering the module namespace. All the relevant details of our Stream are thus nicely contained within a single function:

fn items(
    handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
    // (definition of State enum can go here)

    let client = Client::new(handle);
    Box::new(stream::unfold(State::Start(after), move |state| {
        let cont_token = match state {
            State::Start(opt_ct) => opt_ct,
            State::Next(ct) => Some(ct),
            State::End => return None,
        };
        let url = match cont_token {
            Some(ct) => format!("{}?after={}", URL, ct),
            None => URL.into(),
        };
        let req = Request::new(Method::Get, url.parse().unwrap());
        Some(client.request(req).from_err().and_then(move |resp| {
            let status = resp.status();
            resp.body().concat2().from_err().and_then(move |body| {
                if status.is_success() {
                    serde_json::from_slice::<ItemsResponse>(&body)
                        .map_err(Box::<Error>::from)
                } else {
                    Err(format!("HTTP status: {}", status).into())
                }
            })
            .map(move |items_resp| {
                let next_state = match items_resp.continuation_token {
                    Some(ct) => State::Next(ct),
                    None => State::End,
                };
                (stream::iter_ok(items_resp.items), next_state)
            })
        }))
    })
    .flatten())
}

Sure, there is a little more bookkeeping required now, but at least all the items are being emitted by the Stream as intended.


You can see the complete source in the playground here.


  1. Furthermore, the token doesn’t have to come as part of the HTTP response body. Some API providers (such as GitHub) may use the Link: header to point directly to the next URL to query. 

  2. This example uses “traditional”, synchronous Python code. However, it should be easy to convert it to the asynchronous equivalent that works in Python 3.5 and above, provided you can replace requests with some async HTTP library. 

  3. If you are curious whether other languages could express it better, you can check the Data.Conduit.List.unfold function from the Haskell’s conduit package. For most intents and purposes, it is their equivalent of stream::unfold

  4. Coincidentally, you can create iterators in the very same manner through the itertools::unfold function from the itertools crate

  5. In more technical Rust terms, it means Vec implements the IntoIterator trait, allowing anyone to get an Iterator from it. 

Continue reading

Asynchronous Rust for fun & profit

Posted on Fri 28 April 2017 in Programming • Tagged with Rust, async, Tokio, futures, HTTPLeave a comment

…or: Is Rust webscale?

In this day and age, no language can really make an impact anymore unless it enables its programmers to harness the power of the Internet. Rust is no different here. Despite posing as a true systems language (as opposed to those only marketed as such), it includes highly scalable servers as a prominent objective in its 2017 agenda.

Presumably to satisfy this very objective, the Rust ecosystem has recently seen some major developments in the space of asynchronous I/O. Given the pace of those improvements, it may seem that production quality async services are quite possible already.

But is that so? How exactly do you write async Rust servers in the early to mid 2017?

To find out, I set to code up a toy application. Said program was a small intermediary/API server (a “microservice”, if you will) that tries to hit many of the typical requirements that arise in such projects. The main objective was to test the limits of asynchronous Rust, and see how easily (or difficult) they can be pushed.

This post is a summary of all the lessons I’ve learned from that.

It is necessarily quite long, so if you look for some TL;DR, scroll down straight to Conclusions.

Asynchro-what?

Before we dive in, I have to clarify what “asynchronous” means in this context. Those familiar with async concepts can freely skip this section.

Pulling some threads

Asynchronous processing (or async for short) is brought up most often in the context of I/O operations: disk reads, network calls, database queries, and so on.

Relatively speaking, all those tasks tend to be slow: they take orders of magnitude longer than just executing code or even accessing RAM. The “traditional” (synchronous) approach to dealing with them is to relegate those tasks to separate threads.

When one thread has to wait for a lengthy I/O operation to complete, the operating system (its scheduler, to be precise) can suspend that thread. This lets others execute their code in the mean time and not waste CPU cycles.

This is the essence of concurrency1.

Schedule yourself

But threads are not the only option when dealing with many things (i.e. requests) at once.

The alternative approach is one where no threads are automatically suspended or resumed by the OS. Instead, a special version of I/O subroutines allows the program to continue execution immediately after issuing an I/O call. While the operation happens in the background2, the code is given an opaque handle — usually called a promise, a future, or an async result — that will eventually resolve to the actual return value.

The program can wait for the handle synchronously, but it would typically hand it over to an event loop, an abstraction provided by a dedicated async framework. Such a framework (among which node.js is probably the best known example) maintains a list of all I/O “descriptors” (fds in Unix) that are associated with pending I/O operations.

Then, in the loop, it simply waits on all of them, usually via the epoll system call. Whenever an I/O task completes, the loop would execute a callback associated with its result (or promise, or future). Through this callback, the application is able to process it.

In a sense, we can treat the event loop as a dedicated scheduler for its program.

But why?

So, what exactly the benefit of asynchronous I/O? If anything, it definitely sounds more complicated for the programmer. (Spoiler alert: it is).

The impetus for the development of async techniques most likely came from the C10K problem. The short version of it is that computers are nowadays very fast and should therefore be able to serve thousands of requests simultaneously. (especially when those requests are mostly I/O, which translate to waiting time for the CPU).

And if “serving” queries is indeed almost all waiting, then handling thousands of clients should be very possible.

In some cases, however, it was found that when the OS is scheduling the threads, it introduces too much overhead on the frequent pause/resume state changes (context switching). Like I mentioned above, the asynchronous alternative does away with all that, and indeed lets the CPU just wait (on epoll) until something interesting happens. Once it does, the application can deal with it quickly, issue another I/O call, and let the server go back to waiting.

With today’s processing power we can theoretically handle a lot of concurrent clients this way: up to hundreds of thousands or even millions.

Reality check

Well, ain’t that grand? No wonder everyone is writing everything in node.js now!

Jokes aside, the actual benefits of asynchronous I/O (especially when weighed against its inconvenience for developers) are a bit harder to quantify. For one, they rely heavily on the assumption of fast code & slow I/O being valid in all situations.

But this isn’t really self-evident, and becomes increasingly dubious as time goes on and code complexity grows. It should be obvious, for example, that a Python web frontend talking mostly to in-memory caches in the same datacenter will have radically different performance characteristics than a C++ proxy server calling HTTP APIs over public Internet. Those nuances are often lost in translation between simplistic benchmarks and exaggerated blog posts3.

Upon a closer look, however, these details point quite clearly in favor of asynchronous Rust. Being a language that compiles to native code, it should usually run faster than interpreted (Python, Ruby) or even JITed (JVM & .NET) languages, very close to what is typically referred to as “bare metal” speed. For async I/O, it means the event loop won’t be disturbed for a (relatively) long time to do some trivial processing, leading to higher potential throughput of requests.

All in all, it would seem that Rust is one of the few languages where async actually makes sense.

Rust: the story so far

Obviously, this means it’s been built into the language right from the start… right?

Well, not really. It was always possible to use native epoll through FFI, of course, but that’s not exactly the level of abstraction we’d like to work with. Still, the upper layers of the async I/O stack have been steadily growing at least since Rust 1.0.

The major milestones here include mio, a comparatively basic building block that provides an asynchronous version of TCP/IP. It also offers idiomatic wrappers over epoll, allowing us to write our own event loop.

On the application side, the futures crate abstracts the notion of a potentially incomplete operation into, well, a future. Manipulating those futures is how one can now write asynchronous code in Rust.

More recently, Tokio has been emerging as defacto framework for async I/O in Rust. It essentially combines the two previously mentioned crates, and provides additional abstractions specifically for network clients and servers.

And finally, the popular HTTP framework Hyper is now also supporting asynchronous request handling via Tokio. What this means is that bread-and-butter of the Internet’s application layer — API servers talking JSON over HTTP — should now be fully supported by the ecosystem of asynchronous Rust.

Let’s take it for a spin then, shall we?

The Grand Project

Earlier on, we have established that the main use case for asynchronous I/O is intermediate microservices. They often sit somewhere between a standard web frontend and a storage server or a database. Because of their typical role within a bigger system, these kinds of projects don’t tend to be particularly exciting on their own.

But perhaps we can liven them up a little.

In the end, it is all about the Internet that we’re talking here, and everything on the Internet can usually be improved by one simple addition.


Image source

…Okay, two possible additions — the other one being:

Memes!

If you’re really pedantic, you may call them image macros. But regardless of the name, the important part is putting text on pictures, preferably in a funny way.

The microservice I wrote is doing just that. Thought it won’t ensure your memes are sufficiently hilarious, it will try to deliver them exactly to your specifications. You may thus think of it as possible backend for an image site like this one.

Flimsy excuses & post-hoc justifications

It is, of course, a complete coincidence, lacking any premeditation on my part, that when it comes to evaluating an async platform, a service like this fits the bill very well.

And especially when said platform is async Rust.

Why, though, is it such a happy, er, accident?

  • It’s a simple, well-defined application. There is basically a single endpoint, accepting simple input (JSON or query string) and producing a straightforward result (an image). No need to persist any state made creating an MVP significantly easier.

  • Caching can be used for meme templates and fonts. Besides being an inherent part of most network services, a cache also represents a point of contention for Rust programs. The language is widely known for its alergy to global mutable state, which is exactly what programmatic caches boil down to.

  • Image captioning is a CPU-intensive operation. While the “async” part of async I/O may sometimes go all the way down, many practical services either evolve some important CPU-bound code, or require it right from the start. For this reason, I wanted to check if & how async Rust can mix with threaded concurrency.

  • Configuration knobs can be added. Unlike trivial experiments in the vein of an echo or “Hello world” server, this kind of service warrants some flags that the user could tweak, like the number of image captioning threads, or the size of the template cache. We can see how easy (or how hard) it is to make them applicable across all future-based requests.

All in all, and despite its frivolous subject matter, a meme server is actually hitting quite a few notable spots in the microservice domain.

Learnings

As you may glean from its GitHub repo, it would seem that the experiment was successful. Sure, you could implement some features in the captioning department (supporting animated GIFs comes to mind), but none of these are pertinent to the async mechanics of the server.

And since it’s the async (I/O) in Rust that we’re interested in, let me now present you with an assorted collection of practical experiences with it.

>0-cost futures

If you read the docs’ preamble to the futures crate, you will see it mentioning the “zero-cost” aspect of the library. Consistent with the philosophy behind Rust, it proclaims to deliver its abstractions without any overhead.

Thing is, I’m not sure how this promise can be delivered on in practice.

Flip through the introductory tutorial to Tokio, for example, and you will already find plenty of compromises. Without the crucial (but nightly-only) impl Trait feature, you are basically required to put all your futures in a Box4. They even encourage it themselves, offering a convenient Future::boxed method exactly for this purpose, as well the matching BoxFuture typedef right in the crate.

But hey, you can always just use nightly Rust, right? impl Trait will stabilize eventually, so your code should be, ahem, future-proof either way.

Unfortunately, this assumes all the futures that you’re building your request handlers from shall never cross any thread boundaries. (BoxFuture, for example, automatically constrains them to be Send). As you’ve likely guessed, this doesn’t jive very well with computationally intensive tasks which are best relegated to a separate thread.

To deal with them properly, you’re going to need a thread pool-based executor, which is currently implemented in the futures_cpupool crate. Using it requires a lot of care, though, and a deep understanding of both types of concurrency involved.

Evidently, this was something that I lacked at the time, which is why I encountered problems ensuring that my futures are properly Send. In the end, I settled on making them Send in the most straightforward (and completely unnecessary) manner: by wrapping them in Arc/Mutex. That in itself wasn’t without its perils, but at least allowed me to move forward.

Ironically, this also shows an important, pragmatic property of the futures’ system: sub-par hacks around it are possible — a fact you’ll be glad to know about on the proverbial day before a deadline.

Templates-worthy error messages

Other significant properties of the futures’ abstraction shall include telling the programmer what’s wrong with his code in the simplest, most straightforward, and concise manner possible.

Here, let me show you an example:


…which you can also behold in its gist form .

The reason you will encounter such incomprehensible messages stems from the very building blocks of async code.

Right now, each chained operation on a future — map, and_then, or_else, and so on — produces a nested type. Every subsequent application of those methods “contains” (in terms of the type system) all the previous ones. Keep going, and it will eventually balloon into one big onion of Chain<Map<OrElse<Chain<Map<...etc...>>>>>.


Futures are like ogres.

I haven’t personally hit any compiler limits in this regard, but I’m sure it is plausible for a complicated, real-world program.

It also gets worse if you use nightly Rust with impl Trait. In this case, function boundaries no longer “break” type stacking via Boxing the results into trait objects. Indeed, you can very well end up with some truly gigantic constructs as the compiler tries represent the return types of your most complex handlers.

But even if rustc is up to snuff and can deal with those fractals just fine, it doesn’t necessarily mean the programmer can. Looking at those error messages, I had vivid flashbacks from hacking on C++ templates with ancient compilers like VS2005. The difference is, of course, that we’re not trying any arcane metaprogramming here; we just want to do some relatively mundane I/O.

I have no doubt the messaging will eventually improve, and the mile-long types will at least get pretty-printed. At the moment, however, prepare for some serious squinting and bracket-counting.

Where is my (language) support?

Sadly, those long, cryptic error messages are not the only way in which the Rust compiler disappoints us.

I keep mentioning impl Trait as a generally desirable language feature for writers of asynchronous code. This improvement is still a relatively long way from getting precisely defined, much less stabilized. And it is only a somewhat minor improvement in the async ergonomics.

The wishlist is vastly longer and even more inchoate.

Saying it bluntly, right now Rust doesn’t really support the async style at all. All the combined API surface of futures/Tokio/Hyper/etc. is a clever, but ultimately contrived design, and it has no intentional backing in the Rust language itself.

This is a stark contrast with numerous other languages. They often support asynchronous I/O as something of a first class feature. The list includes at least C#, Python 3.5+, Hack/PHP, ES8 / JavaScript, and basically all the functional languages. They all have dedicated async, await, or equivalent constructs that make the callback-based nature of asynchronous code essentially transparent.

The absence of similar support puts Rust in the same bucket as frontend JavaScript circa 2010, where .then-chaining of promises reigned supreme. This is of course better than the callback hell of early Node, but I wouldn’t think that’s a particularly high bar. In this regard, Rust leaves plenty to be desired.

There are proposals, obviously, to bring async coroutines into Rust. There is an even broader wish to make the language cross the OOP/FP fence already and commit to the functional way; this would mean adding an equivalent of Haskell’s do notation.

Either development could be sufficient. Both, however, require significant amount of design and implementation work. If solved now, this would easily be the most significant addition to the language since its 1.0 release — but the solution is currently in the RFC stages at best.

Future<Ecosystem>

While the core language support is lacking, the great as usual Rust community has been picking up some of the slack by establishing and cultivating a steadily growing ecosystem.

The constellation of async-related crates clusters mostly around the two core libraries: futures crate itself and Tokio. Any functionality you may need while writing asynchronous should likely be found quite easily by searching for one of those two keywords (plus Rust, of course). Another way of finding what you need is to look at the list of Tokio-related crates directly.

To be fair, I can’t really say much about the completeness of this ecosystem. The project didn’t really require too many external dependencies — the only relevant ones were:

  • futures_cpupool mentioned before
  • tokio-timer for imposing a timeout on caption requests
  • tokio-signal which handles SIGINT/Ctrl+C and allows for a graceful shutdown

Normally, you’d also want to research the async database drivers for your storage system of choice. I would not expect anything resembling the Diesel ORM crate, though, nor a web framework comparable to Iron, Pencil, or Rocket.

Conclusions

Alright, so what can we get from this overall analysis?

Given the rapid development of async Rust ecosystem so far, it is clear the technology is very promising. If the community maintains its usual enthusiasm and keeps funneling it into Tokio et al., it won’t be long before it matures into something remarkable.

Right now, however, it exposes way too many rough edges to fully bet on it. Still, there may be some applications where you could get away with an async Rust backend even in production. But personally, I wouldn’t recommend it outside of non-essential services, or tools internal to your organization.

If you do use async Rust for microservices, I’d also advise to take steps to ensure they remain “micro”. Like I’ve elaborated in the earlier sections, there are several issues that make future-based Rust code scale poorly with respect to maintainability. Keeping it simple is therefore essential.

To sum up, async Rust is currently an option only for the adventurous and/or small. Others should stick to a tried & tested solution: something like Java (with Quasar), .NET, Go, or perhaps node.js at the very least.


  1. It is also the crux of parallelism, but that’s different and is not the focus here. 

  2. Background” here refers to the low level, innate concurrency of the OS kernel (mediated with hardware interrupts), not the epoll-based event loops on the application side. 

  3. There is a great parallel to be drawn between a trivial echo/Hello world server, and a 3D graphics program that only redraws an empty screen. Both may start at some very high performance numbers (requests/frames per second) but once you start adding practical stuff, those metrics must drop hyperbolically

  4. Technically, you are not, but the alternative is extremely cumbersome.
    In short, you’d have to follow an approach similar to custom Iterators: define a new struct for each individual case (possibly just newtype‘ing an existing one), and then implement the necessary trait for it.
    For iterators, this works reasonably well, and you don’t need custom ones that often anyway. But futures, by their very nature, are meant to encapsulate any computation. For them, “each individual case” is literally every asynchronous function in your code. 

Continue reading

Query string authentication in Requests

Posted on Fri 11 September 2015 in Code • Tagged with Python, Requests, HTTP, authenticationLeave a comment

Requests is a widely used Python library that provides a nicer API for writing HTTP clients than the standard urllib2 module does. It deals with authentication in an especially concise way: through a simple auth= argument, rather than a separate password manager & authentication handler or other such nonsense.

There are several possible ways to authenticate an HTTP call with Requests, and it’s pretty easy to implement our own approach if the server requires it. All the built-in ways, however, as well as the examples of custom implementations, are heavily biased towards using HTTP headers to transmit the necessary credentials (such as username/password or some kind of opaque token).

Non-standard auth

This is actually quite reasonable: the most popular authentication methods, including OAuth 1.0 & 2.0, use HTTP headers either primarily or exclusively.

Not every HTTP API follows this convention, though. Sometimes, credentials are put in other parts of the request, commonly the URL itself. It may seem like a bad idea at first but it can also be perfectly acceptable: credentials don’t have to expose secrets of any particular user of the remote system.

Steam API is a good example here. Calling any of its endpoints requires providing an API key but it grants no special rights to access data of any particular Steam user. All the information it returns is already visible on their public profile1.

For those special authentication schemes, Requests necessitate rolling out our own implementation. Thankfully, doing so is mostly pretty straightforward.

Simple example

All Requests’ authenticators are callable objects inheriting from requests.auth.AuthBase class. Writing your own is hence a matter of defining a subclass of AuthBase with at least a __call__ method:

class SillyAuth(AuthBase):
    def __call__(self, request):
        request.headers['X-ID'] = 'im valid so auth is yes'
        return request

# usage
requests.get('http://example.com', auth=SillyAuth())

The job of an authenticator is to modify the request so that it includes appropriate credentials in whatever form necessary to have them accepted by the remote server. Like I’ve mentioned before, HTTP headers are the most common option, but the request can be modified in other ways as well.

Query string parameters

One problem with modifying a query string, though, is that it’s a part of request URL. By the time it reaches authenticators, the Requests library has already merged any additional query params into it2. Including more params will thus require modifying the URL.

Though it may sound like a risky endeavour involving string manipulations that are fraught with security issues, it’s not really that bad at all. In fact, the Requests library provides an API to do exactly this:

class QueryStringAuth(AuthBase):
    """Authenticator that attaches a set of query string parameters
    (e.g. an API key) to the request.
    """
    def __init__(self, **params):
        self.params = params

    def __call__(self, request):
        if self.params:
            request.prepare_url(request.url, self.params)
        return request

Albeit scantly documented, the prepare_url method will take an existing URL and a dictionary of query string params, and outfit the request with a brand new URL that contains those params neatly encoded.

Full implementation of QueryStringAuth is a little more involved than the snippet above, because we should like to replicate all the idiosyncracies of how regular Requests API handles query string params. Some of them — like allowing both strings and lists as param values — are taken care of by prepare_url itself, but the rest should be dealt with on our own.

Usage

To finish up, let’s use this authenticator to call Steam API and return a list of games that a given user owns but hasn’t played yet:

STEAM_API_KEY = 'a1b2c3d4e5f6g7h8i9j'  # not a real one, of course


def get_steam_backlog(steam_id):
    url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/'
    params = {
        'steamid': steam_id,
        'include_appinfo': 1,
    }

    response = requests.get(
        url, params=params, auth=QueryStringAuth(key=STEAM_API_KEY))
    games = response.json().get('response', {}).get('games', ())

    for game in games:
        if game.get('playtime_forever', 0):
            continue
        yield game['name']

We could’ve put STEAM_API_KEY directly in params, of course. Singling it out explicitly as an authentication detail, however, makes the code clearer and plays nicely with more advanced features of Requests, such as sessions.


  1. It can be said that only in this case we’re dealing with exclusively authentication, whereas the others also perform authorization. I wouldn’t quibble too much about those details. The fact that both terms are often shortened to “auth” doesn’t exactly help with distinguishing them anyway. 

  2. In fact, what AuthBase.__call__ receives is a special PreparedRequest object which contains the exact bytes that’ll be sent to the server. Most of the higher level abstractions offered by the Requests library (like form data or JSON request body) has been compiled to raw octets at this point. This is done to allow some authenticators (like OAuth) to analyze the full request payload and sign it cryptographically as a part of their flow. 

Continue reading

Automatic error pages in Flask

Posted on Tue 08 September 2015 in Code • Tagged with Flask, Python, HTTP, errorsLeave a comment

In Flask, a popular web framework for Python, it’s pretty easy to implement custom handlers for specific HTTP errors. All you have to do is to write a function and annotate it with the @errorhandler decorator:

@app.errorhandler(404)
def not_found(error):
    return render_template('errors/404.html')

Albeit simple, the above example is actually very realistic. There’s rarely anything else to do in response to serious request error than to send back some HTML with an appropriate message. If you have more handlers like that, though, you’ll notice they get pretty repetitive: for each erroneous HTTP status code (404, 403, 400, etc.), pick a corresponding template and simply render it.

Which is, of course, why we’d want to deal with all in a little smarter way and with less boilerplate.

Just add a template

Ideally, we would like to avoid writing any Python code at all for each individual error handler. Since all we’re doing revolves around predefined templates, let’s just define the handlers automatically based on the HTML files themselves.

Assuming we store them in the same template directory — say, errors/ — and name their files after numeric status codes (e.g. 404.html), getting all those codes is quite simple1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from pathlib import Path, PurePath

from mywebapp import app


#: Path to the directory where HTML templates for error pages are stored.
ERRORS_DIR = PurePath('errors')


def get_supported_error_codes():
    """Returns an iterable of HTTP status codes for which we have
    a custom error page templates defined.
    """
    error_templates_dir = Path(app.root_path, app.template_folder, ERRORS_DIR)

    potential_error_templates = (
        entry for entry in error_templates_dir.glob('*.html')
        if entry.is_file())
    for template in potential_error_templates:
        try:
            code = int(template.stem)  # e.g. 404.html
        except ValueError:
            pass  # could be some base.html template, or similar
        else:
            if code < 400:
                app.logger.warning(
                    "Found error template for non-error HTTP status %s", code)
                continue
            yield code

Once we have them, we can try wiring up the handlers programmatically and making them do the right thing.

One function to handle them all

Although I’ve used a plural here, in reality we only need a single handler, as it’ll be perfectly capable of dealing with any HTTP error we bind it to. To help distinguishing between the different status codes, Flask by default invokes our error handlers with an HTTPException argument that has the code as an attribute:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from flask import render_template
from jinja2 import TemplateNotFound


def error_handler(error):
    """Universal handler for all HTTP errors.

    :param error: :class:`~werkzeug.exceptions.HTTPException`
                  representing the HTTP error.

    :return: HTTP response to be returned
    """
    code = getattr(error, 'code', None)
    if not code:
        app.logger.warning(
            "Got spurious argument to HTTP error handler: %r", error)
        return FATAL_ERROR_RESPONSE

    app.logger.debug("HTTP error %s, rendering error page", code)
    template = ERRORS_DIR / ('%s.html' % code)
    try:
        return render_template(str(template)), code
    except TemplateNotFound:
        # shouldn't happen if the handler has been wired up properly
        app.logger.fatal("Missing template for HTTP error page: %s", template)
        return FATAL_ERROR_RESPONSE

#: Response emitted when an error occurs in the error handler itself.
FATAL_ERROR_RESPONSE = ("Internal Server Error", 500)

Catching TemplateNotFound exception is admittedly a little paranoid here, as it can occur pretty much exclusively due to a programming error elsewhere. Feel free to treat it as a failed assertion about application’s internal state and e.g. convert to AssertionError if desirable.

The setup

The final step is to actually set up the handler(s):

for code in get_supported_error_codes():
    app.errorhandler(code)(error_handler)

It may look a bit wonky, but it’s just a simple desugaring of the standard Python decorator syntax from the first example. There exists a more direct approach of putting the handler inside app.error_handler_spec dictionary, but it is discouraged by Flask documentation.

Where to put the above code, though? I posit it’s perfectly fine to place in the module’s global scope, because the error handlers (and other request handlers) are traditionally defined at import time anyway.

Also notice that the default error handling we’ve defined here doesn’t preclude more specialized variants for select HTTP codes. All you have to do is ensure that your custom @app.errorhandler definition occurs after the above loop.


  1. Note that I’m using the pathlib module here — and you should, too, since it’s all around awesome. However, it is only a part of the standard library since Python 3.4, so you will most likely need to get the backport first. 

Continue reading