Terminating a Stream in Rust

Posted on Sat 16 December 2017 in Code

Here’s a little trick that may be useful in dealing with asynchronous Streams in Rust.

When you consume a Stream using the for_each method, its default behavior is to finish early should an error be produced by the stream:

use futures::prelude::*;
use futures::stream;
use tokio_core::reactor::Core;

let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)]);
let fut = s.for_each(|n| {
    println!("{}", n);

In more precise terms, it means that the Future returned by for_each will resolve with the first error from the underlying stream:

// Prints 1, 2, and then panics with "false".

For most purposes, this is perfectly alright; errors are generally meant to propagate, after all.

Certain kinds of errors, however, are better off silenced. Perhaps they are expected to pop up during normal program operation, or maybe their occurrence should merely affect program execution in a particular way, and not halt it outright. In a simple case like above, you can of course check what for_each itself has returned, but that doesn’t scale to building larger Stream pipelines.

I encountered a situation like this myself when using the hubcaps library. The code I was writing was meant to search for GitHub issues within a specific repository. In GitHub API, this is accomplished by sending a search query like repo:$OWNER/$NAME, which may result in a rather obscure HTTP error (422 Unprocessable Entity) if the given repository doesn’t actually exist. But I didn’t care about this error; should it occur, I’d simply return an empty stream, because doing so was more convenient for the larger bit of logic that was consuming it.

Unfortunately, the Stream trait offers no interface that’d target this use case. There are only a few methods that even allow to look at errors mid-stream, and even fewer that can end it prematurely. On the flip side, at least we don’t have to consider too many combinations when looking for the solution ;)

Indeed, it seems there are only two Stream methods that are worthy of our attention:

  • Stream::then, because it allows for a closure to receive all stream values (items and errors)
  • Stream::take_while, because it accepts a closure that can end the stream early (but only based on items, not errors)

Combining them both, we arrive at the following recipe:

  • Inside a .then closure, look for Errors that you consider non-fatal and replace them with a special item value. The natural choice for such a value is None. As a side effect, this forces us to convert the regular (“successful”) items into Some(item), effectively transforming a Stream<Item=T> into Stream<Item=Option<T>>.

  • Looks for the special value (i.e. None) in the .take_while closure and terminate the stream when it’s been found.

  • Finally, convert the wrapped items back into their original form using .map, thus giving us back a Stream of Ts.

Applying this technique to our initial example, we get something that looks like this:

let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)])
    .then(|r| match r {
        Ok(r) => Ok(Some(r)),  // no-op passthrough of items
        Err(false) => Ok(None) // non-fatal error, terminate the stream
        Err(e) => Err(e),      // no-op passthrough of other errors
    .take_while(|x| future::ok(x.is_some()))

If we now try to consume this stream like before:

    s.for_each(|n| { println!("{}", n); Ok(()) })

it will still end after the first two items, but without producing any errors afterwards.

For a more reusable version of the trick, you can check this gist; it adds a Stream::take_while_err method through an extension trait.

This isn’t a perfect solution, however, because it requires Boxing even on nightly Rust1. We can fix that by introducing a dedicated TakeWhileErr stream type, similarly to what native Stream methods do. I leave that as an exercise for the reader ;-)

  1. This is due to a limitation in the impl Trait feature which prevents it from being used as a return type of trait methods.