Skip to content

Commit

Permalink
Merge pull request #257 from ecmwf-projects/feature/regional_fc_cache2
Browse files Browse the repository at this point in the history
Add new regional fc cacher class for use by update code
  • Loading branch information
ecmwf-cobarzan authored Jan 2, 2025
2 parents d2078c7 + 3e4936f commit df8aa76
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
21 changes: 20 additions & 1 deletion cads_adaptors/adaptors/cams_regional_fc/cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,23 @@ def delete(self, fieldinfo):
self.client.delete_object(Bucket=self._bucket, Key=remote_path)


Cacher = CacherS3
class CacherS3AndFile(CacherS3):
"""Sub-class of CacherS3 to cache not only to an S3 bucket but to a local
file as well.
"""

def __init__(self, *args, field2path=None, **kwargs):
super().__init__(*args, **kwargs)
self.field2path = field2path

def _write_field_sync(self, data, fieldinfo):
# Write to the S3 bucket
super()._write_field_sync(data, fieldinfo)

# Write to a local path?
if self.field2path:
path = self.field2path(fieldinfo)
self.logger.info(f"Caching {fieldinfo} to {path}")
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(data)
6 changes: 3 additions & 3 deletions cads_adaptors/adaptors/cams_regional_fc/cams_regional_fc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cads_adaptors.exceptions import InvalidRequest

from .assert_valid_grib import assert_valid_grib
from .cacher import Cacher
from .cacher import CacherS3
from .convert_grib import convert_grib
from .create_file import create_file, temp_file
from .formats import Formats
Expand Down Expand Up @@ -150,10 +150,10 @@ def get_local(req_groups, integration_server, config, context):
"""Retrieve only the fields which are stored locally (in the cache or on
the datastore) and identify the remaining non-local fields.
"""
# Cacher has knowledge of cache locations
# CacherS3 has knowledge of cache locations
cfg = config.get("regional_fc", {})
no_cache_key = cfg.get("no_cache_key")
with Cacher(
with CacherS3(
integration_server,
logger=context,
no_cache_key=no_cache_key,
Expand Down
11 changes: 7 additions & 4 deletions cads_adaptors/adaptors/cams_regional_fc/meteo_france_retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cds_common.url2.requests_to_urls import requests_to_urls

from .assert_valid_grib import assert_valid_grib
from .cacher import Cacher
from .cacher import CacherS3
from .grib2request import grib2request_init


Expand All @@ -22,6 +22,7 @@ def meteo_france_retrieve(
tmpdir=None,
max_rate=None,
max_simultaneous=None,
cacher=None,
cacher_kwargs=None,
combine_method=None,
logger=None,
Expand Down Expand Up @@ -101,9 +102,11 @@ def meteo_france_retrieve(
urlreqs = list(requests_to_urls(requests, regapi.url_patterns))

# Create an object that will handle the caching
with Cacher(
integration_server, logger=logger, tmpdir=tmpdir, **(cacher_kwargs or {})
) as cacher:
if cacher is None:
cacher = CacherS3(
integration_server, logger=logger, tmpdir=tmpdir, **(cacher_kwargs or {})
)
with cacher:
# Create an object that will allow URL downloading in parallel
downloader = Downloader(
getter=getter,
Expand Down

0 comments on commit df8aa76

Please sign in to comment.