Category Archives: Tech

Faster sorting with decorate-sort-undecorate

Decorate-sort-undecorate is a simple sorting pattern known for decades, which was additionally popularized by Randall Schwartz in the context of Perl, earning it the moniker Schwartzian transform. While I’ve been aware of the pattern for many years, I only recently realized just how relevant it still is for optimizing everyday sorts.

Consider this code that sorts JSON objects by a field:

/// Sorts a slice of JSON objects by the "age" field.
fn sort_by_age(v: &mut [serde_json::Value]) {
    v.sort_by_key(|o| o["age"].as_i64());
}

// usage:
fn main() {
    let mut v = vec![
        json!({"name": "John", "age": 25}),
        json!({"name": "Abel", "age": 18}),
    ];
    sort_by_age(&mut v);
    assert_eq!(v[0]["name"], "Abel");
    assert_eq!(v[1]["name"], "John");
}

The key function here is not doing anything big or controversial, it consists of a single field lookup followed by cheap extraction of integer out of the Value enum – the kind of thing you’d expect a reasonable key function to do. But running this on a larger slice shows that performance isn’t exactly stellar:

fn large_vec() -> Vec<serde_json::Value> {
    std::iter::repeat_with(|| json!({"name": "John", "age": rand::thread_rng().gen_range(1..=100)}))
        .take(1_000_000)
        .collect()
}

fn main() {
    let mut v = large_vec();
    let t0 = Instant::now();
    sort_by_age(&mut v);
    let n = t0.elapsed().as_secs_f64();
    println!("{}", n);
}

On my laptop, the above code takes 1.01s to sort a slice of 1 million JSON objects. While not exactly a catastrophe, it feels like we should be able to do better than that.

First, we can switch from stable to unstable sort by changing v.sort_by_key(...) to v.sort_unstable_by_key(...). Doing so improves the timing to 0.44s, a more than 2x speed boost. In the use case that inspired this article unstable sort was fine because the comparison function provided fallback ordering when the primary key compared equal, and the initial order was meaningless anyway. We’ll continue to use unstable sort in the rest of the article because that gives best performance, but it’s trivial to revert to stable sort.

Looking at performance further, we notice that accessing a field of a JSON object must perform a hash table lookup. Since a sort has to do n log n comparisons on average, this means that we’re doing multiple lookups per each element. That’s the kind of situation where decorate-sort-undecorate might help, as it caches the extracted keys, reducing the number of lookups to “only” n, at the expense of some memory:

fn sort_by_age(v: &mut [Value]) {
    // extract JSON objects into a new vector, where each object is decorated with age
    let mut decorated: Vec<_> = v
        .iter_mut()
        .map(|o| (o["age"].as_i64(), o.take()))
        .collect();
    // sort with a key function that trivially refers to the pre-fetched age
    decorated.sort_unstable_by_key(|&(age, _)| age);
    // undecorate, i.e. store just the sorted objects back into the slice
    for (dest, (_, o)) in v.iter_mut().zip(decorated) {
        *dest = o;
    }
}

With this modification, my laptop takes only 0.059s to sort the slice of 1 million objects! This is 7.5x faster than the previous version, and 17x faster than the original stable sort. This is a much more radical speedup than I ever expected, and it’s something to definitely consider when your key function does anything more than access a struct/tuple field.

What is the explanation for such a drastic improvement? Let’s modify the key function to keep track of how many times it gets called:

    let mut cnt = 0u32;
    decorated.sort_by_key(|&(age, _)| {cnt += 1; age});
    println!("{}", cnt);

This shows that the key function is invoked 39 million times to sort 1 million elements. This is explained by log2(1_000_000) being approximately 20, so n log n is around 20 million. And for each comparison the key function needs to be called twice – once for the left and once for the right-hand side of the comparison – which adds up to 40 million. So the decoration cuts down the number of hash lookups from 40 million to 1 million, and that certainly makes a difference. Another thing to take into account is that sorting the decorated slice sorts by comparing integers directly present in the array, which allows inner loops of the sorting algorithm to be nicely inlined. On the other hand, sorting by a hashmap lookup requires call into the complex hash table code, which doesn’t inline nearly as nicely.

One downside is that we had to replace an elegant call to sort_unstable_by_key() with a bit of manual twiddling. Fortunately it turns out that the standard library has a method that does all this for us – slice::sort_by_cached_key(). According to its documentation, it sorts the slice and calls the key function at most once per element, “using temporary storage to remember the results of key evaluation”. That is functionally equivalent to our decorate-sort-undecorate, differing only in some implementation details (it doesn’t extract the original values to decorate them, but uses indices instead). With it sort_by_age() again becomes a one-liner:

fn sort_by_age(v: &mut [Value]) {
    v.sort_by_cached_key(|o| o["age"].as_i64().unwrap());
}

On my laptop this takes 0.089s to execute, making it 1.5x times slower than the hand-written decorate-sort-undecorate, but still 11x faster than the original naive implementation. The performance difference compared to the manual implementation seems to come from the fact that sort_by_cached_key() implements stable sorting. If we modify the manual decorate-sort-undecorate to use sort_by_key() instead of sort_unstable_by_key(), sort_by_cached_key() will come out as 1.34x faster, which means it’s reasonably optimized.

In conclusion, if you’re sorting by a non-trivial key function, and you have memory to spare, consider replacing sort_by_key() with sort_by_cached_key() and measure the result. If that helps, and you want to squeeze as much performance as possible, you can further optimize by implementing a manual decorate-sort-undecorate as shown above.

Self-referential types for fun and profit

Many Rust questions are asked frequently, and therefore met with short and well-known answers. “Compile in release mode when measuring performance.” “Don’t try to learn Rust with linked lists.” “Use scoped threads.” “That requires specialization.” But there is one response that is delivered in an almost checkmate-like fashion: “You are trying to construct a self-referential type.” This is swiftly explained to be impossible in current Rust, and the asker advised to pursue a different approach. If they absolutely insist on self-references, they’re referred to std::pin, which is unergonomic, hard to understand, and still requires unsafe. Alternatively, they’re given a list of crates that purport to help with creating self-referential structs, but come with disheartening caveats, and some with safety holes which can turn fatal.

Contrary to usual answers, this will show how to create self-referential types manually. A very important subset of such types are straightforward to reason about and, most importantly, can be manipulated through a safe interface. We will not rewrite the borrow checker, we will not use std::pin, but we will use a bit of unsafe. You’ve been warned.

A practical introduction

Just so you don’t think self-referential structs are a theoretical issue that only happen to people trying to implement specialized data structures, consider this task:

Use the zip crate to implement a function that accepts the name of a ZIP file and the name of a member stored inside it, and returns something that implements Read to stream the member.

Streaming the contents of a ZIP file sounds like a reasonable thing to want to do. We’d like to create a function like this:

fn zip_streamer(file_name: &str, member_name: &str) -> impl std::io::Read {
    todo!()
}

Looking at the crate docs, this seems trivial to implement. Open the file, call ZipArchive::new() to create a ZipArchive, then call ZipArchive::by_name() to get a ZipFile, which implements Read. Presto:

pub fn zip_streamer(file_name: &str, member_name: &str) -> impl std::io::Read {
    let file = File::open(file_name).unwrap();
    let mut archive = ZipArchive::new(file).unwrap();
    archive.by_name(member_name).unwrap()
}

But sadly, this fails to compile. The compiler complains that “borrowed value does not live long enough”:

error[E0597]: `archive` does not live long enough
 --> src/main.rs:8:5
  |
8 |     archive.by_name(member_name).unwrap()
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |     |
  |     borrowed value does not live long enough
  |     argument requires that `archive` is borrowed for `'static`
9 | }
  | - `archive` dropped here while still borrowed

The error message is a bit cryptic at first, it’s not clear how exactly “argument requires that archive is borrowed for static” (or what that even means), but looking at the signature of ZipArchive::by_name() reveals the problem. by_name() returns a ZipFile<'a> where 'a is the lifetime of the archive. The ZipFile we’re attempting to return references archive, a local variable. That would produce a dangling reference and obviously can’t be allowed to compile. The only way for that to work would be if archive were itself static, which is where the requirement “that archive is borrowed for ‘static” comes from.

If you’re an experienced Rustacean, at this point you’re developing a sinking feeling about where this is leading to. If you’re new to the language, or even intermediate, this can still look very much fixable: let’s just wrap both archive and its member into a common struct, return it from the function, and implement Read on the struct. That requires a bit of boilerplate, but should resolve the issue of referencing the local variable:

pub fn zip_streamer(file_name: &str, member_name: &str) -> impl std::io::Read {
    struct ZipStreamer<'a> {
        archive: ZipArchive<File>,
        reader: ZipFile<'a>,
    }

    impl Read for ZipStreamer<'_> {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.reader.read(buf)
        }
    }

    let file = File::open(file_name).unwrap();
    let mut archive = ZipArchive::new(file).unwrap();
    let reader = archive.by_name(member_name).unwrap();

    ZipStreamer { archive, reader }
}

You guessed it, this doesn’t compile either, and in fact fails with a very similar error message about argument requiring archive to be borrowed for ‘static, with this addition:

error[E0505]: cannot move out of `archive` because it is borrowed
  --> src/main.rs:22:17
   |
21 |     let reader = archive.by_name(member_name).unwrap();
   |                  ----------------------------
   |                  |
   |                  borrow of `archive` occurs here
   |                  argument requires that `archive` is borrowed for `'static`
22 |     ZipStreamer { archive, reader }
   |                   ^^^^^^^ move out of `archive` occurs here

Now we have a new problem: in order to construct ZipStreamer, we must move our local variable into a field of the ZipStreamer. But this move immediately invalidates reader, because as far as the borrow checker is concerned, use-after-move creates a dangling reference just like use-after-free does.

This looks bad, but there might still be hope. Can’t we box the ZipArchive to heap-allocate it, or wrap it in an Rc or Arc to postpone its destruction? But try as you might, no approach is allowed to compile.

Without ever being aware of it, we’ve been trying to create the dreaded “self-referential struct”. ZipStreamer is self-referential because the reader field refers to the archive field of the same struct. This makes the 'a lifetime nonsensical, because a lifetime refers to some data in the caller, which just doesn’t apply to the reader field. If instead of impl Read we returned the concrete type ZipStreamer<'a>, we simply wouldn’t have a lifetime to use for 'a. While our function does accept two references, neither of those is useful as the lifetime of ZipStreamer is clearly tied to the data inside itself.

This is by no means specific to the zip crate, the same issue pops up in various scenarios: a function that returns a database connection and an open transaction, or a parser that holds the string it parses and the parsed pieces that refer to string data.

Why Rust rejects self-reference

Before showing how to fix this issue, let’s once more look at the reasons why Rust prevents us from creating a self-referential structure. What would go wrong if we could create one? For example:

// imaginary self-referential struct where `current_token` points into `data`
struct ParseState {
    data: String,               // the string we're parsing
    current_token: &'data str,  // current token pointing into the string
}

There are two problems with current_token pointing into data.

First, there is no way to name the lifetime of current_token. In the above example we used a lifetime named the same as the field, inspired by a Polonius talk. In current Rust the only way to provide a lifetime on a struct member is to either make it 'static or take a lifetime declared on the struct, as in the ZipStreamer<'a> attempt above. But that lifetime is not useful, as it always corresponds to some data outside the type.

Second, Rust must assume that any move of ParseState invalidates current_token. Imagine if, ParseState were declared like this:

struct ParseState {
    data: [u8; 4096],            // the fixed-size buffer we're parsing
    current_token: &'data [u8],  // current token pointing into the buffer
}

Moving ParseState from one place to another will obviously invalidate the reference. This is not the case when the data is a String, or simply a Box, because those types heap-allocate their content, and moving the string or box has no effect on their content. But Rust doesn’t know that, it treats String the same as an array of u8, and assumes that moves are detrimental to borrows. This is the issue that prevents self-referential types from working in scenarios that would be considered perfectly safe and reasonable in C++.

This really bears repeating: there is nothing in Rust abstract machine that prevents self-referential types from working, as long as you are willing to box them, because then you don’t run afoul of the move semantics. The remaining questions are how practical the implementation is, and whether one can expose a safe interface to it. Let’s investigate that

Implementation of a self-referential struct

Armed with the above insight, we can sketch what we must do to make ZipStreamer work:

  • box the ZipArchive in ZipStreamer::archive, so we don’t need to worry about moves breaking the ZipFile reference
  • figure out a lifetime for the ZipFile stored in reader
  • ensure that the references in ZipFile stored reader are valid for the whole lifetime of ZipStreamer.

Here is the result:

pub fn zip_streamer(file_name: &str, member_name: &str) -> impl std::io::Read {
    #[allow(dead_code)]
    struct ZipStreamer {
        // actually has lifetime of `archive`
        // declared first so it's droped before `archive`
        reader: ZipFile<'static>,
        // safety: we must never move out of this box as long as reader is alive
        archive: AliasableBox<ZipArchive<File>>,
    }

    impl Read for ZipStreamer {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.reader.read(buf)
        }
    }

    let file = File::open(file_name).unwrap();
    let mut archive = AliasableBox::from_unique(Box::new(ZipArchive::new(file).unwrap()));
    let reader = archive.by_name(member_name).unwrap();
    let reader = unsafe { std::mem::transmute(reader) };

    ZipStreamer { archive, reader }
}

Wait, this is it?

Explanation

Let’s examine the above code in some detail.

How does it work?

It creates a self-referential type that is safe to move around because the referenced data is behind a box, so its memory location doesn’t change when the top-level type is moved. Since the fields of ZipStreamer are private, no external code can mutate archive, or move it out of its box. When the struct is dropped, the documented drop order ensures that the reader field is dropped before archive field, so the reference never dangles, even when unused.

Is it really safe?

The above function is obviously not safe in the sense of “not using any unsafe”. But it exposes a fully safe external API to functionality that uses unsafe internally – much like many parts of the standard library. In other words, I believe that zip_streamer() is sound, but as always with unsafe code, caveat emptor.

What is the meaning of the ‘static lifetime on ZipFile?

The 'static lifetime is the only lifetime we can use in that position. Any other named lifetime would have to be introduced on the type, and would make it unusable. The use of 'static does mean that we are lying to the borrow checker, as the actual lifetime is the scope of the archive member in the box, which cannot be expressed in today’s Rust. This false lifetime is why we must use unsafe below. The important thing is that this lie cannot result in incorrect code being generated – as ZipFile<'a> is generic over any lifetime and cannot misuse 'static in any way (even specialization doesn’t allow specializing on lifetimes). Unlike generic types, which are monomorphized to a single implementation, no monomorphization is done one lifetimes, which are used to reject unsound code, not to modify what code gets generated.

Some examples that show self-referential types are demonstrated on a reference, and get around the lifetime issue by storing a pointer instead. But that trick wouldn’t work here, as we don’t work with a naked reference, but with the ZipFile<'a> type which contains references. Of course, the same approach would work for a reference.

What is the purpose of transmute()?

The call to transmute() admittedly looks scary, as it’s, quoting actual documentation, incredibly unsafe.

But one of the uses of transmute() is to extend the lifetime of a reference, and that’s exactly what we’re doing here. Transmuting between two different types is fraught with pitfalls because their layouts must match precisely, which in some cases requires forcing a non-standard representation. Transmuting between the same type to extend lifetime is still unsafe, but much easier to reason about from the perspective of correct usage of transmute().

What’s the deal with AliasableBox, wouldn’t Box work as well?

When a Box is moved, Rust assumes (and the borrow checker verifies) that the Box has unique access to its contents. Every code along the lines of:

let b = Box::new(...);
let r = &b.some_field;
let b2 = b;
// use "r"

…will fail to compile because b is moved while there are outstanding references into it. But that’s the moral equivalent of what our code does when it creates archive, creates the reader that refers into it, and then moves archive into the ZipStreamer. It seems okay to do this because moving a box doesn’t change the memory address of its content. But it technically creates a dangling reference because moving the box is defined to invalidate any references into it. A reference surviving the move is something the optimizer is allowed to assume won’t happen and is therefore UB. Running the above example under MIRI reports UB due to an invalidated reference.

To fix the issue, instead of Box we can define a box-like type that internally uses a raw pointer. This has the same semantics as a Box, but prevents the optimizer from assuming that moving it invalidates references to its contents. We use AliasableBox from the aliasable crate for that purpose.

This issue was spotted by /u/Darksonn on reddit.

If this works, why doesn’t async do it?

The approach shown here requires the code adhere to a very specific unsafe contract, such as “you must not move out of this Box” or “reader must be declared before archive otherwise you invoke UB.” Async Rust aims to provide a safer experience by limiting the unsafe portion to the creation of the pin, typically abstracted by the standard library or a crate, and the rest of the code being completely safe and sound.

There are ways to abstract this approach, though, but they come at the cost of ergonomics, and is not yet agreed to be sound in all cases. This is what the self-referential crates provide, and is shown below.

Safe version with ouroboros

Ouroboros is a crate that aims to encapsulate the unsafe contract like the one in the above code into abstractions implemented with a mixture of proc macros and custom types. Here is what the above code looks like using it:

use ouroboros::self_referencing;

pub fn zip_streamer(file_name: &str, member_name: &str) -> impl std::io::Read {
    #[self_referencing]
    struct ZipStreamer {
        archive: ZipArchive<File>,
        #[borrows(mut archive)]
        #[not_covariant]
        reader: ZipFile<'this>,
    }

    impl Read for ZipStreamer {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.with_reader_mut(|reader| reader.read(buf))
        }
    }

    let file = File::open(file_name).unwrap();
    let archive = ZipArchive::new(file).unwrap();
    ZipStreamerBuilder {
        archive,
        reader_builder: |archive| archive.by_name(member_name).unwrap(),
    }
    .build()
}

Under the hood ouroboros generates a very similar struct to the one we’ve created manually:

pub(super) struct ZipStreamer {
    reader: ZipFile<'static>,
    archive: ::ouroboros::macro_help::AliasableBox<ZipArchive<File>>,
}

The 'static lifetime is there, as is a reexport of AliasableBox. Ouroboros tries to provide only safe interfaces – if the code compiles, it should be sound, and require no unsafe. For example it puts the above type definition in a module, which makes reader private, and the only way to access it is using with_reader_mut(). This prevents safe code from ever observing the fictitious 'static lifetime.

As mentioned above, the benefit of using a crate like ouroboros is that it provides a safe abstraction to self-referential type. However, there is a number of downsides as well:

  • It generates a lot of code (500+ lines for the above simple example) which affects compilation times. It also adds convenience constructors on the type, which you can’t turn off, and which sometimes actually get in the way.
  • The generated code contains a lot of unsafe hidden behind abstractions that give a sense of safety, but might turn out to be unsound, and have in the past. (I’ll refrain from linking to individual issues, but they’re easy to find on the tracker, and there’s the example of rental which was abandoned due to soundness issues.)
  • It has a lot of hidden complexity, as it modifies the definition of the type in ways that are sometimes hard to reason about.

In summary, it probably makes sense to use a crate like ouroboros if it works for your use case and if you don’t mind all the extra code it generates. But I’d advise at least being aware what it does for you, as it feels like a bit of a leaky abstraction.

Limitations

The approach shown above, which includes both the hand-written unsafe and ouroboros, does have limitations. An interesting use case of self-referential structs is wrapping foreign data, such as C++ strings and vectors with small string/vector optimization. The approach in this article would technically work for such types, but would not be useful because it requires unconditional allocation, and the whole point of such types is efficiency and avoiding allocations. (They work in C++ by implementing a “move constructor” that fixes up the self-references.)

Another situation where the above doesn’t help is if safe code needs to be able to mutably access parts of the self-referential type. Since it’s safe code, you can’t just prohibit it from moving the reference out of it, you need to prevent it using the type system. This is remarkably achieved through std::pin, although many people have a hard time wrapping their heads around the concept in prctice.

Today self-referential types requires either reasoning about unsafe code, which most people shy away from for very good reason, or using crates with very serious hit on ergonomics. This is what we have to work with until Polonius comes to save us.

The stable HashMap trap

You read about faster hash functions and switch to one. Most of your code gets the expected speed boost, but some parts mysteriously get slower – much slower, especially when dealing with large hashmaps. If this sounds familiar, you might have encountered the stable HashMap trap.

Background

Rust’s HashMap is an implementation of Google’s SwissTable. While this ensures excellent performance of the hash table itself, the hash function used by default is designed to protect against a class of DoS attacks at the price of performance. For use cases where you care about performance and maliciously crafted keys are not an issue, switching to a more efficient hash function can boost performance by a significant amount. Rust hash maps make it fairly easy to change the hash function by switching to a different one, and there are crates that offer high-performance hash functions. The Rust Performance Book recommends, among others, the use of the brutally effective FxHasher from rustc-hash crate and the higher-quality ahash crate. This recommendation, however, comes at a price that is rarely if ever mentioned – some O(n) hash map operations, including deserialization, degrade to O(n**2)!

The issue

Since the issue is easy to stumble upon when deserializing HashMaps, let’s use that as example. We’ll construct a HashSet for simplicity, but HashMap would exhibit the same behavior. This code creates a set of 10m random values, serializes it to a Vec<u8> using bincode, and deserializes it back into a HashSet:

use std::time::Instant;
use rand::Rng;

type TestHashSet<K> = std::collections::HashSet<K>;
//type TestHashSet<K> = rustc_hash::FxHashSet<K>;

fn main() {
    let mut rnd = rand::thread_rng();
    let nums: Vec<_> = (0..10_000_000).map(|_| rnd.gen::<u64>()).collect();

    let t0 = Instant::now();
    let h: TestHashSet<u64> = nums.into_iter().collect();
    let t1 = Instant::now();
    println!("create: {}", (t1 - t0).as_secs_f64());

    let out = bincode::serialize(&h).unwrap();
    let t2 = Instant::now();
    println!("serialize: {}", (t2 - t1).as_secs_f64());

    let h2: TestHashSet<u64> = bincode::deserialize(&out).unwrap();
    let t3 = Instant::now();
    println!("deserialize: {}", (t3 - t2).as_secs_f64());

    println!("{}", h2.len());
}

Output on my machine:

create: 1.440201369
serialize: 0.071765342
deserialize: 1.114031976

Now let’s swith to fxhash to speed it up. We only need to point the TestHashSet alias to rustc_hash::FxHashSet<K> and we’re set. Note that FxHashSet is itself an alias for std::collections::HashSet whose hasher is FxHasher, so it only affects the hash function, not the implementation of the table. With that change, I get this output:

create: 1.391839734
serialize: 0.081116361
deserialize: 5.2052695

Creation is slightly faster, serialization takes about the same time, but deserialization is almost 5x slower than with the stdlib hash! Tested against different set sizes, the deserialization time is close to quadratic – it takes 5s with 10m elements, 20s with 20m elements, 69s with 40m elements, and 219s with 80m elements. We’ve seen the trap.

When does it happen?

First, let’s make it clear that the problem has nothing to do with serde or bincode. Serde just happens to create the set from values obtained by iterating it. The same slowdown can be easily reproduced without serde:

// just as slow as the serde version when TestHashSet is FxHashSet
let mut h2: TestHashSet<u64> = Default::default();
for &n in &h {
    h2.insert(n);
}

Second, this is not a consequence of fxhash being of lower quality or hitting some bad edge case. The same quadratic behavior can be observed using any other stable hash function, including the default SipHash when randomization is omitted. To try it, define TestHashSet<K> as std::collections::HashSet<K, BuildHasherDefault<std::collections::hash_map::DefaultHasher>> (quite a mouthful), which uses the default hash function with a non-random initial state – you’ll get a similar slowdown.

The third thing to be aware of is that you can’t reproduce the issue with a simple clone() or collect() – things like let h2 = h.clone() and let h2: TestHashSet<u64> = h.iter().copied().collect() will be O(n), and almost 10x faster with fxhash than with the default hasher. It also doesn’t happen when you pre-allocate the set using with_capacity(). To reproduce, you need to start with a HashSet that must resize at least once to accommodate the data, such as an empty one. Both clone() and collect() preallocate the HashSet which avoids this performance degradation.

This is why serde hits it – although serde stores the size of every container as a “size hint”, it also caps this hint during deserialization to a low number, for security reasons.

In summary, to reproduce quadratic complexity of construction, all of the following conditions must be met:

  • the HashMap/HashSet must resize to accommodate the values we’re inserting
  • it uses a stable hash function
  • it is built from keys in iteration order of the same HashMap/HashSet

Why does it happen?

This bug is thoroughly described in this 2016 article, which I encourage you to read to understand it. In short, copying a larger table into a smaller one assigns the keys from the first half of the larger table to consecutive locations. Then the keys from the second half of the larger table must fit between the keys which were already densely packed, which forms large collision chains. Although this is a temporary situation, fixed on the next resize, it affects performance visibly. But do refer to the article for details.

The article mentions Robin Hood hashing which is no longer used by Rust hash tables, but don’t let that throw you off. Although the implementation switched to a different probing strategy, it still uses open addressing with linear probing and power-of-two hash table sizes.

The quadratic behavior was fixed for the default hasher by introducing randomization, but the issue is still open because the underlying bug is still present, visible with non-randomized hashers, as in the above code.

Workarounds

All fixes for this bug boil down to shuffling the items in order to avoid the construction in unfavorable order. Since the actual bug that leads to the issue is in the implementation of the hashmap, I refer to these as workarounds rather than proper fixes.

I do hope this bug gets more visibility and gets fixed, as it hurts people who try to use fxhash to get a faster HashMap, and it’s not immediately obvious what’s going on. In general, a nice property of hash tables is that, unlike unbalanced binary trees or quicksort, they’re normally not sensitive to key order. It’s a bit of a shame that that doesn’t apply to Rust’s hash tables with deterministic hashers.

Pre-allocate the HashSet/HashMap

This is by far the simplest fix when you don’t control the exact type of the map, but do control it is created. In the non-serde case, that means initialize the h2 hash map with capacity given upfront:

// fast because h2 is of the correct size to begin with
// note: can't just call with_capacity() because it assumes RandomState from stdlib
let mut h2: TestHashSet<u64> = TestHashSet::with_capacity_and_hasher(h.len(), Default::default());
for &n in &h {
    h2.insert(n);
}

This can be made to work with serde, but requires writing a custom deserializer.

Shuffle the items

Shuffling the elements also fixes the issue. For example:

// fast because h2 is built of elements which are not in iteration order of h
let mut elems: Vec<_> = h.iter().copied().collect();
elems.shuffle(&mut rnd);
let mut h2: TestHashSet<u64> = TestHashSet::default();
for n in elems {
    h2.insert(n);
}

This is a good “quick fix” when you don’t control how the table is constructed, but do control the data that flows into it. It is not that easy to apply to the serde case, where it would require a custom serializer.

Randomize the hasher

This is the most general solution, and is how the issue was worked around in the standard library. The standard library uses the RandomState to build the default hasher.

If you use another hasher, you just need to make sure that you’re using the randomizing one. (This is what AHashMap uses by default, so you don’t need to do anything special.) If your hasher doesn’t provide a random hasher builder, you can always create one using the public API. For example, for FxHasher we can create an FxRandomState that implements BuildHasher:

#[derive(Copy, Clone, Debug)]
pub struct FxRandomState(usize);

impl BuildHasher for FxRandomState {
    type Hasher = FxHasher;

    fn build_hasher(&self) -> FxHasher {
        let mut hasher = FxHasher::default();
        hasher.write_usize(self.0);
        hasher
    }
}

Ideally build_hasher() would just return FxHasher::with_seed(self.0), but with_seed() isn’t provided, so we get equivalent functionality by creating a hasher and mixing the seed into it. The remaining part is to implement Default for our hasher builder, which will obtain the actual seed. To make it efficient, we won’t get a random seed every time, but only when a new thread is created. Then within a thread we’ll assign each FxRandomState (and therefore each hash table) a new consecutive seed.

impl Default for FxRandomState {
    fn default() -> Self {
        thread_local! {
            static SEED: Cell<usize> = Cell::new(rand::thread_rng().gen())
        }
        let seed = SEED.with(|seed| {
            let n = seed.get();
            seed.set(n.wrapping_add(1));
            n
        });
        FxRandomState(seed)
    }
}

With this in place, we can alias TestHashSet<K> to HashSet<K, FxRandomState>, and the quadratic behavior goes away.

Contention on multi-threaded regex matching

Let’s say you need to match the same regex across a large number of strings – perhaps you’re applying a grep-like filter to data generated or received by your program. This toy example demonstrates it by matching a regex against half a billion strings:

use regex::Regex;

lazy_static! {
    static ref IS_INTEGER: Regex = Regex::new("^[0-9]+$").unwrap();
}

fn main() {
    let strings: Vec<&str> = ["foo", "bar", "1234", "1234foo", ""]
        .into_iter()
        .cycle()
        .take(100_000_000)
        .collect();

    let start = Instant::now();
    let n_ints = strings.iter().filter(|s| IS_INTEGER.is_match(s)).count();
    let elapsed = start.elapsed().as_secs_f32();
    println!("{} {}s", n_ints, elapsed);
}

It’s not a scientific benchmark of regex performance, but it does show some interesting and unexpected effects also observed in real-world code. For starters, it takes 2.0s to execute the matches on my laptop.

This is good performance, but it’s a shame to do everything in one thread – let’s try to speed it up by using all cores. This is the kind of thing Rayon makes really easy, just change iter() to par_iter():

use rayon::prelude::*;
let n_ints = strings.par_iter().filter(|s| IS_INTEGER.is_match(s)).count();

Surprisingly, this takes 6-8s to execute on the system with 4 physical cores. In other words, instead of being 3-4x faster due to running on 4 cores, it’s 3-4 times slower. A very similar slowdown occurs on the system with 32 physical cores, where the time grows from 2.05s to 8.2s.

This result can’t be chalked up to an inefficiency in Rayon, as the same slowdown is observed when dividing the work among threads in other ways. Is it possible that matching a compiled regex in multiple threads causes contention when accessed from multiple threads?

When this was first suggested in discussions with coworkers, it seemed quite unlikely, as contention would imply that the compiled regex held a lock or other form of synchronization. This runs counter to the idea of a compiled regex, which one would expect to be fully constructed during compilation. Compiled regexes are often seen in lazy_statics, and shared by the whole program. But no matter how unlikely, the possibility of contention is easy to test, simply by switching from lazy_static! to thread_local!:

thread_local! {
    static IS_INTEGER: Regex = Regex::new("^[0-9]+$").unwrap();
}

The match now needs an additional closure to access the thread-local, but is still quite readable:

use rayon::prelude::*;
let n_ints = strings
    .par_iter()
    .filter(|s| IS_INTEGER.with(|is_integer| is_integer.is_match(s)))
    .count();

Continuing the surprise, this takes 0.66s to run, which is 3x faster than the single-threaded version – the kind of speedup one might realistically expect from a 4-core computer. On the 32-core server, it takes 0.086s, a 24x speedup.

So, regex matching does have a contention issue. The compiled regex type, Regex, wraps the internal Exec type, which holds ProgramCache values organized in a Pool that stores them inside a mutex. Accessed from a single thread or from multiple threads at different times, this mutex is cheap to acquire and is held for a very short time. But under strong contention it becomes a bottle neck with attempts to acquire it falling back to OS-level waits, causing performance to degrade.

The file PERFORMANCE.md dedicates a whole section to this issue. The text is nuanced, so instead of quoting its parts, I encourage you to go ahead and read it. It warns of the performance impact and shows how to eliminate it (using a slightly different approach than taken above), but it also says that it’s “supported and encouraged” to define regexes using lazy_static! and use them from multiple threads. It explains that, despite expectations, a compiled regex doesn’t contain everything needed to match it – some of the compiled state is built lazily while executing particular kinds of search, and is later reused for searches of the same kind. The mutex protects the “scratch space” used for those updates. Design options and tradeoffs are discussed in this issue in more detail.

In summary, for most use cases it’s perfectly fine to use the same compiled regex from multiple threads. But if you have code that does heavy regex matching from multiple threads, and does most or all of it on one regex, you’ll almost certainly want to give different instances of the compiled regex to different threads. As always, be sure to measure performance before actually making such a change to the code.

Rust global variables demystified

Rust has a reputation of a language unfriendly to global variables. While this reputation is not entirely undeserved, most of it stems from guarantees afforded by Rust and not by a desire to stifle the programmer’s creativity. In this article we’ll show how to use global variables, and how to overcome the limitations.

Is it even ok to use global variables?

Global variables are a controversial topic in programming circles, with many educators taking the moral high ground and condemning them as code smell, a shortcut, a crutch, hallmark of throw-away code, and <insert favorite insult>. While there is good reason for the hostility, there is also an abundance of situations where global variables are either appropriate or actually the only way to proceed. For example, pre-compiled regular expressions or state of a logger are examples where you’d probably want to use globals in preference to sending the state all the way from top-level to the bottom-most part of your program. Many low-level system APIs, such as those of signal handlers or hardware-triggered interrupts, have callbacks that don’t receive a “state” argument, so communication between the callback and the rest of the system must go through globals. There are other examples, and in this article we’ll assume that you have good reasons for using globals that your CS professor would approve of. If that’s not the case, don’t worry, we won’t tell.

Declaring globals

A Rust global variable is declared much like any other variable, except it’s declared at top-level and uses static instead of let:

static LOG_LEVEL: u8 = 0;

So we use the keyword static instead of let, and must spell out the type, as the compiler refuses to infer it. That means that you must spell out the type even when it’s unambiguous – static LOG_LEVEL = 0u8 won’t compile.

Note that what we’ve declared is actually a static, which is not necessarily global. If the same declaration appeared in a function, the variable would be visible only inside the function, but its value would still be shared among all invocations of the function and last until the end of the program. This article talks about global variables, but most of the content equally applies to static variables defined inside functions, the only difference being that of visibility.

The global works much like you’d expect: you can read it from any function in the module, you can import it (if declared pub) from another module and then read it from there. You can even borrow it and get a reference with a 'static lifetime – because the global variable lasts until the end of the program. Neat.

What you can’t do is assign to the global because it’s not declared as mut. This is where things get interesting.

Mutable globals – atomics and locks

Ideally we’d like to declare our global as mut and have a public API that manipulates it – say, a function that reads it and another that writes it:

static mut LOG_LEVEL: u8 = 0;

pub fn get_log_level() -> u8 {
    LOG_LEVEL
}

pub fn set_log_level(level: u8) {
    LOG_LEVEL = level;
}

The compiler rejects both the getter and the setter with a similar error message:

error[E0133]: use of mutable static is unsafe and requires unsafe function or block
 --> src/lib.rs:8:5
  |
8 |     LOG_LEVEL = level;
  |     ^^^^^^^^^^^^^^^^^ use of mutable static

The underlying problem is that a global variable is potentially visible from multiple threads. The above functions don’t synchronize their access to the global, so there’s nothing to prevent set_log_level() from being called in one thread while another thread calls get_log_level() or set_log_level(), either of which would constitute a data race. Rust requires an unsafe block to signal that such synchronization has been implemented by the code that surrounds access to the mutable static. Alternatively, the whole function can be unsafe to signal that the burden of synchronization is transferred to its callers. Since we don’t in fact have such synchronization (and it’s unclear how a caller of set_log_level() and get_log_level() would even ensure it), we won’t attempt to “fix” it by adding an unsafe to get the code to compile. We want to access globals without advanced reasoning about unsafe and undefined behavior.

Since we’re dealing with a potential data race, let’s address it with the mechanisms used elsewhere in Rust to avoid data races – locks and atomics. In case of a u8, we can simply replace u8 with AtomicU8:

use std::sync::atomic::{AtomicU8, Ordering};

static LOG_LEVEL: AtomicU8 = AtomicU8::new(0);

pub fn get_log_level() -> u8 {
    LOG_LEVEL.load(Ordering::Relaxed)
}

pub fn set_log_level(level: u8) {
    LOG_LEVEL.store(level, Ordering::Relaxed);
}

The global variable is no longer mut, so no unsafe is needed. The code is thread-safe and as performant as an unsafe version would be – on x86 the relaxed atomic load compiles into an ordinary load. If you need stricter ordering guarantees between LOG_LEVEL and other data in the program, you can use Ordering::SeqCst instead.

But what if we need something that won’t neatly fit into an atomic – say, a string? This compiles, but won’t allow us to modify the global:

static LOG_FILE: String = String::new();

pub fn get_log_file() -> &'static str {
    &LOG_FILE
}

Since there is no AtomicString, we need to use a proper lock:

use std::sync::Mutex;

// XXX - doesn't compile
static LOG_FILE: Mutex<String> = Mutex::new(String::new());

pub fn get_log_file() -> String {
    LOG_FILE.lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *LOG_FILE.lock().unwrap() = file;
}

Note that get_log_file() must return a fresh copy of the string. Returning a reference would require a lifetime, and there is no lifetime to associate with the global variable other than 'static, and 'static is incorrect (and wouldn’t compile) because set_log_file() can modify it at any time.

The above doesn’t compile for a different reason:

error[E0015]: calls in statics are limited to constant functions, tuple structs and tuple variants
 --> src/lib.rs:3:34
  |
3 | static LOG_FILE: Mutex<String> = Mutex::new(String::new());
  |                                  ^^^^^^^^^^^^^^^^^^^^^^^^^

For more information about this error, try `rustc --explain E0015`.

What’s going on here? Why did String::new() compile, but Mutex::new(String::new()) didn’t?

The difference is that the globals we declared so far were just pieces of data whose initial values were available at compilation time. The compiler didn’t need to generate any initialization code for static LOG_LEVEL: u8 = 0 – it only reserved a byte in the executable’s data segment and ensured that it contained 0 at compile time. String::new() also works because it is a const fn, function specifically marked as runnable at compile time. It can be marked like that because an empty string doesn’t allocate, so the string returned by String::new() can be represented in the executable by a triple of (0 [length], 0 [capacity], NonNull::dangling() [constant representing unallocated pointer]). Nothing to do at run time. On the other hand, static LOG_FILE: String = String::from("foo") wouldn’t compile because String::from() requires a run-time allocation and is therefore not a const fn.

std::sync::Mutex::new() is not const fn because it requires an allocation in order to keep the system mutex at a fixed address. And even if we used an allocation-free mutex (such as parking_lot::Mutex which supports a const fn constructor on nightly Rust), we’d face the same issue if we wanted to start off with a non-empty string, a data structure coming from a library we don’t control, or information only available at run-time, such as fresh randomness or the current time. In general, we don’t want to be constrained to const fn functions when initializing global variables.

As a side note, C++ supports initializing global variables with arbitrary code by simply allowing the compiler to generate code that runs before main() (or during dlopen() in the case of dynamic libraries). This approach is convenient for simple values, but when used in real programs it led to issues with initialization order, aptly named static initialization order fiasco. To avoid that problem, as well as issues with libraries that require explicit initialization, Rust doesn’t allow pre-main initialization, opting instead for the approach C++ calls the construct on first use idiom.

We will review three ways to initialize a global variable with arbitrary data, two of them based on external (but extremely well reviewed and widely used) crates, and one based on the standard library.

Once cell

The once_cell crate provides a OnceCell type that can be used to define global variables. Here is how one would use OnceCell for LOG_FILE:

use once_cell::sync::OnceCell;
use std::sync::Mutex;

static LOG_FILE: OnceCell<Mutex<String>> = OnceCell::new();

fn ensure_log_file() -> &'static Mutex<String> {
    LOG_FILE.get_or_init(|| Mutex::new(String::new()))
}

pub fn get_log_file() -> String {
    ensure_log_file().lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *ensure_log_file().lock().unwrap() = file;
}

Looking at the implementation of get_log_file() and set_log_file(), it is immediately apparent that they implement the “construct on first use” idiom – both functions call a method that ensures that the inner value is initialized (and that this is only done once), and retrieve a reference to the globally-stored value. This value can then be manipulated in the usual way through interior mutability.

OnceCell<T> is conceptually similar to a RefCell<Option<T>>. Like an Option, it has two states, one empty and another with a useful value. Like RefCell<Option<T>>, it uses interior mutability to allow setting the inner value using just a shared reference. But unlike Option, once set to a non-empty value, the stored value can never be set again. This allows a non-empty OnceCell to give out shared references to the inner data, which RefCell<Option> wouldn’t be allowed to do (it could at best return a Ref<T>) because the contents may change at any time. OnceCell is also thread-safe, so it would actually compare to Mutex<Option<T>>, except it uses an atomic to efficiently check whether the cell has been set.

once_cell also provides a Lazy type that makes the above even simpler, removing the need for a separate ensure_log_file() function:

use std::sync::Mutex;
use once_cell::sync::Lazy;

static LOG_FILE: Lazy<Mutex<String>> = Lazy::new(|| Mutex::new(String::new()));

pub fn get_log_file() -> String {
    LOG_FILE.lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *LOG_FILE.lock().unwrap() = file;
}

Our globals now work pretty much the way we wanted them. Lazy<T> performs a sleight of hand that even allows us to directly call methods like lock() directly on LOG_FILE. It achieves this by implementing Deref, the trait normally used to treat lightweight containers (typically smart pointers) like the values they contain. Deref is the mechanism that allows you to call methods of Foo on Box<Foo>, or methods of &str on String, and so on. Lazy<T> wraps a OnceCell<T> and implements a Deref<T> that returns self.once_cell.get_or_init(|| self.init.take().unwrap()), where init is the closure passed to Lazy::new().

The Lazy version still uses the construct on first use idiom, it’s just hidden behind the magic of Deref. In some cases this can yield surprising results because the actual type of LOG_FILE is not Mutex<String>, it’s Lazy<Mutex<String>>, so if you use it a context that expects exactly &Mutex<String>, it will fail to compile. It’s not a big deal because you can always obtain the actual &Mutex<String> with &*LOG_FILE (equivalent to LOG_FILE.deref()), but it is something to be aware of.

The OnceCell and Lazy types are in the process of getting stabilized, so we can expect them to become part of the standard library in the near future.

Lazy static

Another popular library for creating global variables is the lazy_static crate, which defines a macro that hides even the lazy initialization, allowing you to write code that looks almost like an ordinary declaration:

use lazy_static::lazy_static;
use std::sync::Mutex;

lazy_static! {
    static ref LOG_FILE: Mutex<String> = Mutex::new(String::new());
}

// get_log_file() and set_log_file() defined as with once_cell::Lazy

pub fn get_log_file() -> String {
    LOG_FILE.lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *LOG_FILE.lock().unwrap() = file;
}

An invocation of lazy_static! is just syntax sugar for defining a Lazy value. Under the hood everything works exactly the same as in the example that used once_cell::Lazy (except lazy_static defines its own lazy_static::Lazy). Like with once_cell::Lazy, the actual type of LOG_FILE is not Mutex<String>, but a different type which uses Deref to give out &'static Mutex<String> on method calls. Some details differ, e.g. lazy_static constructs a dummy type also named LOG_FILE and implements Deref on that, while hiding the actual Lazy<T> value in a static variable defined in a function – but the end result is exactly the same.

If you’re curious, you can run cargo expand on code generated by lazy_static! {...} to learn exactly what it does.

Standard library – Once+unsafe

Until OnceCell stabilizes, the standard library doesn’t offer a way to implement the global variables initialized with non-const functions without unsafe code. In most cases this should be avoided because you can use use once_cell or lazy_static. But if you must only depend on the standard library, if you want tighter control, or if you just want to learn how it’s done, here is an example that uses std::sync::Once to implement a mutable global:

use std::mem::MaybeUninit;
use std::sync::{Mutex, Once};

fn ensure_log_file() -> &'static Mutex<String> {
    static mut LOG_FILE: MaybeUninit<Mutex<String>> = MaybeUninit::uninit();
    static LOG_FILE_ONCE: Once = Once::new();

    // Safety: initializing the variable is only done once, and reading is
    // possible only after initialization.
    unsafe {
        LOG_FILE_ONCE.call_once(|| {
            LOG_FILE.write(Mutex::new(String::new()));
        });
        // We've initialized it at this point, so it's safe to return the reference.
        LOG_FILE.assume_init_ref()
    }
}

// get_log_file() and set_log_file() defined as with once_cell::OnceCell

pub fn get_log_file() -> String {
    ensure_log_file().lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *ensure_log_file().lock().unwrap() = file;
}

Once ensures that MaybeUninit is initialized only once, which OnceCell guaranteed in previous versions of the code. Once is also efficient, using atomics to optimize the fast path when the variable has already been initialized. The definition of the variable, now static rather than global, is placed inside the function to prevent code outside ensure_log_file() from accessing it directly. All accesses inside ensure_log_file() are synchronized through call_once(), writer by running inside it, and readers by waiting for it to complete, which makes the access data-race-free.

Once the initialization of LOG_FILE is complete, ensure_log_file() can proceed to return the reference to the inside of the MaybeUninit, using assume_init_ref().

Which option to choose

In most cases involving real-world types, you’ll want to use once_cell or lazy_static, depending on which syntax you prefer. They have the exact same run-time characteristics, and you won’t go wrong with choosing either. Of course, when once_cell stabilizes in the stdlib, that will become the obvious choice.

There are two exceptions:

  1. When your globals are const-initialized and you don’t need to modify them, you can just declare them as static or const. The difference between the two is that static guarantees that they are stored in only one place and const doesn’t (it inlines them where they’re used).

  2. When you need to modify your globals, but their type is supported by std::sync::atomic: bool, u8u64, i8i64, usize, or isize. In that case you can declare the variable as static with the appropriate atomic type, and use the atomic API to read and modify it.

A case not covered by this article are thread-local variables, which can also be global. Those are provided by the thread_local macro from the standard library, and allow the use of non-Sync types in globals.

Upgradable parking_lot::RwLock might not be what you expect

Let’s say we’re building a simple table indexed by integers starting with 0. Although the keys are contiguous, the table is loaded from key-value pairs that arrive in arbitrary order. The number of elements is not known in advance and can be anywhere from a handful up to a hundred million, but is expected to fit in working memory, given a reasonable representation. Due to these constraints the table loader naturally represents the data as Vec<Option<Payload>>. It starts out empty and sports a set() method that fills the payload at the given position, resizing the table as needed:

/// Called during loading, place `val` at `pos`
fn set(&mut self, pos: usize, val: Payload) {
    if pos >= self.data.len() {
        self.data.resize_with(pos + 1, Default::default);
    }
    self.data[pos] = Some(val);
}

The table payload is comparatively expensive to compute (it is deserialized from JSON), so that’s done from multiple worker threads, and set() must be thread-safe. The obvious approach is to place the whole table under a lock, using the fantastic parking_lot crate to provide the implementation. Under a mutex data would be a Mutex<Vec<Option<Payload>>>:

fn set(&self, pos: usize, val: Payload) {
    let mut data = self.data.lock();  // no unwrap() because parking_lot is awesome!
    if pos >= data.len() {
        data.resize_with(pos + 1, Default::default);
    }
    data[pos] = Some(val);
}

/// Called once loading is done to extract the data
fn into_data(self) -> Vec<Option<Payload>> {
    self.data.into_inner()
}

This simple solution has a serious problem – now the whole of set() operates with the big lock acquired. On a machine with 32 physical cores, there is a lot of contention around calls to set(), and the worker threads end up spending about 80% of their time idly waiting for the lock. The worst part is that the coarse lock is only really needed to resize the table, which occurs very rarely, because just one large key immediately removes the need for resizes for all smaller keys. The large majority of calls to set() just access non-overlapping slots in data and have no real need for locking.

Fortunately parking_lot’s Mutex is so incredibly well crafted that it only costs one (1) byte of space. Even if it ends up taking up more space due to padding, just compare that to a POSIX mutex that requires 40 bytes on x86-64 Linux, and must be heap-allocated because it’s UB to move it after initialization! With parking_lot fine-grained locking is quite feasible, even on tables with hundreds of millions of elements. Individual locks will be uncontended and therefore acquirable without system calls, and virtually free compared to the big lock. If we knew the table size in advance, we’d represent it as Vec<Mutex<Option<Payload>>> and be done with it.

However, we still need some kind of lock around the whole table as well, in order to be able to resize it when necessary. Since resizing only happens occasionally, we can use a RwLock, acquiring a shared read lock to access the table and its individual row (which has its own mutex for updating), and request an exclusive write lock only to resize the whole table. The type for data now becomes RwLock<Vec<Mutex<Option<Payload>>>>. (Don’t be scared by the piled-up generics, just read them in the order of appearance: read-write lock containing a vector of mutexes which protect optional payloads.)

To top it off, the parking_lot README advertises a feature of its RwLock that looks exactly like what we need:

RwLock supports atomically upgrading an “upgradable” read lock into a write lock.

Perfect – we acquire the read lock for the whole table, and only upgrade it to a write lock if a resize is needed. After the resize, we downgrade it back to a read lock and proceed. With resizing taken care of, we access the inner mutexes through the shared read lock, acquire the finer-grained mutex without any contention, and store the data. Here is an implementation of that idea:

// XXX
pub fn set(&self, pos: usize, val: Payload) {
    let data = self.data.upgradable_read();
    if data.len() <= pos {   // need to resize the table
        let mut wdata = RwLockUpgradableReadGuard::upgrade(data);
        wdata.resize_with(pos + 1, Default::default);
        let data = RwLockWriteGuard::downgrade(wdata);
        *data[pos].lock() = Some(val);
    } else {
        *data[pos].lock() = Some(val);
    }
}

fn into_data(self) -> Vec<Option<Payload>> {
    self.data.into_inner().into_iter().map(Mutex::into_inner).collect()
}

However, benchmarking this code in production shows no improvement whatsoever compared to the version with one big mutex. What’s going on?

After spending a lot of time trying to figure out where else in the program there could be a lock causing contention (including investigating the allocator, among other suspects turned innocent), I finally came back to this code and carefully reviewed the documentation of upgradable_read, which says the following:

Locks this rwlock with upgradable read access, blocking the current thread until it can be acquired. The calling thread will be blocked until there are no more writers or other upgradable reads which hold the lock. There may be other readers currently inside the lock when this method returns.

So, an upgradable read is locking out other upgradable reads, even if they never exercise their option to upgrade the lock! Since every access to the table from set() is an upgradable read, this made it behave just like a mutex. I feel that this should be more prominently stressed in the docs, as it is easy to miss and makes upgradable reads useless for this scenario. The correct way to implement set() is by manually releasing the read lock and reacquiring a write lock:

pub fn set(&self, pos: usize, val: Payload) {
    let mut data = self.data.read();
    if data.len() <= pos {
        // "upgrade" the lock
        drop(data);
        let mut wdata = self.data.write();
        // check that someone else hasn't resized the table in the meantime
        if wdata.len() <= pos {
            wdata.resize_with(pos + 1, Default::default);
        }
        // now "downgrade" it back again
        drop(wdata);
        data = self.data.read();
    }
    *data[pos].lock() = Some(val);
}

The upgrade and the downgrade are of course no longer atomic, but we don’t really care, we just perform another check after the upgrade in case another thread beat us to the punch and resized the table. And most importantly, this version of the code utilized all the available cores, bringing load times from 7 minutes to 1m45s to parse and load ~70 GiB of JSON data. (Just going through all the data with cat takes 32s, so this is pretty good.)

The immediate moral of the story is: be careful when adopting non-standard features of fundamental APIs like the synchronization devices. The documentation stated the behavior clearly, and quite a bit of debugging would have been avoided by reading it in advance. Even an otherwise excellent library like parking_lot sometimes violates the principle of least surprise.

The deeper lesson is that fearless concurrency, and the more general “if it compiles, it works” principle have their limits. Rust has spoiled us by providing a nice experience with multi-threading, so much that it’s easy to be genuinely surprised when a program that successfully compiles ends up misbehaving in a non-obvious way.

Rust async is colored, and that’s not a big deal

In the last several years async-friendly languages and APIs have received a large amount of attention. One contentious point in the language design space are the “colored functions”, or division of functions to async and non-async ones. The term was introduced by the now-famous 2015 article titled What Color is Your Function?, which uses color as a metaphor for the often painful mismatch between sync and async functions in JavaScript and other languages with explicitly async functions. Since 2015 many more languages have jumped on the aysnc bandwagon, so many more programmers are now getting familiar with the metaphor. Given that some languages managed to provide async IO without colored functions, such as Go, Zig, and in the future likely Java, the discussion around function colors is picking up once again and is raised in the context of Rust. Some people have even tried to argue that the bad rap of colored function doesn’t apply to Rust’s async because it’s not colored in the first place. An article from several days ago is titled “Rust’s async isn’t f#@king colored!”, and similar arguments have appeared on reddit. I’m not picking on any specific post, but I’d like to provide a response to that sort of argument in general.

In this article I will show that Rust async functions are colored, by both the original definition and in practice. This is not meant as an criticism of Rust async, though – I don’t see function colors as an insurmountable issue, but as a reflection of the fundamental difference of async and sync models of the world. Languages that hide that difference do so by introducing compromises that might not be acceptable in a systems language like Rust or C++ – for example, by entirely forbidding the use of system threads, or by complicating the invocation of foreign or OS-level blocking calls. Colored functions are also present in at least C#, Python, Kotlin, and C++, so they’re not a quirk of JavaScript and Rust. And additional features of Rust async do make it easier to connect async code with traditional blocking code, something that is just not possible in JavaScript.

Colored functions

“What Color is Your Function?” starts off by describing an imaginary language that perversely defines two types of functions: red and blue. The language enforces a set of seemingly arbitrary rules regarding how the two are allowed to interact:

  1. Every function has a color.
  2. The way you call a function depends on its color.
  3. You can only call a red function from within another red function.
  4. Red functions are more painful to call.
  5. Some core library functions are red.

Without knowing the details, a reasonable person would agree that the described language is not particularly well designed. Of course, readers of this article in 2021 will not find it hard to recognize the analogy with async: red functions are async functions, and blue functions are just ordinary functions. For example, #2 and #4 refers to the fact that calling an async function requires either explicit callback chaining or await, whereas a sync function can just be called. #3 refers to the fact that await and callback resolution work only inside async functions, and JavaScript doesn’t provide a way to block the current non-async function until a promise (async value) is resolved. The article portrays async functions as a leaky abstraction that profoundly and negatively affects the language, starting with the above rules.

The rules of async make async code contagious, because using just one async function in one place requires all the callers up the stack to become async. This splits the ecosystem into async and non-async libraries, with little possibility to use them interchangeably. The article describes async functions as functions that operate on classic JavaScript callbacks, but further argues that async/await, which was novel at the time, doesn’t help with the issue. Although await constitutes a massive ergonomic improvement for calling async from async (#4), it does nothing to alleviate the split – you still cannot call async code from non-async code because await requires async.

Function colors in Rust async

How does all this apply to Rust? Many people believe that it applies only in part, or not at all. Several objections have been raised:

  1. Rust async functions are in essence ordinary functions that happen to return values that implement the Future trait. async fn is just syntactic sugar for defining such a function, but you can make one yourself using an ordinary fn as long as your function returns a type that implements Future. Since async functions, the argument goes, are functions that return Future<Output = T> instead of T and as such are not “special” in any way, any more than functions that return a Result<T> instead of T are special – so rule #1 (“every function has a color”) doesn’t apply.
  2. Unlike JavaScript, Rust async executors provide a block_on() primitive that invokes an async function from a non-async context and blocks until the result is available – so rule #3 (“you can only call a red function from within another red function”) doesn’t apply.
  3. Again unlike JavaScript, Rust async provides spawn_blocking() which invokes a blocking sync function from an async context, temporarily suspending the current async function without blocking the rest of the async environment. This one doesn’t correspond to a rule from the original article because JavaScript doesn’t support blocking sync functions.
  4. Rule #5 (“some core library functions are red”) doesn’t apply because Rust’s stdlib is sync-only.

If these arguments are correct, the only color rule that remains is rule #4, “red functions are more painful to call”, and that part is almost completely alleviated by await. The original JavaScript problems where async functions “don’t compose in expressions because of the callbacks” or “have different error-handling” simply don’t exist with await, in either JavaScript or Rust. Taking these arguments at face value, it would seem that the whole function-color problem is made up or at least wildly exaggerated from some exotic JavaScript problems that Rust async doesn’t inherit. Unfortunately, this is not the case.

First, the split between the sync and async ecosystems is immediately apparent to anyone who looks at the ecosystem. The very existence of async_std, a crate with the explicit purpose to provide an “async version of the Rust standard library”, shows that the regular standard library is not usable in an async context. If function colors weren’t present in Rust, the ordinary stdlib would be used in both sync and async code, as is the case in Go, where a distinction between “sync” and “async” is never made to begin with.

Then what of the above objections? Let’s go through them one by one and see how they hold up under scrutiny.

Aren’t Rust async functions just ordinary functions with a wacky return type?

While this is true in a technical sense, the same is also true in JavaScript and almost all languages with colored async (with the exception of Kotlin) in exactly the same way. JavaScript async functions are syntactic sugar for functions that create and return a Promise. Python’s async functions are regular callables that immediately return a coroutine object. That doesn’t change the fact that in all those languages the caller must handle the returned Promise (coroutine object in Python, Future in Rust) in ways that differ from handling normal values returned from functions. For example, you cannot pass an async function to Iterator::filter() because Iterator::filter() expects a function that returns an actual bool, not an opaque value that just might produce a bool at some point in the future. No matter what you put in the body of your async function, it will never return bool, and extracting the bool requires executor magic that creates other problems, as we’ll see below. Regardless of whether it’s technically possible to call an async function from a sync context, inability to retrieve its result is at the core of function color distinction.

Ok, but doesn’t the same apply to Result? Functions that need a u32 aren’t particularly happy to receive a Result<u32, SomeError>. A generic function that accepts u32, such as Iterator::min(), has no idea what to do with Result<u32, SomeError>. And yet people don’t go around claiming that Result somehow “colors” their functions. I admit that this argument has merit – Result indeed introduces a semantic shift that is not always easy to bridge, including in the example we used above, Iterator::filter(). There is even a proposal to add 21 new iterator methods such as try_filter(), try_min_by_key(), try_is_partitioned(), and so on, in order to support doing IO in your filter function (and key function, etc.). Doing this completely generically might require Haskell-style monads or at least some form of higher-kinded types. All this indicates that supporting both Result and non-Result types in fully generic code is far from a trivial matter. But is that enough to justify the claim that Result and Future are equivalent in how they affect functions that must handle them? I would say it’s not, and here is why.

If the recipient of a Result doesn’t care about the error case, it can locally resolve Result to the actual value by unwrapping it. If it doesn’t want to panic on error, it can choose to convert the error to a fallback value, or skip the processing of the value. While it can use the ? operator to propagate the error to its caller, it is not obliged to do so. The recipient of a Future doesn’t have that option – it can either .await the future, in which case it must become async itself, or it must ask an executor to resolve the future, in which case it must have access to an executor, and license to block. What it cannot do is get to the underlying value without interaction with the async environment.

Verdict: Rule #1 mostly applies to Rust – async functions are special because they return values that require async context to retrieve the actual payload.

Doesn’t block_on() offer a convenient way to invoke an async function from a non-async context?

Yes, provided you are actually allowed to use it. Libraries are expected to work with the executor provided by the environment and don’t have an executor lying around which they can just call to resolve async code. The standard library, for example, is certainly not allowed to assume any particular executor, and there are currently no traits that abstract over third-party executors.

But even if you had access to an executor, there is a more fundamental problem with block_on(). Consider a sync function fn foo() that, during its run, needs to obtain the value from an async function async fn bar(). To do so, foo() does something like let bar_result = block_on(bar()). But that means that foo() is no longer just a non-async function, it’s now a blocking non-async function. What does that mean? It means that foo() can block for arbitrarily long while waiting for bar() to complete. Async functions are not allowed to call functions like foo() for the same reason they’re not allowed to call thread::sleep() or TcpStream::connect() – calling a blocking function from async code halts the whole executor thread until the blocking function returns. In case of that happening in multiple threads, or in case of a single-threaded executor, that freezes the whole async system. This is not described in the original function color article because neither block_on() nor blocking functions exist in stock JavaScript. But the implications are clear: a function that uses block_on() is no longer blue, but it’s not red either – it’s of a new color, let’s call it purple.

If this looks like it’s changing the landscape, that’s because it is. And it gets worse. Consider another async function, xyzzy(), that needs to call foo(). If foo() were a blue/non-async function, xyzzy() would just call it and be done with it, the way it’d call HashMap::get() or Option::take() without thinking. But foo() is a purple function which blocks on block_on(bar()), and xyzzy() is not allowed to call it. The irony is that both xyzzy() and bar() are async and if xyzz() could just await bar() directly, everything would be fine. The fact that xyzzy() calls bar() through the non-async foo() is what creates the problem – foo‘s use of block_on() breaks the chain of suspensions required for bar() to communicate to xyzzy() that it needs to suspend until further notice. The ability to propagate suspension from the bottom-most awaitee all the way to the executor is the actual reason why async must be contagious. By eliminating async from the signature of foo() one also eliminates much of the advantage of bar() being async, along with the possibility of calling foo() from async code.

Verdict: rule #3 applies because block_on() changes a blue function into something that is neither red nor callable from red.

Doesn’t spawn_blocking() resolve the issue of awaiting blocking functions in async contexts?

spawn_blocking() is a neat bridge between sync and async code: it takes a sync function that might take a long time to execute, and instead of calling it, submits it to a thread pool for execution. It returns a Future, so you can await spawn_blocking(|| some_blocking_call()) like you’d await a true async function without the issues associated with block_on(). This is because the Future returned by spawn_blocking() is pending until until the thread pool reports that it’s done executing the submitted sync function. In our extended color metaphor, spawn_blocking() is an adapter that converts a purple function into a red function. Its main intended use case are CPU-bound functions that might take a long time to execute, as well as blocking functions that just don’t have a good async alternative. The example of the latter are functions that work with the file system, which still don’t have a good async alternative, or legacy blocking code behind FFI (think ancient database drivers and the like).

Problems arise when code tries to avoid multiple function colors and use block_on() or spawn_blocking() to hide the “color” of the implementation. For example, a library might be implemented using async code internally, but use block_on() to expose only a sync API. Someone might then use that library in an async context and wrap the sync calls in spawn_blocking(). What would be the consequences if that was done across the board? Recall that the important advantage of async is the ability to scale the number of concurrent agents (futures) without increasing the number of OS threads. As long as the agents are mostly IO-bound, you can have literally millions of them executing (most of them being suspended at any given time) on a single thread. But if an async function like the above xyzzy() uses spawn_blocking() to await a purple function like foo(), which itself uses block_on() to await an async function like bar(), then we have a problem: the number of xyzzy() instances that can run concurrently and make progress is now limited by the number of threads in the thread pool employed by spawn_blocking(). If you need to spawn a large number of tasks awaiting xyzzy() concurrently, most of them will need to wait for a slot in the thread pool to open up before their foo() functions even begin executing. And all this because foo() blocks on bar(), which is again ironic because bar(), being an async function, is designed to scale independently of the number of threads available to execute it.

The above is not just a matter of performance degradation; in the worst case spawn_blocking(|| block_on(...)) can deadlock. Consider what happens if one async function behind spawn_blocking(|| block_on(...)) needs data from another async function started the same way in order to proceed. It is possible that the other async function cannot make progress because it is waiting for a slot in the thread pool to even begin executing. And the slot won’t free up because it is taken by the first async function, which also runs inside a spawn_blocking() invocation. The slot is never going to change owner, and a deadlock occurs. This can’t happen with async functions that are directly executed as async tasks because those don’t require a slot in a fixed-size pool. They can all be in a suspended state waiting for something to happen to any of them, and resume execution at any moment. In an async system the number of OS threads deployed by the executor doesn’t limit the number of async functions that can work concurrently. (There are executors that use a single thread to drive all futures.)

Verdict: spawn_blocking() is fine to use with CPU-bound or true blocking code, but it’s not a good idea to use it with block_on() because the advantages of async are then lost and there is a possibility of deadlock.

But Rust’s stdlib is sync-only.

That’s technically true, but Rust’s stdlib is intentionally minimal. Important parts of functionality associated with Rust are delegated to external crates, with great success. Many of these external crates now require async, or even a specific executor like tokio. So while the standard library is async-free, you cannot ignore async while programming in Rust.

Verdict: technically true but not useful in a language with a minimalistic standard library.

Dealing with a two-colored world

Again, the above is not a criticism of Rust async, but merely of the claim that it’s not colored. Once we accept that it is, it becomes clear that, unlike JavaScript, Rust actually does provide the tools we need to deal with the mismatch. We can:

  1. Accept that sync and async are two separate worlds, and not try to hide it. In particular, don’t write “sync” interfaces that use block_on() to hide async ones, and the other way around with spawn_blocking(). If you absolutely must hide the async interfaces behind sync ones, then do so at immediately at the entry point, document that you’re doing so, and provide a public interface to the underlying native call.
  2. Respecting the above, use block_on() and spawn_blocking() in application-level code on the boundaries between the two worlds.
  3. In more complex scenarios, create clear and documented boundaries between the two worlds and use channels to communicate between them. This technique is already used for both multi-threaded and async code, so it should come to no surprise to future maintainers. Ideally you’d use channels that provide both a sync and an async interface, but if those are not available, use async channels with block_on() on the sync side.

Patterns of fallible iteration

Since Rust is a language without exceptions, errors are signaled by returning a Result: an enum that denotes either the function’s successful return value, or information about the error. This can seem like it requires a lot of error-handling boilerplate, but for most code this is in fact not the case. Rust’s powerful type system, the near-ubiquitous error-returning convention, and the ? operator combine convenience of exceptions with the safety of explicit return values. However, there are situations when error handling is buried inside callbacks and propagating the error takes a bit of effort. One such situation is when processing iterators that can return errors, where the ? error-handling operator cannot be used in the usual way.

This article will present a fallible iterator and show several ways to handle its errors without resorting to panic.

Explicit loop vs iterator

Reading a file is a typical example of fallible iteration. While the importance of handling errors when opening a file is widely acknowledged because the file can be missing or unreadable, errors while reading the file happen comparatively rarely and are often converted to panic with unwrap(). But such errors can and do occur, for example when the file is behind a network file system, or when it is backed by a pipe or hardware device, and should be handled cleanly. For demonstration purposes we’ll use a simple function that reads a file line by line, parses each line as integer, and returns the line with the maximum value. To indicate the possibility of failure, the function will return Result<u64, io::Error>, written more shortly as io::Result<u64>:

fn max_line(file_name: &str) -> io::Result<u64> {
    // ...
}

This signature gives the callers freedom to decide how to handle the errors indicated by max_line – they can unwrap() the return value to panic in case of error, they can match the error and handle the error variant, or they can use ? to propagate the error, if it occurs, to their caller. Here is a straightforward implementation of max_line, itself making copious use of the strange-looking ? operator:

fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let mut max = 0;
    for line_result in BufReader::new(file).lines() {
        let line = line_result?;
        let n = line
            .parse::<u64>()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
        if n > max {
            max = n;
        }
    }
    Ok(max)
}

? is a postfix operator best described as use-or-return. If x is Result<T, SomeError>, the type of the expression x? will be T. When x is Ok(val), x? will evaluate to val, and when x is Err(e), the operator will just return the error result from the function. Applying the ? operator to Result actually requires the function to be declared as returning Result with a compatible error variant as return value, and fails to compile if this is not the case. BufRead::lines() provides lines as Results, so the expression let line = line_result? desugars to something like:

let line = match line_result {
    Ok(s) => s,               // a string, use it
    Err(e) => return Err(e),  // an error, return it
};

A minor complication arises from FromStr::parse for u64 not returning an io::Error, but a ParseIntError. Since we don’t particularly care about distinguishing between these two kinds of error condition, we use map_err() to unceremoniously convert ParseIntError into an io::Error, retaining the error message.

Readers experienced in idiomatic Rust will notice that the above function calculates the maximum using a for loop and a mutable accumulator, effectively reimplementing Iterator::max. Since BufReader::lines() already returns an iterator, it seems like an obvious improvement to combine it with map() and `max(), something like this:

// XXX
fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    BufReader::new(file)
        .lines()
        .map(|line_result| {
            let line = line_result?;
            let n = line.parse::<u64>()
                .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
            Ok(n)
        })
        .max()
        // unwrap_or() because max() returns an Option which is None 
        // when the iterator is empty
        .unwrap_or(Ok(0))
}

Not unsurprisingly, this fails to compile. The compiler complains that “the trait std::cmp::Ord is not implemented for std::io::Error“, which is Rust’s way of saying that Iterator::max() has no idea how to compare the io::Result<u64> values the closure passed to map() is producing. But the problem lies deeper than than a simple inability to compare. In the original for loop the ? operator could return from the max_line function, effectively aborting the iteration in case of error. In the iterator version, the parsing happens inside a closure, which takes away the power of ? to abort max_line(). In case of error, ? used in the closure returns an error result from the closure, but the iteration continues.

Stopping iteration on error will require a different approach, with several options depending on what we need to do with the items produced by the iterator.

Collecting items into container – collect

If you need to collect items from a fallible iterator, Rust offers an elegant idiom for aborting the iteration on error. Instead of normally collecting into a container, you can collect into Result<Container>. The effect will be exactly what we are after: collect() will return a Result that will be Ok(container) if all the values were Ok, and Err(e) if an error was encountered, in which case the iteration will have stopped after consuming the error. Applied to max_line, it would look like this:

// correct, but uses an intermediate vector
fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let numbers_result: io::Result<Vec<u64>> = BufReader::new(file)
        .lines()
        .map(parse_line)
        .collect();
    let max = numbers_result?.into_iter().max().unwrap_or(0);
    Ok(max)
}

This performs the processing in two steps: first, the numbers are collected into a result that contains either a vector of numbers, or an error. We use ? to grab the vector or return the error and, provided there was no error, proceed to find the maximum.

Side note: since the closure passed to map will remain unchanged in subsequent examples, we moved it to a separate helper function:

fn parse_line(line_result: io::Result<String>) -> io::Result<u64> {
    let line = line_result?;
    let n = line
        .parse::<u64>()
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
    Ok(n)
}

The above max_line has the same error-handling behavior as the original for loop, but at the cost of allocating a potentially huge vector of numbers only to find the maximum. If you need the items in the vector for other purposes, that’s the approach you want to take. If you need to process the items in a streaming fashion, read on.

Consuming items – try_fold

max_line() is privileged to control the iteration from start to end: it sets up an iterator and consumes it, either with the for loop or letting a folding function like max() do it. Other than collect() discussed above, Iterator provides two additional consuming methods that stop on error: try_fold and try_for_each.

While a try_max() doesn’t exist, it is easy to emulate it because we can replace max() with fold(0, |max, n| std::cmp::max(max, n)). Likewise, try_max() is neatly expressible in terms of try_fold:

fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    BufReader::new(file)
        .lines()
        .map(parse_line)
        .try_fold(0u64, |max, n_result| Ok(std::cmp::max(max, n_result?)))
}

Not quite as pretty as try_max() would have been, but works exactly as the original for loop, and operates on the iterator. We don’t even need to use the ? operator for the result because max_line() returns an io::Result<u64> and that’s exactly what try_fold() gives us. If you can use this in your code, that’s almost certainly what you want.

The limitation of this approach is that it requires that you control how the iterator is consumed. If instead of max() we needed to invoke a function that accepts an iterator provided by a third party, we would have problems. Not every function that consumes an iterator can be easily replaced by a home-grown version based on try_fold or try_for_each. For example, the itertools crate contains a number of useful iterator adapters to solve real-world problems, including itertools::merge_by. We could use it to efficiently merge two sorted streams of records into a single stream, maintaining the sort order:

let iter1 = reader1.lines().map(Record::from_string);
let iter2 = reader2.lines().map(Record::from_string);
let mut out = Writer::new(...);
// XXX where to put try_fold or try_for_each?
for rec in itertools::merge_by(iter1, iter2, comparator) {
    out.write(rec.to_string())?;
}

Handling errors reported by iter1 and iter2 is not a simple matter of using try_fold or try_for_each because the record iterators are consumed by itertools::merge_by. As of this writing there is no try_merge_by in itertools, and it’s not clear there should be one, because adding try_merge_by would imply adding a fallible version of other adapters, eventually doubling the API surface of the module and the trait. There has to be a better way.

Stop-at-error iterator adapter – scan

Stopping at an error is really a special case of stopping the iteration on an arbitrary condition – something Iterator easily handles with take_while. It would seem that adapting an iterator to stop on error is as simple as tacking take_while(Result::is_ok) onto it, possibly adding a map to extract the values. The result looks like this:

// XXX
fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let max = BufReader::new(file)
        .lines()
        .map(parse_line)
        .take_while(|n_result| match n_result {
            Ok(_) => true, // keep going
            Err(e) => {
                // XXX e is &io::Error
                false      // stop
            }
        })
        .map(Result::unwrap)
        .max()
        .unwrap_or(0);
    Ok(max)
}

This is closer to what we want, but not quite there yet. Iteration stops on error, but we don’t have the error value, nor any indication whether an error occurred. Since take_while() passes its callback a shared reference to the item, the closure cannot move the error to a captured local variable – the compiler would complain of a “move out of shared reference”. Also, take_while is a filter and not a transformer, so it requires the unsightly .map(Result::unwrap) making it look like the program might panic on error, when it will in fact correctly stop iterating.

We can address all the issues with take_while() by switching to its big brother Iterator::scan(). Like take_while(), scan() supports stopping the iteration, but it also allows transforming the value and storing intermediate state. We’ll use all those features, ending up with this:

fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let mut err = Ok(());
    let max = BufReader::new(file)
        .lines()
        .map(parse_line)
        .scan(&mut err, until_err)
        .max()
        .unwrap_or(0);
    err?;
    Ok(max)
}

There are several new things going on, so let’s go through them one by one.

First, we initialize err to a Result which we’ll use as the place to store the error if one occurs. Its initial value is a placeholder – we could have used a type like Option<io::Error> and initialized it to None, but it’s nicer to use a Result because it will then work with ? in our function. As err serves only to detect the error, it has no meaningful Ok variant, and we initialize it with a unit value.

Second, after the unchanged map(parse_line), we chain the iterator to the scan() adapter, passing it a utility function which we’ll show shortly. The function passed to scan() provides values that will be returned from iterator’s next(), which means it stops the iteration by returning None. And that’s precisely what until_err does: returns Some(item) if the result is Ok(item), and otherwise stores the error result into the provided reference and returns None.

Third, since max() isn’t privy to our setup, it will return the maximum of the numbers it receives from the iterator. If there was no error, that will be the whole file and in case of error, we’ll get the maximum of lines until the first error. (Which is equivalent to what the max variable would have contained in case of error in our original loop implementation.) So after the call to max(), we must remember to check whether the iteration was prematurely aborted, and if so, return the error. err? does that and is simply a shorthand for if let Err(err) = err { return err }. Finally, once we’ve gotten errors out of the way and have proven that max() has observed all the lines from the file, we can return it from the function.

The remaining piece is the until_err helper, which looks like this:

fn until_err<T, E>(err: &mut &mut Result<(), E>, item: Result<T, E>) -> Option<T> {
    match item {
        Ok(item) => Some(item),
        Err(e) => {
            **err = Err(e);
            None
        }
    }
}

The signature looks daunting because the function will be called with a reference to the value passed to scan, which is itself a reference to our local variable of type Result<(), _>. (This signature allows passing temporary values to scan(), which we cannot do because we need to inspect err after the iteration to check whether an error was encountered.) The logic of the function a straightforward and described above: a match mapping Ok(item) to Some(item) and Err(e) to None, the latter stopping the iteration and storing e for later inspection.

To conclude, here is how scan() would apply to the code that uses itertools::merge_by:

let (mut err1, mut err2) = (Ok(()), Ok(()));
let iter1 = reader1.lines().map(Record::from_string).scan(&mut err1, until_err);
let iter2 = reader2.lines().map(Record::from_string).scan(&mut err2, until_err);
let mut out = Writer::new(...);
for rec in itertools::merge_by(iter1, iter2, comparator) {
    // check whether an input iterator has stopped, and abandon the loop if so
    err1?;
    err2?;
    out.write(rec.to_string())?;
}

Once merge_by is set up correctly, the remaining for loop can be easily transformed to use try_for_each() (or a separate scan() and for_each() pair!) to handle the write errors:

itertools::merge_by(iter1, iter2, comparator)
    .try_for_each(|rec| {
        err1?;
        err2?;
        out.write(rec.to_string())?;
    })?;

Summary

Fallible iteration is often omitted from introductory materials, but Rust does provide several idioms to handle errors in items:

  • If you need to collect the items, use collect::<Result<Container>, _>().
  • If you control how the iterator is consumed, use try_fold() or try_for_each() to stop at first error. Those two methods are even provided by Rayon’s parallel iterators.
  • Otherwise, use scan(&mut err, until_err) and use the iterator as usual. You’ll just need to live with the until_err helper function (which you can also write as a closure) and remember to check err for error after the iterator has been exhausted.

Note: the original version of this article used Iterator::sum() as the method that consumes the iterator, but a reddit reader pointed out that sum() actually stops on first result automatically.

Parallel stream processing with Rayon

Most Rust programmers have heard of Rayon, a crate that makes it almost magically easy to introduce parallelism to a program. In this article we’ll examine how to apply Rayon to basic stream processing.

Running the examples

To run the examples in this blog post, create a directory, run cargo init --bin in it and edit the generated Cargo.toml to include the following dependencies:

[dependencies]
rayon = "1.3.1"
serde_json = "1.0.57"

The code from the examples just goes to src/main.rs. You can run it with cargo run --release or build it with cargo build --release and run it as target/release/<directory-name>. Do use the release mode when checking the performance of anything in Rust, as the numbers you’ll get in debug mode are completely meaningless.

Basics of Rayon

Rayon is a library for parallel processing without the hassle. You don’t need to manage channels, events, futures, or even know about a thread pool, you just tell Rayon what you want performed in parallel, and Rayon does it. To borrow the example from the github page, here is a sequential program that sums up squares of the elements of a slice:

fn sum_of_squares(input: &[u64]) -> u64 {
    input.iter()
         .map(|&i| i * i)
         .sum()
}

To make it parallel, all you need to do is change iter to par_iter:

use rayon::prelude::*;

fn sum_of_squares(input: &[u64]) -> u64 {
    input.par_iter()
         .map(|&i| i * i)
         .sum()
}

Rayon delivers on its promise and produces a parallel version that runs ~1.5x faster, as tested on my machine using a vector of a billion integers. This is fairly impressive given that the individual map and reduce operations in this example boil down to just a CPU instruction each. Rayon’s implementation uses a technique called work stealing to ensure efficient distribution of tasks among threads. The idea behind work stealing is that each thread maintains a local queue of tasks submitted by code running in that thread. By default the thread runs local tasks, and only when it runs out of those does it go ahead and “steal” tasks from queues of other threads. If all threads are equally busy, each just services its own tasks, maximizing data locality and reducing the number of context switches.

The Rayon API provides two distinct abstractions: fork-join and parallel iterators. Fork-join is the lower-level primitive that is deceptively simple: it consists of a single function, join(), which accepts two closures and executes them, potentially in parallel. rayon::join(A, B) pushes B to the thread-local queue and starts executing A – in the same thread. By the time A returns, it’s possible that B had already been stolen by another thread, in which case the current thread waits for it to finish, stealing tasks from other threads while waiting. If B had not been stolen, join() just executes it as well, being no worse off than if the code had been sequential.

Fork-join elegantly maps to divide-and-conquer algorithms, but Rayon is best known and loved for its other abstraction built on top of fork-join, the parallel iterators. They enrich the standard library collections with a par_iter() method that returns a ParallelIterator, an object that closely resembles Iterator but which, when exhausted, executes closures passed to methods like map() and fold() in parallel. A simple use rayon::prelude::* enables this functionality, adding the par_iter() method to containers like Vec<T> or &[T].

Invoked on a slice, as in the sum_of_squares example, par_iter() divides it up into smaller chunks which it processes in parallel using rayon::join() behind the scenes. Methods like sum() and the more general fold() and reduce() aggregate the output, also in parallel, eventually returning a single value. A consequence of the semantics of ParallelIterator is that you cannot just use it in a regular for loop, you have to exhaust it using methods it provides such as the already mentioned sum(), fold(), and reduce(), but also for_each() and others. The closures passed to the methods that accept them must be callable from multiple threads, which the compiler will diligently check, as we’ll see below.

An excellent introduction to Rayon is provided by its author in the classic blog post which you should definitely check out.

Stream processing

An aspect of parallelism often overlooked when presenting Rayon is processing of streaming data. When processing a stream we don’t have a handy container to divide up, all we have are items arriving one at a time with no idea how many of them there will be in total. This makes some parallel processing techniques inaccessible, so we have to do things slightly differently.

As an example, let’s process a stream of JSON-encoded values in the one-record-per-line jsonl format. We will calculate the average of the value field in each record, with the sequential code looking like this:

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}

Clear enough – iterate over input lines, drop the ones lines we can’t deserialize into a JSON object, remove objects without a value field or those whose value is not a number, sum them up, and divide by the count of valid records. The function is generic over the type of its input, so we can call it with any input that implements io::Read, such as a File, the standard input handle returned by io::stdin::lock(), or even a &[u8]:

fn main() {
    let json = r#"{"value": 1}
{"value": 15.3, "foo": "bar"}
{}
{"value": 100}"#
        .as_bytes();
    assert_eq!(avg_values_jsonl(json), 116.3 / 3.);
}

To make avg_values_jsonl parallel we can’t just add par_iter() into the mix like we did for sum_of_squares(). par_iter() is only defined for concrete container types because it relies on knowing the number of items in a container and being able to divide them into subparts to operate on, which doesn’t apply to generic iterators.

However Rayon provides a different mechanism specifically for the streaming use case, a par_bridge() method defined on iterators with the corresponding trait. The method adapts an ordinary iterator into a parallel one, employing clever design to take individual items from the iterator and balance them between Rayon’s threads. Most importantly, it implements ParallelIterator, which means you can use the adapter exactly as you would use the value returned by par_iter() on a collection.

Let’s try to insert par_bridge() into the iteration chain:

use rayon::prelude::*;

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()  // this is new
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}

Unsurprisingly, our first attempt fails to compile:

   Compiling playground v0.1.0 (/home/hniksic/work/playground)
error[E0599]: no method named `par_bridge` found for struct `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>` in the current scope
   --> src/main.rs:12:10
    |
12  |           .par_bridge()
    |            ^^^^^^^^^^ method not found in `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>`
    |
    = note: the method `par_bridge` exists but the following trait bounds were not satisfied:
            `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::iter::Iterator`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`

Charming, isn’t it? (But still not as bad as Tokio.)

The error message is inscrutable because the iterator on which we’re attempting to invoke par_bridge() has a complex generic type, coming from map() wrapping lines() wrapping BufReader wrapping the generic type of input. The actual problem is explained in the “note” which says that “the method par_bridge exists but the following trait bounds were not satisfied: <gibberish>: std::marker::Send”. The iterator returned by input.lines() doesn’t implement Send because it contains the value moved from input, whose type is only known to implement the Read trait. Without Send Rayon doesn’t have permission to send the iterator to another thread, which it might need to do. If the function were allowed to compile as written, calling it with an input which is not Send, perhaps because it contains Rc<_> or another non-Send type, would crash the program. Luckily for us, rustc prevents this and rejects the code due to the missing bound, even if the error message could be a bit smoother.

Once we understand the problem, the fix is simple: add the Send trait bound in addition to Read, declaring input as input: impl Read + Send. With that change we get a different compile error:

error[E0594]: cannot assign to `cnt`, as it is a captured variable in a `Fn` closure
  --> src/main.rs:17:13
   |
17 |             cnt += 1;
   |             ^^^^^^^^ cannot assign

The problem here is that the closure mutates shared state, the cnt counter. That requires the closure to capture cnt by unique (mutable) reference, which makes it an FnMut closure. This was perfectly fine in single-threaded code, but Rayon plans to call the closure from multiple threads, so it requests an Fn closure. The compiler rejects the assignment in an Fn closure and saves us from the embarrassment of a data race. Nice!

Neither of these issues is specific to Rayon, we would encounter the exact same errors if we tried to pass the closure to multiple threads using other means. We can fix the assignment by switching cnt to an AtomicUsize which can be safely modified through shared reference:

use rayon::prelude::*;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read + Send) -> f64 {
    let input = BufReader::new(input);

    let cnt = AtomicUsize::new(0);
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt.fetch_add(1, Ordering::Relaxed);
            value.as_f64()
        })
        .sum();
    total / cnt.into_inner() as f64
}

And this compiles!

After verifying that the parallel code produces correct results, we can proceed to compare its performance with that of the sequential version. One way is to modify the main() function to produce much more data and time the result:

fn main() {
    let json: String = (0..1_000_000).map(|i| format!(r#"{{"foo": "bar", "value": {}}}
"#, i)).collect();
    let t0 = std::time::Instant::now();
    assert_eq!(avg_values_jsonl(json.as_bytes()), 499999.5);
    println!("time: {}", t0.elapsed().as_secs_f64());
}

The above rather unscientific benchmark run on an old four-core desktop completes the sequential version in 0.51s and the Rayon version in 0.25s, showing a 2x speedup from parallel execution. Let’s see what happens when we add more keys to each JSON line to emulate a heavier workload per record:

fn main() {
    let some_keys = r#""foo1": "bar", "foo2": "bar", "foo3": "bar", "foo4": "bar", "foo5": "bar", "foo6": "bar", "foo7": "bar", "foo8": "bar", "foo9": "bar", "foo10": "bar""#;
    let json: String = (0..1_000_000).map(|i| format!(r#"{{{}, "value": {}}}
"#, some_keys, i)).collect();
    let t0 = std::time::Instant::now();
    assert_eq!(avg_values_jsonl(json.as_bytes()), 499999.5);
    println!("time: {}", t0.elapsed().as_secs_f64());
}

In this variant the sequential version takes 2.6s and the Rayon version 0.81s, a 3.2x speedup. As expected, making individual tasks more substantial increases the benefit of multiple cores.

Preserving order

The previous example summarizes the entire stream into a single number. But stream processing often needs to map input records into output records, possibly filtering them, but without upsetting their order. The classic Unix streaming commands like grep, cut or sed work exactly like that.

At first parallel processing and maintaining record order appear at odds with each other because parallel execution inherently removes ordering. But if only the transformations are run in parallel, it should be possible to associate an index with the input records as they are read, and use those indexes to reestablish the original order on output. Though perhaps not as straightforward as processing slices, processing streaming data in parallel seems perfectly feasible, even though none of the classic Unix streaming tools do it. (To be fair, that is because they predate today’s threading by decades, and also because they rely on parallelization on the pipeline level – piping grep to sed does run both commands in parallel.)

To experiment with order-preserving streaming, let’s transform JSONL containing arrays into a simple tab-delimited CSV:

use serde_json::Value;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};

fn json_to_csv(input: impl Read, output: impl Write) -> io::Result<()> {
    let input = BufReader::new(input);
    let mut output = BufWriter::new(output);
    for line in input.lines() {
        let line = line?;
        let rec: Vec<Value> = match serde_json::from_str(&line) {
            Ok(rec) => rec,
            Err(_) => continue, // should log the error...
        };
        for (idx, val) in rec.iter().enumerate() {
            output.write_all(val.to_string().as_bytes())?;
            if idx != rec.len() - 1 {
                output.write(b"\t")?;
            }
        }
        output.write(b"\n")?;
    }
    Ok(())
}

fn main() {
    let json = r#"[1, 2, 3]
["foo", "bar", "baz"]
"#;
    let mut output = vec![];
    json_to_csv(json.as_bytes(), &mut output).unwrap();
    let output = std::str::from_utf8(&output).unwrap();
    assert_eq!(output, "1\t2\t3\n\"foo\"\t\"bar\"\t\"baz\"\n");
}

Like the previous example, this one also reads the input line by line, deserializing each line into a JSON value. The value is now required to be an array of items, which are written to the output stream as a very naive form of CSV that just uses a tab as a separator.

There are two obstacles to running this in parallel using Rayon. The first, minor one, is that the code is structured as a for loop which makes copious use of the ? operator to bail out in case of IO error. Parallel execution doesn’t like for loops, so we’ll need to find a way to rewrite it using for_each, while still handling errors. The bigger obstacle is that you cannot just write the output from inside closures invoked by Rayon because they’ll be called in parallel, so their outputs will intermingle. In addition to that, the par_bridge() adapter, successfully used in the previous example, is explicitly documented not to guarantee preserving the order of the original iterator.

Fortunately there is a way out of both obstacles. Somewhat surprisingly, Rayon preserves original order when items are collected using the collect() method on the parallel iterator. This means that we don’t have to manually associate input records with order and reorder them on output, Rayon will do it for us, as long as we collect the output into a container. While we can’t slurp the whole input into a vector, we can work in batches: collect a fixed number of input lines into a Vec<String> batch. Then we can call par_iter() on the batch to process the lines in parallel, using map() to transform each input line into the corresponding output line. Finally we can collect the batch of output lines into a new Vec<String> whose order matches the order of input, and write them out.

Let’s examine the parallel version piece by piece:

fn json_to_csv(input: impl Read, output: impl Write) -> io::Result<()> {
    let input = BufReader::new(input);
    let mut output = BufWriter::new(output);
    let mut line_iter = input.lines();

In this version we don’t need to add Send bounds to input (or output) because we’ll do the reading and writing from the thread we’re called from. We do need to extract the input iterator into a separate variable because we’ll pull the lines manually:

    loop {
        let mut batch = vec![];
        for _ in 0..16384 {
            if let Some(line) = line_iter.next() {
                batch.push(line?);
            } else {
                break;
            }
        }
        if batch.is_empty() {
            break;
        }

We obtain a batch of input lines which we’ll process in parallel. Increasing the batch size generally improves performance, but past the size of several thousand lines, the returns begin to diminish and the memory use and latency start to become noticeable. Since the batch is collected in a single thread, we can also keep using the ? operator to bail out of the function in case of IO error.

        let output_lines: Vec<_> = batch
            .par_iter()
            .map(|line| {
                let mut outline: Vec<u8> = vec![];
                let rec: Vec<Value> = match serde_json::from_str(&line) {
                    Ok(rec) => rec,
                    Err(_) => return outline,
                };
                for (idx, val) in rec.iter().enumerate() {
                    outline.write_all(val.to_string().as_bytes()).unwrap();
                    if idx != rec.len() - 1 {
                       outline.write(b"\t").unwrap();
                    }
                }
                outline.write(b"\n").unwrap();
                outline
            })
            .collect();

Here we process the batch in parallel, transforming each input line into an output line. In case of error the output line will be left empty and not affect the output. We could have also used filter_map and returned Option<Vec<u8>> from the closure, but errors are expected to be rare, so we optimize for the common case and return the Vec directly. The closure passed to map() is called from multiple threads in parallel and therefore out of order, but the final collect() magically reassembles them back to the original order. The unwrap() used on all IO calls in the closure is not the cop-out it looks like because we can prove those unwraps will never panic. The output of all the writes is a Vec<u8> writing to which never results in an IO error – the only error that can happen is failing to grow the vector, and that will just abort the process.

What remains is to print the output lines:

        for line in output_lines {
            output.write_all(&line)?;
        }
    }
    Ok(())
}

To measure performance we can use the same approach as when benchmarking avg_values_jsonl: create a large input and measure the time it takes to process it.

fn main() {
    let json = r#"[1, 2, 3]
["foo", "bar", "baz"]
"#;
    let mut big = vec![];
    for _ in 0..1_000_000 {
        big.extend(json.as_bytes());
    }
    let mut output = vec![];
    let t0 = std::time::Instant::now();
    json_to_csv(big.as_slice(), &mut output).unwrap();
    println!("{}", t0.elapsed().as_secs_f64());
}

The sequential version finishes in 1.75s and the parallel version in 0.79s, a 2.2x speedup.

There are possible further optimizations to this approach. One is that the batch could be smaller, but consist of groups of lines. The map() closure would receive a group of lines instead of a single line, so the whole group of input lines would transform to a single Vec<u8> with multiple output lines, reducing the number of allocated strings. Another opportunity for optimization is that both input and output are currently performed without processing running during either. As suggested here, one could use rayon::join() to perform the reading of next input batch and the processing/output of the previous batch in parallel.

Based on experience and these experiments, Rayon can be put to good use for stream transformation and aggregation. It is especially straightforward when the order is not important and one can use par_bridge(), otherwise it requires explicit buffering, but still much less work than implementing parallel computation manually. Making the latter use case more convenient is requested in an issue.

Minimalistic blocking bounded queue in C++

While working on speeding up a Python extension written in C++, I needed a queue to distribute work among threads, and possibly to gather their results. What I had in mind was something akin to Python’s queue module – more specifically:

  • a blocking MPMC queue with fixed capacity;
  • that supports push, try_push, pop, and try_pop;
  • that depends only on the C++11 standard library;
  • and has a simple and robust header-only implementation.

Since I could choose the size of the unit of work arbitrarily, the performance of the queue as measured by microbenchmarks wasn’t a priority, as long as it wasn’t abysmal. Likewise, the queue capacity would be fairly small, on the order of the number of cores on the system, so the queue wouldn’t need to be particularly efficient at allocation – while a ring buffer would be optimal for storage, a linked list or deque would work as well. Other non-requirements included:

  • strong exception safety – while certainly an indicator of quality, the work queue did not require it because its payload would consist of smart pointers, either std::unique_ptr<T> or std::shared_ptr<T>, neither of which throws on move/copy.
  • lock-free/wait-free mutation – again, most definitely a plus (so long as the queue can fall back to blocking when necessary), but not a requirement because the workers would spend the majority of time doing actual work.
  • timed versions of blocking operations – useful in general, but I didn’t need them.

Googling C++ bounded blocking MPMC queue gives quite a few results, but surprisingly, most of them don’t really meet the requirements set out at the beginning.

For example, this queue is written by Dmitry Vyukov and endorsed by Martin Thompson, so it’s definitely of the highest quality, and way beyond something I (or most people) would be able to come up with on my own. However, it’s a lock-free queue that doesn’t expose any kinds of blocking operations by design. According to the author, waiting for events such as the queue no longer being empty or full is a concern that should be handled outside the queue. This position has a lot of merit, especially in more complex scenarios where you might need to wait on multiple queues at once, but it complicates the use in many simple cases, such as the work distribution I was implementing. In these use cases the a thread is communicating with only queue at one time. When the queue is unavailable due to being empty or full, that is a signal of backpressure and there is nothing else the thread can do but sleep until the situation changes. If done right, this sleep should neither hog the CPU nor introduce unnecessary latency, so it is at least convenient that it be implemented as part of the queue.

The next implementation that looks really promising is Erik Rigtorp’s MPMCQueue. It has a small implementation, supports both blocking and non-blocking variants of enqueue and dequeue operations, and covers inserting elements by move, copy, and emplace. The author claims that it’s battle-tested in several high-profile EA games, as well as in a low-latency trading platform. However, a closer look at push() and pop() betrays a problem with the blocking operations. For example, pop() contains the following loop:

    while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
      ;

Waiting is implemented with a busy loop, so that the waiting thread is not put to sleep when unable to pop, but it instead loops until this is possible. In usage that means that if the producers stall for any reason, such as reading new data from a slow network disk, consumers will angrily pounce on the CPU waiting for new items to appear in the queue. And if the producers are faster than the consumers, then it’s them who will spend CPU to wait for some free space to appear in the queue. In the latter case, the busy-looping producers will actually take the CPU cycles away from the workers doing useful work, prolonging the wait. This is not to say that Erik’s queue is not of high quality, just that it doesn’t fit this use case. I suspect that applications the queue was designed for very rarely invoke the blocking versions of these operations, and when they do, they do so in scenarios where they are confident that the blockage is about to clear up.

Then there are a lot of queue implementations on stackoverflow and various personal blogs that seem like they could be used, but don’t stand up to scrutiny. For example, this queue comes pretty high in search results, definitely supports blocking, and appears to achieve the stated requirements. Compiling it does produce some nasty warnings, though:

warning: operation on 'm_popIndex' may be undefined [-Wsequence-point]

Looking at the source, the assignment m_popIndex = ++m_popIndex % m_size is an infamous case of undefined behavior in C and C++. The author probably thought he found a way to avoid parentheses in m_popIndex = (m_popIndex + 1) % m_size, but C++ doesn’t work like that. The problem is easily fixed, but it leads a bad overall impression. Another issue is that it requires a semaphore implementation, using boost’s inter-process semaphore by default. The page does provide a replacement semaphore, but the two in combination become unreasonably heavy-weight: the queue contains no less than three mutexes, two condition variables, and two counters. The simple queues implemented with mutexes and condition variables tend to have a single mutex and one or two condition variables, depending on whether the queue is bounded.

It was at this point that it occurred to me that it would actually be more productive to just take a simple and well-tested classic queue and port it to C++ than to review and correct examples found by googling. I started from Python’s queue implementation and ended up with the following:

template<typename T>
class queue {
  std::deque<T> content;
  size_t capacity;

  std::mutex mutex;
  std::condition_variable not_empty;
  std::condition_variable not_full;

  queue(const queue &) = delete;
  queue(queue &&) = delete;
  queue &operator = (const queue &) = delete;
  queue &operator = (queue &&) = delete;

 public:
  queue(size_t capacity): capacity(capacity) {}

  void push(T &&item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      not_full.wait(lk, [this]() { return content.size() < capacity; });
      content.push_back(std::move(item));
    }
    not_empty.notify_one();
  }

  bool try_push(T &&item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      if (content.size() == capacity)
        return false;
      content.push_back(std::move(item));
    }
    not_empty.notify_one();
    return true;
  }

  void pop(T &item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      not_empty.wait(lk, [this]() { return !content.empty(); });
      item = std::move(content.front());
      content.pop_front();
    }
    not_full.notify_one();
  }

  bool try_pop(T &item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      if (content.empty())
        return false;
      item = std::move(content.front());
      content.pop_front();
    }
    not_full.notify_one();
    return true;
  }
};

Notes on the implementation:

  • push() and try_push() only accept rvalue references, so elements get moved into the queue, and if you have them in a variable, you might need to use queue.push(std::move(el)). This property allows the use of std::unique_ptr<T> as the queue element. If you have copyable types that you want to copy into the queue, you can always use queue.push(T(el)).

  • pop() and try_pop() accept a reference to the item rather than returning the item. This provides the strong exception guarantee – if moving the element from the front of the queue to item throws, the queue remains unchanged. Yes, I know I said I didn’t care for exception guarantees in my use case, but this design is shared by most C++ queues, and the exception guarantee follows from it quite naturally, so it’s a good idea to embrace it. I would have preferred pop() to just return an item, but that would require the C++17 std::optional (or equivalent) for the return type of try_pop(), and I couldn’t use C++17 in the project. The downside of accepting a reference is that to pop an item you must first be able to construct an empty one. Of course, that’s not a problem if you use the smart pointers which default-construct to hold nullptr, but it’s worth mentioning. Again, the same design is used by other C++ queues, so it’s apparently an acceptable choice.

  • The use of mutex and condition variables make this queue utterly boring to people excited about parallel and lock-free programming. Still, those primitives are implemented very efficiently in modern C++, without the need to use system calls in the non-contended case. In benchmarks against fancier queues I was never able to measure the difference on the workload of the application I was using.

Is this queue as battle-tested as the ones by Mr Vyukov and Mr Rigtorp? Certainly not! But it does work fine in production, it has passed code review by in-house experts, and most importantly, the code is easy to follow and understand.

The remaining question is – is this a classic example of NIH? Is there really no other way than to implement your own queue? How do others do it? To shed some light on this, I invite the reader to read Dmitry Vyukov’s classification of queues. In short, he categorizes queues across several dimensions: by whether they produce multiple producers, consumers, or both, by the underlying data structure, by maximum size, overflow behavior, support for priorities, ordering guarantees, and many many others! Differences in these choices vastly affect the implementation, and there is no one class that fits into all use cases. If you have needs for extremely low latency, definitely look into the lock-free queues like the ones from Vyukov or Rigtorp. If your needs are matched by the “classic” blocking queues, then a simple implementation like the one showed in this article might be the right choice. I would still prefer to have one good implementation written by experts that tries to cover the middle ground, for example a C++ equivalent of Rust’s crossbeam channels. But that kind of thing doesn’t appear to exist for C++ yet, at least not as a free-standing class.

EDIT
As pointed out on reddit, the last paragraph is not completely correct. There is a queue implementation that satisfies the requirements and has most of the crossbeam channels functionality: the moodycamel queue. I did stumble on that queue when originally researching the queues, but missed it when writing the article later, since it didn’t come up at the first page of results when googling for C++ bounded blocking MPMC queue. This queue was too complex to drop into the project I was working on, since its size would have dwarfed the rest of the code. But if size doesn’t scare you and you’re looking for a full-featured high quality queue implementation, do consider that one.