Skip to content

Commit

Permalink
Include receipts in downloads and refactor of common code (#67)
Browse files Browse the repository at this point in the history
* make_receipt method added
* Refactor
* default receipt=False
* defaults from CDS
* warning for bad format requested, allow format conversion kwargs
  • Loading branch information
EddyCMWF authored Nov 30, 2023
1 parent 7f72d98 commit 2cd5782
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 80 deletions.
97 changes: 95 additions & 2 deletions cads_adaptors/adaptors/cds.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any
import os
from copy import deepcopy
from typing import Any, Union

from cads_adaptors import constraints, costing
from cads_adaptors import constraints, costing, mapping
from cads_adaptors.adaptors import AbstractAdaptor, Context, Request
from cads_adaptors.tools.general import ensure_list


class AbstractCdsAdaptor(AbstractAdaptor):
Expand All @@ -15,6 +18,11 @@ def __init__(self, form: dict[str, Any], **config: Any):
self.licences: list[tuple[str, int]] = config.pop("licences", [])
self.config = config
self.context = Context()
# The following attributes are updated during the retireve method
self.input_request: Request = Request()
self.mapped_request: Request = Request()
self.download_format: str = "zip"
self.receipt: bool = False

def validate(self, request: Request) -> bool:
return True
Expand All @@ -28,3 +36,88 @@ def estimate_costs(self, request: Request) -> dict[str, int]:

def get_licences(self, request: Request) -> list[tuple[str, int]]:
return self.licences

# This is essentially a second __init__, but only for when we have a request at hand
# and currently only implemented for retrieve methods
def _pre_retrieve(self, request: Request, default_download_format="zip"):
self.input_request = deepcopy(request)
self.receipt = request.pop("receipt", True)
self.download_format = request.pop("download_format", default_download_format)
self.mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore

def make_download_object(
self,
paths: Union[str, list],
**kwargs,
):
from cads_adaptors.tools import download_tools

# Allow possibility of over-riding the download format from the adaptor
download_format = kwargs.get("download_format", self.download_format)

paths = ensure_list(paths)
filenames = [os.path.basename(path) for path in paths]
# TODO: use request-id instead of hash
kwargs.setdefault(
"base_target", f"{self.collection_id}-{hash(tuple(self.input_request))}"
)

# Allow adaptor possibility of over-riding request value
if kwargs.get("receipt", self.receipt):
receipt_kwargs = kwargs.pop("receipt_kwargs", {})
kwargs.setdefault(
"receipt", self.make_receipt(filenames=filenames, **receipt_kwargs)
)

return download_tools.DOWNLOAD_FORMATS[download_format](paths, **kwargs)

def make_receipt(
self,
input_request: Union[Request, None] = None,
download_size: Any = None,
filenames: list = [],
**kwargs,
) -> dict[str, Any]:
"""
Create a receipt to be included in the downloaded archive.
**kwargs contains any other fields that are calculated during the runtime of the adaptor
"""
from datetime import datetime as dt

# Allow adaptor to override and provide sanitized "input_request" if necessary
if input_request is None:
input_request = self.input_request

# Update kwargs with default values
if download_size is None:
download_size = "unknown"

receipt = {
"collection-id": self.collection_id,
"request": input_request,
"request-timestamp": dt.now().strftime("%Y-%m-%d %H:%M:%S"),
"download-size": download_size,
"filenames": filenames,
# Get static URLs:
"user-support": "https://support.ecmwf.int",
"privacy-policy": "https://cds.climate.copernicus.eu/disclaimer-privacy",
# TODO: Change to URLs for licence instead of slug
"licence": [
f"{licence[0]} (version {licence[1]})" for licence in self.licences
],
# TODO: Add request-id information to the context
"request-uid": self.config.get("request_uid", "Unavailable"),
#
# TODO: Add URL/DNS information to the context for populating these fields:
# "web-portal": self.???, # Need update to information available to adaptors
# "api-access": "https://url-to-data-api/{self.collection_id}"
# "metadata-api-access": "https://url-to-metadata-api/{self.collection_id}"
#
# TODO: Add metadata information to config, this could also be done via the metadata api
# "citation": self.???, # Need update to information available to adaptors
**kwargs,
**self.config.get("additional_receipt_info", {}),
}

return receipt
80 changes: 47 additions & 33 deletions cads_adaptors/adaptors/mars.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,41 @@
import os
from typing import BinaryIO, Union

from cads_adaptors import mapping
from cads_adaptors.adaptors import Context, Request, cds
from cads_adaptors.tools.general import ensure_list


def convert_format(
result: str,
data_format: str,
context: Context | None = None,
**kwargs,
) -> list:
# NOTE: The NetCDF compressed option will not be visible on the WebPortal, it is here for testing
if data_format in ["netcdf", "nc", "netcdf_compressed"]:
if data_format in ["netcdf_compressed"]:
to_netcdf_kwargs = {
"compression_options": "default",
}
else:
to_netcdf_kwargs = {}
from cads_adaptors.tools.convertors import grib_to_netcdf_files

# Give the power to overwrite the to_netcdf kwargs from the request
to_netcdf_kwargs = {**to_netcdf_kwargs, **kwargs}
paths = grib_to_netcdf_files(result, **to_netcdf_kwargs)
elif data_format in ["grib", "grib2", "grb", "grb2"]:
paths = [result]
else:
if context is not None:
context.stdout = (
context.user_visible_log
) = "WARNING: Unrecoginsed data_format requested, returning as original grib/grib2 format"
paths = [result]

return paths


def execute_mars(
request: Union[Request, list],
target: str = "data.grib",
Expand Down Expand Up @@ -58,43 +88,27 @@ def retrieve(self, request: Request) -> BinaryIO:
return open(result) # type: ignore


class MarsCdsAdaptor(DirectMarsCdsAdaptor):
class MarsCdsAdaptor(cds.AbstractCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
from cads_adaptors.tools import download_tools
# TODO: Remove legacy syntax all together
if "format" in request:
_data_format = request.pop("format")
request.setdefault("data_format", _data_format)

# Format of data files, grib or netcdf
data_format = request.pop("format", "grib") # TODO: remove legacy syntax?
data_format = request.pop("data_format", data_format)
data_format = request.pop("data_format", "grib")

if data_format in ["netcdf", "nc", "netcdf_compressed"]:
default_download_format = "zip"
else:
default_download_format = "as_source"

# Format of download archive, as_source, zip, tar, list etc.
download_format = request.pop("download_format", default_download_format)
# Allow user to provide format conversion kwargs
convert_kwargs = request.pop("convert_kwargs", {})

mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore
# To preserve existing ERA5 functionality the default download_format="as_source"
request.setdefault("download_format", "as_source")

result = execute_mars(mapped_request, context=self.context)
self._pre_retrieve(request=request)

# NOTE: The NetCDF compressed option will not be visible on the WebPortal, it is here for testing
if data_format in ["netcdf", "nc", "netcdf_compressed"]:
if data_format in ["netcdf_compressed"]:
to_netcdf_kwargs = {
"compression_options": "default",
}
else:
to_netcdf_kwargs = {}
from cads_adaptors.tools.convertors import grib_to_netcdf_files
result = execute_mars(self.mapped_request, context=self.context)

results = grib_to_netcdf_files(result, **to_netcdf_kwargs)
else:
results = [result]

download_kwargs = {
"base_target": f"{self.collection_id}-{hash(tuple(request))}"
}
return download_tools.DOWNLOAD_FORMATS[download_format](
results, **download_kwargs
paths = convert_format(
result, data_format, context=self.context, **convert_kwargs
)

return self.make_download_object(paths)
40 changes: 17 additions & 23 deletions cads_adaptors/adaptors/multi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing as T
from copy import deepcopy

from cads_adaptors import AbstractCdsAdaptor, mapping
from cads_adaptors.adaptors import Request
Expand Down Expand Up @@ -38,9 +39,11 @@ def split_request(
return this_request

def retrieve(self, request: Request):
from cads_adaptors.tools import adaptor_tools, download_tools
from cads_adaptors.tools import adaptor_tools

download_format = request.pop("download_format", "zip")
self.input_request = deepcopy(request)
self.receipt = request.pop("receipt", False)
self.download_format = request.pop("download_format", "zip")

these_requests = {}
exception_logs: T.Dict[str, str] = {}
Expand All @@ -57,7 +60,8 @@ def retrieve(self, request: Request):
# TODO: check this_request is valid for this_adaptor, or rely on try?
# i.e. split_request does NOT implement constraints.
if len(this_request) > 0:
this_request.setdefault("download_format", "list")
this_request["download_format"] = "list"
this_request["receipt"] = False
these_requests[this_adaptor] = this_request

results = []
Expand All @@ -79,34 +83,30 @@ def retrieve(self, request: Request):
# get the paths
paths = [res.name for res in results]

download_kwargs = dict(
base_target=f"{self.collection_id}-{hash(tuple(results))}"
)

return download_tools.DOWNLOAD_FORMATS[download_format](
paths, **download_kwargs
return self.make_download_object(
paths,
)


class MultiMarsCdsAdaptor(MultiAdaptor):
def retrieve(self, request: Request):
"""For MultiMarsCdsAdaptor we just want to apply mapping from each adaptor."""
from cads_adaptors.adaptors.mars import execute_mars
from cads_adaptors.tools import adaptor_tools, download_tools
from cads_adaptors.adaptors.mars import convert_format, execute_mars
from cads_adaptors.tools import adaptor_tools

download_format = request.pop("download_format", "as_source")
self.input_request = deepcopy(request)
self.receipt = request.pop("receipt", False)
self.download_format = request.pop("download_format", "zip")

# Format of data files, grib or netcdf
request.pop("format", "grib")
data_format = request.pop("format", "grib")

mapped_requests = []
logger.debug(f"MultiMarsCdsAdaptor, full_request: {request}")
for adaptor_tag, adaptor_desc in self.config["adaptors"].items():
this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form)
this_values = adaptor_desc.get("values", {})

# logger.debug(f"MultiMarsCdsAdaptor, {adaptor_tag}, config: {this_adaptor.config}")

this_request = self.split_request(
request, this_values, **this_adaptor.config
)
Expand All @@ -122,12 +122,6 @@ def retrieve(self, request: Request):
logger.debug(f"MultiMarsCdsAdaptor, mapped_requests: {mapped_requests}")
result = execute_mars(mapped_requests, context=self.context)

# TODO: Handle alternate data_format
paths = convert_format(result, data_format)

download_kwargs = {
"base_target": f"{self.collection_id}-{hash(tuple(request))}"
}

return download_tools.DOWNLOAD_FORMATS[download_format](
[result], **download_kwargs
)
return self.make_download_object(paths)
24 changes: 8 additions & 16 deletions cads_adaptors/adaptors/url.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
from typing import BinaryIO

from cads_adaptors import mapping
from cads_adaptors.adaptors import Request, cds


class UrlCdsAdaptor(cds.AbstractCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
from cads_adaptors.tools import download_tools, url_tools
# TODO: Remove legacy syntax all together
if "format" in request:
_download_format = request.pop("format")
request.setdefault("download_format", _download_format)

download_format = request.pop("format", "zip") # TODO: Remove legacy syntax
# CADS syntax over-rules legacy syntax
download_format = request.pop("download_format", download_format)
self._pre_retrieve(request=request)

# Do not need to check twice
# if download_format not in {"zip", "tgz"}:
# raise ValueError(f"{download_format} is not supported")

mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore
from cads_adaptors.tools import url_tools

# Convert request to list of URLs
requests_urls = url_tools.requests_to_urls(
mapped_request, patterns=self.config["patterns"]
self.mapped_request, patterns=self.config["patterns"]
)

# try to download URLs
urls = [ru["url"] for ru in requests_urls]
paths = url_tools.try_download(urls)

download_kwargs = {"base_target": f"{self.collection_id}-{hash(tuple(urls))}"}

return download_tools.DOWNLOAD_FORMATS[download_format](
paths, **download_kwargs
)
return self.make_download_object(paths)
Loading

0 comments on commit 2cd5782

Please sign in to comment.