diff --git a/cads_adaptors/adaptors/cds.py b/cads_adaptors/adaptors/cds.py index dcbe9147..65d4a6a6 100644 --- a/cads_adaptors/adaptors/cds.py +++ b/cads_adaptors/adaptors/cds.py @@ -1,7 +1,10 @@ -from typing import Any +import os +from copy import deepcopy +from typing import Any, Union -from cads_adaptors import constraints, costing +from cads_adaptors import constraints, costing, mapping from cads_adaptors.adaptors import AbstractAdaptor, Context, Request +from cads_adaptors.tools.general import ensure_list class AbstractCdsAdaptor(AbstractAdaptor): @@ -15,6 +18,11 @@ def __init__(self, form: dict[str, Any], **config: Any): self.licences: list[tuple[str, int]] = config.pop("licences", []) self.config = config self.context = Context() + # The following attributes are updated during the retireve method + self.input_request: Request = Request() + self.mapped_request: Request = Request() + self.download_format: str = "zip" + self.receipt: bool = False def validate(self, request: Request) -> bool: return True @@ -28,3 +36,88 @@ def estimate_costs(self, request: Request) -> dict[str, int]: def get_licences(self, request: Request) -> list[tuple[str, int]]: return self.licences + + # This is essentially a second __init__, but only for when we have a request at hand + # and currently only implemented for retrieve methods + def _pre_retrieve(self, request: Request, default_download_format="zip"): + self.input_request = deepcopy(request) + self.receipt = request.pop("receipt", True) + self.download_format = request.pop("download_format", default_download_format) + self.mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore + + def make_download_object( + self, + paths: Union[str, list], + **kwargs, + ): + from cads_adaptors.tools import download_tools + + # Allow possibility of over-riding the download format from the adaptor + download_format = kwargs.get("download_format", self.download_format) + + paths = ensure_list(paths) + filenames = [os.path.basename(path) for path in paths] + # TODO: use request-id instead of hash + kwargs.setdefault( + "base_target", f"{self.collection_id}-{hash(tuple(self.input_request))}" + ) + + # Allow adaptor possibility of over-riding request value + if kwargs.get("receipt", self.receipt): + receipt_kwargs = kwargs.pop("receipt_kwargs", {}) + kwargs.setdefault( + "receipt", self.make_receipt(filenames=filenames, **receipt_kwargs) + ) + + return download_tools.DOWNLOAD_FORMATS[download_format](paths, **kwargs) + + def make_receipt( + self, + input_request: Union[Request, None] = None, + download_size: Any = None, + filenames: list = [], + **kwargs, + ) -> dict[str, Any]: + """ + Create a receipt to be included in the downloaded archive. + + **kwargs contains any other fields that are calculated during the runtime of the adaptor + """ + from datetime import datetime as dt + + # Allow adaptor to override and provide sanitized "input_request" if necessary + if input_request is None: + input_request = self.input_request + + # Update kwargs with default values + if download_size is None: + download_size = "unknown" + + receipt = { + "collection-id": self.collection_id, + "request": input_request, + "request-timestamp": dt.now().strftime("%Y-%m-%d %H:%M:%S"), + "download-size": download_size, + "filenames": filenames, + # Get static URLs: + "user-support": "https://support.ecmwf.int", + "privacy-policy": "https://cds.climate.copernicus.eu/disclaimer-privacy", + # TODO: Change to URLs for licence instead of slug + "licence": [ + f"{licence[0]} (version {licence[1]})" for licence in self.licences + ], + # TODO: Add request-id information to the context + "request-uid": self.config.get("request_uid", "Unavailable"), + # + # TODO: Add URL/DNS information to the context for populating these fields: + # "web-portal": self.???, # Need update to information available to adaptors + # "api-access": "https://url-to-data-api/{self.collection_id}" + # "metadata-api-access": "https://url-to-metadata-api/{self.collection_id}" + # + # TODO: Add metadata information to config, this could also be done via the metadata api + # "citation": self.???, # Need update to information available to adaptors + **kwargs, + **self.config.get("additional_receipt_info", {}), + } + + return receipt diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index ad90168e..7636f3e7 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -1,11 +1,41 @@ import os from typing import BinaryIO, Union -from cads_adaptors import mapping from cads_adaptors.adaptors import Context, Request, cds from cads_adaptors.tools.general import ensure_list +def convert_format( + result: str, + data_format: str, + context: Context | None = None, + **kwargs, +) -> list: + # NOTE: The NetCDF compressed option will not be visible on the WebPortal, it is here for testing + if data_format in ["netcdf", "nc", "netcdf_compressed"]: + if data_format in ["netcdf_compressed"]: + to_netcdf_kwargs = { + "compression_options": "default", + } + else: + to_netcdf_kwargs = {} + from cads_adaptors.tools.convertors import grib_to_netcdf_files + + # Give the power to overwrite the to_netcdf kwargs from the request + to_netcdf_kwargs = {**to_netcdf_kwargs, **kwargs} + paths = grib_to_netcdf_files(result, **to_netcdf_kwargs) + elif data_format in ["grib", "grib2", "grb", "grb2"]: + paths = [result] + else: + if context is not None: + context.stdout = ( + context.user_visible_log + ) = "WARNING: Unrecoginsed data_format requested, returning as original grib/grib2 format" + paths = [result] + + return paths + + def execute_mars( request: Union[Request, list], target: str = "data.grib", @@ -58,43 +88,27 @@ def retrieve(self, request: Request) -> BinaryIO: return open(result) # type: ignore -class MarsCdsAdaptor(DirectMarsCdsAdaptor): +class MarsCdsAdaptor(cds.AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: - from cads_adaptors.tools import download_tools + # TODO: Remove legacy syntax all together + if "format" in request: + _data_format = request.pop("format") + request.setdefault("data_format", _data_format) - # Format of data files, grib or netcdf - data_format = request.pop("format", "grib") # TODO: remove legacy syntax? - data_format = request.pop("data_format", data_format) + data_format = request.pop("data_format", "grib") - if data_format in ["netcdf", "nc", "netcdf_compressed"]: - default_download_format = "zip" - else: - default_download_format = "as_source" - - # Format of download archive, as_source, zip, tar, list etc. - download_format = request.pop("download_format", default_download_format) + # Allow user to provide format conversion kwargs + convert_kwargs = request.pop("convert_kwargs", {}) - mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore + # To preserve existing ERA5 functionality the default download_format="as_source" + request.setdefault("download_format", "as_source") - result = execute_mars(mapped_request, context=self.context) + self._pre_retrieve(request=request) - # NOTE: The NetCDF compressed option will not be visible on the WebPortal, it is here for testing - if data_format in ["netcdf", "nc", "netcdf_compressed"]: - if data_format in ["netcdf_compressed"]: - to_netcdf_kwargs = { - "compression_options": "default", - } - else: - to_netcdf_kwargs = {} - from cads_adaptors.tools.convertors import grib_to_netcdf_files + result = execute_mars(self.mapped_request, context=self.context) - results = grib_to_netcdf_files(result, **to_netcdf_kwargs) - else: - results = [result] - - download_kwargs = { - "base_target": f"{self.collection_id}-{hash(tuple(request))}" - } - return download_tools.DOWNLOAD_FORMATS[download_format]( - results, **download_kwargs + paths = convert_format( + result, data_format, context=self.context, **convert_kwargs ) + + return self.make_download_object(paths) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index c249303b..9b2ea1fe 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -1,4 +1,5 @@ import typing as T +from copy import deepcopy from cads_adaptors import AbstractCdsAdaptor, mapping from cads_adaptors.adaptors import Request @@ -38,9 +39,11 @@ def split_request( return this_request def retrieve(self, request: Request): - from cads_adaptors.tools import adaptor_tools, download_tools + from cads_adaptors.tools import adaptor_tools - download_format = request.pop("download_format", "zip") + self.input_request = deepcopy(request) + self.receipt = request.pop("receipt", False) + self.download_format = request.pop("download_format", "zip") these_requests = {} exception_logs: T.Dict[str, str] = {} @@ -57,7 +60,8 @@ def retrieve(self, request: Request): # TODO: check this_request is valid for this_adaptor, or rely on try? # i.e. split_request does NOT implement constraints. if len(this_request) > 0: - this_request.setdefault("download_format", "list") + this_request["download_format"] = "list" + this_request["receipt"] = False these_requests[this_adaptor] = this_request results = [] @@ -79,25 +83,23 @@ def retrieve(self, request: Request): # get the paths paths = [res.name for res in results] - download_kwargs = dict( - base_target=f"{self.collection_id}-{hash(tuple(results))}" - ) - - return download_tools.DOWNLOAD_FORMATS[download_format]( - paths, **download_kwargs + return self.make_download_object( + paths, ) class MultiMarsCdsAdaptor(MultiAdaptor): def retrieve(self, request: Request): """For MultiMarsCdsAdaptor we just want to apply mapping from each adaptor.""" - from cads_adaptors.adaptors.mars import execute_mars - from cads_adaptors.tools import adaptor_tools, download_tools + from cads_adaptors.adaptors.mars import convert_format, execute_mars + from cads_adaptors.tools import adaptor_tools - download_format = request.pop("download_format", "as_source") + self.input_request = deepcopy(request) + self.receipt = request.pop("receipt", False) + self.download_format = request.pop("download_format", "zip") # Format of data files, grib or netcdf - request.pop("format", "grib") + data_format = request.pop("format", "grib") mapped_requests = [] logger.debug(f"MultiMarsCdsAdaptor, full_request: {request}") @@ -105,8 +107,6 @@ def retrieve(self, request: Request): this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form) this_values = adaptor_desc.get("values", {}) - # logger.debug(f"MultiMarsCdsAdaptor, {adaptor_tag}, config: {this_adaptor.config}") - this_request = self.split_request( request, this_values, **this_adaptor.config ) @@ -122,12 +122,6 @@ def retrieve(self, request: Request): logger.debug(f"MultiMarsCdsAdaptor, mapped_requests: {mapped_requests}") result = execute_mars(mapped_requests, context=self.context) - # TODO: Handle alternate data_format + paths = convert_format(result, data_format) - download_kwargs = { - "base_target": f"{self.collection_id}-{hash(tuple(request))}" - } - - return download_tools.DOWNLOAD_FORMATS[download_format]( - [result], **download_kwargs - ) + return self.make_download_object(paths) diff --git a/cads_adaptors/adaptors/url.py b/cads_adaptors/adaptors/url.py index b0c47c9a..91b405d4 100644 --- a/cads_adaptors/adaptors/url.py +++ b/cads_adaptors/adaptors/url.py @@ -1,34 +1,26 @@ from typing import BinaryIO -from cads_adaptors import mapping from cads_adaptors.adaptors import Request, cds class UrlCdsAdaptor(cds.AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: - from cads_adaptors.tools import download_tools, url_tools + # TODO: Remove legacy syntax all together + if "format" in request: + _download_format = request.pop("format") + request.setdefault("download_format", _download_format) - download_format = request.pop("format", "zip") # TODO: Remove legacy syntax - # CADS syntax over-rules legacy syntax - download_format = request.pop("download_format", download_format) + self._pre_retrieve(request=request) - # Do not need to check twice - # if download_format not in {"zip", "tgz"}: - # raise ValueError(f"{download_format} is not supported") - - mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore + from cads_adaptors.tools import url_tools # Convert request to list of URLs requests_urls = url_tools.requests_to_urls( - mapped_request, patterns=self.config["patterns"] + self.mapped_request, patterns=self.config["patterns"] ) # try to download URLs urls = [ru["url"] for ru in requests_urls] paths = url_tools.try_download(urls) - download_kwargs = {"base_target": f"{self.collection_id}-{hash(tuple(urls))}"} - - return download_tools.DOWNLOAD_FORMATS[download_format]( - paths, **download_kwargs - ) + return self.make_download_object(paths) diff --git a/cads_adaptors/tools/download_tools.py b/cads_adaptors/tools/download_tools.py index df1f432d..8cc4fac4 100644 --- a/cads_adaptors/tools/download_tools.py +++ b/cads_adaptors/tools/download_tools.py @@ -1,11 +1,15 @@ import os -from typing import BinaryIO, Callable, Dict, List +from typing import Any, BinaryIO, Callable, Dict, List, Union + +import yaml from cads_adaptors.tools.general import ensure_list # TODO zipstream for archive creation -def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> BinaryIO: +def zip_paths( + paths: List[str], base_target: str = "output-data", receipt: Any = None, **kwargs +) -> BinaryIO: import zipfile target = f"{base_target}.zip" @@ -17,6 +21,13 @@ def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> B archive_name = os.path.basename(path) archive.write(path, archive_name) + if receipt is not None: + yaml_output: str = yaml.safe_dump(receipt, indent=2) + archive.writestr( + f"receipt-{base_target}.yaml", + data=yaml_output, + ) + for path in paths: os.remove(path) @@ -27,6 +38,7 @@ def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> B def targz_paths( paths: List[str], base_target: str = "output-data", + receipt: Any = None, **kwargs, ) -> BinaryIO: import tarfile @@ -40,6 +52,12 @@ def targz_paths( archive_name = os.path.basename(path) archive.add(path, arcname=archive_name) + if receipt is not None: + receipt_fname = f"receipt-{base_target}.yaml" + with open(receipt_fname, "w") as receipt_file: + yaml.safe_dump(receipt, stream=receipt_file, indent=2) + archive.add(receipt_fname) + for path in paths: os.remove(path) @@ -49,14 +67,21 @@ def targz_paths( def list_paths( paths: List[str], **kwargs, -) -> List: +) -> List[BinaryIO]: + if kwargs.get("receipt") is not None: + receipt_fname = f"receipt-{kwargs.get('base_target', 'nohash')}.yaml" + with open(receipt_fname, "w") as receipt_file: + yaml.safe_dump(kwargs.get("receipt"), stream=receipt_file, indent=2) + paths.append(receipt_fname) return [open(path, "rb") for path in ensure_list(paths)] -def as_source(paths: List[str], **kwargs) -> BinaryIO: +def as_source(paths: List[str], **kwargs) -> Union[BinaryIO, List[BinaryIO]]: # Only return as_source if a single path, otherwise list MUST be requested - assert len(paths) == 1 - return open(paths[0], "rb") + if len(paths) == 1: + return open(paths[0], "rb") + else: + return list_paths(paths, **kwargs) DOWNLOAD_FORMATS: Dict[str, Callable] = {