Skip to content

Architecture and implementation

Reductionist is written in Rust, a language that is rapidly gaining popularity for a variety of use cases. It provides high level abstractions with low runtime overhead, a modern toolchain, and has a unique approach that provides safe automatic memory management without garbage collection. While the Rust standard library is not as comprehensive as some other "batteries included" languages, the crates.io ecosystem is relatively mature and provides a number of de-facto standard libraries. Reductionist is built on top of a number of popular open source components.

A few properties make it relatively easy to build a conceptual mental model of how Reductionist works.

  • All operations share the same request processing pipeline.
  • The request processing pipeline for each request is a fairly linear sequence of steps.
  • There is no persistent state.
  • The only external service that is interacted with is an S3-compatible object store.

The more challenging aspects of the system are the lower level details of asynchronous programming, memory management, the Rust type system and working with multi-dimensional arrays.

A diagram of the request processing pipeline is shown in Figure 1.

Request processing flow diagram

Figure 1: Request processing pipeline flow diagram

The "Perform numerical operation" step depends on the type of numerical operation being performed. A diagram of this step for the sum operation is shown in Figure 2.

Sum operation flow diagram

Figure 2: Sum operation flow diagram

Axum web server

Axum is an asynchronous web framework that performs well in various benchmarks and is built on top of various popular components, including the hyper HTTP library. It integrates well with Tokio, the most popular asynchronous Rust runtime, and allows us to easily define an API route for each operation. Extractors make it easy to consume data from the request in a type-safe way. The operation request handler is the operation_handler function in src/app.rs.

API request data

The JSON request data is deserialised into the RequestData struct defined in src/models.rs using the serde library. Serde handles conversion errors at the type level, while further validation of request data invariants is performed using the validator crate.

S3 object download

Object data is downloaded from the object store using the AWS SDK. The S3Client struct in src/s3_client.rs provides a simplified wrapper around the AWS SDK. Typically we will be operating on a "storage chunk", a hyperslab within the larger dataset that the object contains. In this case a byte range is specified in the S3 GetObject request to avoid downloading the whole object. The AWS SDK is asynchronous and does provide a streaming response, however we read the whole storage chunk into memory to simplify later stages of the pipeline. Storage chunks are expected to be small enough (O(MiB)) that this should not be a problem.

Construction of aws_sdk_s3::Client structs is a relatively slow task. A key performance improvement involves the use of a shared client object for each combination of object store URL and credentials. This is implemented using the S3ClientMap in src/s3_client.rs and benchmarked in benches/s3_client.rs.

Downloaded storage chunk data is returned to the request handler as a Bytes object, which is a wrapper around a u8 (byte) array.

Filters and compression

When a variable in a netCDF, HDF5 or Zarr dataset is created, it may be compressed to reduce storage requirements. Additionally, prior to compression one or more filters may be applied to the data with the aim of increasing the compression ratio. When consuming such data, Reductionist needs to reverse any compression and filters applied. The filter pipeline is implemented in src/filter_pipeline.rs.

First, if a compression algorithm is specified in the request data, the storage chunk is decompressed using the same algorithm. Currently the Gzip and Zlib algorithms are supported using the flate2 and zune-inflate libraries respectively. This mix of libraries was chosen based on performance benchmarks in benches/compression.rs. Compression is implemented in src/compression.rs.

Next, if any filters are specified in the request data, they are decoded in reverse order. Currently the byte shuffle filter is supported. This filter reorders the data to place the Nth bytes of each data value together, with the aim of grouping leading zeroes. The shuffle filter is implemented in src/filters/shuffle.rs, and has several optimisations including loop unrolling that were benchmarked using benches/shuffle.rs.

The Operation trait

Here the implementation becomes specific to the requested operation (min, max, etc.). This is achieved using the Operation trait defined in src/operation.rs.

/// Trait for active storage operations.
///
/// This forms the contract between the API layer and operations.
pub trait Operation {
    /// Execute the operation.
    ///
    /// Returns a [models::Response](crate::models::Response) object with response data.
    ///
    /// # Arguments
    ///
    /// * `request_data`: RequestData object for the request
    /// * `data`: [`Vec<u8>`] containing data to operate on.
    fn execute(
        request_data: &models::RequestData,
        data: Vec<u8>,
    ) -> Result<models::Response, ActiveStorageError>;
}

This interface accepts the request data and a byte array containing the storage chunk data in its original byte order. On success, it returns a Response struct which contains a byte array of the response data as well as the data type, shape and a count of non-missing elements in the array.

A second NumOperation trait with an execute_t method handles the dynamic dispatch between the runtime data type in the request data and the generic implementation for that type.

Operations

Each operation is implemented by a struct that implements the NumOperation trait. For example, the sum operation is implemented by the Sum struct in src/operations.rs. The Sum struct's execute_t method does the following:

  • Zero copy conversion of the byte array to a multi-dimensional ndarray::ArrayView object of the data type, shape and byte order specified in the request data
  • If a selection was specified in the request data, create a sliced ndarray::ArrayView onto the original array view
  • If missing data was specified in the request data:
  • Create an iterator over the array view that filters out missing data, performs the sum operation and counts non-missing elements
  • Otherwise:
  • Use the array view's native sum and len methods to take the sum and element count
  • Convert the sum to a byte array and return with the element count

The procedure for other operations varies slightly but generally follows the same pattern.

Error handling

The ActiveStorageError enum in src/error.rs describes the various errors that may be returned by the Reductionist API, as well as how to format them for the JSON error response body. Low-level errors are converted to higher-level errors and ultimately wrapped by ActiveStorageError. This is a common pattern in Rust and allows us to describe all of the errors that a function or application may return.

Configuration

Reductionist configuration is implemented in src/cli.rs using the clap library, and accepts command line arguments and environment variables.

Resource management

Reductionist supports optional restriction of resource usage. This is implemented in src/resource_manager.rs using Tokio Semaphores. This allows Reductionist to limit the quantity of various resources used at any time:

  • S3 connections
  • memory used for numeric data (this is more of a rough guide than a perfect limit)
  • threads used for CPU-bound work

CPU-bound work

There is particular friction between the asynchronous and synchronous types of work in the system. Axum and Tokio very efficiently handle the asynchronous aspects such as the HTTP server and S3 object download. The other work such as decompression, filtering and numerical operations are more CPU-bound, and can easily block the Tokio runtime from efficiently handling asynchronous tasks. Two alternative methods were developed to alleviate this issue.

  1. The resource manager can limit the number of threads used for CPU-bound work, by default leaving one CPU core free for handling asynchronous tasks.
  2. Integration with Rayon, a library that provides a thread pool.

Limited benchmarking was done to compare the two approaches, however the first appeared to have lower overhead. The second approach may leave the server more responsive if more CPU-heavy operations are used in future.

Monitoring

Prometheus metrics are implemented in src/metrics.rs and are exposed by the Reductionist API under the /metrics path. These include:

  • incoming requests (counter)
  • outgoing response (counter)
  • response time (histogram)

Tracing and profiling

Reductionist integrates with Jaeger, a distributed tracing platform. Various sections of the request processing pipeline are instrumented with spans, making it easy to visualise the relative durations in the Jaeger UI. Testing with a sum over some CMIP6 temperature data, this showed that in terms of wall clock time, the S3 storage chunk download takes the majority of the time, followed by decompression, byte shuffle, and finally the actual numerical operation.

Flame graphs created using flamegraph-rs were useful to visualise which parts of the code consume the most CPU cycles. This was useful to determine where to focus performance improvements, and showed that decompression is the most CPU-heavy task.