My most recent Rust crate
is an API client for the Path of Exile’s public stash tabs.
One problem that I had to solve while writing it was to turn a sequence of paginated items
(in this case, player stash tabs) into a single, asynchronous Stream
.
In this post, I’ll explain how to use
the Stream
interface,
along with functions from the futures
crate,
to create a single Stream
from multiple batches of entities.
Pagination 101
To divide a long list of items into pages
is a very common pattern in many HTTP-based APIs.
If the client requests a sequence of entities
that would be too large to serve as a single response,
there has to be some way to split it over multiple HTTP roundtrips.
To accomplish that, API servers will often return a constant number of items at first (like 50),
followed by some form of continuation token:
$ curl http://api.example.com/items
{
"items": [
{...},
{...},
{...}
],
"continuationToken": "e53c68db0ee412ac239173db147a02a0"
}
Such token is preferably an opaque sequence of bytes,
though sometimes it can be an explicit offset (index) into the list of results.
Regardless of its exact nature, clients need to pass the token with their next request
in order to obtain another batch of results:
$ curl 'http://api.example.com/items?after=e53c68db0ee412ac239173db147a02a0'
{
"items": [
{...},
{...}
],
"continuationToken": "4e3986e4c7f591b8cb17cf14addd40a6"
}
Repeat this procedure for as long as the response contains a continuation token,
and you will eventually go through the entire sequence.
If it’s really, really long (e.g. it’s a Twitter firehose for a popular hashtag),
then you may of course hit some problems due to the sheer number of requests.
For many datasets, however, this pagination scheme is absolutely sufficient
while remaining relatively simple for clients to implement.
Stream it in Rust
What the client code would typically do, however,
is to hide the pagination details completely
and present only the final, unified sequence of items.
Such abstraction is useful even for end-user applications,
but it’s definitely expected from any shared library that wraps the third-party API.
Depending on your programming language of choice,
this abstraction layer may be very simple to implement.
Here’s how it could be done in Python,
whose concepts of iterables and generators are a perfect fit for this task:
import requests
def iter_items(after=None):
"""Yield items from an example API.
:param after: Optional continuation token
"""
while True:
url = "http://api.example.com/items"
if after is not None:
url += "?after=%s" % after
response = requests.get(url)
response.raise_for_status()
for item in response.json()['items']:
yield item
after = response.json().get("continuationToken")
if after is None:
break
# consumer
for item in iter_items():
print(item)
In Rust, you can find their analogues in the Iterator
and Stream
traits,
so we’re off to a pretty good start.
What’s missing, however, is the equivalent of yield
:
something to tell the consumer “Here, have the next item!”,
and then go back to the exact same place in the producer function.
This ability to jump back and forth between two (or more) functions
involves having a language support for coroutines.
Not many mainstream languages pass this requirement,
although Python and C# would readily come to mind.
In case of Rust, there have been some nightly
proposals
and experiments,
but nothing seems to be stabilizing anytime soon.
DIY streaming
But of course, if you do want a Stream
of paginated items,
there is at least one straightforward solution:
just implement the Stream
trait directly.
This is actually quite a viable approach, very similar to rolling out a custom Iterator
.
Some minor differences stem mostly from a more complicated state management in Stream::poll
compared to Iterator::next
.
While an iterator is either exhausted or not,
a stream can also be waiting
for the next item to “arrive” (Ok(Async::NotReady)
),
or have errored out permanently (Err(e)
).
As a consequence, the return value of Stream::poll
is
slightly more complex
than just plain Option
, but nevertheless quite manageable.
Irrespective of difficulty,
writing a custom Stream
from scratch would inevitably involve a lot of boilerplate.
You may find it necessary in more complicated applications, of course,
but for something that’s basically a glorified while
loop,
it doesn’t seem like a big ask to have a more concise solution.
The stream unfolds
Fortunately there is one!
Its crucial element is the standalone
stream::unfold
function
from the futures
crate:
pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> where
F: FnMut(T) -> Option<Fut>,
Fut: IntoFuture<Item = (It, T)>,
Reading through the signature of this function can be a little intimidating at first.
Part of it is Rust’s verbose syntax for anything
that involves both trait bounds and closures,
making stream::unfold
seem more complicated than it actually is.
Indeed, if you’ve ever used Iterator
adapters like .filter_map
or .fold
,
the unfold
function will be pretty easy to understand.
(And if you haven’t, don’t worry! It’s really quite simple :))
If you look closely, you’ll see that stream::unfold
takes the following two arguments:
- first one is essentially an arbitrary initial value, called a seed
- second one is a closure that receives the seed and returns an optional pair of values
What are those values?…
Well, the entire purpose of the unfold
function is to create a Stream
,
and a stream should inevitably produce some items.
Consequently, the first value in the returned pair will be the next item in the stream.
And what about the second value? That’s just the next state of the seed!
It will be received by the very same closure
when someone asks the Stream
to produce its next item.
By passing around a useful value — say, a continuation token —
you can create something that’s effectively a while
loop from the Python example above.
The last important bits about this pair of values is the wrapping.
First, it is actually a Future
, allowing your stream to yield objects
that it doesn’t quite have yet —
for example, those which ultimately come from an HTTP response.
Secondly, its outermost layer is an Option
.
This enables you to terminate the stream when the underlying source is exhausted
by simply returning None
.
Until then, however, you should return Some
with the (future of) aforementioned pair of values.
Paginate! Paginate!
If you have doubts about how all those pieces of stream::unfold
fit in,
then looking at the usage example in the docs
may give you some idea of what it enables you to do.
It’s a very artificial example, though:
the resulting Stream
isn’t waiting for any asynchronous Future
s,
which is the very reason you’d use a Stream
over an Iterator
in the first place.
We can find a more natural application for unfold
if we go back to our original problem.
To reiterate, we want to repeatedly query an HTTP API for a long list of items,
giving our callers a Stream
of such items they can process at their leisure.
At the same time, all the details about pagination and handling of continuation tokens or offsets
should be completely hidden from the caller.
To employ stream::unfold
for this task,
we need two things: the initial seed, and an appropriate closure.
I have hinted already at using the continuation token as our seed,
or the state that we pass around from one closure invocation to another.
What remains is mostly making the actual HTTP request and interpreting the JSON response,
for which we’ll use the defacto standard Rust crates:
hyper
, Serde,
and serde_json
:
use std::error::Error;
use futures::{future, Future, stream, Stream};
use hyper::{Client, Method};
use hyper::client::Request;
use serde_json;
use tokio_core::reactor::Handle;
const URL: &str = "http://api.example.com/items";
fn items(
handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
let client = Client::new(handle);
Box::new(stream::unfold(after, move |cont_token| {
let url = match cont_token {
Some(ct) => format!("{}?after={}", URL, ct),
None => return None,
};
let req = Request::new(Method::Get, url.parse().unwrap());
Some(client.request(req).from_err().and_then(move |resp| {
let status = resp.status();
resp.body().concat2().from_err().and_then(move |body| {
if status.is_success() {
serde_json::from_slice::<ItemsResponse>(&body)
.map_err(Box::<Error>::from)
} else {
Err(format!("HTTP status: {}", status).into())
}
})
.map(move |items_resp| {
(stream::iter_ok(items_resp.items), items_resp.continuation_token)
})
}))
})
.flatten())
}
#[derive(Deserialize)]
struct ItemsResponse {
items: Vec<Item>,
#[serde(rename = "continuationToken")]
continuation_token: Option<String>,
}
While this code
may be a little challenging to decipher at first,
it’s not out of line compared to how working with Future
s and Stream
s looks like in general.
In either case, you can expect a lot of .and_then
callbacks :)
There is one detail here that I haven’t mentioned previously, though.
It relates to the stream::iter_ok
and Stream::flatten
calls
which you may have already noticed.
The issue with stream::unfold
is that it only allows to yield an item once per closure invocation.
For us, this is too limiting: a single batch response from the API will contain many such items,
but we have no way of “splitting” them.
What we can do instead is to produce a Stream
of entire batches of items,
at least at first, and then flatten
it.
What Stream::flatten
does here is to turn a nested Stream<Stream<Item>>
into a flat Stream<Item>
. The latter is what we eventually want to return,
so all we need now is to create this nested stream of streams.
How? Well, that’s actually pretty easy.
We can already deserialize a Vec<Item>
from the JSON response — that’s our item batch! —
which is essentially an iterable of Item
s.
Another utility function from the stream
module,
namely stream::iter_ok
,
can readily turn such iterable into a “immediate” Stream
.
Such Stream
won’t be asynchronous at all
— its items will have been ready from the very beginning —
but it will still conform to the Stream
interface,
enabling it to be flatten
ed as we request.
But wait! There is a bug!
So in the end, is this the solution we’re looking for?…
Well, almost. First, here’s the expected usage of the function we just wrote:
let mut core = tokio_core::reactor::Core::new().unwrap();
core.run({
let continuation_token = None; // start from the beginning
items(&core.handle(), continuation_token).for_each(|item| {
println!("{:?}", item);
Ok(())
})
}).unwrap();
While this is more complicated than the plain for
loop in Python,
most of it is just Tokio boilerplate.
The notable part is the invocation of items()
,
where we pass None
as a continuation token to indicate that
we want the entire sequence, right from its beginning.
And since we’re talking about fetching long sequences,
we would indeed expect a lot of items.
So it is probably quite surprising to hear
that the stream we’ve created here will be completely empty.
…What? How?!
If you look again at the source code of items()
,
the direct reason should be pretty easy to find.
The culprit lies in the return None
branch of the first match
.
If we don’t pass Some(continuation_token)
as a parameter to items()
,
this branch will be hit immediately,
terminating the stream before it had a chance to produce anything.
It may not be very clear how to fix this problem.
After all, the purpose of the match
was to detect the end of the sequence,
but it apparently prevents us from starting it in the first place!
Looking at the problem from another angle,
we can see we’ve conflated two distinct states of our stream
— “before it has started” and “after it’s ended” — into a single one (“no continuation token”).
Since we obviously don’t want to make the after
parameter mandatory
— users should be able to say “Give me everything!” —
we need another way of telling those two states apart.
In terms of Rust types, it seems that Option<String>
is no longer sufficient
for encoding all possible states of our Stream
.
Although we could try to fix that in some ad-hoc way (e.g. by adding another bool
flag),
it feels cleaner to just define a new, dedicated type.
For one, this allows us to designate a name for each of the states in question,
improving the readability and maintainability of our code:
enum State {
Start(Option<String>),
Next(String),
End,
}
Note that we can put this definition directly inside the items()
function,
without cluttering the module namespace.
All the relevant details of our Stream
are thus nicely contained
within a single function:
fn items(
handle: &Handle, after: Option<String>
) -> Box<Stream<Item=Item, Error=Box<Error>>>
{
// (definition of State enum can go here)
let client = Client::new(handle);
Box::new(stream::unfold(State::Start(after), move |state| {
let cont_token = match state {
State::Start(opt_ct) => opt_ct,
State::Next(ct) => Some(ct),
State::End => return None,
};
let url = match cont_token {
Some(ct) => format!("{}?after={}", URL, ct),
None => URL.into(),
};
let req = Request::new(Method::Get, url.parse().unwrap());
Some(client.request(req).from_err().and_then(move |resp| {
let status = resp.status();
resp.body().concat2().from_err().and_then(move |body| {
if status.is_success() {
serde_json::from_slice::<ItemsResponse>(&body)
.map_err(Box::<Error>::from)
} else {
Err(format!("HTTP status: {}", status).into())
}
})
.map(move |items_resp| {
let next_state = match items_resp.continuation_token {
Some(ct) => State::Next(ct),
None => State::End,
};
(stream::iter_ok(items_resp.items), next_state)
})
}))
})
.flatten())
}
Sure, there is a little more bookkeeping required now,
but at least all the items are being emitted by the Stream
as intended.
You can see the complete source in
the playground here.
Continue reading