Terminating a Stream in Rust
Posted on Sat 16 December 2017 in Code
Here’s a little trick that may be useful in dealing with
asynchronous Stream
s in Rust.
When you consume a Stream
using
the for_each
method,
its default behavior is to finish early should an error be produced by the stream:
use futures::prelude::*;
use futures::stream;
use tokio_core::reactor::Core;
let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)]);
let fut = s.for_each(|n| {
println!("{}", n);
Ok(())
});
In more precise terms, it means that
the Future
returned by for_each
will resolve with the first error
from the underlying stream:
// Prints 1, 2, and then panics with "false".
Core::new().unwrap().run(fut).unwrap();
For most purposes, this is perfectly alright; errors are generally meant to propagate, after all.
Certain kinds of errors, however, are better off silenced.
Perhaps they are expected to pop up during normal program operation,
or maybe their occurrence should merely affect program execution in a particular way,
and not halt it outright.
In a simple case like above, you can of course check what for_each
itself has returned,
but that doesn’t scale to building larger Stream
pipelines.
I encountered a situation like this myself when using
the hubcaps library.
The code I was writing was meant to
search for GitHub issues
within a specific repository.
In GitHub API, this is accomplished by sending a search query like repo:$OWNER/$NAME
,
which may result in a rather obscure HTTP error (422 Unprocessable Entity)
if the given repository doesn’t actually exist.
But I didn’t care about this error; should it occur, I’d simply return an empty stream,
because doing so was more convenient for the larger bit of logic that was consuming it.
Unfortunately, the Stream
trait offers no interface that’d target this use case.
There are only a few methods that even allow to look at errors mid-stream,
and even fewer that can end it prematurely.
On the flip side, at least we don’t have to consider too many combinations
when looking for the solution ;)
Indeed, it seems there are only two Stream
methods that are worthy of our attention:
Stream::then
, because it allows for a closure to receive all stream values (items and errors)Stream::take_while
, because it accepts a closure that can end the stream early (but only based on items, not errors)
Combining them both, we arrive at the following recipe:
-
Inside a
.then
closure, look forErr
ors that you consider non-fatal and replace them with a special item value. The natural choice for such a value isNone
. As a side effect, this forces us to convert the regular (“successful”)item
s intoSome(item)
, effectively transforming aStream<Item=T>
intoStream<Item=Option<T>>
. -
Looks for the special value (i.e.
None
) in the.take_while
closure and terminate the stream when it’s been found. -
Finally, convert the wrapped items back into their original form using
.map
, thus giving us back aStream
ofT
‘s.
Applying this technique to our initial example, we get something that looks like this:
let s = stream::iter_result(vec![Ok(1), Ok(2), Err(false), Ok(3)])
.then(|r| match r {
Ok(r) => Ok(Some(r)), // no-op passthrough of items
Err(false) => Ok(None) // non-fatal error, terminate the stream
Err(e) => Err(e), // no-op passthrough of other errors
})
.take_while(|x| future::ok(x.is_some()))
.map(Option::unwrap);
If we now try to consume this stream like before:
Core::new().run(
s.for_each(|n| { println!("{}", n); Ok(()) })
).unwrap();
it will still end after the first two items, but without producing any errors afterwards.
For a more reusable version of the trick, you can check
this gist;
it adds a Stream::take_while_err
method through an extension trait.
This isn’t a perfect solution, however, because it requires Box
ing even on nightly Rust1.
We can fix that by introducing a dedicated TakeWhileErr
stream type,
similarly to what native Stream
methods do.
I leave that as an exercise for the reader ;-)
-
This is due to a limitation in the
impl Trait
feature which prevents it from being used as a return type of trait methods. ↩