Unfolding a Stream of paginated items

Posted on Wed 24 January 2018 in Code • Tagged with Rust, Tokio, streams, HTTPLeave a comment

My most recent Rust crate is an API client for the Path of Exile’s public stash tabs. One problem that I had to solve while writing it was to turn a sequence of paginated items (in this case, player stash tabs) into a single, asynchronous Stream.

In this post, I’ll explain how to use the Stream interface, along with functions from the futures crate, to create a single Stream from multiple batches of entities.

Pagination 101

To divide a long list of items into pages is a very common pattern in many HTTP-based APIs.

If the client requests a sequence of entities that would be too large to serve as a single response, there has to be some way to split it over multiple HTTP roundtrips. To accomplish that, API servers will often return a constant number of items at first (like 50), followed by some form of continuation token:

$ curl http://api.example.com/items
{
    "items": [
        {...},
        {...},
        {...}
    ],
    "continuationToken": "e53c68db0ee412ac239173db147a02a0"
}

Such token is preferably an opaque sequence of bytes, though sometimes it can be an explicit offset (index) into the list of results1. Regardless of its exact nature, clients need to pass the token with their next request in order to obtain another batch of results:

$ curl 'http://api.example.com/items?after=e53c68db0ee412ac239173db147a02a0'
{
    "items": [
        {...},
        {...}
    ],
    "continuationToken": "4e3986e4c7f591b8cb17cf14addd40a6"
}

Repeat this procedure for as long as the response contains a continuation token, and you will eventually go through the entire sequence. If it’s really, really long (e.g. it’s a Twitter firehose for a popular hashtag), then you may of course hit some problems due to the sheer number of requests. For many datasets, however, this pagination scheme is absolutely sufficient while remaining relatively simple for clients to implement.

Stream it in Rust

What the client code would typically do, however, is to hide the pagination details completely and present only the final, unified sequence of items. Such abstraction is useful even for end-user applications, but it’s definitely expected from any shared library that wraps the third-party API.

Depending on your programming language of choice, this abstraction layer may be very simple to implement. Here’s how it could be done in Python, whose concepts of iterables and generators are a perfect fit for this task2:

import requests

def iter_items(after=None):
    """Yield items from an example API.
    :param after: Optional continuation token
    """
    while True:
        url = "http://api.example.com/items"
        if after is not None:
            url += "?after=%s" % after
        response = requests.get(url)
        response.raise_for_status()
        for item in response.json()['items']:
            yield item
        after = response.json().get("continuationToken")
        if after is None:
            break

# consumer
for item in iter_items():
    print(item)

In Rust, you can find their analogues in the Iterator and Stream traits, so we’re off to a pretty good start. What’s missing, however, is the equivalent of yield: something to tell the consumer “Here, have the next item!”, and then go back to the exact same place in the producer function.

This ability to jump back and forth between two (or more) functions involves having a language support for coroutines. Not many mainstream languages pass this requirement, although Python and C# would readily come to mind. In case of Rust, there have been some nightly proposals and experiments, but nothing seems to be stabilizing anytime soon.

DIY streaming

But of course, if you do want a Stream of paginated items, there is at least one straightforward solution: just implement the Stream trait directly.

This is actually quite a viable approach, very similar to rolling out a custom Iterator. Some minor differences stem mostly from a more complicated state management in Stream::poll compared to Iterator::next. While an iterator is either exhausted or not, a stream can also be waiting for the next item to “arrive” (Ok(Async::NotReady)), or have errored out permanently (Err(e)). As a consequence, the return value of Stream::poll is slightly more complex than just plain Option, but nevertheless quite manageable.

Irrespective of difficulty, writing a custom Stream from scratch would inevitably involve a lot of boilerplate. You may find it necessary in more complicated applications, of course, but for something that’s basically a glorified while loop, it doesn’t seem like a big ask to have a more concise solution.

The stream unfolds

Fortunately there is one! Its crucial element is the standalone stream::unfold function from the futures crate:

pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> where
    F: FnMut(T) -> Option<Fut>,
    Fut: IntoFuture<Item = (It, T)>,

Reading through the signature of this function can be a little intimidating at first. Part of it is Rust’s verbose syntax for anything that involves both trait bounds and closures3, making stream::unfold seem more complicated than it actually is. Indeed, if you’ve ever used Iterator adapters like .filter_map or .fold, the unfold function will be pretty easy to understand. (And if you haven’t, don’t worry! It’s really quite simple :))

If you look closely, you’ll see that stream::unfold takes the following two arguments:

  • first one is essentially an arbitrary initial value, called a seed
  • second one is a closure that receives the seed and returns an optional pair of values

What are those values?… Well, the entire purpose of the unfold function is to create a Stream, and a stream should inevitably produce some items. Consequently, the first value in the returned pair will be the next item in the stream.

And what about the second value? That’s just the next state of the seed! It will be received by the very same closure when someone asks the Stream to produce its next item. By passing around a useful value — say, a continuation token — you can create something that’s effectively a while loop from the Python example above.

The last important bits about this pair of values is the wrapping.

First, it is actually a Future, allowing your stream to yield objects that it doesn’t quite have yet — for example, those which ultimately come from an HTTP response.

Secondly, its outermost layer is an Option. This enables you to terminate the stream when the underlying source is exhausted by simply returning None. Until then, however, you should return Some with the (future of) aforementioned pair of values.

Paginate! Paginate!

If you have doubts about how all those pieces of stream::unfold fit in, then looking at the usage example in the docs may give you some idea of what it enables you to do. It’s a very artificial example, though: the resulting Stream isn’t waiting for any asynchronous Futures, which is the very reason you’d use a Stream over an Iterator in the first place4.

We can find a more natural application for unfold if we go back to our original problem. To reiterate, we want to repeatedly query an HTTP API for a long list of items, giving our callers a Stream of such items they can process at their leisure. At the same time, all the details about pagination and handling of continuation tokens or offsets should be completely hidden from the caller.

To employ stream::unfold for this task, we need two things: the initial seed, and an appropriate closure.

I have hinted already at using the continuation token as our seed, or the state that we pass around from one closure invocation to another. What remains is mostly making the actual HTTP request and interpreting the JSON response, for which we’ll use the defacto standard Rust crates: hyper, Serde, and serde_json:

use std::error::Error;

use futures::{future, Future, stream, Stream};
use hyper::{Client, Method};
use hyper::client::Request;
use serde_json;
use tokio_core::reactor::Handle;

const URL: &str = "http://api.example.com/items";

fn items(
    handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
    let client = Client::new(handle);
    Box::new(stream::unfold(after, move |cont_token| {
        let url = match cont_token {
            Some(ct) => format!("{}?after={}", URL, ct),
            None => return None,
        };
        let req = Request::new(Method::Get, url.parse().unwrap());
        Some(client.request(req).from_err().and_then(move |resp| {
            let status = resp.status();
            resp.body().concat2().from_err().and_then(move |body| {
                if status.is_success() {
                    serde_json::from_slice::<ItemsResponse>(&body)
                        .map_err(Box::<Error>::from)
                } else {
                    Err(format!("HTTP status: {}", status).into())
                }
            })
            .map(move |items_resp| {
                (stream::iter_ok(items_resp.items), items_resp.continuation_token)
            })
        }))
    })
    .flatten())
}

#[derive(Deserialize)]
struct ItemsResponse {
    items: Vec<Item>,
    #[serde(rename = "continuationToken")]
    continuation_token: Option<String>,
}

While this code may be a little challenging to decipher at first, it’s not out of line compared to how working with Futures and Streams looks like in general. In either case, you can expect a lot of .and_then callbacks :)

There is one detail here that I haven’t mentioned previously, though. It relates to the stream::iter_ok and Stream::flatten calls which you may have already noticed.
The issue with stream::unfold is that it only allows to yield an item once per closure invocation. For us, this is too limiting: a single batch response from the API will contain many such items, but we have no way of “splitting” them.

What we can do instead is to produce a Stream of entire batches of items, at least at first, and then flatten it. What Stream::flatten does here is to turn a nested Stream<Stream<Item>> into a flat Stream<Item>. The latter is what we eventually want to return, so all we need now is to create this nested stream of streams.

How? Well, that’s actually pretty easy.

We can already deserialize a Vec<Item> from the JSON response — that’s our item batch! — which is essentially an iterable of Items5. Another utility function from the stream module, namely stream::iter_ok, can readily turn such iterable into a “immediate” Stream. Such Stream won’t be asynchronous at all — its items will have been ready from the very beginning — but it will still conform to the Stream interface, enabling it to be flattened as we request.

But wait! There is a bug!

So in the end, is this the solution we’re looking for?…

Well, almost. First, here’s the expected usage of the function we just wrote:

let mut core = tokio_core::reactor::Core::new().unwrap();
core.run({
    let continuation_token = None;  // start from the beginning
    items(&core.handle(), continuation_token).for_each(|item| {
        println!("{:?}", item);
        Ok(())
    })
}).unwrap();

While this is more complicated than the plain for loop in Python, most of it is just Tokio boilerplate. The notable part is the invocation of items(), where we pass None as a continuation token to indicate that we want the entire sequence, right from its beginning.

And since we’re talking about fetching long sequences, we would indeed expect a lot of items. So it is probably quite surprising to hear that the stream we’ve created here will be completely empty.

…What? How?!

If you look again at the source code of items(), the direct reason should be pretty easy to find. The culprit lies in the return None branch of the first match. If we don’t pass Some(continuation_token) as a parameter to items(), this branch will be hit immediately, terminating the stream before it had a chance to produce anything.

It may not be very clear how to fix this problem. After all, the purpose of the match was to detect the end of the sequence, but it apparently prevents us from starting it in the first place!

Looking at the problem from another angle, we can see we’ve conflated two distinct states of our stream — “before it has started” and “after it’s ended” — into a single one (“no continuation token”). Since we obviously don’t want to make the after parameter mandatory — users should be able to say “Give me everything!” — we need another way of telling those two states apart.

In terms of Rust types, it seems that Option<String> is no longer sufficient for encoding all possible states of our Stream. Although we could try to fix that in some ad-hoc way (e.g. by adding another bool flag), it feels cleaner to just define a new, dedicated type. For one, this allows us to designate a name for each of the states in question, improving the readability and maintainability of our code:

enum State {
    Start(Option<String>),
    Next(String),
    End,
}

Note that we can put this definition directly inside the items() function, without cluttering the module namespace. All the relevant details of our Stream are thus nicely contained within a single function:

fn items(
    handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
    // (definition of State enum can go here)

    let client = Client::new(handle);
    Box::new(stream::unfold(State::Start(after), move |state| {
        let cont_token = match state {
            State::Start(opt_ct) => opt_ct,
            State::Next(ct) => Some(ct),
            State::End => return None,
        };
        let url = match cont_token {
            Some(ct) => format!("{}?after={}", URL, ct),
            None => URL.into(),
        };
        let req = Request::new(Method::Get, url.parse().unwrap());
        Some(client.request(req).from_err().and_then(move |resp| {
            let status = resp.status();
            resp.body().concat2().from_err().and_then(move |body| {
                if status.is_success() {
                    serde_json::from_slice::<ItemsResponse>(&body)
                        .map_err(Box::<Error>::from)
                } else {
                    Err(format!("HTTP status: {}", status).into())
                }
            })
            .map(move |items_resp| {
                let next_state = match items_resp.continuation_token {
                    Some(ct) => State::Next(ct),
                    None => State::End,
                };
                (stream::iter_ok(items_resp.items), next_state)
            })
        }))
    })
    .flatten())
}

Sure, there is a little more bookkeeping required now, but at least all the items are being emitted by the Stream as intended.


You can see the complete source in the playground here.


  1. Furthermore, the token doesn’t have to come as part of the HTTP response body. Some API providers (such as GitHub) may use the Link: header to point directly to the next URL to query. 

  2. This example uses “traditional”, synchronous Python code. However, it should be easy to convert it to the asynchronous equivalent that works in Python 3.5 and above, provided you can replace requests with some async HTTP library. 

  3. If you are curious whether other languages could express it better, you can check the Data.Conduit.List.unfold function from the Haskell’s conduit package. For most intents and purposes, it is their equivalent of stream::unfold

  4. Coincidentally, you can create iterators in the very same manner through the itertools::unfold function from the itertools crate

  5. In more technical Rust terms, it means Vec implements the IntoIterator trait, allowing anyone to get an Iterator from it. 

Continue reading

Terminating a Stream in Rust

Posted on Sat 16 December 2017 in Code • Tagged with Rust, streams, Tokio, asyncLeave a comment

Here’s a little trick that may be useful in dealing with asynchronous Streams in Rust.

When you consume a Stream using the for_each method, its default behavior is to finish early should an error be produced by the stream:

use futures::prelude::*;
use futures::stream;
use tokio_core::reactor::Core;

let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)]);
let fut = s.for_each(|n| {
    println!("{}", n);
    Ok(())
});

In more precise terms, it means that the Future returned by for_each will resolve with the first error from the underlying stream:

// Prints 1, 2, and then panics with "false".
Core::new().unwrap().run(fut).unwrap();

For most purposes, this is perfectly alright; errors are generally meant to propagate, after all.

Certain kinds of errors, however, are better off silenced. Perhaps they are expected to pop up during normal program operation, or maybe their occurrence should merely affect program execution in a particular way, and not halt it outright. In a simple case like above, you can of course check what for_each itself has returned, but that doesn’t scale to building larger Stream pipelines.

I encountered a situation like this myself when using the hubcaps library. The code I was writing was meant to search for GitHub issues within a specific repository. In GitHub API, this is accomplished by sending a search query like repo:$OWNER/$NAME, which may result in a rather obscure HTTP error (422 Unprocessable Entity) if the given repository doesn’t actually exist. But I didn’t care about this error; should it occur, I’d simply return an empty stream, because doing so was more convenient for the larger bit of logic that was consuming it.

Unfortunately, the Stream trait offers no interface that’d target this use case. There are only a few methods that even allow to look at errors mid-stream, and even fewer that can end it prematurely. On the flip side, at least we don’t have to consider too many combinations when looking for the solution ;)

Indeed, it seems there are only two Stream methods that are worthy of our attention:

  • Stream::then, because it allows for a closure to receive all stream values (items and errors)
  • Stream::take_while, because it accepts a closure that can end the stream early (but only based on items, not errors)

Combining them both, we arrive at the following recipe:

  • Inside a .then closure, look for Errors that you consider non-fatal and replace them with a special item value. The natural choice for such a value is None. As a side effect, this forces us to convert the regular (“successful”) items into Some(item), effectively transforming a Stream<Item=T> into Stream<Item=Option<T>>.

  • Looks for the special value (i.e. None) in the .take_while closure and terminate the stream when it’s been found.

  • Finally, convert the wrapped items back into their original form using .map, thus giving us back a Stream of Ts.

Applying this technique to our initial example, we get something that looks like this:

let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)])
    .then(|r| match r {
        Ok(r) => Ok(Some(r)),  // no-op passthrough of items
        Err(false) => Ok(None) // non-fatal error, terminate the stream
        Err(e) => Err(e),      // no-op passthrough of other errors
    })
    .take_while(|x| future::ok(x.is_some()))
    .map(Option::unwrap);

If we now try to consume this stream like before:

Core::new().run(
    s.for_each(|n| { println!("{}", n); Ok(()) })
).unwrap();

it will still end after the first two items, but without producing any errors afterwards.


For a more reusable version of the trick, you can check this gist; it adds a Stream::take_while_err method through an extension trait.

This isn’t a perfect solution, however, because it requires Boxing even on nightly Rust1. We can fix that by introducing a dedicated TakeWhileErr stream type, similarly to what native Stream methods do. I leave that as an exercise for the reader ;-)


  1. This is due to a limitation in the impl Trait feature which prevents it from being used as a return type of trait methods. 

Continue reading