Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidNew-NOAA committed Oct 4, 2024
1 parent 4f0446a commit 7c30e48
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 274 deletions.
3 changes: 1 addition & 2 deletions scripts/exglobal_atm_analysis_fv3_increment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
AtmAnl = AtmAnalysis(config, 'atmanlfv3inc')

# Initialize and execute FV3 increment converter
AtmAnl.initialize_jedi()
AtmAnl.execute(config.APRUN_ATMANLFV3INC)
AtmAnl.jedi.execute(config.APRUN_ATMANLFV3INC)
4 changes: 2 additions & 2 deletions scripts/exglobal_atm_analysis_initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
AtmAnl = AtmAnalysis(config, 'atmanlvar')

# Initialize JEDI variational analysis
AtmAnl.initialize_jedi()
AtmAnl.initialize_analysis()
AtmAnl.jedi.initialize(AtmAnl.task_config)
AtmAnl.initialize()
2 changes: 1 addition & 1 deletion scripts/exglobal_atm_analysis_variational.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
AtmAnl = AtmAnalysis(config, 'atmanlvar')

# Execute JEDI variational analysis
AtmAnl.execute(config.APRUN_ATMANLVAR, ['fv3jedi', 'variational'])
AtmAnl.jedi.execute(config.APRUN_ATMANLVAR, ['fv3jedi', 'variational'])
4 changes: 2 additions & 2 deletions scripts/exglobal_atmens_analysis_initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@
AtmEnsAnl = AtmEnsAnalysis(config, 'atmensanlobs')

# Initialize JEDI ensemble DA analysis
AtmEnsAnl.initialize_jedi()
AtmEnsAnl.initialize_analysis()
AtmEnsAnl.jedi.initialize(AtmEnsAnl.task_config)
AtmEnsAnl.initialize()
2 changes: 1 addition & 1 deletion scripts/exglobal_atmens_analysis_obs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
AtmEnsAnl = AtmEnsAnalysis(config, 'atmensanlobs')

# Initialize and execute JEDI ensembler DA analysis in observer mode
AtmEnsAnl.execute(config.APRUN_ATMENSANLOBS, ['fv3jedi', 'localensembleda'])
AtmEnsAnl.jedi.execute(config.APRUN_ATMENSANLOBS, ['fv3jedi', 'localensembleda'])
2 changes: 1 addition & 1 deletion scripts/exglobal_atmens_analysis_sol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
AtmEnsAnl = AtmEnsAnalysis(config, 'atmensanlsol')

# Initialize and execute JEDI ensemble DA analysis in solver mode
AtmEnsAnl.initialize_jedi()
AtmEnsAnl.jedi.initialize(AtmEnsAnl.task_config)
AtmEnsAnl.execute(config.APRUN_ATMENSANLSOL, ['fv3jedi', 'localensembleda'])
203 changes: 81 additions & 122 deletions ush/python/pygfs/jedi/jedi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
import tarfile
from logging import getLogger
from typing import List, Dict, Any, Optional
from pprint import pformat
from jcb import render
from wxflow import (AttrDict,
FileHandler,
from wxflow import (AttrDict, FileHandler, Task, Executable,
chdir, rm_p,
parse_j2yaml,
parse_j2yaml, save_as_yaml,
logit,
Task,
Executable,
WorkflowException)

logger = getLogger(__name__.split('.')[-1])
Expand All @@ -22,7 +20,7 @@ class Jedi:
Class for initializing and executing JEDI applications
"""
@logit(logger, name="Jedi")
def __init__(self, task_config: AttrDict, yaml_name: Optional[str] = None) -> None:
def __init__(self, DATA: str, JEDIEXE: str, yaml_name: Optional[str]) -> None:
"""Constructor for JEDI objects
This method will construct a Jedi object.
Expand All @@ -44,69 +42,51 @@ def __init__(self, task_config: AttrDict, yaml_name: Optional[str] = None) -> No
None
"""

# For provenance, save incoming task_config as a private attribute of JEDI object
self._task_config = task_config
_exe_name = os.path.basename(JEDIEXE)

_exe_name = os.path.basename(task_config.JEDIEXE)

self.exe = os.path.join(task_config.DATA, _exe_name)
self.exe_src = JEDIEXE
self.rundir = DATA
self.exe = os.path.join(DATA, _exe_name)
if yaml_name:
self.yaml = os.path.join(task_config.DATA, yaml_name + '.yaml')
self.yaml = os.path.join(DATA, yaml_name + '.yaml')
else:
self.yaml = os.path.join(task_config.DATA, os.path.splitext(_exe_name)[0] + '.yaml')
self.yaml = os.path.join(DATA, os.path.splitext(_exe_name)[0] + '.yaml')

# Initialize empty JEDI input config attribute-dictionary
self.config = AttrDict()
self.j2tmpl_dir = os.path.join(task_config.PARMgfs, 'gdas')

# self.j2tmpl_dir = os.path.join(task_config.PARMgfs, 'gdas')

@logit(logger)
def set_config(self, task_config: AttrDict, algorithm: Optional[str] = None) -> AttrDict:
"""Compile a JEDI configuration dictionary from a template file and save to a YAML file
def initialize(self, task_config: AttrDict) -> None:
"""Initialize JEDI application
Parameters
----------
task_config : AttrDict
Dictionary of all configuration variables associated with a GDAS task.
algorithm (optional) : str
Name of the algorithm used to generate the JEDI configuration dictionary.
It will override the algorithm set in the task_config.JCB_<>_YAML file.
Returns
----------
None
This method will initialize a JEDI application.
This includes:
- generating JEDI YAML config
- saving JEDI YAML config to run directory
- linking the JEDI executable to run directory
"""

if 'JCB_BASE_YAML' in task_config.keys():
# Step 1: Fill templates of the JCB base YAML file
jcb_config = parse_j2yaml(task_config.JCB_BASE_YAML, task_config)

# Step 2: If algorithm is present then override the algorithm in the JEDI
# config. Otherwise, if the algorithm J2-YAML is present, fill
# its templates and merge.
if algorithm:
jcb_config['algorithm'] = algorithm
elif 'JCB_ALGO' in task_config.keys():
jcb_config['algorithm'] = task_config.JCB_ALGO
elif 'JCB_ALGO_YAML' in task_config.keys():
jcb_algo_config = parse_j2yaml(task_config.JCB_ALGO_YAML, task_config)
jcb_config.update(jcb_algo_config)

# Step 3: Generate the JEDI YAML using JCB
self.config = render(jcb_config)
elif 'JEDIYAML' in task_config.keys():
# Generate JEDI YAML without using JCB
self.config = parse_j2yaml(task_config.JEDIYAML, task_config,
searchpath=self.j2tmpl_dir)
else:
logger.exception(f"FATAL ERROR: Unable to compile JEDI configuration dictionary, ABORT!")
raise KeyError(f"FATAL ERROR: Task config must contain JCB_BASE_YAML or JEDIYAML")
# Render JEDI config dictionary
logger.info(f"Generating JEDI YAML config: {self.yaml}")
self.config = self.get_config(task_config)
logger.debug(f"JEDI config:\n{pformat(self.config)}")

# Save JEDI config dictionary to YAML in run directory
logger.debug(f"Writing JEDI YAML config to: {self.yaml}")
save_as_yaml(self.config, self.yaml)

# Link JEDI executable to run directory
logger.info(f"Linking JEDI executable {self.exe_src} to {self.exe}")
self.link_exe()

@logit(logger)
def execute(self, task_config: AttrDict, aprun_cmd: str, jedi_args: Optional[List] = None) -> None:
def execute(self, aprun_cmd: str, jedi_args: Optional[List] = None) -> None:
"""Execute JEDI application
Parameters
----------
task_config: AttrDict
Attribute-dictionary of all configuration variables associated with a GDAS task.
aprun_cmd: str
String comprising the run command for the JEDI executable.
jedi_args (optional): List
Expand All @@ -118,7 +98,7 @@ def execute(self, task_config: AttrDict, aprun_cmd: str, jedi_args: Optional[Lis
Attribute-dictionary of JEDI configuration rendered from a template.
"""

chdir(task_config.DATA)
chdir(self.rundir)

exec_cmd = Executable(aprun_cmd)
exec_cmd.add_default_arg(self.exe)
Expand All @@ -127,110 +107,90 @@ def execute(self, task_config: AttrDict, aprun_cmd: str, jedi_args: Optional[Lis
exec_cmd.add_default_arg(arg)
exec_cmd.add_default_arg(self.yaml)

logger.info(f"Executing {exec_cmd}")
try:
exec_cmd()
except OSError:
raise OSError(f"FATAL ERROR: Failed to execute {exec_cmd}")
except Exception:
raise WorkflowException(f"FATAL ERROR: An error occurred during execution of {exec_cmd}")

@staticmethod
@logit(logger)
def link_exe(task_config: AttrDict) -> None:
"""Link JEDI executable to run directory
def get_config(self, task_config: AttrDict, algorithm: Optional[str] = None) -> AttrDict:
"""Compile a JEDI configuration dictionary from a template file and save to a YAML file
Parameters
----------
task_config: AttrDict
Attribute-dictionary of all configuration variables associated with a GDAS task.
task_config : AttrDict
Dictionary of all configuration variables associated with a GDAS task.
algorithm (optional) : str
Name of the algorithm used to generate the JEDI configuration dictionary.
It will override the algorithm set in the task_config.JCB_ALGO_YAML file.
Returns
----------
None
"""

# TODO: linking is not permitted per EE2.
# Needs work in JEDI to be able to copy the exec. [NOAA-EMC/GDASApp#1254]
logger.warn("Linking is not permitted per EE2.")
exe_dest = os.path.join(task_config.DATA, os.path.basename(task_config.JEDIEXE))
if os.path.exists(exe_dest):
rm_p(exe_dest)
os.symlink(task_config.JEDIEXE, exe_dest)
# Fill JCB base YAML template and build JCB config dictionary
jcb_config = parse_j2yaml(task_config.JCB_BASE_YAML, task_config)

# Add JCB algorithm YAML, if it exists, to JCB config dictionary
if 'JCB_ALGO_YAML' in task_config.keys():
jcb_config.update(parse_j2yaml(task_config.JCB_ALGO_YAML, task_config))

@logit(logger)
def get_obs_dict(self, task_config: AttrDict) -> Dict[str, Any]:
"""Compile a dictionary of observation files to copy
# Set algorithm in JCB config dictionary or override the one set by JCB_ALGO_YAML
if algorithm:
jcb_config['algorithm'] = algorithm

This method extracts 'observers' from the JEDI yaml and from that list, extracts a list of
observation files that are to be copied to the run directory
from the observation input directory
# Generate JEDI YAML config by rendering JCB config dictionary
jedi_config = render(jcb_config)

return jedi_config

@logit(logger)
def link_exe(self) -> None:
"""Link JEDI executable to run directory
Parameters
----------
task_config: AttrDict
Attribute-dictionary of all configuration variables associated with a GDAS task.
None
Returns
----------
obs_dict: Dict
a dictionary containing the list of observation files to copy for FileHandler
None
"""

observations = find_value_in_nested_dict(self.config, 'observations')

copylist = []
for ob in observations['observers']:
obfile = ob['obs space']['obsdatain']['engine']['obsfile']
basename = os.path.basename(obfile)
copylist.append([os.path.join(task_config.COM_OBS, basename), obfile])
obs_dict = {
'mkdir': [os.path.join(task_config.DATA, 'obs')],
'copy': copylist
}
return obs_dict
# TODO: linking is not permitted per EE2.
# Needs work in JEDI to be able to copy the exec. [NOAA-EMC/GDASApp#1254]
logger.warn("Linking is not permitted per EE2.")
if os.path.exists(self.exe):
rm_p(self.exe)
os.symlink(self.exe_src, self.exe)

@staticmethod
@logit(logger)
def get_bias_dict(self, task_config: AttrDict, bias_file) -> Dict[str, Any]:
"""Compile a dictionary of observation files to copy
This method extracts 'observers' from the JEDI yaml and determines from that list
if bias correction tar files are to be copied to the run directory
from the component directory.
def remove_redundant(input_list: List) -> List:
"""Remove reduncancies from list with possible redundant, non-mutable elements
Parameters
----------
task_config: AttrDict
Attribute-dictionary of all configuration variables associated with a GDAS task.
bias_file
name of bias correction tar file
input_list : List
List with possible redundant, non-mutable elements
Returns
----------
bias_dict: Dict
a dictionary containing the list of observation bias files to copy for FileHandler
output_list : List
Input list but with redundancies removed
"""

observations = find_value_in_nested_dict(self.config, 'observations')

copylist = []
for ob in observations['observers']:
if 'obs bias' in ob.keys():
obfile = ob['obs bias']['input file']
obdir = os.path.dirname(obfile)
basename = os.path.basename(obfile)
prefix = '.'.join(basename.split('.')[:-3])
bfile = f"{prefix}.{bias_file}"
tar_file = os.path.join(obdir, bfile)
copylist.append([os.path.join(task_config.VarBcDir, bfile), tar_file])
break

bias_dict = {
'mkdir': [os.path.join(task_config.DATA, 'bc')],
'copy': copylist
}

return bias_dict
output_list = []
for item in input_list:
if item not in output_list:
output_list.append(item);

return output_list

@staticmethod
@logit(logger)
def extract_tar(tar_file: str) -> None:
Expand Down Expand Up @@ -264,7 +224,6 @@ def extract_tar(tar_file: str) -> None:
logger.exception(f"FATAL ERROR: unable to extract from {tar_file}")
raise tarfile.ExtractError("FATAL ERROR: unable to extract from {tar_file}")


@logit(logger)
def find_value_in_nested_dict(nested_dict: Dict, target_key: str) -> Any:
"""
Expand Down
Loading

0 comments on commit 7c30e48

Please sign in to comment.