Skip to content

Integration of Reductionist and PyActiveStorage

Reductionist has been integrated with the PyActiveStorage library, and PyActiveStorage acts as a client of the Reductionist server. PyActiveStorage currently works with data in netCDF4 format, and is able to perform reductions on a variable within such a dataset. Numerical operations are performed on individual storage chunks, with the results later aggregated. The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implementated in a reduce_chunk Python function in activestorage/storage.py, and this interface was used as the basis for the integration of Reductionist. The following code snippet shows the reduce_chunk function signature.

def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None):
    """ We do our own read of chunks and decoding etc 

    rfile - the actual file with the data 
    offset, size - where and what we want ...
    compression - optional `numcodecs.abc.Codec` compression codec
    filters - optional list of `numcodecs.abc.Codec` filter codecs
    dtype - likely float32 in most cases. 
    shape - will be a tuple, something like (3,3,1), this is the dimensionality of the 
            chunk itself
    order - typically 'C' for c-type ordering
    chunk_selection - python slice tuples for each dimension, e.g.
                        (slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1))
                        this defines the part of the chunk which is to be obtained
                        or operated upon.
    method - computation desired 
            (in this Python version it's an actual method, in 
            storage implementations we'll change to controlled vocabulary)

    """

For Reductionist, the reduce_chunk function signature in activestorage/reductionist.py is similar, but replaces the local file path with a requests.Session object, the Reductionist server URL, S3-compatible object store URL, and the bucket and object containing the data.

def reduce_chunk(session, server, source, bucket, object,
                 offset, size, compression, filters, missing, dtype, shape,
                 order, chunk_selection, operation):
    """Perform a reduction on a chunk using Reductionist.

    :param session: requests.Session object
    :param server: Reductionist server URL
    :param source: S3 URL
    :param bucket: S3 bucket
    :param object: S3 object
    :param offset: offset of data in object
    :param size: size of data in object
    :param compression: optional `numcodecs.abc.Codec` compression codec
    :param filters: optional list of `numcodecs.abc.Codec` filter codecs
    :param missing: optional 4-tuple describing missing data
    :param dtype: numpy data type
    :param shape: will be a tuple, something like (3,3,1), this is the
                  dimensionality of the chunk itself
    :param order: typically 'C' for c-type ordering
    :param chunk_selection: N-tuple where N is the length of `shape`, and each
                            item is an integer or slice.  e.g.  (slice(0, 2,
                            1), slice(1, 3, 1), slice(0, 1, 1))
                            this defines the part of the chunk which is to be
                            obtained or operated upon.
    :param operation: name of operation to perform
    :returns: the reduced data as a numpy array or scalar
    :raises ReductionistError: if the request to Reductionist fails
    """

Within the reduce_chunk implementation for Reductionist, the following steps are taken:

  • build Reductionist API request data
  • build Reductionist API URL
  • perform an HTTP(S) POST request to Reductionist
  • on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers
  • on failure, raise a ReductionistError with the response status code and JSON encoded error response

The use of a requests.Session object allows for TCP connection pooling, reducing connection overhead when multiple requests are made within a short timeframe.

It should be possible to provide a unified interface to storage systems by abstracting away the details of the storage system and data source, but this has not yet been done.

Other changes to the main activestorage.Active class were necessary for integration of Reductionist. These include:

  • Support for reading netCDF metadata from files stored in S3 using the s3fs and h5netcdf libraries
  • Configuration options in activestorage/config.py to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket
  • Constructor storage_type argument for activestorage.Active to specify the storage backend
  • Use of a thread pool to execute storage chunk reductions in parallel
  • Unit tests to cover new and modified code
  • Integration test changes to allow running against a POSIX or S3 storage backend