diff --git a/cads_adaptors/adaptors/cams_regional_fc/__init__.py b/cads_adaptors/adaptors/cams_regional_fc/__init__.py index 21aaebd0..954ac0f8 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/__init__.py +++ b/cads_adaptors/adaptors/cams_regional_fc/__init__.py @@ -2,6 +2,9 @@ from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, Request +STACK_TEMP_DIR = "/tmp/cams-europe-air-quality-forecasts/temp" +STACK_DOWNLOAD_DIR = "/tmp/cams-europe-air-quality-forecasts/download" + class CAMSEuropeAirQualityForecastsAdaptor(AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: @@ -16,54 +19,18 @@ def retrieve(self, request: Request) -> BinaryIO: result_file = cams_regional_fc(self.context, self.config, self.mapped_requests) - return open(result_file.path, "rb") + return open(result_file, "rb") class CAMSEuropeAirQualityForecastsAdaptorForLatestData(AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: - from .cams_regional_fc import retrieve_latest - - message = ( - f"The parent request is {request['parent_request_uid']}, " - "launched by user {request['parent_request_user_uid']}." - ) - self.context.add_stdout(message) + from .subrequest_main import subrequest_main - result_file = retrieve_latest( - self.context, - request["requests"], - request["dataset_dir"], - request["integration_server"], - ) - if hasattr(result_file, "path"): - return open(result_file.path, "rb") - else: - request_uid = self.config.get("request_uid", None) - message = f"Sub-request {request_uid} failed to produce a result when one was expected." - self.context.add_stderr(message) - raise RuntimeError(message) + return subrequest_main("latest", request, self.config, self.context) class CAMSEuropeAirQualityForecastsAdaptorForArchivedData(AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: - from .cams_regional_fc import retrieve_archived - - message = ( - f"The parent request is {request['parent_request_uid']}, " - "launched by user {request['parent_request_user_uid']}." - ) - self.context.add_stdout(message) + from .subrequest_main import subrequest_main - result_file = retrieve_archived( - self.context, - request["requests"], - request["dataset_dir"], - request["integration_server"], - ) - if hasattr(result_file, "path"): - return open(result_file.path, "rb") - else: - request_uid = self.config.get("request_uid", None) - message = f"Sub-request {request_uid} failed to produce a result when one was expected." - self.context.add_stderr(message) - raise RuntimeError(message) + return subrequest_main("archived", request, self.config, self.context) diff --git a/cads_adaptors/adaptors/cams_regional_fc/assert_valid_grib.py b/cads_adaptors/adaptors/cams_regional_fc/assert_valid_grib.py index a29c3564..0290ad7c 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/assert_valid_grib.py +++ b/cads_adaptors/adaptors/cams_regional_fc/assert_valid_grib.py @@ -1,7 +1,3 @@ -import os -import random -from datetime import datetime - from cds_common.message_iterators import grib_bytes_iterator from cds_common.url2.downloader import ResponseException from eccodes import codes_is_defined @@ -9,7 +5,7 @@ from .grib2request import grib2request -def assert_valid_grib(req, response, context): +def assert_valid_grib(req, response): """Raise a ResponseException if the request response indicates success but the content is not a valid grib message. """ @@ -43,13 +39,13 @@ def assert_valid_grib(req, response, context): except Exception as e: # Write bad grib to file for investigation? - if datetime.now() < datetime(2021, 10, 31, 0): - rn = random.randint(0, 2**128) - file = f"/tmp/cams-europe-air-quality-forecasts/debug/badgrib_{context.request_id}.{rn}.grib" - context.info(f'Writing bad grib to {file}: {req["url"]}') - os.makedirs(os.path.dirname(file), exist_ok=True) - with open(file, "wb") as f: - f.write(response.content) + # if datetime.now() < datetime(2021, 10, 31, 0): + # rn = random.randint(0, 2**128) + # file = f"/tmp/cams-europe-air-quality-forecasts/debug/badgrib_{context.request_id}.{rn}.grib" + # context.info(f'Writing bad grib to {file}: {req["url"]}') + # os.makedirs(os.path.dirname(file), exist_ok=True) + # with open(file, "wb") as f: + # f.write(response.content) raise ResponseException( "Request did not return valid grib: " + "{}: {}".format(e, req["req"]) diff --git a/cads_adaptors/adaptors/cams_regional_fc/cacher.py b/cads_adaptors/adaptors/cams_regional_fc/cacher.py index 0a3ed9ba..3adc80b0 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/cacher.py +++ b/cads_adaptors/adaptors/cams_regional_fc/cacher.py @@ -1,5 +1,6 @@ import concurrent.futures import io +import logging import os import re import threading @@ -21,13 +22,18 @@ class AbstractCacher: defines the interface. """ - def __init__(self, context, no_put=False): - self.context = context + def __init__( + self, integration_server, logger=None, no_put=False, permanent_fields=None + ): + self.integration_server = integration_server + self.logger = logging.getLogger(__name__) if logger is None else logger self.no_put = no_put # Fields which should be cached permanently (on the datastore). All # other fields will be cached in temporary locations. - self.permanent_fields = [{"model": ["ENS"], "level": ["0"]}] + if permanent_fields is None: + permanent_fields = [{"model": ["ENS"], "level": ["0"]}] + self.permanent_fields = permanent_fields def done(self): pass @@ -69,9 +75,7 @@ def put(self, req): nmatches = count_fields(intn) if nmatches == 0: raise Exception( - "Got unexpected field " - + repr(req1field) - + " from request " + f"Got unexpected field {req1field!r} from request " + repr(req["req"]) ) assert nmatches == 1 @@ -139,8 +143,6 @@ def _cache_file_path(self, fieldinfo): """Return a field-specific path or the given field. Can be used by a child class to determine server-side cache location. """ - dir = "permanent" if self._cache_permanently(fieldinfo) else "temporary" - # Set the order we'd like the keys to appear in the filename. Area # keys will be last. order1 = ["model", "type", "variable", "level", "time", "step"] @@ -159,16 +161,12 @@ def key_order(k): if keys not in self._templates: # Form a Jinja2 template string for the cache files. "_backend" not # used; organised by date; area keys put at the end. - path_template = ( - dir - + "/{{ date }}/" - + "_".join( - [ - "{k}={{{{ {k} }}}}".format(k=k) - for k in sorted(keys, key=key_order) - if k not in ["date", "_backend"] - ] - ) + path_template = "{{ date }}/" + "_".join( + [ + "{k}={{{{ {k} }}}}".format(k=k) + for k in sorted(keys, key=key_order) + if k not in ["date", "_backend"] + ] ) self._templates[keys] = jinja2.Template(path_template) @@ -181,7 +179,12 @@ def key_order(k): "Bad characters in value for " + k + ": " + repr(v) ) - return self._templates[keys].render(fieldinfo) + dir = "permanent" if self._cache_permanently(fieldinfo) else "temporary" + # Data from the integration server should not mix with the production data + if self.integration_server: + dir += "_esuite" + + return f"{dir}/" + self._templates[keys].render(fieldinfo) class AbstractAsyncCacher(AbstractCacher): @@ -193,11 +196,11 @@ class is still abstract since it does not do the actual data copy. It def __init__( self, - context, *args, - nthreads=10, - max_mem=100000000, - tmpdir="/cache/tmp", + logger=None, + nthreads=None, + max_mem=None, + tmpdir=None, **kwargs, ): """The number of fields that will be written concurrently to the cache @@ -210,15 +213,17 @@ def __init__( temporarily written to disk (in tmpdir) to avoid excessive memory usage. """ - super().__init__(context, *args, **kwargs) - self.nthreads = nthreads + super().__init__(*args, logger=logger, **kwargs) + self.nthreads = 10 if nthreads is None else nthreads self._lock1 = threading.Lock() self._lock2 = threading.Lock() self._qclosed = False self._templates = {} self._futures = [] self._start_time = None - self._queue = MemSafeQueue(max_mem, tmpdir, logger=context) + self._queue = MemSafeQueue( + 100000000 if max_mem is None else max_mem, tmpdir=tmpdir, logger=logger + ) def _start_copy_threads(self): """Start the threads that will do the remote copies.""" @@ -250,7 +255,7 @@ def done(self): "drain": now - qclose_time, "io": iotime, } - self.context.info(f"MemSafeQueue summary: {summary!r}") + self.logger.info(f"MemSafeQueue summary: {summary!r}") def __enter__(self): return self @@ -312,8 +317,8 @@ def _write_field_sync(self, data, fieldinfo): local_object = io.BytesIO(data) remote_path = self._cache_file_path(fieldinfo) - self.context.debug( - f"CACHER: copying data to " f"{self._host}:{self._bucket}:{remote_path}" + self.logger.info( + f"Caching {fieldinfo} to {self._host}:{self._bucket}:{remote_path}" ) # Uncomment this code if it can't be trusted that the bucket already @@ -337,7 +342,7 @@ def _write_field_sync(self, data, fieldinfo): status = "uploaded" break except Exception as exc: - self.context.error( + self.logger.error( "Failed to upload to S3 bucket (attempt " f"#{attempt}): {exc!r}" ) status = f"process ended in error: {exc!r}" diff --git a/cads_adaptors/adaptors/cams_regional_fc/cams_regional_fc.py b/cads_adaptors/adaptors/cams_regional_fc/cams_regional_fc.py index 00aac235..49ca4b23 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/cams_regional_fc.py +++ b/cads_adaptors/adaptors/cams_regional_fc/cams_regional_fc.py @@ -1,28 +1,24 @@ -import json import logging import os -import random -import tempfile import time import zipfile -from copy import deepcopy -from datetime import datetime, timedelta from cds_common import date_tools, hcube_tools, tree_tools from cds_common.cams.regional_fc_api import regional_fc_api from cds_common.url2.downloader import Downloader -from cdscompute.errors import NoDataException -from .api_retrieve import api_retrieve +from cads_adaptors.exceptions import InvalidRequest + from .assert_valid_grib import assert_valid_grib from .cacher import Cacher from .convert_grib import convert_grib -from .create_file import create_file +from .create_file import create_file, temp_file from .formats import Formats from .grib2request import grib2request_init from .nc_request_groups import nc_request_groups from .preprocess_requests import preprocess_requests from .process_grib_files import process_grib_files +from .subrequest_main import subrequest_main from .which_fields_in_file import which_fields_in_file # Used to temporarily disable access to archived data in an emergency, e.g. @@ -30,41 +26,22 @@ ARCHIVED_OFF = False -class MockResultFile: - def __init__(self, path): - self.path = path +class NoDataException(Exception): + pass - def __str__(self): - return self.path +def cams_regional_fc(context, config, requests): + """Top-level function for the regional forecast adaptor.""" + # Using the Meteo France test (aka "integration") server? + integration_server = config.get("regional_fc", {}).get("integration_server", False) -def cams_regional_fc(context, config, requests, forms_dir=None): # Get an object which will give us information/functionality associated # with the Meteo France regional forecast API - regapi = regional_fc_api( - integration_server=config.get("integration_server", False), logger=context - ) - - def create_result_file(self, extension): - request_uid = config["request_uid"] - result_path = ( - f"/tmp/cams-europe-air-quality-forecasts/download/{request_uid}.{extension}" - ) - os.makedirs(os.path.dirname(result_path), exist_ok=True) - return MockResultFile(result_path) - - def create_temp_file(self, extension=".tmp"): - temp_path = "/tmp/cams-europe-air-quality-forecasts/temp/" - os.makedirs(temp_path, exist_ok=True) - fd, path = tempfile.mkstemp(suffix=extension, dir=temp_path) - os.close(fd) - return path - - context.create_result_file = create_result_file.__get__(context) - context.create_temp_file = create_temp_file.__get__(context) + regapi = regional_fc_api(integration_server=integration_server, logger=context) # Pre-process requests requests, info = preprocess_requests(context, requests, regapi) + info["config"] = config # If converting to NetCDF then different groups of grib files may need to be # converted separately. Split and group the requests into groups that can be @@ -82,22 +59,20 @@ def create_temp_file(self, extension=".tmp"): ): info["stages"].append("zip") - dataset_dir = "/src/cads-adaptors/cads_adaptors/adaptors/cams_regional_fc/config" - # Initialisation for function that can understand GRIB file contents - grib2request_init(dataset_dir) + grib2request_init(config["regional_fc"]["definitions"]) # Get locally stored fields - get_local(req_groups, context) + get_local(req_groups, integration_server, config, context) # Divide non-local fields betwen latest and archived - set_backend(req_groups, regapi, dataset_dir, context) + set_backend(req_groups, regapi, context) # Retrieve non-local latest (fast-access) fields - get_latest(req_groups, regapi, dataset_dir, context, config) + get_latest(req_groups, config, context) # Retrieve non-local archived (slow-access) fields - get_archived(req_groups, regapi, dataset_dir, context, config) + get_archived(req_groups, config, context) # Remove groups that had no matching data req_groups = [x for x in req_groups if "retrieved_files" in x] @@ -116,7 +91,7 @@ def create_temp_file(self, extension=".tmp"): # Convert to netCDF? if "convert" in info["stages"]: - convert_grib(req_groups, info, dataset_dir, context) + convert_grib(req_groups, info, config["regional_fc"]["definitions"], context) # Zip output files? if "zip" in info["stages"]: @@ -128,11 +103,13 @@ def create_temp_file(self, extension=".tmp"): raise Exception("Bug: result_file not set") from None -def set_backend(req_groups, regapi, dataset_dir, context): - """Divide requests between "latest" and "archived" and set their "_backend" attribute accordingly.""" +def set_backend(req_groups, regapi, context): + """Divide requests between "latest" and "archived" and set their "_backend" + attribute accordingly. + """ for req_group in req_groups: online, offline = split_latest_from_archived( - req_group["uncached_requests"], regapi, dataset_dir, context + req_group["uncached_requests"], regapi, context ) for r in online: r["_backend"] = ["latest"] @@ -142,43 +119,23 @@ def set_backend(req_groups, regapi, dataset_dir, context): req_group["uncached_archived_requests"] = offline -def split_latest_from_archived(requests, regapi, dataset_dir, context): +def split_latest_from_archived(requests, regapi, context): """Split requests into "latest" and "archived" groups.""" if requests: # Get the catalogue that lists all fields that are currently in the # fast "synopsis" part of the backend try: - online_cat = regapi.get_catalogue("latest", retry={"timeout": 10}) - except Exception as e: - # If there is a problem at Meteo France and the catalogue can't be - # accessed, try using the one stored in the dataset directory - # instead. It's probably up-to-date and better than failing the - # request. - context.error( - f"Failed to download latest catalogue: {e!r}. " - f"Reading from {dataset_dir} instead" - ) - with open(f"{dataset_dir}/catalogue_latest.json") as f: - online_cat = json.load(f) + online_cat = regapi.get_catalogue("latest", retry={"timeout": 120}) + except Exception: + # We could make a basic guess at which fields were in the latest + # catalogue based on date, such as assuming all fields from the + # last N days are, but for now we'll just consider this terminal + raise # Split latest from archived fields lcat = tree_tools.to_list(online_cat) latest, archived, _ = hcube_tools.hcubes_intdiff2(requests, lcat) - # Holes in the latest catalogue will have been assigned to the archived - # list. This is not a problem if they result in 404s but the archive - # backend will reject any requests for dates less than N days old with a - # 400 HTTP error, so remove any if they exist - archived, invalid = archive_maxdate_split(archived, regapi) - if invalid: - context.info( - "Not attempting to retrieve " - + str(hcube_tools.count_fields(invalid)) - + " fields " - "which are not in latest catatalogue but also too new " - "to be in archived" - ) - context.debug("Latest fields: " + repr(latest)) context.debug("Archived fields: " + repr(archived)) @@ -189,50 +146,17 @@ def split_latest_from_archived(requests, regapi, dataset_dir, context): return (latest, archived) -def archive_maxdate_split(reqs, regapi): - """Return a copy of requests with fields that are too recent to be in thearchive backend removed. - Requesting these fields would result in a HTTP 400 (invalid request) error. - """ - valid = [] - invalid = [] - - if reqs: - # The maximum date that the archived backend will allow in a request - # without raising a HTTP 400 (bad request) error - date_max = (datetime.utcnow() - timedelta(days=regapi.archive_min_delay)).date() - fmt = date_tools.guess_date_format(reqs[0]["date"][0]) - for r in reqs: - rdates = [ - d.date() - for d in date_tools.expand_dates_list(r["date"], as_datetime=True) - ] - rv = deepcopy(r) - ri = deepcopy(r) - rv["date"] = [d.strftime(fmt) for d in rdates if d <= date_max] - ri["date"] = [d.strftime(fmt) for d in rdates if d > date_max] - if rv["date"]: - valid.append(rv) - if ri["date"]: - invalid.append(ri) - - assert hcube_tools.count_fields(reqs) == hcube_tools.count_fields( - valid - ) + hcube_tools.count_fields(invalid) - - return (valid, invalid) - - -def get_local(req_groups, context): +def get_local(req_groups, integration_server, config, context): """Retrieve only the fields which are stored locally (in the cache or on the datastore) and identify the remaining non-local fields. """ # Cacher has knowledge of cache locations - with Cacher(context) as cacher: + with Cacher(integration_server, logger=context) as cacher: for req_group in req_groups: - _get_local(req_group, cacher, context) + _get_local(req_group, cacher, config, context) -def _get_local(req_group, cacher, context): +def _get_local(req_group, cacher, config, context): """Retrieve only the fields which are already stored locally (in the cache or on the datastore) and identify non-local fields. """ @@ -246,9 +170,7 @@ def _get_local(req_group, cacher, context): {"url": cacher.cache_file_url(field), "req": field} for field in hcube_tools.unfactorise(reqs) ) - # CAREFUL! "urls" is a generator and will be consumed by the first iteration downloader = Downloader( - context, max_rate=50, max_simultaneous=15, combine_method="cat", @@ -256,18 +178,20 @@ def _get_local(req_group, cacher, context): response_checker=assert_valid_grib, response_checker_threadsafe=False, combine_in_order=False, - write_to_temp=True, request_timeout=[60, 300], max_attempts={404: 1, "default": 3}, nonfatal_codes=[404, "exception"], retry_wait=5, allow_no_data=True, + logger=context, min_log_level=logging.INFO, ) - grib_file = downloader.execute(urls) + grib_file = temp_file(config, suffix=".grib") + if not config.get("regional_fc", {}).get("no_cache_downloads"): + downloader.execute(urls, target=grib_file) # Identify uncached fields - the ones not present in the file - cached, uncached = which_fields_in_file(reqs, grib_file, context) + cached, uncached = which_fields_in_file(reqs, grib_file, config, context) req_group["uncached_requests"] = uncached context.info("Retrieved " + str(hcube_tools.count_fields(cached)) + " local fields") context.info( @@ -275,13 +199,14 @@ def _get_local(req_group, cacher, context): + str(hcube_tools.count_fields(uncached)) ) - if grib_file is not None: + # File will be empty if no fields found + if os.path.getsize(grib_file) > 0: req_group["retrieved_files"] = req_group.get("retrieved_files", []) + [ grib_file ] -def get_latest(req_groups, regapi, dataset_dir, context, config=None): +def get_latest(req_groups, config, context): """Retrieve uncached latest fields.""" for req_group in req_groups: if not req_group["uncached_latest_requests"]: @@ -293,17 +218,15 @@ def get_latest(req_groups, regapi, dataset_dir, context, config=None): for reqs in hcube_tools.hcubes_chunk( req_group["uncached_latest_requests"], 5000 ): - grib_file = retrieve_subrequest( - reqs, req_group, regapi, dataset_dir, context, config - ) + grib_file = get_uncached(reqs, req_group, config, context) # Fields may have expired from the latest backend by the time the # request was made. Reassign any missing fields to the archive # backend. - reassign_missing_to_archive(reqs, grib_file, req_group, regapi, context) + reassign_missing_to_archive(reqs, grib_file, req_group, config, context) -def get_archived(req_groups, regapi, dataset_dir, context, config=None): +def get_archived(req_groups, config, context): """Retrieve uncached slow-access archived fields.""" for req_group in req_groups: if not req_group["uncached_archived_requests"]: @@ -322,182 +245,132 @@ def get_archived(req_groups, regapi, dataset_dir, context, config=None): for reqs in hcube_tools.hcubes_chunk( req_group["uncached_archived_requests"], 900 ): - retrieve_subrequest(reqs, req_group, regapi, dataset_dir, context, config) + get_uncached(reqs, req_group, config, context) -def retrieve_latest(*args): - """Adaptor only intended to be called as a sub-request from the main - adaptor to retrieve uncached latest fields only. The separate entry - point from retrieve_archived allows a different QOS to be applied. - """ - return retrieve_xxx(*args) - - -def retrieve_archived(*args): - """Adaptor only intended to be called as a sub-request from the main - adaptor to retrieve uncached archived fields only. The separate entry - point from retrieve_latest allows a different QOS to be applied. - """ - if ARCHIVED_OFF: - raise Exception( - "Access to archived data is temporarily " - + "suspended. Only the latest few days are available" - ) - - try: - return retrieve_xxx(*args) - - except Exception: - maintenance_end_time = datetime(2021, 3, 3, 18, 0) - if datetime.utcnow() < maintenance_end_time: - # raise Exception( - # 'Apologies: your request requires data from a remote server ' - # 'which is currently undergoing maintenance. Normal service is ' - # 'expected to be resumed by ' + - # maintenance_end_time.strftime('%A %d %B %H:%M UTC')) from None - raise Exception( - "Apologies: your request requires data from a remote server " - "which is undergoing a maintenance session that is taking " - "longer than expected. Please try again later." - ) - else: - raise - - -def retrieve_xxx(context, requests, dataset_dir, integration_server): - def create_result_file(self, extension): - random_value = random.randint(0, 2**128) - result_path = f"/tmp/cams-europe-air-quality-forecasts/download/{random_value}.{extension}" - os.makedirs(os.path.dirname(result_path), exist_ok=True) - return MockResultFile(result_path) +MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES = 3 - context.create_result_file = create_result_file.__get__(context) - # Get an object which will give us information/functionality associated - # with the Meteo France regional forecast API - regapi = regional_fc_api(integration_server=integration_server, logger=context) - - file = api_retrieve(context, requests, regapi, dataset_dir) +def get_uncached(requests, req_group, config, context): + """Retrieve chunk of uncached fields.""" + backend = requests[0]["_backend"][0] + assert backend in ["latest", "archived"] - context.info("Sub-request completed") + # Retrieve the fields in a sub-request or directly? The latter is only used for + # testing. Generally you want a sub-request. + cfg = config.get("regional_fc", {}) + if str(cfg.get("no_subrequests")) != "1": + path = retrieve_subrequest(backend, requests, req_group, config, context) - return file + else: + # No sub-request - call code directly. For testing. + f = subrequest_main( + backend, {"requests": requests, "parent_config": config}, config, context + ) + f.close() + path = f.name + # The target can legitimately be empty as the code currently accepts 404s + size = os.path.getsize(path) + if size > 0: + req_group["retrieved_files"] = req_group.get("retrieved_files", []) + [path] + else: + context.add_stdout("Sub-request target file is empty") -MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES = 3 + return path -def retrieve_subrequest(requests, req_group, regapi, dataset_dir, context, config=None): +def retrieve_subrequest(backend, requests, req_group, config, context): from cdsapi import Client """Retrieve chunk of uncached fields in a sub-request""" - backend = requests[0]["_backend"][0] - assert backend in ["latest", "archived"] + # Is this backend expecting issues due to maintenance? + cfg = config.get("regional_fc", {}) + maintenance_msg = cfg.get("backend_maintenance", {}).get(backend) - # Retrieve uncached fields in sub-requests to allow a different QOS to be - # applied to avoid overloading the Meteo France API. + # Construct a target file name + target = temp_file(config, suffix=".grib") + + # Get a client context.info("Executing sub-request to retrieve uncached fields: " + repr(requests)) t0 = time.time() - ADS_API_URL = ( - "https://" + os.environ["ADS_SERVER_NAME"] + os.environ["API_ROOT_PATH"] + client = Client( + url="https://" + os.environ["ADS_SERVER_NAME"] + os.environ["API_ROOT_PATH"], + key=os.environ["HIGH_PRIORITY_CADS_API_KEY"], + wait_until_complete=False, ) - ADS_API_KEY = os.environ["HIGH_PRIORITY_CADS_API_KEY"] - client = Client(url=ADS_API_URL, key=ADS_API_KEY, wait_until_complete=False) - if backend == "latest": - dataset = "cams-europe-air-quality-forecasts-latest" - else: - dataset = "cams-europe-air-quality-forecasts-archived" - random_value = random.randint(0, 2**128) - target = f"/tmp/cams-europe-air-quality-forecasts/download/{random_value}.sub" - os.makedirs(os.path.dirname(target), exist_ok=True) + # Launch the sub-request + response = None sub_request_uid = None - if config: - request_uid = config.get("request_uid", None) - user_uid = config.get("user_uid", None) + dataset = f"cams-europe-air-quality-forecasts-{backend}" try: response = client.retrieve( dataset, - { - "requests": requests, - "dataset_dir": dataset_dir, - "integration_server": regapi.integration_server, - "parent_request_uid": request_uid, - "parent_request_user_uid": user_uid, - }, + {"requests": requests, "parent_config": config}, ) + except Exception as e: + sub_request_uid = "none" if response is None else response.request_uid + context.add_stderr( + "Sub-request " + + ("" if response is None else f"({response.request_uid}) ") + + f"failed: {e!r}" + ) + if maintenance_msg: + raise InvalidRequest(maintenance_msg) from None + else: + raise RuntimeError( + f"Failed to retrieve data from {backend} remote server. " + "Please try again later." + ) from None + else: sub_request_uid = response.request_uid message = f"Sub-request {sub_request_uid} has been launched (via the CDSAPI)." context.add_stdout(message) - for i_retry in range(MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES): - try: - response.download(target) - break - except Exception as e: - context.add_stdout( - f"Attempt {i_retry+1} to download the result " - f"of sub-request {sub_request_uid} failed: {e!r}" - ) - if i_retry + 1 == MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES: - raise - - result = MockResultFile(target) - # TODO: when should we delete this file? - except Exception as e: - if not sub_request_uid: - sub_request_uid = "with no ID assigned" - message = f"Sub-request {sub_request_uid} failed: {e!r}" + # Download the result + exc = None + for i_retry in range(MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES): + try: + response.download(target) + break + except Exception as e: + exc = e + context.add_stdout( + f"Attempt {i_retry+1} to download the result of sub-request " + f"{sub_request_uid} failed: {e!r}" + ) + else: + message = f"Failed to download sub-request result: {exc!r}" context.add_stderr(message) - raise RuntimeError(message) - - context.info("... sub-request succeeded after " + str(time.time() - t0) + "s") + raise RuntimeError(message) from None - if result is not None: - grib_file = result - req_group["retrieved_files"] = req_group.get("retrieved_files", []) + [ - grib_file - ] - else: - context.info("... but found no data") - grib_file = None + size = os.path.getsize(target) + context.info( + f"... sub-request downloaded {size} bytes in " + str(time.time() - t0) + "s" + ) - return grib_file + return target -def reassign_missing_to_archive(reqs, grib_file, req_group, regapi, context): +def reassign_missing_to_archive(reqs, grib_file, req_group, config, context): """Re-assign fields which are in reqs but not in grib_file to the archived backend.""" # Which are in the file and which aren't? - if grib_file: - grib_file_path = grib_file.path - else: - grib_file_path = None - present, missing = which_fields_in_file(reqs, grib_file_path, context) - - # The archive backend will reject any requests for dates less than N - # days old with a 400 HTTP error, so remove any if they exist. There - # shouldn't be any though. - missing_valid, missing_invalid = archive_maxdate_split(missing, regapi) - if missing_invalid: - context.error( - "Fields missing from latest backend but cannot " - "be on archived backend: " + repr(missing_invalid) - ) - if missing_valid: + present, missing = which_fields_in_file(reqs, grib_file, config, context) + if missing: context.info( - "Resorting to archived backend for missing fields: " + repr(missing_valid) + "Resorting to archived backend for missing fields: " + repr(missing) ) - - for r in missing_valid: + for r in missing: r["_backend"] = ["archived"] - req_group["uncached_archived_requests"].extend(missing_valid) + req_group["uncached_archived_requests"].extend(missing) def zip_files(req_groups, info, context): assert info["format"] != Formats.grib - path = create_file("zip", ".zip", info, context) + path = create_file("zip", ".zip", info) with zipfile.ZipFile(path, "w") as zf: for req_group in req_groups: zf.write( diff --git a/cads_adaptors/adaptors/cams_regional_fc/config/regional_fc_definitions.yaml b/cads_adaptors/adaptors/cams_regional_fc/config/regional_fc_definitions.yaml deleted file mode 100644 index 679b3fee..00000000 --- a/cads_adaptors/adaptors/cams_regional_fc/config/regional_fc_definitions.yaml +++ /dev/null @@ -1,775 +0,0 @@ -# This file contains metadata on the variables and models of the CAMS regional -# forecast. It is the hand-written authoritative source of information on each. - -# For the variable entries... -# frontend_api_name: -# Name used by CDS API -# backend_api_name: -# Name used by Meteo France API -# hidden: -# Set to True to prevent the variable appearing on the form. Does not prevent -# API access -# form_label: -# Label to use on download form -# var_table_units: -# Units to appear in variables.yaml -# grib_representations: -# List of dicts of GRIB key-value pairs that can identify this variable. -# It's a list because some variable encodings were changed, meaning >1 entry -# required -# eccodes_definition: -# Information used to define parameter definition tables that allow ecCodes to -# recognise parameters it otherwise wouldn't. This is used by the new netCDF -# converter. -# netcdf: -# Used in conversion from GRIB to netCDF by the legacy converter - -variable: -- frontend_api_name: alder_pollen - backend_api_name: C_POL_ALDER - standard_name: - hidden: false - form_label: Alder pollen - var_table_units: grains m-3 - grib_representations: - - parameterNumber: 59 - constituentType: 62100 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: AlderPollen - name: Alder pollen - paramId: 999001 - units: 'grains m**-3' - netcdf: - varname: apg_conc - species: Alder Pollen Grain - shortname: ALDERPOLLEN - units: grains/m3 - scale: 1 - -- frontend_api_name: ammonia - backend_api_name: NH3_USI - standard_name: mass_concentration_of_ammonia_in_air - hidden: false - form_label: Ammonia - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 9 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: nh3density - name: Ammonia - paramId: 999002 - units: 'kg m**-3' - netcdf: - varname: nh3_conc - species: Ammonia - shortname: NH3 - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: birch_pollen - backend_api_name: C_POL_BIRCH - standard_name: - hidden: false - form_label: Birch pollen - var_table_units: grains m-3 - grib_representations: - - parameterNumber: 59 - constituentType: 64000 - productDefinitionTemplateNumber: 40 - - parameterNumber: 59 - constituentType: 62101 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: BirchPollen - name: Birch pollen - paramId: 999003 - units: 'grains m**-3' - netcdf: - varname: bpg_conc - species: Birch Pollen Grain - shortname: BIRCH_POLLEN - units: grains/m3 - scale: 1 - -- frontend_api_name: carbon_monoxide - backend_api_name: CO_USI - standard_name: mass_concentration_of_carbon_monoxide_in_air - hidden: false - form_label: Carbon monoxide - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 4 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: codensity - name: Carbon monoxide - paramId: 999004 - units: 'kg m**-3' - netcdf: - varname: co_conc - species: Carbon Monoxide - shortname: CO - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: dust - backend_api_name: DUST_USI - standard_name: mass_concentration_of_dust_in_air - hidden: false - form_label: Dust - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62001 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: dustdensity - name: Dust - paramId: 999005 - units: 'kg m**-3' - netcdf: - varname: dust - species: Dust - shortname: Dust - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: formaldehyde - backend_api_name: HCHO_USI - standard_name: mass_concentration_of_formaldehyde_in_air - hidden: false - form_label: Formaldehyde - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 7 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: hchodensity - name: Formaldehyde - paramId: 999024 - units: 'kg m**-3' - netcdf: - varname: hcho_conc - species: Formaldehyde - shortname: HCHO - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: glyoxal - backend_api_name: CHOCHO_USI - standard_name: - hidden: false - form_label: Glyoxal - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 10038 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: chochodensity - name: Glyoxal - paramId: 999025 - units: 'kg m**-3' - netcdf: - varname: chocho_conc - species: Glyoxal - shortname: CHOCHO - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: grass_pollen - backend_api_name: C_POL_GRASS - standard_name: - hidden: false - form_label: Grass pollen - var_table_units: grains m-3 - grib_representations: - - parameterNumber: 59 - constituentType: 64001 - productDefinitionTemplateNumber: 40 - - parameterNumber: 59 - constituentType: 62300 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: GrassPollen - name: Grass pollen - paramId: 999006 - units: 'grains m**-3' - netcdf: - varname: gpg_conc - species: Grass Pollen Grain - shortname: GRASSPOLLEN - units: grains/m3 - scale: 1 - -- frontend_api_name: mugwort_pollen - backend_api_name: C_POL_MUGW - standard_name: - hidden: false - form_label: Mugwort pollen - var_table_units: grains m-3 - grib_representations: - - parameterNumber: 59 - constituentType: 62201 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: MugwortPollen - name: Mugwort pollen - paramId: 999007 - units: 'grains m**-3' - netcdf: - varname: mpg_conc - species: Mugwort Pollen Grain - shortname: MUGWORTPOLLEN - units: grains/m3 - scale: 1 - -- frontend_api_name: nitrogen_dioxide - backend_api_name: NO2_USI - standard_name: mass_concentration_of_nitrogen_dioxide_in_air - hidden: false - form_label: Nitrogen dioxide - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 5 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: no2density - name: Nitrogen dioxide - paramId: 999008 - units: 'kg m**-3' - netcdf: - varname: no2_conc - species: Nitrogen Dioxide - shortname: NO2 - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: nitrogen_monoxide - backend_api_name: NO_USI - standard_name: mass_concentration_of_nitrogen_monoxide_in_air - hidden: false - form_label: Nitrogen monoxide - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 11 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: nodensity - name: Nitrogen monoxide - paramId: 999009 - units: 'kg m**-3' - netcdf: - varname: no_conc - species: Nitrogen Monoxide - shortname: 'NO' - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: non_methane_vocs - backend_api_name: NMVOC_USI - standard_name: mass_concentration_of_nmvoc_expressed_as_carbon_in_air - hidden: false - form_label: Non-methane VOCs - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 60013 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: NonMethaneVOCs - name: Non-methane volatile organic compounds - paramId: 999010 - units: 'kg m**-3' - netcdf: - varname: nmvoc_conc - species: Non-Methane Volatile Organic Compounds - shortname: NMVOC - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: olive_pollen - backend_api_name: C_POL_OLIVE - standard_name: - hidden: false - form_label: Olive pollen - var_table_units: grains m-3 - grib_representations: - - parameterNumber: 59 - constituentType: 64002 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: OlivePollen - name: Olive pollen - paramId: 999011 - units: 'grains m**-3' - netcdf: - varname: opg_conc - species: Olive Pollen Grain - shortname: OLIVEPOLLEN - units: grains/m3 - scale: 1 - -- frontend_api_name: ozone - backend_api_name: O3_USI - standard_name: mass_concentration_of_ozone_in_air - hidden: false - form_label: Ozone - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 0 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: o3density - name: Ozone - paramId: 999012 - units: 'kg m**-3' - netcdf: - varname: o3_conc - species: Ozone - shortname: O3 - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: particulate_matter_2.5um - backend_api_name: PM25_USI - standard_name: mass_concentration_of_pm2p5_ambient_aerosol_in_air - hidden: false - form_label: "Particulate matter < 2.5 µm (PM2.5)" - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 40009 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: pm2p5 - name: Particulate matter d <= 2.5 um - paramId: 999013 - units: 'kg m**-3' - netcdf: - varname: pm2p5_conc - species: PM2.5 Aerosol - shortname: PM25 - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: particulate_matter_10um - backend_api_name: PM10_USI - standard_name: mass_concentration_of_pm10_ambient_aerosol_in_air - hidden: false - form_label: "Particulate matter < 10 µm (PM10)" - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 40008 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: pm10 - name: Particulate matter d <= 10 um - paramId: 999014 - units: 'kg m**-3' - netcdf: - varname: pm10_conc - species: PM10 Aerosol - shortname: PM10 - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: peroxyacyl_nitrates - backend_api_name: PANS_USI - standard_name: - hidden: false - form_label: Peroxyacyl nitrates - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 60018 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: pansdensity - name: Peroxyacyl nitrates - paramId: 999016 - units: 'kg m**-3' - netcdf: - varname: pans_conc - species: Acyl Peroxy Nitrates - shortname: PANS - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: pm2.5_anthropogenic_fossil_fuel_carbon - backend_api_name: EC_FF_USI - standard_name: - hidden: true - form_label: PM2.5, anthropogenic fossil fuel carbon only - var_table_units: - grib_representations: - - parameterNumber: 0 - constituentType: 62097 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: PM25FossilFuelCarbon - name: Particulate matter d <= 2.5 um from fossil fuel carbon - paramId: 999017 - units: 'kg m**-3' - netcdf: - varname: ecff_conc - species: PM2.5 Elemental Carbon from Anthropogenic Fossil Fuels - shortname: ECFF - units: "µg/m3" - scale: 1 - -- frontend_api_name: pm2.5_anthropogenic_wood_burning_carbon - backend_api_name: EC_WB_USI - standard_name: - hidden: true - form_label: PM2.5, anthropogenic wood burning carbon only - var_table_units: - grib_representations: - - parameterNumber: 0 - constituentType: 62098 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: PM25WoodBurningCarbon - name: Particulate matter d <= 2.5 um from wood-burning carbon - paramId: 999018 - units: 'kg m**-3' - netcdf: - varname: ecwb_conc - species: PM2.5 Elemental Carbon from Anthropogenic Wood Burning - shortname: ECWB - units: "µg/m3" - scale: 1 - -- frontend_api_name: pm2.5_total_organic_matter - backend_api_name: PM25_OM_USI - standard_name: - hidden: false - form_label: PM2.5, total organic matter only - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62010 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: pm2p5totalorganicmatter - name: Particulate matter d <= 2.5 um from total organic matter - paramId: 999025 - units: 'kg m**-3' - netcdf: - varname: pm2p5_total_om_conc - species: PM2.5 Aerosol from Total Organic Matter - shortname: PM25_TOTAL_OM - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: pm10_sea_salt_dry - backend_api_name: DYNSAL_USI - standard_name: - hidden: false - form_label: PM10, sea salt (dry) only - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62008 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: pm10seasaltdry - name: Particulate matter d <= 10 um from sea salt (dry) - paramId: 999024 - units: 'kg m**-3' - netcdf: - varname: pm10_ss_conc - species: PM10 Aerosol from Sea salt (dry) - shortname: PM10_SEASALT_DRY - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: pm10_wildfires - backend_api_name: PM_WF_USI - standard_name: - hidden: false - form_label: PM10, wildfires only - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62096 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: pm10wildfire - name: Particulate matter d <= 10 um from wildfires - paramId: 999015 - units: 'kg m**-3' - netcdf: - varname: pmwf_conc - species: PM10 Aerosol from Wildfires - shortname: PM_Wildfire - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: ragweed_pollen - backend_api_name: C_POL_RAGW - standard_name: - hidden: false - form_label: Ragweed pollen - var_table_units: grains m-3 - grib_representations: - - parameterNumber: 59 - constituentType: 64003 - productDefinitionTemplateNumber: 40 - - parameterNumber: 59 - constituentType: 62200 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: RagweedPollen - name: Ragweed pollen - paramId: 999019 - units: 'grains m**-3' - netcdf: - varname: rwpg_conc - species: Ragweed Pollen Grain - shortname: RAGWEEDPOLLEN - units: grains/m3 - scale: 1 - -- frontend_api_name: residential_elementary_carbon - backend_api_name: EC_RES_USI - standard_name: - hidden: false - form_label: Residential elementary carbon - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62094 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: ResidentialElementaryCarbon - name: Residential elementary carbon - paramId: 999020 - units: 'kg m**-3' - netcdf: - varname: ecres_conc - species: Residential Elementary Carbon - shortname: ECRES - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: secondary_inorganic_aerosol - backend_api_name: SIA_USI - standard_name: mass_concentration_of_secondary_inorganic_aerosol_in_air - hidden: false - form_label: Secondary inorganic aerosol - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62099 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: SecondaryInorganicAerosol - name: Secondary inorganic aerosol - paramId: 999021 - units: 'kg m**-3' - netcdf: - varname: sia_conc - species: Secondary Inorganic Aerosol - shortname: SIA - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: sulphur_dioxide - backend_api_name: SO2_USI - standard_name: mass_concentration_of_sulfur_dioxide_in_air - hidden: false - form_label: Sulphur dioxide - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 8 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: so2density - name: Sulphur dioxide - paramId: 999022 - units: 'kg m**-3' - netcdf: - varname: so2_conc - species: Sulphur Dioxide - shortname: SO2 - units: "µg/m3" - scale: 1000000000.0 - -- frontend_api_name: total_elementary_carbon - backend_api_name: EC_TOT_USI - standard_name: - hidden: false - form_label: Total elementary carbon - var_table_units: "GRIB: kg m-3; netCDF: µg m-3" - grib_representations: - - parameterNumber: 0 - constituentType: 62095 - productDefinitionTemplateNumber: 40 - eccodes_definition: - shortName: TotalElementaryCarbon - name: Total elementary carbon - paramId: 999023 - units: 'kg m**-3' - netcdf: - varname: ectot_conc - species: Total Elementary Carbon - shortname: ECTOT - units: "µg/m3" - scale: 1000000000.0 - - -model: -- backend_api_name: ENS - frontend_api_name: ensemble - form_label: Ensemble median - grib_representations: - - centre: 85 - subCentre: 2 - netcdf: - institution: Meteo France - name: ENSEMBLE - name2: ENSEMBLE - - # INERIS -- backend_api_name: CHIMERE - frontend_api_name: chimere - form_label: CHIMERE - grib_representations: - - centre: 85 - subCentre: 200 - netcdf: - institution: CNRS - name: CHIMERE - name2: CHIMERE - - # Met Norway -- backend_api_name: EMEP - frontend_api_name: emep - form_label: EMEP - grib_representations: - - centre: 88 - subCentre: 0 - netcdf: - institution: Met No - name: EMEP - name2: EMEP - - # KNMI/TNO -- backend_api_name: LOTOS - frontend_api_name: lotos - form_label: LOTOS-EUROS - grib_representations: - - centre: 99 - subCentre: 0 - netcdf: - institution: Royal Netherlands Meteorological Institute - name: LOTOS-EUROS - name2: LOTOS_EUROS - - # SMHI -- backend_api_name: MATCH - frontend_api_name: match - form_label: MATCH - grib_representations: - - centre: 82 - subCentre: 98 - netcdf: - institution: Swedish Meteorological and Hydrological Institute - name: MATCH - name2: MATCH - - # Meteo France -- backend_api_name: MOCAGE - frontend_api_name: mocage - form_label: MOCAGE - grib_representations: - - centre: 85 - subCentre: 1 - netcdf: - institution: Meteo France - name: MOCAGE - name2: MOCAGE - - # FMI -- backend_api_name: SILAM - frontend_api_name: silam - form_label: SILAM - grib_representations: - - centre: 86 - subCentre: 0 - netcdf: - institution: Finnish Meteorological Institute - name: SILAM - name2: SILAM - - # Forschungszentrum Julich / IEK8 -- backend_api_name: EURADIM - frontend_api_name: euradim - form_label: EURAD-IM - grib_representations: - - centre: 85 - subCentre: 201 - netcdf: - institution: RIUUK - name: EURAD-IM - name2: EURAD - - # Aarhus University (Denmark) -- backend_api_name: DEHM - frontend_api_name: dehm - form_label: DEHM - grib_representations: - - centre: 85 - subCentre: 203 - netcdf: - institution: Aarhus University (Denmark) - name: DEHM - name2: DEHM - - # Polish Institute for National Protection (IEP-NRI) -- backend_api_name: GEMAQ - frontend_api_name: gemaq - form_label: GEM-AQ - grib_representations: - - centre: 85 - subCentre: 204 - netcdf: - institution: "Institute of Environmental Protection – National Research Institute (Poland)" - name: GEM-AQ - name2: GEM-AQ - - # ENEA - Italian National Agency for New Technologies, Energy and Sustainable - # Economic Development -- backend_api_name: MINNI - frontend_api_name: minni - form_label: MINNI - grib_representations: - - centre: 85 - subCentre: 205 - netcdf: - institution: "ENEA (Italy)" - name: MINNI - name2: MINNI - - # Barcelona Supercomputing Center (BSC) -- backend_api_name: MONARCH - frontend_api_name: monarch - form_label: MONARCH - grib_representations: - - centre: 85 - subCentre: 206 - netcdf: - institution: "Barcelona Supercomputing Center" - name: MONARCH - name2: MONARCH diff --git a/cads_adaptors/adaptors/cams_regional_fc/convert_grib.py b/cads_adaptors/adaptors/cams_regional_fc/convert_grib.py index 51655262..6b6cd887 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/convert_grib.py +++ b/cads_adaptors/adaptors/cams_regional_fc/convert_grib.py @@ -1,22 +1,16 @@ from os.path import exists -import yaml - from .convert_grib_to_netcdf import convert_grib_to_netcdf from .create_file import create_file from .formats import Formats -def convert_grib(req_groups, info, dataset_dir, context): +def convert_grib(req_groups, info, regfc_defns, context): """Convert files to NetCDF if required.""" - # Information on each parameter & model - with open(dataset_dir + "/regional_fc_definitions.yaml") as f: - regfc_defns = yaml.safe_load(f) - # Convert to NetCDF? if info["format"] in [Formats.netcdf, Formats.netcdf_zip, Formats.netcdf_cdm]: for req_group in req_groups: - req_group["nc_file"] = create_file("convert", ".nc", info, context) + req_group["nc_file"] = create_file("convert", ".nc", info) if info["format"] in (Formats.netcdf, Formats.netcdf_zip): convert_grib_to_netcdf( req_group["requests"], @@ -24,10 +18,10 @@ def convert_grib(req_groups, info, dataset_dir, context): req_group["nc_file"], regfc_defns, ) - elif info["format"] == Formats.netcdf_cdm: - convert_grib_to_netcdf_cdm( - req_group["grib_file"], req_group["nc_file"], dataset_dir, context - ) + # elif info["format"] == Formats.netcdf_cdm: + # convert_grib_to_netcdf_cdm( + # req_group["grib_file"], req_group["nc_file"], dataset_dir, context + # ) else: raise Exception("Unrecognised format: " + info["format"]) diff --git a/cads_adaptors/adaptors/cams_regional_fc/create_file.py b/cads_adaptors/adaptors/cams_regional_fc/create_file.py index 3088e1f0..76a0d3c5 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/create_file.py +++ b/cads_adaptors/adaptors/cams_regional_fc/create_file.py @@ -1,24 +1,40 @@ -def create_file(stage, suffix, info, context, temp_path=None): +import os +from tempfile import mkstemp + +from . import STACK_DOWNLOAD_DIR, STACK_TEMP_DIR + + +def create_file(stage, suffix, info): """Return the path of a new result file or temp file, depending on whether the specified processing stage is the last in the series or not. """ + # If this isn't the last stage in the process then make a temp file. + # Otherwise make a result file assert stage in info["stages"] temp = stage != info["stages"][-1] if temp: - if temp_path is None: - path = context.create_temp_file(suffix) - else: - path = temp_path + path = temp_file(info["config"], suffix=suffix) else: # Check a final result file isn't requested more than once if "result_file" in info: raise Exception("Adaptor trying to create >1 result file??") - info["result_file"] = context.create_result_file(suffix) - # path = info['result_file'].path - path = info["result_file"].path - # path = 'alabala.txt' + os.makedirs(STACK_DOWNLOAD_DIR, exist_ok=True) + info["result_file"] = ( + f"{STACK_DOWNLOAD_DIR}/" + f"{info['config']['request_uid']}{suffix}" + ) + path = info["result_file"] return path + + +def temp_file(config, suffix=""): + """Make and return the path of a temporary file.""" + os.makedirs(STACK_TEMP_DIR, exist_ok=True) + fd, target = mkstemp( + prefix=config["request_uid"] + "_", suffix=suffix, dir=STACK_TEMP_DIR + ) + os.close(fd) + return target diff --git a/cads_adaptors/adaptors/cams_regional_fc/grib2request.py b/cads_adaptors/adaptors/cams_regional_fc/grib2request.py index 59054a63..bbd614f7 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/grib2request.py +++ b/cads_adaptors/adaptors/cams_regional_fc/grib2request.py @@ -1,13 +1,12 @@ """Code that takes a grib message, reverse engineers and returns the associated ADS API request dictionary.""" -import yaml from eccodes import codes_get grib_key_types = {} field_data: dict = {} -def grib2request_init(dataset_dir): +def grib2request_init(regfc_defns): """Initialise global variables: grib_key_types and field_data. This is so that it doesn't need to be done multiple times or in grib2request(), which is called from places where the path to the dataset directory is @@ -17,10 +16,6 @@ def grib2request_init(dataset_dir): if field_data: return - # Read information on the variables and models available - with open(f"{dataset_dir}/regional_fc_definitions.yaml") as f: - regfc_defns = yaml.safe_load(f) - # Link grib representations to API request values field_data.update( { diff --git a/cads_adaptors/adaptors/cams_regional_fc/mem_safe_queue.py b/cads_adaptors/adaptors/cams_regional_fc/mem_safe_queue.py index e97a8a56..8d9a6267 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/mem_safe_queue.py +++ b/cads_adaptors/adaptors/cams_regional_fc/mem_safe_queue.py @@ -12,14 +12,12 @@ class MemSafeQueue(queue.Queue): means the queue memory usage will not grow out of control. """ - def __init__( - self, nbytes_max, tmpdir, *args, logger=logging.getLogger(__name__), **kwargs - ): + def __init__(self, nbytes_max, *args, tmpdir=None, logger=None, **kwargs): super().__init__(*args, **kwargs) self.nbytes_max = nbytes_max self.nbytes = 0 self.tmpdir = tmpdir - self.logger = logger + self.logger = logging.getLogger(__name__) if logger is None else logger self._lock = threading.Lock() self.stats = {} @@ -60,6 +58,8 @@ def put(self, item, **kwargs): self._lock.release() self.logger.debug(f"MemSafeQueue: storing on disk: {fieldinfo!r}") t = time.time() + if self.tmpdir: + os.makedirs(self.tmpdir, exist_ok=True) with NamedTemporaryFile(dir=self.tmpdir, delete=False) as tmp: tmp.write(data) self.stats["iotime"] += time.time() - t diff --git a/cads_adaptors/adaptors/cams_regional_fc/api_retrieve.py b/cads_adaptors/adaptors/cams_regional_fc/meteo_france_retrieve.py similarity index 67% rename from cads_adaptors/adaptors/cams_regional_fc/api_retrieve.py rename to cads_adaptors/adaptors/cams_regional_fc/meteo_france_retrieve.py index 34d80e81..c631b028 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/api_retrieve.py +++ b/cads_adaptors/adaptors/cams_regional_fc/meteo_france_retrieve.py @@ -1,3 +1,4 @@ +import logging import time from copy import deepcopy from itertools import product @@ -12,8 +13,25 @@ from .grib2request import grib2request_init -def api_retrieve(context, requests, regapi, dataset_dir, no_cache_put=False, **kwargs): - """Download the fields from the Meteo France API.""" +def meteo_france_retrieve( + requests, + target, + regapi, + regfc_defns, + integration_server, + tmpdir=None, + max_rate=None, + max_simultaneous=None, + cacher_kwargs=None, + logger=None, + **kwargs, +): + """Download the fields from the Meteo France API. This function is designed to be + callable from outside of the CDS infrastructure. + """ + if logger is None: + logger = logging.getLogger(__name__) + # Keyword argument options to Downloader that depend on the backend # (archived/latest) backend_specific = { @@ -46,18 +64,21 @@ def api_retrieve(context, requests, regapi, dataset_dir, no_cache_put=False, **k "default": 50, 404: 1, 400: 10, - }, # 400's can get raised at 2am - "request_timeout": [60, 300], + }, + # Read timeout reduced from 300s because not too uncommon to hit + # that and it was found to be quicker to fail sooner and retry than + # wait for a long timeout + "request_timeout": [60, 60], }, } backend = requests[0]["_backend"][0] # Process requests according to the abilities of the Meteo France API - requests = make_api_hypercubes(requests, regapi, context) + requests = make_api_hypercubes(requests, regapi) # Initialisation for function that can understand GRIB file contents - grib2request_init(dataset_dir) + grib2request_init(regfc_defns) # By default Downloader would use requests.get to make requests. Provide # an alternative function that takes care of the Meteo France API @@ -65,30 +86,32 @@ def api_retrieve(context, requests, regapi, dataset_dir, no_cache_put=False, **k getter = regapi.get_fields_url # Objects to limit the rate and maximum number of simultaneous requests - rate_limiter = CamsRegionalFcApiRateLimiter(regapi) - number_limiter = CamsRegionalFcApiNumberLimiter(regapi) + rate_limiter = CamsRegionalFcApiRateLimiter(max_rate or regapi.max_rate) + number_limiter = CamsRegionalFcApiNumberLimiter( + max_simultaneous or regapi.max_simultaneous + ) # Translate requests into URLs as dicts with a 'url' and a 'req' key urlreqs = list(requests_to_urls(requests, regapi.url_patterns)) # Create an object that will handle the caching - with Cacher(context, no_put=no_cache_put) as cacher: + with Cacher( + integration_server, logger=logger, tmpdir=tmpdir, **(cacher_kwargs or {}) + ) as cacher: # Create an object that will allow URL downloading in parallel - context.create_result_file(".json") downloader = Downloader( - context, getter=getter, max_rate=rate_limiter, max_simultaneous=number_limiter, - combine_method="cat", + combine_method="cat" if target else "null", target_suffix=".grib", response_checker=assert_valid_grib, response_checker_threadsafe=False, combine_in_order=False, - write_to_temp=False, nonfatal_codes=[404], allow_no_data=True, cacher=cacher, + logger=logger, **backend_specific[backend], **kwargs, ) @@ -97,31 +120,29 @@ def api_retrieve(context, requests, regapi, dataset_dir, no_cache_put=False, **k try: # Returns None if no data is found - file = downloader.execute(urlreqs) - except RequestFailed: - # req = {x["url"]: x["req"] for x in urlreqs}[e.url] - # raise Exception( - # 'Failed to retrieve data for ' + str(req) + - # f' (code {e.status_code}). Please try again later') \ - # from None - return None + file = downloader.execute(urlreqs, target=target) + except RequestFailed as e: + req = {x["url"]: x["req"] for x in urlreqs}[e.url] + raise Exception( + f"Failed to retrieve data for {req} (code {e.status_code})." + ) from None # Ensure the next call to this routine does not happen less than # 1/max_rate seconds after the last API request rate_limiter.block({"req": {"_backend": backend}}) nfields = hcube_tools.count_fields(requests) - context.info( + logger.info( f"Attempted download of {nfields} fields took " + f"{time.time() - t0} seconds" ) - context.info("download finished") #! + logger.info("Meteo France download finished") return file -def make_api_hypercubes(requests, regapi, context): +def make_api_hypercubes(requests, regapi): """Process request hypercubes into the dicts required for url2 input. For requests for latest data, for which each field must be fetched with a separate URL request, this is a null op. Archived data URL requests can @@ -180,74 +201,48 @@ def levgroups(levels): return output -class CamsRegionalFcApiLimiter: - """Abstract base class for controlling the URL requests. - It controls rate and max number of simultaneous requests to the regional forecast API. - """ - - def __init__(self, regapi): - # Maximum URL request rates for online and offline data - self._max_rate = regapi.max_rate - - # Maximum number of simultaneous requests for online and offline data - self._max_simultaneous = regapi.max_simultaneous - - # The time duration that data stays online. Nominally this is 5 days - # but we subtract a small amount of time for safety - #### self._online_duration = timedelta(days=5) - timedelta(minutes=10) - - self._rate_semaphores = {k: Semaphore() for k in self._max_rate.keys()} - self._number_semaphores = { - k: Semaphore(v) for k, v in self._max_simultaneous.items() - } - - # def _type(self, req): - # """Return whether a request field is expected to be online or offline - # depending on the validity time.""" - # r = req['req'] - # vtime = datetime.strptime(r['date'] + ' ' + r['time'], - # '%Y-%m-%d %H%M') + \ - # timedelta(hours=int(r['step'])) - # if (datetime.now() - vtime) < self._online_duration: - # return 'latest' - # else: - # return 'archived' - - -class CamsRegionalFcApiRateLimiter(CamsRegionalFcApiLimiter): +class CamsRegionalFcApiRateLimiter: """Class to limit the URL request rate to the regional forecast API.""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, max_rate): + self._max_rate = max_rate + self._rate_semaphores = {k: Semaphore() for k in self._max_rate.keys()} def block(self, req): """Block as required to ensure there is at least 1/max_rate seconds - between calls for the same type of data, where max_rate depends on - the type in question. + between calls for the same backend, where max_rate depends on the + backend in question. """ - type = req["req"]["_backend"] - self._rate_semaphores[type].acquire() - Timer(1 / self._max_rate[type], self._rate_semaphores[type].release).start() + backend = req["req"]["_backend"] + self._rate_semaphores[backend].acquire() + Timer( + 1 / self._max_rate[backend], self._rate_semaphores[backend].release + ).start() -class CamsRegionalFcApiNumberLimiter(CamsRegionalFcApiLimiter): - """Class to limit the number of simultaneously executing URL requests to the regional forecast API.""" +class CamsRegionalFcApiNumberLimiter: + """Class to limit the number of simultaneously executing URL requests to the + regional forecast API. + """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, max_simultaneous): + self._max_simultaneous = max_simultaneous + self._number_semaphores = { + k: Semaphore(v) for k, v in self._max_simultaneous.items() + } def block(self, req): """Block as required to ensure there are no more than N ongoing - requests for the same type of data, where N depends on the type in + requests for the same backend, where N depends on the backend in question. Return a function that will unblock when called. """ - type = req["req"]["_backend"] - self._number_semaphores[type].acquire() - return lambda X: self._number_semaphores[type].release() + backend = req["req"]["_backend"] + self._number_semaphores[backend].acquire() + return lambda X: self._number_semaphores[backend].release() @property def max_simultaneous(self): """Return the total number of simultaneous URL requests allowed, of any type. """ - return sum(self._max_rate.values()) + return sum(self._max_simultaneous.values()) diff --git a/cads_adaptors/adaptors/cams_regional_fc/preprocess_requests.py b/cads_adaptors/adaptors/cams_regional_fc/preprocess_requests.py index 3dbe020e..255b961d 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/preprocess_requests.py +++ b/cads_adaptors/adaptors/cams_regional_fc/preprocess_requests.py @@ -2,10 +2,10 @@ from math import ceil, floor from cds_common import date_tools, hcube_tools -from cds_common.request_schemas import enforce_schema -from cdscompute.errors import BadRequestException -# from tabulate import tabulate +from cads_adaptors.exceptions import InvalidRequest +from cads_adaptors.validation.enforce import enforce as enforce_schema + from .formats import Formats @@ -44,7 +44,7 @@ def preprocess_requests(context, requests, regapi): # We cannot convert to NetCDF format if the retrieved fields will be on # different grids if format != Formats.grib and len(requested_grids) > 1: - raise BadRequestException( + raise InvalidRequest( "The model grid changed during the period requested. Fields on " + "different grids cannot be combined in one NetCDF file. " + "Please either request grib format, make separate requests or " @@ -154,13 +154,9 @@ def set_area(request, area_list, grid, context): for ia, (key, value) in enumerate(zip(default_area.keys(), area_list)): area[key] = float(value) if area["north"] <= area["south"]: - raise BadRequestException( - "area north limit must be greater than " + "south limit", "" - ) + raise InvalidRequest("area north limit must be greater than south limit") if area["east"] <= area["west"]: - raise BadRequestException( - "area east limit must be greater than " + "west limit", "" - ) + raise InvalidRequest("area east limit must be greater than west limit") # Snap to the grid area["north"] = snap_to_grid(area["north"], grid["south"], grid["dlat"], floor) @@ -168,9 +164,7 @@ def set_area(request, area_list, grid, context): area["south"] = snap_to_grid(area["south"], grid["south"], grid["dlat"], ceil) area["east"] = snap_to_grid(area["east"], grid["west"], grid["dlon"], floor) if area["north"] < area["south"] or area["east"] < area["west"]: - raise BadRequestException( - "requested area does not contain a " + "grid point", "" - ) + raise InvalidRequest("requested area does not contain a grid point") # Only insert area in request if it's not the full area (for caching # reasons) @@ -188,14 +182,9 @@ def set_area(request, area_list, grid, context): # return an error code. direction = 1 if incr < 0 else -1 if area[k] * direction > (grid[k] + incr) * direction: - raise BadRequestException( - "Area " - + k - + " value lies outside model grid limit of " - + str(grid[k]) - + " for date(s)=" - + repr(request["date"]), - "", + raise InvalidRequest( + f"Area {k} value lies outside model grid limit of {grid[k]} " + + f"for date(s)={request['date']!r}" ) # Return the requested grid, whether inserted into the request or not @@ -213,7 +202,9 @@ def dirty_manual_tabulate(rows, headers, separator=", "): def model_grids_table(grids, regapi): - """Return the text of a table summarising the regional model grids in use for the requested fields.""" + """Return the text of a table summarising the regional model grids in use for the + requested fields. + """ # Loop over each grid and the fields that were requested on that grid strings = [] for grid, requested in grids.items(): @@ -255,7 +246,9 @@ def model_grids_table(grids, regapi): def snap_to_grid(coord, minl, incr, rounder): - """Snap a lat or lon to the regional grid where the lat/lon min is minl and the grid length is incr.""" + """Snap a lat or lon to the regional grid where the lat/lon min is minl and the + grid length is incr. + """ raw = rounder((coord - (minl + incr / 2)) / incr) * incr + (minl + incr / 2) # Rounding error can lead to spurious significant figures. This is a # poor-man's attempt at getting the number of decimal places the result diff --git a/cads_adaptors/adaptors/cams_regional_fc/process_grib_files.py b/cads_adaptors/adaptors/cams_regional_fc/process_grib_files.py index b8c60165..e85170ad 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/process_grib_files.py +++ b/cads_adaptors/adaptors/cams_regional_fc/process_grib_files.py @@ -29,17 +29,17 @@ def process_grib_files(req_groups, info, context): # required for req_group in req_groups: # If there is only one retrieved file for this group and it does not - # require any alteration then use as-is - if len(req_group["retrieved_files"]) == 1 and not any(alterations.values()): - pre_existing = req_group["retrieved_files"][0] + # require any alteration then use as-is. Otherwise, copy data to the + # new file. + if ( + len(req_group["retrieved_files"]) == 1 + and not any(alterations.values()) + and info["stages"][-1] != "merge_grib" + ): + req_group["grib_file"] = req_group["retrieved_files"][0] else: - pre_existing = None - req_group["grib_file"] = create_file( - "merge_grib", ".grib", info, context, temp_path=pre_existing - ) - - # Copy data to the grib file? - if req_group["grib_file"] != pre_existing: + # Copy data to the grib file + req_group["grib_file"] = create_file("merge_grib", ".grib", info) with open(req_group["grib_file"], "wb") as fout: for data in data_processor(req_group, alterations, context): fout.write(data) @@ -66,7 +66,7 @@ def data_processor(req_group, alterations, context): req_group["retrieved_files"], req_group["requests"], grib2request, - context, + logger=context, ) else: iterator = grib_file_iterator(req_group["retrieved_files"]) diff --git a/cads_adaptors/adaptors/cams_regional_fc/subrequest_main.py b/cads_adaptors/adaptors/cams_regional_fc/subrequest_main.py new file mode 100644 index 00000000..4db6fb4c --- /dev/null +++ b/cads_adaptors/adaptors/cams_regional_fc/subrequest_main.py @@ -0,0 +1,55 @@ +import os +from tempfile import mkstemp + +from cds_common.cams.regional_fc_api import regional_fc_api + +from . import STACK_DOWNLOAD_DIR, STACK_TEMP_DIR +from .meteo_france_retrieve import meteo_france_retrieve + + +def subrequest_main(backend, request, child_config, context): + """Get data from the specified Meteo France backend.""" + parent_config = request["parent_config"] + message = ( + f"The parent request is {parent_config['request_uid']}, " + f"launched by user {parent_config['user_uid']}." + ) + context.add_stdout(message) + + # Are we using the "integration" (test) or main server? + cfg = parent_config.get("regional_fc", {}) + integration_server = cfg.get("integration_server", False) + if integration_server: + context.info("Using integration server") + + # Get an object which will give us information/functionality associated + # with the Meteo France regional forecast API + regapi = regional_fc_api(integration_server=integration_server, logger=context) + + # Construct a target file name + os.makedirs(STACK_DOWNLOAD_DIR, exist_ok=True) + fd, target = mkstemp( + prefix=child_config["request_uid"] + "_", suffix=".grib", dir=STACK_DOWNLOAD_DIR + ) + os.close(fd) + + # Get the data + try: + meteo_france_retrieve( + request["requests"], + target, + regapi, + cfg["definitions"], + integration_server, + logger=context, + tmpdir=STACK_TEMP_DIR, + max_rate=cfg.get("meteofrance_max_rate"), + max_simultaneous=cfg.get("meteofrance_max_simultaneous"), + cacher_kwargs=cfg.get("cacher_kwargs", {}), + ) + except Exception as e: + message = f"Failed to obtain data from remote server: {e!r}" + context.add_stderr(message) + raise RuntimeError(message) from None + + return open(target, "rb") diff --git a/cads_adaptors/adaptors/cams_regional_fc/which_fields_in_file.py b/cads_adaptors/adaptors/cams_regional_fc/which_fields_in_file.py index 260d04c2..37773c02 100644 --- a/cads_adaptors/adaptors/cams_regional_fc/which_fields_in_file.py +++ b/cads_adaptors/adaptors/cams_regional_fc/which_fields_in_file.py @@ -1,48 +1,32 @@ -import os -import random import shutil import time -from datetime import datetime from cds_common import hcube_tools from cds_common.message_iterators import grib_file_iterator +from .create_file import temp_file from .grib2request import grib2request -def which_fields_in_file(reqs, grib_file, context): +def which_fields_in_file(reqs, grib_file, config, context): """Compare the requests with the contents of the grib file and return two lists of requests, representing those which are in the file and those which are not. """ - if grib_file is None: - msg_iterator = [] - else: - msg_iterator = grib_file_iterator(grib_file) - # Read the grib file to find out which fields were retrieved + msg_iterator = grib_file_iterator(grib_file) try: reqs_infile = [ {k: [v] for k, v in grib2request(msg).items()} for msg in msg_iterator ] except Exception: - # Sometimes we have problems here. Copy the file somewhere so - # we can investigate later. - tmp = ( - "/tmp/cams-europe-air-quality-forecasts/debug/" - + "problem_file." - + datetime.now().strftime("%Y%m%d.%H%M%S") - + "." - + str(random.randint(0, 2**128)) - + ".grib" - ) + # Sometimes we have problems here. Copy the file somewhere so we can + # investigate later. + tmp = temp_file(config, suffix=".grib") context.info( - "Encountered error when reading grib file. Copying " - + "to " - + tmp - + " for offline investigation" + f"Encountered error when reading grib file. Copying to {tmp} for offline " + "investigation" ) - os.makedirs(os.path.dirname(tmp), exist_ok=True) shutil.copyfile(grib_file, tmp) raise hcube_tools.hcubes_merge(reqs_infile) diff --git a/cads_adaptors/adaptors/cds.py b/cads_adaptors/adaptors/cds.py index c13b3ab8..8e06a23e 100644 --- a/cads_adaptors/adaptors/cds.py +++ b/cads_adaptors/adaptors/cds.py @@ -1,5 +1,7 @@ import os import pathlib +import datetime +import re from copy import deepcopy from random import randint from typing import Any, BinaryIO @@ -111,6 +113,55 @@ def pre_mapping_modifications(self, request: dict[str, Any]) -> dict[str, Any]: return request + def ensure_dateranges(self, strings): + dateranges = [] + for string in strings: + dates = re.split("[;/]", string) + if len(dates) == 1: + dates *= 2 + dateranges.append(dates) + return dateranges + + def instantiate_dynamic_daterange(self, string: str, today: datetime): + dates = re.split("[;/]", string) + if len(dates) == 1: + dates *= 2 + for i,date in enumerate(dates): + if date.startswith("current"): + diff = date.replace("current","") + diff = int(diff) if diff else 0 + date = today + datetime.timedelta(diff) + dates[i] = date.strftime("%Y-%m-%d") + return f"{dates[0]}/{dates[1]}" + + def preprocess_conditions(self, conditions): + today = datetime.datetime.now(datetime.timezone.utc) + for condition in conditions: + if "date" in condition: + condition["date"] = self.instantiate_dynamic_daterange(condition["date"], today) + + def dateranges_in(self, contained, container): + container_interval = re.split("[;/]", container) + contained_intervals = self.ensure_dateranges(contained) + for contained_interval in contained_intervals: + if not (container_interval[0] <= contained_interval[0] and contained_interval[1] <= container_interval[1]): + return False + return True + + def satisfy_condition(self, request: dict[str, Any], condition: dict[str, Any]): + for key in condition: + if key == "date": + if not self.dateranges_in(request[key], condition[key]): + return False + elif not set(ensure_list(request[key])) <= set(ensure_list(condition[key])): + return False + return True + + def satisfy_conditions(self, request: dict[str, Any], conditions: list[dict[str, Any]]): + for condition in conditions: + if self.satisfy_condition(request, condition): + return True + def normalise_request(self, request: Request) -> Request: """ Normalise the request prior to submission to the broker, and at the start of the retrieval. @@ -139,6 +190,19 @@ def normalise_request(self, request: Request) -> Request: # Pre-mapping modifications working_request = self.pre_mapping_modifications(deepcopy(request)) + # Implement a request-level tagging system + try: + self.conditional_tagging = self.config.get("conditional_tagging", None) + if self.conditional_tagging is not None: + for tag in self.conditional_tagging: + conditions = self.conditional_tagging[tag] + self.preprocess_conditions(conditions) + if self.satisfy_conditions(request, conditions): + hidden_tag = f"__{tag}" + request[hidden_tag] = True + except Exception as e: + self.context.add_stdout(f"An error occured while attempting conditional tagging: {e!r}") + # If specified by the adaptor, intersect the request with the constraints. # The intersected_request is a list of requests if self.intersect_constraints_bool: