Parallel stream processing with Rayon

Most Rust programmers have heard of Rayon, a crate that makes it almost magically easy to introduce parallelism to a program. In this article we’ll examine how to apply Rayon to basic stream processing.

Running the examples

To run the examples in this blog post, create a directory, run cargo init --bin in it and edit the generated Cargo.toml to include the following dependencies:

[dependencies]
rayon = "1.3.1"
serde_json = "1.0.57"

The code from the examples just goes to src/main.rs. You can run it with cargo run --release or build it with cargo build --release and run it as target/release/<directory-name>. Do use the release mode when checking the performance of anything in Rust, as the numbers you’ll get in debug mode are completely meaningless.

Basics of Rayon

Rayon is a library for parallel processing without the hassle. You don’t need to manage channels, events, futures, or even know about a thread pool, you just tell Rayon what you want performed in parallel, and Rayon does it. To borrow the example from the github page, here is a sequential program that sums up squares of the elements of a slice:

fn sum_of_squares(input: &[u64]) -> u64 {
    input.iter()
         .map(|&i| i * i)
         .sum()
}

To make it parallel, all you need to do is change iter to par_iter:

use rayon::prelude::*;

fn sum_of_squares(input: &[u64]) -> u64 {
    input.par_iter()
         .map(|&i| i * i)
         .sum()
}

Rayon delivers on its promise and produces a parallel version that runs ~1.5x faster, as tested on my machine using a vector of a billion integers. This is fairly impressive given that the individual map and reduce operations in this example boil down to just a CPU instruction each. Rayon’s implementation uses a technique called work stealing to ensure efficient distribution of tasks among threads. The idea behind work stealing is that each thread maintains a local queue of tasks submitted by code running in that thread. By default the thread runs local tasks, and only when it runs out of those does it go ahead and “steal” tasks from queues of other threads. If all threads are equally busy, each just services its own tasks, maximizing data locality and reducing the number of context switches.

The Rayon API provides two distinct abstractions: fork-join and parallel iterators. Fork-join is the lower-level primitive that is deceptively simple: it consists of a single function, join(), which accepts two closures and executes them, potentially in parallel. rayon::join(A, B) pushes B to the thread-local queue and starts executing A – in the same thread. By the time A returns, it’s possible that B had already been stolen by another thread, in which case the current thread waits for it to finish, stealing tasks from other threads while waiting. If B had not been stolen, join() just executes it as well, being no worse off than if the code had been sequential.

Fork-join elegantly maps to divide-and-conquer algorithms, but Rayon is best known and loved for its other abstraction built on top of fork-join, the parallel iterators. They enrich the standard library collections with a par_iter() method that returns a ParallelIterator, an object that closely resembles Iterator but which, when exhausted, executes closures passed to methods like map() and fold() in parallel. A simple use rayon::prelude::* enables this functionality, adding the par_iter() method to containers like Vec<T> or &[T].

Invoked on a slice, as in the sum_of_squares example, par_iter() divides it up into smaller chunks which it processes in parallel using rayon::join() behind the scenes. Methods like sum() and the more general fold() and reduce() aggregate the output, also in parallel, eventually returning a single value. A consequence of the semantics of ParallelIterator is that you cannot just use it in a regular for loop, you have to exhaust it using methods it provides such as the already mentioned sum(), fold(), and reduce(), but also for_each() and others. The closures passed to the methods that accept them must be callable from multiple threads, which the compiler will diligently check, as we’ll see below.

An excellent introduction to Rayon is provided by its author in the classic blog post which you should definitely check out.

Stream processing

An aspect of parallelism often overlooked when presenting Rayon is processing of streaming data. When processing a stream we don’t have a handy container to divide up, all we have are items arriving one at a time with no idea how many of them there will be in total. This makes some parallel processing techniques inaccessible, so we have to do things slightly differently.

As an example, let’s process a stream of JSON-encoded values in the one-record-per-line jsonl format. We will calculate the average of the value field in each record, with the sequential code looking like this:

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}

Clear enough – iterate over input lines, drop the ones lines we can’t deserialize into a JSON object, remove objects without a value field or those whose value is not a number, sum them up, and divide by the count of valid records. The function is generic over the type of its input, so we can call it with any input that implements io::Read, such as a File, the standard input handle returned by io::stdin::lock(), or even a &[u8]:

fn main() {
    let json = r#"{"value": 1}
{"value": 15.3, "foo": "bar"}
{}
{"value": 100}"#
        .as_bytes();
    assert_eq!(avg_values_jsonl(json), 116.3 / 3.);
}

To make avg_values_jsonl parallel we can’t just add par_iter() into the mix like we did for sum_of_squares(). par_iter() is only defined for concrete container types because it relies on knowing the number of items in a container and being able to divide them into subparts to operate on, which doesn’t apply to generic iterators.

However Rayon provides a different mechanism specifically for the streaming use case, a par_bridge() method defined on iterators with the corresponding trait. The method adapts an ordinary iterator into a parallel one, employing clever design to take individual items from the iterator and balance them between Rayon’s threads. Most importantly, it implements ParallelIterator, which means you can use the adapter exactly as you would use the value returned by par_iter() on a collection.

Let’s try to insert par_bridge() into the iteration chain:

use rayon::prelude::*;

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()  // this is new
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}

Unsurprisingly, our first attempt fails to compile:

   Compiling playground v0.1.0 (/home/hniksic/work/playground)
error[E0599]: no method named `par_bridge` found for struct `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>` in the current scope
   --> src/main.rs:12:10
    |
12  |           .par_bridge()
    |            ^^^^^^^^^^ method not found in `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>`
    |
    = note: the method `par_bridge` exists but the following trait bounds were not satisfied:
            `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::iter::Iterator`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`

Charming, isn’t it? (But still not as bad as Tokio.)

The error message is inscrutable because the iterator on which we’re attempting to invoke par_bridge() has a complex generic type, coming from map() wrapping lines() wrapping BufReader wrapping the generic type of input. The actual problem is explained in the “note” which says that “the method par_bridge exists but the following trait bounds were not satisfied: <gibberish>: std::marker::Send”. The iterator returned by input.lines() doesn’t implement Send because it contains the value moved from input, whose type is only known to implement the Read trait. Without Send Rayon doesn’t have permission to send the iterator to another thread, which it might need to do. If the function were allowed to compile as written, calling it with an input which is not Send, perhaps because it contains Rc<_> or another non-Send type, would crash the program. Luckily for us, rustc prevents this and rejects the code due to the missing bound, even if the error message could be a bit smoother.

Once we understand the problem, the fix is simple: add the Send trait bound in addition to Read, declaring input as input: impl Read + Send. With that change we get a different compile error:

error[E0594]: cannot assign to `cnt`, as it is a captured variable in a `Fn` closure
  --> src/main.rs:17:13
   |
17 |             cnt += 1;
   |             ^^^^^^^^ cannot assign

The problem here is that the closure mutates shared state, the cnt counter. That requires the closure to capture cnt by unique (mutable) reference, which makes it an FnMut closure. This was perfectly fine in single-threaded code, but Rayon plans to call the closure from multiple threads, so it requests an Fn closure. The compiler rejects the assignment in an Fn closure and saves us from the embarrassment of a data race. Nice!

Neither of these issues is specific to Rayon, we would encounter the exact same errors if we tried to pass the closure to multiple threads using other means. We can fix the assignment by switching cnt to an AtomicUsize which can be safely modified through shared reference:

use rayon::prelude::*;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read + Send) -> f64 {
    let input = BufReader::new(input);

    let cnt = AtomicUsize::new(0);
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt.fetch_add(1, Ordering::Relaxed);
            value.as_f64()
        })
        .sum();
    total / cnt.into_inner() as f64
}

And this compiles!

After verifying that the parallel code produces correct results, we can proceed to compare its performance with that of the sequential version. One way is to modify the main() function to produce much more data and time the result:

fn main() {
    let json: String = (0..1_000_000).map(|i| format!(r#"{{"foo": "bar", "value": {}}}
"#, i)).collect();
    let t0 = std::time::Instant::now();
    assert_eq!(avg_values_jsonl(json.as_bytes()), 499999.5);
    println!("time: {}", t0.elapsed().as_secs_f64());
}

The above rather unscientific benchmark run on an old four-core desktop completes the sequential version in 0.51s and the Rayon version in 0.25s, showing a 2x speedup from parallel execution. Let’s see what happens when we add more keys to each JSON line to emulate a heavier workload per record:

fn main() {
    let some_keys = r#""foo1": "bar", "foo2": "bar", "foo3": "bar", "foo4": "bar", "foo5": "bar", "foo6": "bar", "foo7": "bar", "foo8": "bar", "foo9": "bar", "foo10": "bar""#;
    let json: String = (0..1_000_000).map(|i| format!(r#"{{{}, "value": {}}}
"#, some_keys, i)).collect();
    let t0 = std::time::Instant::now();
    assert_eq!(avg_values_jsonl(json.as_bytes()), 499999.5);
    println!("time: {}", t0.elapsed().as_secs_f64());
}

In this variant the sequential version takes 2.6s and the Rayon version 0.81s, a 3.2x speedup. As expected, making individual tasks more substantial increases the benefit of multiple cores.

Preserving order

The previous example summarizes the entire stream into a single number. But stream processing often needs to map input records into output records, possibly filtering them, but without upsetting their order. The classic Unix streaming commands like grep, cut or sed work exactly like that.

At first parallel processing and maintaining record order appear at odds with each other because parallel execution inherently removes ordering. But if only the transformations are run in parallel, it should be possible to associate an index with the input records as they are read, and use those indexes to reestablish the original order on output. Though perhaps not as straightforward as processing slices, processing streaming data in parallel seems perfectly feasible, even though none of the classic Unix streaming tools do it. (To be fair, that is because they predate today’s threading by decades, and also because they rely on parallelization on the pipeline level – piping grep to sed does run both commands in parallel.)

To experiment with order-preserving streaming, let’s transform JSONL containing arrays into a simple tab-delimited CSV:

use serde_json::Value;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};

fn json_to_csv(input: impl Read, output: impl Write) -> io::Result<()> {
    let input = BufReader::new(input);
    let mut output = BufWriter::new(output);
    for line in input.lines() {
        let line = line?;
        let rec: Vec<Value> = match serde_json::from_str(&line) {
            Ok(rec) => rec,
            Err(_) => continue, // should log the error...
        };
        for (idx, val) in rec.iter().enumerate() {
            output.write_all(val.to_string().as_bytes())?;
            if idx != rec.len() - 1 {
                output.write(b"\t")?;
            }
        }
        output.write(b"\n")?;
    }
    Ok(())
}

fn main() {
    let json = r#"[1, 2, 3]
["foo", "bar", "baz"]
"#;
    let mut output = vec![];
    json_to_csv(json.as_bytes(), &mut output).unwrap();
    let output = std::str::from_utf8(&output).unwrap();
    assert_eq!(output, "1\t2\t3\n\"foo\"\t\"bar\"\t\"baz\"\n");
}

Like the previous example, this one also reads the input line by line, deserializing each line into a JSON value. The value is now required to be an array of items, which are written to the output stream as a very naive form of CSV that just uses a tab as a separator.

There are two obstacles to running this in parallel using Rayon. The first, minor one, is that the code is structured as a for loop which makes copious use of the ? operator to bail out in case of IO error. Parallel execution doesn’t like for loops, so we’ll need to find a way to rewrite it using for_each, while still handling errors. The bigger obstacle is that you cannot just write the output from inside closures invoked by Rayon because they’ll be called in parallel, so their outputs will intermingle. In addition to that, the par_bridge() adapter, successfully used in the previous example, is explicitly documented not to guarantee preserving the order of the original iterator.

Fortunately there is a way out of both obstacles. Somewhat surprisingly, Rayon preserves original order when items are collected using the collect() method on the parallel iterator. This means that we don’t have to manually associate input records with order and reorder them on output, Rayon will do it for us, as long as we collect the output into a container. While we can’t slurp the whole input into a vector, we can work in batches: collect a fixed number of input lines into a Vec<String> batch. Then we can call par_iter() on the batch to process the lines in parallel, using map() to transform each input line into the corresponding output line. Finally we can collect the batch of output lines into a new Vec<String> whose order matches the order of input, and write them out.

Let’s examine the parallel version piece by piece:

fn json_to_csv(input: impl Read, output: impl Write) -> io::Result<()> {
    let input = BufReader::new(input);
    let mut output = BufWriter::new(output);
    let mut line_iter = input.lines();

In this version we don’t need to add Send bounds to input (or output) because we’ll do the reading and writing from the thread we’re called from. We do need to extract the input iterator into a separate variable because we’ll pull the lines manually:

    loop {
        let mut batch = vec![];
        for _ in 0..16384 {
            if let Some(line) = line_iter.next() {
                batch.push(line?);
            } else {
                break;
            }
        }
        if batch.is_empty() {
            break;
        }

We obtain a batch of input lines which we’ll process in parallel. Increasing the batch size generally improves performance, but past the size of several thousand lines, the returns begin to diminish and the memory use and latency start to become noticeable. Since the batch is collected in a single thread, we can also keep using the ? operator to bail out of the function in case of IO error.

        let output_lines: Vec<_> = batch
            .par_iter()
            .map(|line| {
                let mut outline: Vec<u8> = vec![];
                let rec: Vec<Value> = match serde_json::from_str(&line) {
                    Ok(rec) => rec,
                    Err(_) => return outline,
                };
                for (idx, val) in rec.iter().enumerate() {
                    outline.write_all(val.to_string().as_bytes()).unwrap();
                    if idx != rec.len() - 1 {
                       outline.write(b"\t").unwrap();
                    }
                }
                outline.write(b"\n").unwrap();
                outline
            })
            .collect();

Here we process the batch in parallel, transforming each input line into an output line. In case of error the output line will be left empty and not affect the output. We could have also used filter_map and returned Option<Vec<u8>> from the closure, but errors are expected to be rare, so we optimize for the common case and return the Vec directly. The closure passed to map() is called from multiple threads in parallel and therefore out of order, but the final collect() magically reassembles them back to the original order. The unwrap() used on all IO calls in the closure is not the cop-out it looks like because we can prove those unwraps will never panic. The output of all the writes is a Vec<u8> writing to which never results in an IO error – the only error that can happen is failing to grow the vector, and that will just abort the process.

What remains is to print the output lines:

        for line in output_lines {
            output.write_all(&line)?;
        }
    }
    Ok(())
}

To measure performance we can use the same approach as when benchmarking avg_values_jsonl: create a large input and measure the time it takes to process it.

fn main() {
    let json = r#"[1, 2, 3]
["foo", "bar", "baz"]
"#;
    let mut big = vec![];
    for _ in 0..1_000_000 {
        big.extend(json.as_bytes());
    }
    let mut output = vec![];
    let t0 = std::time::Instant::now();
    json_to_csv(big.as_slice(), &mut output).unwrap();
    println!("{}", t0.elapsed().as_secs_f64());
}

The sequential version finishes in 1.75s and the parallel version in 0.79s, a 2.2x speedup.

There are possible further optimizations to this approach. One is that the batch could be smaller, but consist of groups of lines. The map() closure would receive a group of lines instead of a single line, so the whole group of input lines would transform to a single Vec<u8> with multiple output lines, reducing the number of allocated strings. Another opportunity for optimization is that both input and output are currently performed without processing running during either. As suggested here, one could use rayon::join() to perform the reading of next input batch and the processing/output of the previous batch in parallel.

Based on experience and these experiments, Rayon can be put to good use for stream transformation and aggregation. It is especially straightforward when the order is not important and one can use par_bridge(), otherwise it requires explicit buffering, but still much less work than implementing parallel computation manually. Making the latter use case more convenient is requested in an issue.

One thought on “Parallel stream processing with Rayon”

Leave a Reply