Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run climate aggregation #3

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 77 additions & 50 deletions src/rra_climate_aggregates/aggregate/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
import tqdm
from rra_tools import jobmon
import itertools

from rra_climate_aggregates import cli_options as clio
from rra_climate_aggregates import constants as cac
Expand All @@ -20,6 +21,7 @@ def aggregate_main(
scenario: str,
measure: str,
draw: str,
hierarchy: str,
population_model_root: str,
climate_data_root: str,
output_dir: str,
Expand All @@ -34,51 +36,65 @@ def aggregate_main(
print("Loading climate data")
ds = cd_data.load_annual_results(scenario, measure, draw)

for hierarchy in cac.SHAPE_HIERARCHIES:
print("Processing", hierarchy)
print("Building location masks")
bounds_map, mask = utils.build_location_masks(hierarchy, pm_data)
results = []
print(f"Aggregating data with {len(bounds_map)} locations")
for year in tqdm.trange(1950, 2101, disable=not progress_bar):
pop_raster = pm_data.load_real_results(f"{year}q1")
pop_arr = pop_raster._ndarray # noqa: SLF001
clim_var = ds.sel(year=year)["value"]
clim_raster = (
to_raster(clim_var)
.resample_to(pop_raster, "nearest")
.astype(np.float32)
)
pop_weighted = pop_raster * clim_raster
pop_weighted_arr = pop_weighted._ndarray # noqa: SLF001
subset_hierarchies = cac.HIERARCHY_MAP[hierarchy]

print("Processing", hierarchy)
print("Building location masks")
bounds_map, mask = utils.build_location_masks(hierarchy, pm_data)
results = []
print(f"Aggregating data with {len(bounds_map)} locations")
for year in tqdm.trange(1950, 2101, disable=not progress_bar):
pop_raster = pm_data.load_results(f"{year}q1")
pop_arr = pop_raster._ndarray # noqa: SLF001
clim_var = ds.sel(year=year)["value"]
clim_raster = (
to_raster(clim_var)
.resample_to(pop_raster, "nearest")
.astype(np.float32)
)
pop_weighted = pop_raster * clim_raster
pop_weighted_arr = pop_weighted._ndarray # noqa: SLF001

for location_id, (rows, cols) in tqdm.tqdm(
list(bounds_map.items()), disable=not progress_bar
):
loc_mask = mask[rows, cols] == location_id
loc_clim = pop_weighted_arr[rows, cols]
loc_pop = pop_arr[rows, cols]
wc = np.nansum(loc_clim[loc_mask])
pop = np.nansum(loc_pop[loc_mask])
r = wc / pop if pop else np.nan
results.append((location_id, year, scenario, draw, wc, pop, r))
for location_id, (rows, cols) in tqdm.tqdm(
list(bounds_map.items()), disable=not progress_bar
):
loc_mask = mask[rows, cols] == location_id
loc_clim = pop_weighted_arr[rows, cols]
loc_pop = pop_arr[rows, cols]
wc = np.nansum(loc_clim[loc_mask])
pop = np.nansum(loc_pop[loc_mask])
r = wc / pop if pop else np.nan
results.append((location_id, year, scenario, wc, pop, r))

h_results = pd.DataFrame(
results,
columns=[
"location_id",
"year",
"scenario",
"draw",
"weighted_climate",
"population",
"value",
],
)
h_results = pd.DataFrame(
results,
columns=[
"location_id",
"year",
"scenario",
"weighted_climate",
"population",
"value",
],
).sort_values(by=["location_id", "year"])

agg_h = pm_data.load_hierarchy(hierarchy)
if scenario == "ssp245" and measure == "mean_temperature" and draw == "000":
pop = utils.aggregate_pop_to_hierarchy(h_results, agg_h)
for subset_hierarchy in subset_hierarchies:
subset_h = pm_data.load_hierarchy(subset_hierarchy)
subset_pop = pop[pop.location_id.isin(subset_h.location_id)]
ca_data.save_population(subset_pop, version, subset_hierarchy)

climate = utils.aggregate_climate_to_hierarchy(h_results, agg_h)
for subset_hierarchy in subset_hierarchies:
subset_h = pm_data.load_hierarchy(subset_hierarchy)
subset_climate = climate[climate.location_id.isin(subset_h.location_id)]
ca_data.save_raw_results(
h_results,
subset_climate,
version,
hierarchy,
subset_hierarchy,
scenario,
measure,
draw,
)
Expand All @@ -89,6 +105,7 @@ def aggregate_main(
@clio.with_scenario()
@clio.with_measure()
@clio.with_draw()
@clio.with_hierarchy()
@clio.with_input_directory("population-model", cac.POPULATION_MODEL_ROOT)
@clio.with_input_directory("climate-data", cac.CLIMATE_DATA_ROOT)
@clio.with_output_directory(cac.MODEL_ROOT)
Expand All @@ -98,6 +115,7 @@ def aggregate_task(
scenario: str,
measure: str,
draw: str,
hierarchy: str,
population_model_dir: str,
climate_data_dir: str,
output_dir: str,
Expand All @@ -109,6 +127,7 @@ def aggregate_task(
scenario,
measure,
draw,
hierarchy,
population_model_dir,
climate_data_dir,
output_dir,
Expand All @@ -121,6 +140,7 @@ def aggregate_task(
@clio.with_scenario(allow_all=True)
@clio.with_measure(allow_all=True)
@clio.with_draw(allow_all=True)
@clio.with_hierarchy(allow_all=True)
@clio.with_input_directory("population-model", cac.POPULATION_MODEL_ROOT)
@clio.with_input_directory("climate-data", cac.CLIMATE_DATA_ROOT)
@clio.with_output_directory(cac.MODEL_ROOT)
Expand All @@ -129,22 +149,30 @@ def aggregate(
version: str,
scenario: list[str],
measure: list[str],
draw: list[int],
draw: list[str],
hierarchy: list[str],
population_model_dir: str,
climate_data_dir: str,
output_dir: str,
queue: str,
) -> None:
ca_data = ClimateAggregateData(output_dir)

jobs = []
for s, m, j, h in itertools.product(scenario, measure, draw, hierarchy):
if not ca_data.raw_results_path(version, h, s, m, j).exists():
jobs.append((s, m, j, h))
jobs = list(set(jobs))

print(f"Running {len(jobs)} jobs")

jobmon.run_parallel(
runner="catask",
task_name="aggregate",
node_args={
"scenario": scenario,
"measure": measure,
"draw": draw,
},
flat_node_args=(
("scenario", "measure", "draw", "hierarchy"),
jobs,
),
task_args={
"version": version,
"population-model-dir": population_model_dir,
Expand All @@ -155,9 +183,8 @@ def aggregate(
"queue": queue,
"cores": 1,
"memory": "30G",
"runtime": "200m",
"project": "proj_rapidresponse",
"constraints": "archive",
"runtime": "400m",
"project": "proj_rapidresponse",
},
log_root=ca_data.log_dir("aggregate"),
max_attempts=3,
Expand Down
56 changes: 55 additions & 1 deletion src/rra_climate_aggregates/aggregate/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
import numpy.typing as npt
import pandas as pd
from rasterio.features import MergeAlg, rasterize

from rra_climate_aggregates.data import (
Expand All @@ -11,7 +12,7 @@ def build_location_masks(
hierarchy: str,
pm_data: PopulationModelData,
) -> tuple[dict[int, tuple[slice, slice]], npt.NDArray[np.uint32]]:
template = pm_data.load_real_results("2020q1")
template = pm_data.load_results("2020q1")

raking_shapes = pm_data.load_raking_shapes(hierarchy)

Expand Down Expand Up @@ -42,3 +43,56 @@ def build_location_masks(
merge_alg=MergeAlg.replace,
)
return bounds_map, location_mask


def aggregate_pop_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame:
results = (
data.drop(columns=["weighted_climate", "value"])
.rename(columns={"population": "value"})
.set_index("location_id")
.copy()
)
for level in reversed(list(range(1, hierarchy.level.max() + 1))):
level_mask = hierarchy.level == level
parent_map = hierarchy.loc[level_mask].set_index("location_id").parent_id
subset = results.loc[parent_map.index]
subset["parent_id"] = parent_map

parent_values = (
subset.groupby(["year", "scenario", "parent_id"])[["value"]]
.sum()
.reset_index()
.rename(columns={"parent_id": "location_id"})
.set_index("location_id")
)
results = pd.concat([results, parent_values])

results = results.reset_index().sort_values(["location_id", "year"]).reset_index(drop=True)
return results

def aggregate_climate_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame:
results = data.set_index("location_id").copy()

for level in reversed(list(range(1, hierarchy.level.max() + 1))):
level_mask = hierarchy.level == level
parent_map = hierarchy.loc[level_mask].set_index("location_id").parent_id

subset = results.loc[parent_map.index]
subset["parent_id"] = parent_map

parent_values = (
subset.groupby(["year", "scenario", "parent_id"])[["weighted_climate", "population"]]
.sum()
.reset_index()
.rename(columns={"parent_id": "location_id"})
.set_index("location_id")
)
parent_values["value"] = parent_values.weighted_climate / parent_values.population
results = pd.concat([results, parent_values])
results = (
results.drop(columns=["weighted_climate", "population"])
.reset_index()
.sort_values(["location_id", "year"])
.reset_index(drop=True)
)
return results
13 changes: 13 additions & 0 deletions src/rra_climate_aggregates/cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ def with_draw[**P, T](
)


def with_hierarchy[**P, T](
*,
allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
return with_choice(
"hierarchy",
allow_all=allow_all,
choices=cac.HIERARCHY_MAP,
help="Hierarchy to process.",
convert=allow_all,
)


__all__ = [
"RUN_ALL",
"convert_choice",
Expand Down
21 changes: 11 additions & 10 deletions src/rra_climate_aggregates/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@

DRAWS = [f"{d:>03}" for d in range(100)]

SHAPE_HIERARCHIES = [
# "gbd_2021",
"lsae_a2",
]

AGG_HIERARCHIES = [
"gbd_2021",
"fhs_2021",
"lsae",
]
# Mapping between pixel aggregation hierarchies to location aggregation hierarchies.
# The pixel aggregation hierarchies are the most detailed shapes used to
# aggregate the pixel data to the location level.
# The location aggregation hierarchies correspond to particular modeling datasets.
# Their most-detailed shapes may be a subset of the pixel aggregation hierarchy they
# map to. They are used to produce results from the most detailed level up to the global
# level.
HIERARCHY_MAP = {
"gbd_2021": ["gbd_2021", "fhs_2021"],
"lsae_1209": ["lsae_1209"],
}
Loading
Loading