diff --git a/CFAPyX/__init__.py b/CFAPyX/__init__.py deleted file mode 100644 index 6524c8d..0000000 --- a/CFAPyX/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .backend import CFANetCDFBackendEntrypoint - -from .creator import CFANetCDF \ No newline at end of file diff --git a/CFAPyX/backend.py b/CFAPyX/backend.py deleted file mode 100644 index 9a8b7a2..0000000 --- a/CFAPyX/backend.py +++ /dev/null @@ -1,182 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -from xarray.backends import StoreBackendEntrypoint, BackendEntrypoint -from xarray.backends.common import AbstractDataStore -from xarray.core.dataset import Dataset -from xarray import conventions - -from CFAPyX.datastore import CFADataStore - -import logging - -logger = logging.getLogger(__name__) - -def open_cfa_dataset( - filename_or_obj, - drop_variables=None, - mask_and_scale=None, - decode_times=None, - concat_characters=None, - decode_coords=None, - use_cftime=None, - decode_timedelta=None, - cfa_options=None, - group=None, - ): - """ - Top-level function which opens a CFA dataset using Xarray. Creates a CFA Datastore - from the ``filename_or_obj`` provided, then passes this to a CFA StoreBackendEntrypoint - to create an Xarray Dataset. Most parameters are not handled by CFA, so only the - CFA-relevant ones are described here. - - :param filename_or_obj: (str) The path to a CFA-netCDF file to be opened by Xarray - - :param cfa_options: (dict) A set of kwargs provided to CFA which provide additional - configurations. Currently implemented are: substitutions (dict), - decode_cfa (bool) - - :param group: (str) The name or path to a NetCDF group. CFA can handle opening - from specific groups and will inherit both ``group`` and ``global`` - dimensions/attributes. - - :returns: An xarray.Dataset object composed of xarray.DataArray objects representing the different - NetCDF variables and dimensions. CFA aggregated variables are decoded unless the ``decode_cfa`` - parameter in ``cfa_options`` is false. - """ - - cfa_options = cfa_options or {} - - # Load the CFA datastore from the provided file (object not supported). - store = CFADataStore.open(filename_or_obj, group=group) - - # Expands cfa_options into individual kwargs for the store. - store.cfa_options = cfa_options - - use_active = False - if hasattr(store, 'use_active'): - use_active = store.use_active - - # Xarray makes use of StoreBackendEntrypoints to provide the Dataset 'ds' - store_entrypoint = CFAStoreBackendEntrypoint() - ds = store_entrypoint.open_dataset( - store, - mask_and_scale=mask_and_scale, - decode_times=decode_times, - concat_characters=concat_characters, - decode_coords=decode_coords, - drop_variables=drop_variables, - use_cftime=use_cftime, - decode_timedelta=decode_timedelta, - use_active=use_active - ) - - return ds - -class CFANetCDFBackendEntrypoint(BackendEntrypoint): - - description = "Open CFA-netCDF files (.nca) using CFA-PyX in Xarray" - url = "https://cedadev.github.io/CFAPyX/" - - def open_dataset( - self, - filename_or_obj, - *, - drop_variables=None, - mask_and_scale=None, - decode_times=None, - concat_characters=None, - decode_coords=None, - use_cftime=None, - decode_timedelta=None, - cfa_options=None, - group=None, - # backend specific keyword arguments - # do not use 'chunks' or 'cache' here - ): - """ - Returns a complete xarray representation of a CFA-netCDF dataset which includes expanding/decoding - CFA aggregated variables into proper arrays. - """ - - cfa_options = cfa_options or {} - - return open_cfa_dataset( - filename_or_obj, - drop_variables=drop_variables, - mask_and_scale=mask_and_scale, - decode_times=decode_times, - concat_characters=concat_characters, - decode_coords=decode_coords, - use_cftime=use_cftime, - decode_timedelta=decode_timedelta, - cfa_options=cfa_options, - group=group) - -class CFAStoreBackendEntrypoint(StoreBackendEntrypoint): - description = "Open CFA-based Abstract Data Store" - url = "https://cedadev.github.io/CFAPyX/" - - def open_dataset( - self, - cfa_xarray_store, - mask_and_scale=True, - decode_times=True, - concat_characters=True, - decode_coords=True, - drop_variables=None, - use_cftime=None, - decode_timedelta=None, - use_active=False, - ) -> Dataset: - """ - Takes cfa_xarray_store of type AbstractDataStore and creates an xarray.Dataset object. - Most parameters are not handled by CFA, so only the CFA-relevant ones are described here. - - :param cfa_xarray_store: (obj) The CFA Datastore object which loads and decodes CFA - aggregated variables and dimensions. - - :returns: An xarray.Dataset object composed of xarray.DataArray objects representing the different - NetCDF variables and dimensions. CFA aggregated variables are decoded unless the ``decode_cfa`` - parameter in ``cfa_options`` is false. - - """ - assert isinstance(cfa_xarray_store, AbstractDataStore) - - # Same as NetCDF4 operations, just with the CFA Datastore - vars, attrs = cfa_xarray_store.load() - encoding = cfa_xarray_store.get_encoding() - - # Ensures variables/attributes comply with CF conventions. - vars, attrs, coord_names = conventions.decode_cf_variables( - vars, - attrs, - mask_and_scale=mask_and_scale, - decode_times=decode_times, - concat_characters=concat_characters, - decode_coords=decode_coords, - drop_variables=drop_variables, - use_cftime=use_cftime, - decode_timedelta=decode_timedelta, - ) - - # Create the xarray.Dataset object here. - if use_active: - try: - from XarrayActive import ActiveDataset - - ds = ActiveDataset(vars, attrs=attrs) - except ImportError: - raise ImportError( - '"ActiveDataset" from XarrayActive failed to import - please ' - 'ensure you have the XarrayActive package installed.' - ) - else: - ds = Dataset(vars, attrs=attrs) - - ds = ds.set_coords(coord_names.intersection(vars)) - ds.set_close(cfa_xarray_store.close) - ds.encoding = encoding - - return ds \ No newline at end of file diff --git a/CFAPyX/creator.py b/CFAPyX/creator.py deleted file mode 100644 index 6e9177a..0000000 --- a/CFAPyX/creator.py +++ /dev/null @@ -1,948 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -import netCDF4 -import numpy as np -import logging -import glob - -from collections import OrderedDict - -logger = logging.getLogger(__name__) - -CONCAT_MSG = 'See individual datasets for more information.' - -class CFACreateMixin: - - def _first_pass(self, agg_dims: list = None) -> tuple: - """ - Perform a first pass across all provided files. Extracts the global - attributes and information on all variables and dimensions into - separate python dictionaries. Also collects the set of files arranged - by aggregated dimension coordinates, to be used later in constructing - the CFA ``fragment_location`` properties. - """ - - logger.info('Performing first pass on the set of files.') - - arranged_files = {} - var_info = None - dim_info = None - global_attrs = None - - ## First Pass - Determine dimensions - for x, file in enumerate(self.files): - logger.info(f'First pass: File {x+1}/{len(self.files)}') - - ds = self._call_file(file) - - if len(file) == 1: - file = file[0] - - all_dims = ds.dimensions.keys() - all_vars = ds.variables.keys() - - coord_variables = [] - pure_dimensions = [] - variables = [] - - ## Sort dimension/variable types - switch to dict with types? - for d in all_dims: - if d in all_vars: - coord_variables.append(d) - else: - pure_dimensions.append(d) - - for v in all_vars: - if v not in all_dims: - variables.append(v) - - if not dim_info: - dim_info = {d: {} for d in all_dims} - if not var_info: - var_info = {v: {} for v in variables} - - logger.info(f'Coordinate variables: {coord_variables}') - logger.info(f'Pure dimensions: {pure_dimensions}') - logger.info(f'Variables: {variables}') - - if var_info: - if len(set(variables) ^ set(var_info.keys())) != 0: - raise ValueError( - 'Differing numbers of variables across the fragment files ' - 'is not currently supported.' - ) - - ## Accumulate global attributes - ncattrs = {} - for attr in ds.ncattrs(): - ncattrs[attr] = ds.getncattr(attr) - global_attrs = self._accumulate_attrs(global_attrs, ncattrs) - - ## Accumulate dimension info - fcoord = [] - first_time = (x == 0) - for d in all_dims: - - if dim_info[d] == {} and not first_time: - raise ValueError( - f'Files contain differing numbers of dimensions. "{d}"' - 'appears to not be present in all files.' - ) - - new_info, arr_components = self._collect_dim_info( - ds, d, pure_dimensions, coord_variables, - agg_dims=agg_dims, first_time=first_time) - - dim_info = self._update_info(ds.dimensions[d], dim_info, new_info) - - if arr_components is not None: - if first_time: - for attr in arr_components.keys(): - dim_info[d][attr] = [arr_components[attr]] - else: - if arr_components['starts'] not in dim_info[d]['starts']: - dim_info[d]['starts'] += [arr_components['starts']] - dim_info[d]['sizes'] += [arr_components['sizes']] - dim_info[d]['arrays'] += [arr_components['arrays']] - - fcoord.append(arr_components['starts'].item()) - - ## Accumulate var_info - for v in variables: - - try: - fill = ds[v].getncattr('_FillValue') - except: - fill = None - - vdims = [] - for d in ds[v].dimensions: # Preserving the dimensions per variable - if d in coord_variables: - vdims.append(d) - - new_info = { - 'dtype': np.dtype(ds[v].dtype), - 'dims' : tuple(ds[v].dimensions), - 'cdims': vdims, - 'address': v, # Or match with replacement, - '_FillValue': fill, - } - - var_info = self._update_info(ds.variables[v], var_info, new_info) - - arranged_files[tuple(fcoord)] = file - - return arranged_files, global_attrs, var_info, dim_info - - def _second_pass( - self, - var_info : dict, - non_aggregated : list - ) -> dict: - - """ - Second pass through a subset of the files (2) to collect non-aggregated variables - which will be stored in the CFA file. - """ - - logger.info('Performing a second pass on a subset of files.') - - second_set = self.files[:2] - for x, file in enumerate(second_set): - logger.info(f'Second pass: File {x+1}/{len(self.files)}') - - ds = self._call_file(file) # Ideally don't want to do this twice. - - for v in non_aggregated: - new_values = np.array(ds.variables[v]) - - if 'data' not in var_info[v]: - var_info[v]['data'] = new_values - else: - if not np.array_equal(new_values, var_info[v]['data']): - raise ValueError( - f'Non-aggregated variable "{v}" differs between sample files.' - ) - return var_info - - def _collect_dim_info( - self, - ds, - d : str, - pure_dimensions : list, - coord_variables : list, - agg_dims : list = None, - first_time : bool = False, - ): - - """ - Collect new info about each dimension. The collected attributes - depend mostly on if the dimension is ``pure`` (i.e not a coordinate - variable) or if it is a coordinate variable. Aggregated dimensions require - collection of all array sequences that have a different ``start`` value. - If the aggregation dimensions are known, we do not have to collect arrays - from each file from non-aggregated dimensions.""" - - arr_components = None - - if first_time: - agg_dims = coord_variables - else: - if agg_dims is None or agg_dims == []: - agg_dims = coord_variables - - if d in pure_dimensions: - - new_info = { - 'size': ds.dimensions[d].size, - 'type':'pure', - 'f_size': 1, - } - else: - new_info = { - 'size': None, - 'type': 'coord', - 'dtype': ds[d].dtype, - 'f_size': None, - } - - if d in agg_dims: - - array = np.array(list(ds[d]), dtype=ds[d].dtype) - start = array[0] - size = len(array) - - arr_components = { - 'sizes': size, - 'starts': start, - 'arrays': array, - } - - return new_info, arr_components - - def _update_info( - self, - ncattr_obj, - info : dict, - new_info: dict, - ) -> dict: - - """ - Update the information for a variable/dimension based on the - current dataset. Certain properties are collected in lists while - others are explicitly defined and should be equal across all files. - Others still may differ across files, in which case the ``concat_msg`` - is applied which usually indicates to inspect individual files for - the correct value. - """ - - id = ncattr_obj.name - logger.debug(f'Concatenating information for {id}') - - attrs = {} - if hasattr(ncattr_obj, 'ncattrs'): - for attr in ncattr_obj.ncattrs(): - attrs[attr] = ncattr_obj.getncattr(attr) - - if info[id] != {}: - info[id]['attrs'] = self._accumulate_attrs(info[id]['attrs'], attrs) - - for attr, value in new_info.items(): - if value != info[id][attr]: - raise ValueError( - f'Info not matching between files for "{id}": "{attr}"' - ) - else: - info[id] = new_info - info[id]['attrs'] = attrs - - return info - - def _arrange_dimensions( - self, - dim_info : dict, - agg_dims : list = None - ) -> dict: - - """ - Arrange the aggregation dimensions by ordering the - start values collected from each file. Dimension arrays are - aggregated to a single array once properly ordered, and the sizes - fragments in each dimension are recorded in the ``dim_info``. - """ - - logger.info('Performing aggregated dimension sorting') - - ## Arrange Aggregation Dimensions - aggregation_dims = [] - for cd, info in dim_info.items(): - - if 'starts' not in info: - continue - - starts = info['starts'] # Needed later - arrays = info.pop('arrays') - sizes = info['sizes'] - - dim_info[cd]['f_size'] = len(starts) - - if len(starts) == 1: - cdimarr = arrays[0] - ndimsizes = (sizes[0],) - - else: - - ## Still here means the dimension requires aggregation. - aggregation_dims.append(cd) - - narr = np.array(starts) - arr = narr.astype(np.float64) - sort = np.argsort(arr) - - cdimarr = None - nds = [] - for s in sort: - - if cdimarr is None: - cdimarr = np.array(arrays[s]) - else: - cdimarr = np.concatenate((cdimarr, np.array(arrays[s]))) - - nds.append(sizes[s]) - - ndimsizes = tuple(nds) # Removed np.array here - - info['size'] = cdimarr.size - info['array'] = cdimarr - info['sizes'] = ndimsizes - - if agg_dims is not None: - if len(agg_dims) != len(aggregation_dims): - raise ValueError( - 'Found fewer aggregation dims than user provided value.' - f'User defined: ({list(agg_dims)})' - f'Derived: ({list(aggregation_dims)})' - ) - - return dim_info, aggregation_dims - - def _assemble_location( - self, - arranged_files : dict, - dim_info : dict - ) -> dict: - - """ - Assemble the base CFA ``fragment_location`` from which all the - locations for different variables are derived. Locations are defined - by the number of dimensions, and follow the same pattern for definition - as the ``fragment_shapes``. The combinations of dimensions that - require their own ``location`` and ``shape`` are recorded in ``cdim_opts``. - """ - - logger.debug('Assembling the location variable') - - # Define the location space - location_space = tuple(i for i in self.fragment_space if i > 1) - if self.max_files > 1: - location_space = location_space + (self.max_files,) - - # Assemble the set of named dims - named_cdims = [k for k, v in dim_info.items() if v['type'] == 'coord'] - - # Initialise empty location container - location = np.empty(location_space, dtype=f' 1: - new_coord.append( - dim_info[named_cdims[x]]['starts'].index(c) - ) - - location[tuple(new_coord)] = arranged_files[coord] - - return location - - def _apply_agg_dims( - self, - var_info, - agg_dims - ): - for var, meta in var_info.items(): - - aggs = [] - - if 'cdims' not in meta: - continue - - for cd in meta['cdims']: - if cd in agg_dims: - aggs.append(cd) - - if aggs: - var_info[var]['adims'] = aggs - - return var_info - - def _determine_non_aggregated( - self, - var_info : dict, - agg_dims : list - ) -> list: - - """ - Determine non-aggregated variables present in the fragment files. - Non-aggregated variables are equivalent to the ``identical variables`` - from kerchunk jargon. If the non-aggregated variables are later found - to vary across the fragment files, an error will be raised. - """ - - non_aggregated = [] - for var, info in var_info.items(): - if not (set(info['cdims']) & set(agg_dims)): - logger.info('Second pass required to extract non-aggregated variable values') - non_aggregated.append(var) - - logger.debug(f'Non-aggregated variables: {tuple(non_aggregated)}') - return non_aggregated - - def _determine_size_opts(self, var_info: dict, agg_dims: list) -> list: - """ - Determine the combinations of dimensions from the information - around each variable. Each combination requires a different - ``location`` and ``shape`` fragment array variable in the final - CFA-netCDF file. - """ - - cdimopts = [] - for v in var_info.values(): - cds = v['dims'] - - if (set(agg_dims) & set(cds)): - if cds and sorted(cds) not in cdimopts: - cdimopts.append(cds) - - logger.debug(f'Determined {len(cdimopts)} size options:') - for c in cdimopts: - logger.debug(f' - {tuple(c)}') - return cdimopts - - def _accumulate_attrs(self, attrs: dict, ncattrs: dict) -> dict: - """ - Accumulate attributes from the new source and the existing set. - Ignore fill value attributes as these are handled elsewhere. - If attributes are not equal across files, the ``concat_msg`` is - used to indicate where data users should seek out the source files - for correct values. - """ - - if attrs is None: - first_time = True - attrs = {} - - for attr in ncattrs.keys(): - if attr == '_FillValue': - continue - - if attr not in attrs: - if first_time: - attrs[attr] = ncattrs[attr] - else: - logger.warning(f'AttributeWarning: Attribute "{attr}" not present in all files') - attrs[attr] = self.concat_msg - else: - if attrs[attr] != ncattrs[attr]: - attrs[attr] = self.concat_msg - else: - attrs[attr] = ncattrs[attr] - return attrs - -class CFAWriteMixin: - - def _write_dimensions(self): - """ - Write the collected dimensions in dim_info as new dimensions - in the CFA-netCDF file. So-called ``pure`` dimensions which - have no variable component (no array of values) are defined - with size alone, whereas coordinate dimensions (coordinate - variables) have an associated variable component. The so-called - ``f-dims`` are also created here as the fragmented size of each - coordinate dimension. - - Note: if a coordinate dimension is not fragmented, it still has - an attributed f-dim, equal to 1. - """ - - f_dims = {} - - for dim, di in self.dim_info.items(): - - f_size = di['f_size'] - dim_size = di['size'] - - real_part = self.ds.createDimension( - dim, - dim_size - ) - - frag_part = self.ds.createDimension( - f'f_{dim}', - f_size, - ) - - f_dims[f'f_{dim}'] = f_size - - if di['type'] == 'coord': - axis_var = self.ds.createVariable( - dim, - di['dtype'], - (dim,), # Link to coord dimension - ) - for k, v in di['attrs'].items(): - axis_var.setncattr(k, v) - - axis_var[:] = di['array'] - - else: - for k, v in di['attrs'].items(): - real_part.setncattr(k, v) - - return f_dims - - def _write_variables(self): - """ - Non-aggregated variables are defined exactly the same as in - the fragment files, while aggregated variables contain - ``aggregated_data`` and ``aggregated_dimensions`` attributes, - which link to the fragment array variables. - """ - - for var, meta in self.var_info.items(): - - if 'adims' not in meta: - variable = self._write_nonagg_variable(var, meta) - else: - - agg_dims = ' '.join(meta['dims']) - - num = None - for n, opt in enumerate(self.cdim_opts): - if opt == meta['dims']: - num = n - - if num is None: - raise ValueError( - 'Dimension mismatch issue.' - ) - - agg_data = ' '.join([ - f'location: fragment_location_{num}', - f'address: fragment_address_{var}', - f'shape: fragment_shape_{num}' - ]) - - variable = self._write_aggregated_variable(var, meta, agg_dims, agg_data) - - def _write_fragment_addresses(self): - """ - Create a ``fragment_address`` variable for each variable - which is not dimension-less. - """ - - addrs = [] - for variable, meta in self.var_info.items(): - if 'adims' not in meta: - continue - addr = self.ds.createVariable( - f'fragment_address_{variable}', - str, - (), - ) - addr[:] = np.array(meta['address'], dtype=str) - addrs.append(addr) - - def _write_shape_dims(self, f_dims: dict): - """ - Construct the shape and location dimensions for each - combination of dimensions stored in ``cdim_opts``. This - utilises the so-called ``f-dims`` previously created - for each coordinate dimension. - """ - - for x, opt in enumerate(self.cdim_opts): - ndims = self.ds.createDimension( - f'shape_{x}', - len(opt), - ) - - vopt = tuple([f'f_{o}' for o in opt]) - - if self.max_files > 1: - vopt = vopt + ('versions',) - - location = self.ds.createVariable( - f'fragment_location_{x}', - str, - vopt, - ) - - vshape = [] - for opt in vopt: - vshape.append(f_dims[opt]) - - loc_data = np.reshape(self.location, vshape) - - location[(slice(0, None) for i in vopt)] = np.array(loc_data, dtype=str) - - def _write_fragment_shapes(self): - """ - Construct the ``fragment_shape`` variable part for each - combination of dimensions stored in ``cdim_opts``. This - utilises the ``shape`` dimensions previously created. - """ - - def fill_empty(array, size): - array = list(array) - init_length = int(len(array)) - for x in range(size - init_length): - array.append(0) - return tuple(array) - - cdimlens = {d: len(meta['sizes']) for d, meta in self.dim_info.items() if meta['type'] == 'coord'} - - for num, cdims in enumerate(self.cdim_opts): - # opt is a tuple of the cdimensions for this set of instructions. - - largest = 0 - i_dim = '' - - for d in cdims: - if d not in cdimlens: - continue - if cdimlens[d] > largest: - largest = cdimlens[d] - i_dim = f'f_{d}' - - # Find the largest of the dimensions - # Set dim_sizes accordingly - shape_name = f'fragment_shape_{num}' - - shapes = [] - - for d in cdims: - if 'sizes' in self.dim_info[d]: - sizes = self.dim_info[d]['sizes'] - else: - sizes = (self.dim_info[d]['size'],) - shapes.append(fill_empty(sizes, largest)) - - shape = self.ds.createVariable( - shape_name, - int, # Type - (f'shape_{num}', i_dim) - ) - - shapes = np.array(shapes) - shapes = np.ma.array(shapes, dtype=int, mask=(shapes==0)) - shape[:,:] = shapes - - def _write_aggregated_variable( - self, - var : str, - meta : dict, - agg_dims : str, - agg_data : str - ): - """ - Create the netCDF parameters required for an aggregated variable. - - Note: The dimensions and variables referenced in ``agg_data`` need to - have already been defined for the dataset by this point. - """ - - var_arr = self.ds.createVariable( - var, - meta['dtype'], - (), - fill_value = meta.pop('_FillValue', None), - ) - - for k, v in meta['attrs'].items(): - if k == '_FillValue': - continue - try: - var_arr.setncattr(k, v) - except Exception as err: - logger.warning( - f'Cannot set attribute - {k}: {v} for {var}' - ) - logger.warning(err) - - var_arr.aggregated_dimensions = agg_dims - var_arr.aggregated_data = agg_data - - def _write_nonagg_variable( - self, - var : str, - meta: dict - ): - - """ - Create a non-aggregated variable for the CFA-netCDF file. - If this variable has some attributed data (which it should), - the data is set for this variable in the new file.""" - - var_arr = self.ds.createVariable( - var, - meta['dtype'], - meta['dims'], - fill_value = meta.pop('_FillValue', None), - ) - - for k, v in meta['attrs'].items(): - if k == '_FillValue': - continue - try: - var_arr.setncattr(k, v) - except Exception as err: - logger.warning( - f'Cannot set attribute - {k}: {v} for {var}' - ) - logger.warning(err) - - if 'data' in meta: - var_arr[:] = meta['data'] - -class CFANetCDF(CFACreateMixin, CFAWriteMixin): - - description = 'The CFAPyX Constructor class, for creating new CFA-netCDF files.' - - def __init__(self, files: list, concat_msg : str = CONCAT_MSG): - - """ - Initialise this CFANetCDF instance with some basic values, and filter - the provided set of files. A custom concat message can also be set - here if needed.""" - - if isinstance(files, str): - files = glob.glob(files) - self.files = self._filter_files(files) - else: - self.files = self._filter_files(files) - - self.longest_filename = '' - - self.global_attrs = {} - self.var_info = {} - self.dim_info = {} - - self.fragment_space = None - self.location = None - self.cdim_opts = None - - self.concat_msg = concat_msg - - self.ds = None - - def create( - self, - updates : dict = None, - removals: list = None, - agg_dims: list = None, - ) -> None: - - """ - Perform the operations and passes needed to accumulate the set of - variable/dimension info and attributes to then construct a CFA-netCDF - file.""" - - updates = updates or {} - removals = removals or [] - - # First pass collect info - arranged_files, global_attrs, var_info, dim_info = self._first_pass(agg_dims=agg_dims) - - global_attrs, var_info, dim_info = self._apply_filters(updates, removals, global_attrs, var_info, dim_info) - - # Arrange aggregation dimensions - dim_info, agg_dims = self._arrange_dimensions(dim_info, agg_dims=agg_dims) - var_info = self._apply_agg_dims(var_info, agg_dims) - - # Determine size options and non-aggregated variables - self.cdim_opts = self._determine_size_opts(var_info, agg_dims) - non_aggregated = self._determine_non_aggregated(var_info, agg_dims) - - # Perform a second pass to collect non-aggregated variables if present. - if len(non_aggregated) > 0: - var_info = self._second_pass(var_info, non_aggregated) - - # Define the fragment space - self.fragment_space = [v['f_size'] for v in dim_info.values() if 'f_size' in v] - - # Assemble the location with correct dimensions - location = self._assemble_location(arranged_files, dim_info) - - self.global_attrs = global_attrs - self.dim_info = dim_info - self.var_info = var_info - self.location = location - - def write( - self, - outfile: str - ) -> None: - - """ - Use the accumulated dimension/variable info and attributes to - construct a CFA-netCDF file.""" - - self.ds = netCDF4.Dataset(outfile, mode='w', format='NETCDF4', maskandcale=True) - self.ds.Conventions = 'CF-1.12' - - # Populate global dimensions - for attr, value in self.global_attrs.items(): - self.ds.setncattr(attr, value) - - f_dims = self._write_dimensions() - - if self.max_files > 1: - nfiles = self.ds.createDimension( - 'versions', - self.max_files, - ) - - f_dims['versions'] = self.max_files - - self._write_shape_dims(f_dims) - self._write_fragment_shapes() - self._write_fragment_addresses() - - self._write_variables() - - self.ds.close() - - def _apply_filters(self, updates, removals, global_attrs, var_info, dim_info): - - global_attrs, var_info, dim_info = self._apply_updates(updates, global_attrs, var_info, dim_info) - global_attrs, var_info, dim_info = self._apply_removals(removals, global_attrs, var_info, dim_info) - - return global_attrs, var_info, dim_info - - def _apply_updates(self, updates, global_attrs, var_info, dim_info): - global_u, vars_u, dims_u = {}, {}, {} - for upd in updates.keys(): - if '.' not in upd: - global_u[upd] = updates[upd] - else: - item = upd.split('.')[0] - if item in var_info.keys(): - vars_u[upd] = updates[upd] - elif item in dim_info.keys(): - dims_u[upd] = updates[upd] - else: - logger.warning( - 'Attempting to set an attribute for a var/dim that' - f'is not present: "{item}"' - ) - - for attr, upd in global_u.items(): - global_attrs[attr] = upd - - for attr, upd in vars_u.items(): - (v, vattr) = attr.split('.') - var_info[v]['attrs'][vattr] = upd - - for attr, upd in dims_u.items(): - (d, dattr) = attr.split('.') - dim_info[d]['attrs'][dattr] = upd - - return global_attrs, var_info, dim_info - - def _apply_removals(self, removals, global_attrs, var_info, dim_info): - global_r, vars_r, dims_r = [],[],[] - for rem in removals: - if '.' not in rem: - global_r.append(rem) - else: - item = rem.split('.')[0] - if item in var_info.keys(): - vars_r.append(rem) - elif item in dim_info.keys(): - dims_r.append(rem) - else: - logger.warning( - 'Attempting to remove an attribute for a var/dim that' - f'is not present: "{item}"' - ) - - for rem in global_r: - global_attrs.pop(rem) - - for rem in vars_r: - (v, vattr) = rem.split('.') - var_info[v]['attrs'].pop(rem) - - for rem in dims_r: - (d, dattr) = rem.split('.') - dim_info[v]['attrs'].pop(rem) - - return global_attrs, var_info, dim_info - - def _filter_files(self, files: list) -> list: - """ - Filter the set of files to identify the trailing dimension - indicative of multiple file locations. Also identifies the - length of the longest filename to be used later when storing - numpy string arrays. - """ - - filtered = [] - trailing_file = False - - max_files = 0 - for f in files: - if isinstance(f, tuple): - trailing_file = True - if max_files < len(f): - max_files = len(f) - - for f in files: - if trailing_file: - fileopts = [''] * max_files - if isinstance(f, tuple): - for x, c in enumerate(f): - fileopts[x] = c - else: - fileopts[0] = f - filtered.append(tuple(fileopts)) - else: - filtered.append((f,)) - - self.max_files = max_files - - return filtered - - def _call_file(self, file: str) -> netCDF4.Dataset: - """ - Open the file as a netcdf dataset. If there are multiple filenames - provided, use the first file. Also determine the longest filename - to be used to define the ``location`` parameter later. - """ - - if isinstance(file, tuple): - ds = netCDF4.Dataset(file[0]) - for f in file: - if len(f) > len(self.longest_filename): - self.longest_filename = f - else: - ds = netCDF4.Dataset(file) - if len(file) > len(self.longest_filename): - self.longest_filename = file - - return ds diff --git a/CFAPyX/datastore.py b/CFAPyX/datastore.py deleted file mode 100644 index 27d05d4..0000000 --- a/CFAPyX/datastore.py +++ /dev/null @@ -1,466 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -from xarray.backends import ( - NetCDF4DataStore -) - -from xarray.core.utils import FrozenDict -from xarray.core import indexing -from xarray.coding.variables import pop_to -from xarray.core.variable import Variable - -import netCDF4 -import numpy as np -import re - -from CFAPyX.wrappers import FragmentArrayWrapper -from CFAPyX.decoder import get_fragment_positions, get_fragment_extents -from CFAPyX.group import CFAGroupWrapper - -import logging - -logger = logging.getLogger(__name__) - - -xarray_subs = { - 'file:///':'/' -} - -class CFADataStore(NetCDF4DataStore): - - """ - DataStore container for the CFA-netCDF loaded file. Contains all unpacking routines - directly related to the specific variables and attributes. The ``NetCDF4Datastore`` - Xarray class from which this class inherits, has an ``__init__`` method which - cannot easily be overriden, so properties are used instead for specific variables - that may be un-set at time of use. - """ - - @property - def chunks(self): - if hasattr(self,'_cfa_chunks'): - return self._cfa_chunks - return None - - @chunks.setter - def chunks(self, value): - self._cfa_chunks = value - - @property - def cfa_options(self): - """ - Property of the datastore that relates private option variables to the standard - ``cfa_options`` parameter. - """ - - return { - 'substitutions': self._substitutions, - 'decode_cfa': self._decode_cfa, - 'chunks': self.chunks, - 'chunk_limits': self._chunk_limits, - 'use_active': self.use_active - } - - @cfa_options.setter - def cfa_options(self, value): - self._set_cfa_options(**value) - - def _set_cfa_options( - self, - substitutions=None, - decode_cfa=True, - chunks={}, - chunk_limits=True, - use_active=False, - ): - """ - Method to set cfa options. - - :param substitutions: (dict) Set of provided substitutions to Xarray, - following the CFA conventions on substitutions. - - :param decode_cfa: (bool) Optional setting to disable CFA decoding - in some cases, default is True. - - :param use_active: (bool) Enable for use with XarrayActive. - - :param chunks: (dict) Not implemented in 2024.9.0 - - :param chunk_limits: (dict) Not implemented in 2024.9.0 - """ - - self.chunks = chunks - self._substitutions = substitutions - self._decode_cfa = decode_cfa - self._chunk_limits = chunk_limits - self.use_active = use_active - - def _acquire(self, needs_lock=True): - """ - Fetch the global or group dataset from the Datastore Caching Manager (NetCDF4) - """ - with self._manager.acquire_context(needs_lock) as root: - ds = CFAGroupWrapper.open(root, self._group, self._mode) - - self.conventions = ds.Conventions - - return ds - - def _decode_feature_data(self, feature_data, readd={}): - """ - Decode the value of an object which is expected to be of the form of a - ``feature: variable`` blank-separated element list. - """ - parts = re.split(': | ',feature_data) - - # Anything that uses a ':' needs to be readded after the previous step. - for k, v in readd: - for p in parts: - p.replace(k,v) - - return {k: v for k, v in zip(parts[0::2], parts[1::2])} - - def _check_applied_conventions(self, agg_data): - """ - Check that the aggregated data complies with the conventions specified in the - CFA-netCDF file - """ - - required = ('shape', 'location', 'address') - if 'CFA-0.6.2' in self.conventions.split(' '): - required = ('location', 'file', 'format') - - for feature in required: - if feature not in agg_data: - raise ValueError( - f'CFA-netCDF file is not compliant with {self.conventions} ' - f'Required aggregated data features: "{required}", ' - f'Received "{tuple(agg_data.keys())}"' - ) - - def _perform_decoding( - self, - shape, - address, - location, - array_shape, - value=None, - cformat='', - substitutions=None): - """ - Private method for performing the decoding of the standard ``fragment array - variables``. Any convention version-specific adjustments should be made prior - to decoding with this function, namely in the public method of the same name. - - :param shape: (obj) The integer-valued ``shape`` fragment array variable - defines the shape of each fragment's data in its canonical - form. CF-1.12 section 2.8.1 - - :param address: (obj) The ``address`` fragment array variable, that may - have any data type, defines how to find each fragment - within its fragment dataset. CF-1.12 section 2.8.1 - - :param location: (obj) The string-valued ``location`` fragment array - variable defines the locations of fragment datasets using - Uniform Resource Identifiers (URIs). CF-1.12 section 2.8.1 - - :param value: (obj) *Optional* unique data value to fill a fragment array - where the data values within the fragment are all the same. - - :param cformat: (str) *Optional* ``format`` argument if provided by the - CFA-netCDF or cfa-options parameters. CFA-0.6.2 - - :param substitutions: (dict) Set of substitutions to apply in the form 'base':'sub' - - :returns: (fragment_info) A dictionary of fragment metadata where each - key is the coordinates of a fragment in index space and the - value is a dictionary of the attributes specific to that - fragment. - - """ - - fragment_info = {} - - # Extract non-padded fragment sizes per dimension. - fragment_size_per_dim = [i.compressed().tolist() for i in shape] - - # Derive the total shape of the fragment array in all fragmented dimensions. - fragment_space = [len(fsize) for fsize in fragment_size_per_dim] - - # Obtain the positions of each fragment in index space. - fragment_positions = get_fragment_positions(fragment_size_per_dim) - - global_extent, extent, shapes = get_fragment_extents( - fragment_size_per_dim, - array_shape - ) - - if value is not None: - # -------------------------------------------------------- - # This fragment contains a constant value, not file - # locations. - # -------------------------------------------------------- - fragment_space = value.shape - fragment_info = { - frag_pos: { - "shape": shapes[frag_pos], - "fill_value": value[frag_pos].item(), - "global_extent": global_extent[frag_pos], - "extent": extent[frag_pos], - "format": "full", - } - for frag_pos in fragment_positions - } - - return fragment_info, fragment_space - - if not address.ndim: # Scalar address - addr = address.getValue() - adtype = np.array(addr).dtype - address = np.full(fragment_space, addr, dtype=adtype) - - if cformat != '': - if not cformat.ndim: - cft = cformat.getValue() - npdtype = np.array(cft).dtype - cformat = np.full(fragment_space, cft, dtype=npdtype) - - for frag_pos in fragment_positions: - - fragment_info[frag_pos] = { - "shape" : shapes[frag_pos], - "location" : location[frag_pos], - "address" : address[frag_pos], - "extent" : extent[frag_pos], - "global_extent": global_extent[frag_pos] - } - if hasattr(cformat, 'shape'): - fragment_info[frag_pos]["format"] = cformat[frag_pos] - - # Apply string substitutions to the fragment filenames - if substitutions: - for value in fragment_info.values(): - for base, sub in substitutions.items(): - if isinstance(value['location'], str): - value["location"] = value["location"].replace(base, sub) - else: - for v in value["location"]: - v = v.replace(base, sub) - - return fragment_info, fragment_space - - # Public class methods - - def perform_decoding(self, array_shape, agg_data): - """ - Public method ``perform_decoding`` involves extracting the aggregated - information parameters and assembling the required information for actual - decoding. - """ - - # If not raised an error in checking, we can continue. - self._check_applied_conventions(agg_data) - - cformat = '' - value = None - try: - if 'CFA-0.6.2' in self.conventions: - shape = self.ds.variables[agg_data['location']] - location = self.ds.variables[agg_data['file']] - cformat = self.ds.variables[agg_data['format']] - else: # Default to CF-1.12 - shape = self.ds.variables[agg_data['shape']] - location = self.ds.variables[agg_data['location']] - if 'value' in agg_data: - value = self.ds.variables[agg_data['value']] - - address = self.ds.variables[agg_data['address']] - except Exception as err: - raise ValueError( - 'One or more aggregated data features specified could not be ' - 'found in the data: ' - f'"{tuple(agg_data.keys())}"' - f' - original error: {err}' - ) - - subs = {} - if hasattr(location, 'substitutions'): - subs = location.substitutions.replace('https://', 'https@//') - subs = self._decode_feature_data(subs, readd={'https://':'https@//'}) - - return self._perform_decoding(shape, address, location, array_shape, - cformat=cformat, value=value, - substitutions = xarray_subs | subs) - # Combine substitutions with known defaults for using in xarray. - - def get_variables(self): - """ - Fetch the netCDF4.Dataset variables and perform some CFA decoding if - necessary. - - ``ds`` is now a ``GroupedDatasetWrapper`` object from ``CFAPyX.group`` which - has flattened the group structure and allows fetching of variables and - attributes from the whole group tree from which a specific group may inherit. - - :returns: A ``FrozenDict`` Xarray object of the names of all variables, - and methods to fetch those variables, depending on if those - variables are standard NetCDF4 or CFA Aggregated variables. - """ - - if not self._decode_cfa: - return FrozenDict( - (k, self.open_variable(k, v)) for k, v in self.ds.variables.items() - ) - - # Determine CFA-aggregated variables - all_vars, real_vars = {}, {} - - fragment_array_vars = [] - - ## Ignore variables in the set of standardised terms. - for avar in self.ds.variables.keys(): - cfa = False - ## CF-Compliant method of identifying aggregated variables. - if hasattr(self.ds.variables[avar], 'aggregated_dimensions'): - cfa = True - - agg_data = self.ds.variables[avar].aggregated_data.split(' ') - - for vname in agg_data: - fragment_array_vars += re.split(': | ',vname) - - all_vars[avar] = (self.ds.variables[avar], cfa) - - # Ignore fragment array variables at this stage of decoding. - for var in all_vars.keys(): - if var not in fragment_array_vars: - real_vars[var] = all_vars[var] - - - return FrozenDict( - (k, self.open_variable(k, v)) for k, v in real_vars.items() - ) - - def get_attrs(self): - """ - Produce the FrozenDict of attributes from the ``NetCDF4.Dataset`` or - ``CFAGroupWrapper`` in the case of using a group or nested group tree. - """ - return FrozenDict((k, self.ds.getncattr(k)) for k in self.ds.ncattrs()) - - def open_variable(self, name: str, var): - """ - Open a CFA-netCDF variable as either a standard NetCDF4 Datastore variable - or as a CFA aggregated variable which requires additional decoding. - - :param name: (str) A named NetCDF4 variable. - - :param var: (obj) The NetCDF4.Variable object or a tuple with the contents - ``(NetCDF4.Variable, cfa)`` where ``cfa`` is a bool that - determines if the variable is a CFA or standard variable. - - :returns: The variable object opened as either a standard store variable - or CFA aggregated variable. - """ - if type(var) == tuple: - if var[1] and self._decode_cfa: - variable = self.open_cfa_variable(name, var[0]) - else: - variable = self.open_store_variable(name, var[0]) - else: - variable = self.open_store_variable(name, var) - return variable - - def open_cfa_variable(self, name: str, var): - """ - Open a CFA Aggregated variable with the correct parameters to create an - Xarray ``Variable`` instance. - - :param name: (str) A named NetCDF4 variable. - - :param var: (obj) The NetCDF4.Variable object or a tuple with the - contents ``(NetCDF4.Variable, cfa)`` where ``cfa`` is - a bool that determines if the variable is a CFA or - standard variable. - - :returns: An xarray ``Variable`` instance constructed from the - attributes provided here, and data provided by a - ``FragmentArrayWrapper`` which is indexed by Xarray's - ``LazilyIndexedArray`` class. - """ - - real_dims = { - d: self.ds.dimensions[d].size for d in var.aggregated_dimensions.split(' ') - } - agg_data = self._decode_feature_data(var.aggregated_data) - - ## Array Metadata - dimensions = tuple(real_dims.keys()) - array_shape = tuple(real_dims.values()) - - fragment_info, fragment_space = self.perform_decoding(array_shape, agg_data) - - units = '' - if hasattr(var, 'units'): - units = getattr(var, 'units') - if hasattr(var, 'aggregated_units'): - units = getattr(var, 'aggregated_units') - - ## Get non-aggregated attributes. - attributes = {} - for k in var.ncattrs(): - if 'aggregated' not in k: - attributes[k] = var.getncattr(k) - - ## Array-like object - data = indexing.LazilyIndexedArray( - FragmentArrayWrapper( - fragment_info, - fragment_space, - shape=array_shape, - units=units, - dtype=var.dtype, - cfa_options=self.cfa_options, - named_dims=dimensions, - )) - - encoding = {} - if isinstance(var.datatype, netCDF4.EnumType): - encoding["dtype"] = np.dtype( - data.dtype, - metadata={ - "enum": var.datatype.enum_dict, - "enum_name": var.datatype.name, - }, - ) - else: - encoding["dtype"] = var.dtype - - if data.dtype.kind == "S" and "_FillValue" in attributes: - attributes["_FillValue"] = np.bytes_(attributes["_FillValue"]) - - filters = var.filters() - if filters is not None: - encoding.update(filters) - chunking = var.chunking() - if chunking is not None: - if chunking == "contiguous": - encoding["contiguous"] = True - encoding["chunksizes"] = None - else: - encoding["contiguous"] = False - encoding["chunksizes"] = tuple(chunking) - encoding["preferred_chunks"] = dict(zip(var.dimensions, chunking)) - # TODO: figure out how to round-trip "endian-ness" without raising - # warnings from netCDF4 - # encoding['endian'] = var.endian() - pop_to(attributes, encoding, "least_significant_digit") - # save source so __repr__ can detect if it's local or not - encoding["source"] = self._filename - encoding["original_shape"] = data.shape - - v = Variable(dimensions, data, attributes, encoding) - return v - diff --git a/CFAPyX/decoder.py b/CFAPyX/decoder.py deleted file mode 100644 index fa8e36d..0000000 --- a/CFAPyX/decoder.py +++ /dev/null @@ -1,112 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -from itertools import accumulate, product - -import logging - -logger = logging.getLogger(__name__) - -def get_fragment_positions(fragment_size_per_dim): - """ - Get the positions in index space for each fragment. - - :param fragment_size_per_dim: (list) The set of fragment sizes per dimension. first dimension has length - equal to the number of array dimensions, second dimension is a list of the - fragment sizes for the corresponding array dimension. - - :returns: A list of tuples representing the positions of all the fragments in index space given by the - fragment_size_per_dim. - - """ - return product(*(range(len(sizes)) for sizes in fragment_size_per_dim)) - -def get_fragment_extents(fragment_size_per_dim, array_shape): - """ - Return descriptors for every fragment. Copied from cf-python version 3.14.0 onwards. - - :param fragment_size_per_dim: (list) The set of fragment sizes per dimension. first dimension has length - equal to the number of array dimensions, second dimension is a list of the - fragment sizes for the corresponding array dimension. - - :param array_shape: (tuple) The shape of the total array in ``array space``. - - :returns: - - global_extents - The array of slice objects for each fragment which define where the fragment - slots into the total array. - - extents - The extents to be applied to each fragment, usually just the whole fragment array. - - shapes - The shape of each fragment in ``array space``. - - """ - - ndim = len(fragment_size_per_dim) - - initial = [0 for i in range(ndim)] - final = [len(i) for i in fragment_size_per_dim] - - fragmented_dims = [i for i in range(len(fragment_size_per_dim)) if len(fragment_size_per_dim[i]) != 1] - - dim_shapes = [] - dim_indices = [] - f_locations = [] - for dim, fs in enumerate(fragment_size_per_dim): - - # (num_fragments) for each dimension - dim_shapes.append(fs) - - fsa = tuple(accumulate((0,) + tuple(fs))) - - if dim in fragmented_dims: - # Same as s_locations - f_locations.append(tuple(range(len(fs)))) - else: - # No fragmentation along this dimension - # (0, 0, 0, 0 ...) in each non-fragmented dimension. - f_locations.append((0,) * len(fs)) - - # List of slices a to a+1 for every item in the new fs. - dim_indices.append([slice(i, j) for i, j in zip(fsa[:-1], fsa[1:])]) - - # Transform f_locations to get a dict of positions with a slice and shape for each. - positions = [ - coord for coord in product( - *[range(r[0], r[1]) for r in zip(initial, final)] - ) - ] - - f_indices = [] - for dim, u in enumerate(dim_indices): - if dim in fragmented_dims: - f_indices.append( (slice(None),) * len(u)) - else: - f_indices.append( u ) - - f_shapes = [ - dim_shape if dim in fragmented_dims else (size,) * len(dim_shape) - for dim, (dim_shape, size) in enumerate(zip(dim_shapes, array_shape)) - ] - - global_extents = {} - extents = {} - shapes = {} - - for frag_pos in positions: - extents[frag_pos] = [] - global_extents[frag_pos] = [] - shapes[frag_pos] = [] - for a, i in enumerate(frag_pos): - extents[frag_pos].append( - f_indices[a][i] - ) - global_extents[frag_pos].append( - dim_indices[a][i] - ) - shapes[frag_pos].append( - f_shapes[a][i] - ) - - return global_extents, extents, shapes diff --git a/CFAPyX/group.py b/CFAPyX/group.py deleted file mode 100644 index 0f22cc2..0000000 --- a/CFAPyX/group.py +++ /dev/null @@ -1,144 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -import logging - -logger = logging.getLogger(__name__) - -class VariableWrapper: - """ - Wrapper object for the ``ds.variables`` and ``ds.attributes`` objects which can handle - either ``global`` or ``group`` based variables . - """ - - def __init__(self, prop_sets): - - # Note: core_ds refers to either the group, or the global ds if there is no group requested. - - self._core_props = prop_sets[0] - - props = {} - for prop in prop_sets: - props = props | prop - - self._properties = props - - def __getitem__(self, item): - """ - Requesting a named or indexed variable within ds.variables, needs - to be handled for the two sets of variables. - """ - - if type(item) == int: - item = list(self.keys())[item] - - if item in self._properties: - return self._properties[item] - else: - raise ValueError( - f'"{item}" not present in Dataset.' - ) - - def keys(self): - """ - Requesting the set of keys should return the set of both keys combined in a ``dict_keys`` object. - """ - return self._properties.keys() - - def items(self): - return self._properties.items() - - def __getattr__(self, attr): - try: - return getattr(self._core_props, attr) - except: - raise AttributeError( - f'No such attribute: "{attr}' - ) - -class CFAGroupWrapper: - """ - Wrapper object for the CFADataStore ``ds`` parameter, required to bypass the issue - with groups in Xarray, where all variables outside the group are ignored. - """ - def __init__(self, var_sets, ds_sets): - - self.variables = VariableWrapper( - var_sets, - ) - - self._ds_sets = ds_sets - - self.Conventions = '' - if hasattr(ds_sets[0],'Conventions'): - self.Conventions = ds_sets[0].Conventions - - @classmethod - def open(cls, root, group, mode): - - if group in {None, "", "/"}: - # use the root group - return root - else: - # make sure it's a string - if not isinstance(group, str): - raise ValueError("group must be a string or None") - - # support path-like syntax - path = group.strip("/").split("/") - var_sets = [root.variables] - ds_sets = [root] - - for part in path: - try: - var_sets.append(root.groups[part].variables) - ds_sets.append(root.groups[part]) - root = root.groups[part] - except KeyError as e: - raise ValueError( - f'Group path "{part}" not found in this dataset.' - ) - - return cls(var_sets, ds_sets) - - @property - def dimensions(self): - dims = {} - for ds in self._ds_sets: - dims = dims | dict(ds.dimensions) - return dims - - def ncattrs(self): - attrs = [] - for ds in self._ds_sets: - attrs += list(ds.ncattrs()) # Determine return type - return attrs - - def getncattr(self, k): - for ds in self._ds_sets: - try: - return ds.getncattr(k) - except: - pass - raise AttributeError( - f'Attribute "{k}" not found.' - ) - - @property - def variables(self): - return self._variables - - @variables.setter - def variables(self, value): - self._variables = value - - def __getattr__(self, name): - for ds in self._ds_sets: - try: - return getattr(ds, name) - except: - pass - raise AttributeError( - f'Attribute "{name}" not found.' - ) \ No newline at end of file diff --git a/CFAPyX/wrappers.py b/CFAPyX/wrappers.py deleted file mode 100644 index 17f5a6e..0000000 --- a/CFAPyX/wrappers.py +++ /dev/null @@ -1,518 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -from arraypartition import ( - ArrayPartition, - ArrayLike, - get_chunk_shape, - get_chunk_space, - get_chunk_positions, - get_chunk_extent, - get_dask_chunks, - combine_slices, - normalize_partition_chunks -) - -import dask.array as da -from dask.array.core import getter -from dask.base import tokenize -from dask.utils import SerializableLock, is_arraylike -from dask.array.reductions import numel - -from itertools import product -import math -import numpy as np - -try: - from XarrayActive import ActiveOptionsContainer -except: - class ActiveOptionsContainer: - pass - -import logging - -logger = logging.getLogger(__name__) - -class CFAOptionsMixin: - """ - Simple container for CFA options properties. - """ - - __slots__ = ( - 'chunks', - '_chunk_limits', - '_substitutions', - '_decode_cfa' - ) - - @property - def cfa_options(self): - """ - Relates private option variables to the ``cfa_options`` parameter of the backend. - """ - - return { - 'substitutions': self._substitutions, - 'decode_cfa': self._decode_cfa, - 'chunks': self.chunks, - 'chunk_limits':self._chunk_limits - } - - @cfa_options.setter - def cfa_options(self, value): - self._set_cfa_options(**value) - - def _set_cfa_options( - self, - substitutions=None, - decode_cfa=None, - chunks={}, - chunk_limits=None, - use_active=False, - **kwargs): - """ - Sets the private variables referred by the ``cfa_options`` parameter to the backend. - Ignores additional kwargs. - """ - - self._substitutions = substitutions - self._decode_cfa = decode_cfa - self._chunk_limits = chunk_limits - self.chunks = chunks - self.use_active = use_active - -class FragmentArrayWrapper(ArrayLike, CFAOptionsMixin, ActiveOptionsContainer): - """ - FragmentArrayWrapper behaves like an Array that can be indexed or referenced to - return a Dask-like array object. This class is essentially a constructor for the - partitions that feed into the returned Dask-like array into Xarray. - """ - - description = 'Wrapper-class for the array of fragment objects' - - def __init__( - self, - fragment_info, - fragment_space, - shape, - units, - dtype, - cfa_options={}, - named_dims=None - ): - """ - Initialisation method for the FragmentArrayWrapper class - - :param fragment_info: (dict) The information relating to each fragment with the - fragment coordinates in ``fragment space`` as the key. Each - fragment is described by the following: - - ``shape`` - The shape of the fragment in ``array space``. - - ``location`` - The file from which this fragment originates. - - ``address`` - The variable and group name relating to this variable. - - ``extent`` - The slice object to apply to the fragment on retrieval (usually get - the whole array) - - ``global_extent`` - The slice object that equates to a particular fragment out - of the whole array (in ``array space``). - - :param fragment_space: (tuple) The coordinate system that refers to individual - fragments. Each coordinate eg. i, j, k refers to the - number of fragments in each of the associated dimensions. - - :param shape: (tuple) The total shape of the array in ``array space`` - - :param units: (obj) The units of the values represented in this Array-like class. - - :param dtype: (obj) The datatype of the values represented in this Array-like class. - - :param cfa_options: (dict) The set of options defining some specific decoding behaviour. - - :param named_dims: (list) The set of dimension names that apply to this Array object. - - :returns: None - """ - - self.fragment_info = fragment_info - self.fragment_space = fragment_space - self.named_dims = named_dims - - super().__init__(shape, dtype=dtype, units=units) - - # Set internal private variables - self.cfa_options = cfa_options - - self._apply_substitutions() - - self.__array_function__ = self.__array__ - - def __getitem__(self, selection): - """ - Non-lazy retrieval of the dask array when this object is indexed. - """ - arr = self.__array__() - return arr[tuple(selection)] - - def __array__(self): - """ - Non-lazy array construction, this will occur as soon as the instance is ``indexed`` - or any other ``array`` behaviour is attempted. Construction of a Dask-like array - occurs here based on the decoded fragment info and any other specified settings. - """ - - array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) - - dtype = self.dtype - units = self.units - - calendar = None # Fix later - - # Fragment info dict at this point - fragment_info = self.fragment_info - - # For now expect to deal only with NetCDF Files - - # dict of array-like objects to pass to the dask Array constructor. - fragments = {} - - for pos in self.fragment_info.keys(): - - fragment_shape = self.fragment_info[pos]['shape'] - fragment_position = pos - global_extent = self.fragment_info[pos]['global_extent'] - extent = self.fragment_info[pos]['extent'] - - fragment_format = 'nc' - - if 'fill_value' in self.fragment_info[pos]: - filename = None - address = None - # Extra handling required for this condition. - else: - filename = self.fragment_info[pos]['location'] - address = self.fragment_info[pos]['address'] - - # Wrong extent type for both scenarios but keep as a different label for - # dask chunking. - - fragment = CFAPartition( - filename, - address, - dtype=dtype, - extent=extent, - shape=fragment_shape, - position=fragment_position, - aggregated_units=units, - aggregated_calendar=calendar, - format=fragment_format, - named_dims=self.named_dims, - global_extent=global_extent - ) - - fragments[pos] = fragment - - if not self.chunks: - dsk = self._assemble_dsk_dict(fragments, array_name) - - global_extent = { - k: fragment_info[k]["global_extent"] for k in fragment_info.keys() - } - - dask_chunks = get_dask_chunks( - self.shape, - self.fragment_space, - extent=global_extent, - dtype=self.dtype, - explicit_shapes=None - ) - - else: - dask_chunks, partitions = self._create_partitions(fragments) - - dsk = self._assemble_dsk_dict(partitions, array_name) - - darr = self._assemble_array(dsk, array_name[0], dask_chunks) - return darr - - def _optimise_chunks(self): - """ - Replace the keyword ``optimised`` in the provided chunks with a chunk size for - the specified dimension that will be most optimised. The optimal chunk sizes are such - that the number of chunks is close to a power of 2.""" - - auto_chunks = {} - for c in self.chunks: - if self.chunks[c] == 'optimised': - auto_chunks[c] = 'auto' - else: - auto_chunks[c] = self.chunks[c] - - nchunks = normalize_partition_chunks( - auto_chunks, - self.shape, - self.dtype, - self.named_dims - ) - - # For each 'optimised' dimension, take the log2 of the number of chunks (len) - # and round to the nearest integer. Divide the array length by 2^(this number) and - # round again to give the optimised chunk size for that dimension. - - for x, nd in enumerate(self.named_dims): - if nd not in self.chunks: - continue - - if self.chunks[nd] == 'optimised': - nchunk = len(nchunks[x]) - - power = round(math.log2(nchunk)) - opsize = round(self.shape[x]/2**power) - - self.chunks[nd] = opsize - - def _create_partitions(self, fragments): - """ - Creates a partition structure that falls along the existing fragment boundaries. - This is done by simply chunking each fragment given the user provided chunks, rather - than the whole array, because user provided chunk sizes apply to each fragment equally. - - :param fragments: (dict) The set of fragment objects (CFAPartitions) in ``fragment space`` - before any chunking is applied. - - :returns: The set of dask chunks to provide to dask when building the array and the corresponding - set of copied fragment objects for each partition. - """ - if 'optimised' in self.chunks.items(): - # Running on standard dask chunking mode. - self._optimise_chunks() - - dask_chunks = [[] for i in range(self.ndim)] - fragment_coverage = [[] for i in range(self.ndim)] - for dim in range(self.ndim): - for x in range(self.fragment_space[dim]): - # Position eg. 0, 0, X - position = [0 for i in range(self.ndim)] - position[dim] = x - - fragment = fragments[tuple(position)] - - dchunks = normalize_partition_chunks( # Needs the chunks - self.chunks, - fragment.shape, - dtype=self.dtype, - named_dims=self.named_dims - ) - - dask_chunks[dim] += dchunks[dim] - fragment_coverage[dim].append(len(dchunks[dim])) - - def outer_cumsum(array): - cumsum = np.cumsum(array) - cumsum = np.append(cumsum, 0) - return np.roll(cumsum,1) - - def global_combine(internal, external): - local = [] - for dim in range(len(internal)): - start = internal[dim].start - external[dim].start - stop = internal[dim].stop - external[dim].start - local.append(slice(start,stop)) - return local - - fragment_cumul = [outer_cumsum(d) for d in fragment_coverage] - partition_cumul = [outer_cumsum(p) for p in dask_chunks] - partition_space = [len(d) for d in dask_chunks] - - partitions = {} - partition_coords = get_chunk_positions(partition_space) - for coord in partition_coords: - fragment_coord = [] - internal = [] - for dim, c in enumerate(coord): - cumulative = fragment_cumul[dim] - - if c < cumulative[0]: - cumul = cumulative[0] - else: - cumul = max(filter(lambda l: l <= c, cumulative)) - - fc = np.where(cumulative == cumul)[0] - fragment_coord.append(int(fc)) - - ext = slice( - partition_cumul[dim][c], - partition_cumul[dim][c+1] - ) - internal.append(ext) - - # Currently applying GLOBAl extent not internal extent to each fragment. - - source = fragments[tuple(fragment_coord)] - external = source.global_extent - extent = global_combine(internal, external) - - partitions[coord] = source.copy(extent=extent) - - return dask_chunks, partitions - - def _assemble_dsk_dict(self, partitions, array_name): - """ - Assemble the base ``dsk`` task dependency graph which includes the fragment objects - plus the method to index each object (with locking). - - :param partitions: (dict) The set of partition objects (CFAPartition) with - their positions in the relevant ``space``. - - :returns: A task dependency graph with all the partitions included to use - when constructing the dask array. - """ - - dsk = {} - for part_position in partitions.keys(): - part = partitions[part_position] - - p_identifier = f"{part.__class__.__name__}-{tokenize(part)}" - dsk[p_identifier] = part - dsk[array_name + part_position] = ( - getter, - p_identifier, - part.get_extent(), - False, - getattr(part, "_lock", False) # Check version cf-python - ) - return dsk - - def _apply_substitutions(self): - """ - Perform substitutions for this fragment array. - """ - if not self._substitutions: - return - - if type(self._substitutions) != list: - self._substitutions = [self._substitutions] - - for s in self._substitutions: - base, substitution = s.split(':') - for f in self.fragment_info.keys(): - - if isinstance(self.fragment_info[f]['location'], str): - self.fragment_info[f]['location'] = self.fragment_info[f]['location'].replace(base, substitution) - else: - for finfo in self.fragment_info[f]['location']: - finfo = finfo.replace(base, substitution) - - def _assemble_array(self, dsk, array_name, dask_chunks): - - """ - Assemble the dask/dask-like array for this FragmentArrayWrapper from the - assembled ``dsk`` dict and set of dask chunks. Also provides an array name - for the dask tree to register. - """ - - meta = da.empty(self.shape, dtype=self.dtype) - if not hasattr(self, 'use_active'): - darr = da.Array(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta) - return darr - - if not self.use_active: - darr = da.Array(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta) - return darr - try: - from XarrayActive import DaskActiveArray - - darr = DaskActiveArray(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta) - except ImportError: - raise ImportError( - '"DaskActiveArray" from XarrayActive failed to import - please ensure ' - 'you have the XarrayActive package installed.' - ) - return darr - -class CFAPartition(ArrayPartition): - """ - Wrapper object for a CFA Partition, extends the basic ArrayPartition with CFA-specific - methods. - """ - - description = 'Wrapper object for a CFA Partition (Fragment or Chunk)' - - - def __init__(self, - filename, - address, - aggregated_units=None, - aggregated_calendar=None, - global_extent=None, - **kwargs - ): - - """ - Wrapper object for the 'array' section of a fragment. Contains some metadata - to ensure the correct fragment is selected, but generally just serves the - fragment array to dask when required. - - :param filename: (str) The path to a Fragment file from which this - partition object will access data from. The partition may represent - all or a subset of the data from the Fragment file. - - :param address: (str) The address of the data variable within the - Fragment file, may include a group hierarchy structure. - - :param aggregated_units: (obj) The expected units for the received data. - If the units of the received data are not equal to the ``aggregated_units`` - then the data is 'post-processed' using the cfunits ``conform`` function. - - :param aggregated_calendar: None - """ - - super().__init__(filename, address, units=aggregated_units, **kwargs) - self.aggregated_units = aggregated_units - self.aggregated_calendar = aggregated_calendar - self.global_extent = global_extent - - def copy(self, extent=None): - """ - Create a new instance of this class from its own methods and attributes, and - apply a new extent to the copy if required. - """ - - kwargs = self.get_kwargs() - - if 'units' in kwargs: - if not kwargs['aggregated_units']: - kwargs['aggregated_units'] = kwargs['units'] - kwargs.pop('units') - - if extent: - kwargs['extent'] = combine_slices(self.shape, list(self.get_extent()), extent) - kwargs['global_extent'] = combine_slices(self.shape, list(self.global_extent), extent) - - new = CFAPartition( - self.filename, - self.address, - **kwargs - ) - return new - - def _post_process_data(self, data): - """Correct units/data conversions - if necessary at this stage""" - - if self.units != self.aggregated_units: - try: - from cfunits import Units - except FileNotFoundError: - raise ValueError( - 'Encountered issue when trying to import the "cfunits" library:' - "cfunits requires UNIDATA UDUNITS-2. Can't find the 'udunits2' library." - ' - Consider setting up a conda environment, and installing ' - '`conda install -c conda-forge udunits2`' - ) - - data = Units.conform(data, self.units, self.aggregated_units) - return data - - def get_kwargs(self): - return { - 'aggregated_units': self.aggregated_units, - 'aggregated_calendar': self.aggregated_calendar - } | super().get_kwargs() \ No newline at end of file