Integration of Reductionist and PyActiveStorage¶
Reductionist has been integrated with the PyActiveStorage library, and PyActiveStorage acts as a client of the Reductionist server.
PyActiveStorage currently works with data in netCDF4 format, and is able to perform reductions on a variable within such a dataset.
Numerical operations are performed on individual storage chunks, with the results later aggregated.
The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implementated in a reduce_chunk
Python function in activestorage/storage.py
, and this interface was used as the basis for the integration of Reductionist.
The following code snippet shows the reduce_chunk
function signature.
def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None):
""" We do our own read of chunks and decoding etc
rfile - the actual file with the data
offset, size - where and what we want ...
compression - optional `numcodecs.abc.Codec` compression codec
filters - optional list of `numcodecs.abc.Codec` filter codecs
dtype - likely float32 in most cases.
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
chunk itself
order - typically 'C' for c-type ordering
chunk_selection - python slice tuples for each dimension, e.g.
(slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1))
this defines the part of the chunk which is to be obtained
or operated upon.
method - computation desired
(in this Python version it's an actual method, in
storage implementations we'll change to controlled vocabulary)
"""
For Reductionist, the reduce_chunk
function signature in activestorage/reductionist.py
is similar, but replaces the local file path with a requests.Session
object, the Reductionist server URL, S3-compatible object store URL, and the bucket and object containing the data.
def reduce_chunk(session, server, source, bucket, object,
offset, size, compression, filters, missing, dtype, shape,
order, chunk_selection, operation):
"""Perform a reduction on a chunk using Reductionist.
:param session: requests.Session object
:param server: Reductionist server URL
:param source: S3 URL
:param bucket: S3 bucket
:param object: S3 object
:param offset: offset of data in object
:param size: size of data in object
:param compression: optional `numcodecs.abc.Codec` compression codec
:param filters: optional list of `numcodecs.abc.Codec` filter codecs
:param missing: optional 4-tuple describing missing data
:param dtype: numpy data type
:param shape: will be a tuple, something like (3,3,1), this is the
dimensionality of the chunk itself
:param order: typically 'C' for c-type ordering
:param chunk_selection: N-tuple where N is the length of `shape`, and each
item is an integer or slice. e.g. (slice(0, 2,
1), slice(1, 3, 1), slice(0, 1, 1))
this defines the part of the chunk which is to be
obtained or operated upon.
:param operation: name of operation to perform
:returns: the reduced data as a numpy array or scalar
:raises ReductionistError: if the request to Reductionist fails
"""
Within the reduce_chunk
implementation for Reductionist, the following steps are taken:
- build Reductionist API request data
- build Reductionist API URL
- perform an HTTP(S) POST request to Reductionist
- on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers
- on failure, raise a
ReductionistError
with the response status code and JSON encoded error response
The use of a requests.Session
object allows for TCP connection pooling, reducing connection overhead when multiple requests are made within a short timeframe.
It should be possible to provide a unified interface to storage systems by abstracting away the details of the storage system and data source, but this has not yet been done.
Other changes to the main activestorage.Active
class were necessary for integration of Reductionist.
These include:
- Support for reading netCDF metadata from files stored in S3 using the s3fs and h5netcdf libraries
- Configuration options in
activestorage/config.py
to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket - Constructor
storage_type
argument foractivestorage.Active
to specify the storage backend - Use of a thread pool to execute storage chunk reductions in parallel
- Unit tests to cover new and modified code
- Integration test changes to allow running against a POSIX or S3 storage backend