Skip to content

Commit

Permalink
Support saving multiple energy results per simulation
Browse files Browse the repository at this point in the history
If there are multiple sets of energies per simulations such as in the case of having multiple signal excitations,
the energies are saved as a list under each layer key in `project_result.json`.

`produce_epr_table` script is refactored to add a `result_id` column and save all results on separate lines in the csv

MER corrections are read from file if they already exist
  • Loading branch information
Tuomas Myllari committed Oct 16, 2024
1 parent 350852a commit 83c8d09
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,7 @@ def get_energy_integrals(path: Path | str) -> dict:
"""
try:
energy_data, energy_layer_data = Path(path) / "energy.dat", Path(path) / "energy.dat.names"
energies = pd.read_csv(energy_data, sep=r"\s+", header=None).values.flatten()
energies = pd.read_csv(energy_data, sep=r"\s+", header=None).transpose().values.squeeze().tolist()

energy_layers = []
with open(energy_layer_data, encoding="utf-8") as fp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
region_corrections = pp_data.get("region_corrections", {})


def _get_ith(d, i):
"""gets the ith element of a list that also works for scalars"""
return d[i] if isinstance(d, (list, tuple)) else d


def get_mer_coefficients(simulation, region):
"""
returns the MER coefficients for certain path and saves the loaded coefficients in a separate json file.
Expand All @@ -62,27 +67,56 @@ def get_mer_coefficients(simulation, region):
if correction_key is None:
return None

corr_key = "_" + correction_key
corr_file = {f for f in correction_files if corr_key in f and simulation.startswith(f[: f.find(corr_key)])}
if len(corr_file) > 1:
corr_file = {f for f in corr_file if simulation == f[: f.find(corr_key)]}
if not corr_file:
print(f"Expected correction file not found with keys {simulation} and {correction_key}.")
return None
if len(corr_file) > 1:
print(f"Multiple matching correction files found with keys {simulation} and {correction_key}.")
return None
correction_file = f"coefficients_{simulation}_{region}.json"
# Load the correction file if it already exists
if os.path.isfile(correction_file):
with open(correction_file, "r", encoding="utf-8") as f:
coefficient = json.load(f)
else:
corr_key = "_" + correction_key
corr_file = {f for f in correction_files if corr_key in f and simulation.startswith(f[: f.find(corr_key)])}
if len(corr_file) > 1:
corr_file = {f for f in corr_file if simulation == f[: f.find(corr_key)]}
if not corr_file:
print(f"Expected correction file not found with keys {simulation} and {correction_key}.")
return None
if len(corr_file) > 1:
print(f"Multiple matching correction files found with keys {simulation} and {correction_key}.")
return None

with open(corr_file.pop(), "r", encoding="utf-8") as f:
res = json.load(f)
mer_keys = [k for k, _ in res.items() if "mer" in k and k.startswith("E_")]

# always use the first signal excitation for corrections
mer_total = sum(_get_ith(res[k], 0) for k in mer_keys)
coefficient = {
group: sum(_get_ith(res[k], 0) for k in mer_keys if group.lower() in k.lower()) / mer_total
for group in groups
}

with open(correction_file, "w", encoding="utf-8") as f:
json.dump(coefficient, f)

with open(corr_file.pop(), "r", encoding="utf-8") as f:
res = json.load(f)
mer_keys = [k for k, v in res.items() if "mer" in k and k.startswith("E_")]
mer_total = sum(res[k] for k in mer_keys)
coefficient = {group: sum(res[k] for k in mer_keys if group.lower() in k.lower()) / mer_total for group in groups}
return coefficient

with open(f"coefficients_{simulation}_{region}.json", "w", encoding="utf-8") as f:
json.dump(coefficient, f)

return coefficient
def get_results_list(results):
"""Transforms a single dictionary with list values to a list of dictionaries with scalar values"""
energy_results = {
k: v for k, v in results.items() if k.startswith("E_") or k.startswith("Exy_") or k.startswith("Ez_")
}

num_results_list = [len(v) if isinstance(v, (list, tuple)) else 1 for _, v in energy_results.items()]
num_results = min(num_results_list)
if any((n != num_results for n in num_results_list)):
print(f"Varying number of energy results in project_results.json. Will only use the first {num_results}")

results_list = []
for result_i in range(num_results):
results_list.append({k: _get_ith(v, result_i) for k, v in energy_results.items()})

return results_list


# Find data files
Expand All @@ -96,102 +130,111 @@ def get_mer_coefficients(simulation, region):
# Find parameters that are swept
definition_files = [f.replace("_project_results.json", ".json") for f in result_files]
parameters, parameter_values = find_varied_parameters(definition_files)
parameters = ["result_index"] + parameters

# Load result data
epr_dict = {}
for key, result_file in zip(parameter_values.keys(), result_files):
for original_key, result_file in zip(list(parameter_values.keys()), result_files):
with open(result_file, "r", encoding="utf-8") as f:
result = json.load(f)

energy = {k[2:]: v for k, v in result.items() if k.startswith("E_")}

def _sum_value(_dict, _key, _addition):
_dict[_key] = _dict.get(_key, 0.0) + _addition

# add sheet energies if 'sheet_approximations' are available
if "sheet_approximations" in pp_data:
xy_energy = {k[4:]: v for k, v in result.items() if k.startswith("Exy_")}
z_energy = {k[3:]: v for k, v in result.items() if k.startswith("Ez_")}

# read layers and material_dict data to determine sheet background materials
with open(f"{key}.json", "r", encoding="utf-8") as f:
sim_data = json.load(f)
sheet_layers = [(k, d) for k, d in sim_data["layers"].items() if k in xy_energy or k in z_energy]
eps_r_dict = {k: d["permittivity"] for k, d in sim_data["material_dict"].items() if "permittivity" in d}
bg_key = {k: d.get("background", "unknown_sheet_background") for k, d in sheet_layers}
bg_eps_r = {k: eps_r_dict.get(d.get("material"), 1.0) for k, d in sheet_layers}

for k, d in pp_data["sheet_approximations"].items():
if "thickness" not in d:
print(f'"thickness" missing from sheet_approximations["{k}"]')
continue
eps_r = d["eps_r"]

for xy_k, xy_v in xy_energy.items():
if k in xy_k:
_sum_value(energy, xy_k, xy_v * d["thickness"] * eps_r)
_sum_value(energy, bg_key[xy_k], -xy_v * d["thickness"] * bg_eps_r[xy_k])

for z_k, z_v in z_energy.items():
if k in z_k:
_sum_value(energy, z_k, z_v * d["thickness"] * (bg_eps_r[z_k] ** 2) / eps_r)
_sum_value(energy, bg_key[z_k], -z_v * d["thickness"] * bg_eps_r[z_k])

elif any(k.startswith("Exy_") or k.startswith("Ez_") for k in result.keys()):
print(
'Results contain boundary energies, but no "sheet_approximation" is defined. ',
"Boundary energies will be ignored in EPR",
)

total_energy = sum(energy.values())
if total_energy == 0.0:
print(f'Total energy 0 for simulation "{key}". No EPRs will be written.')
continue

if not groups:
# calculate EPR corresponding to each energy integral
epr_dict[key] = {f"p_{k}": v / total_energy for k, v in energy.items()}
elif not region_corrections:
# use EPR groups to combine layers
epr_dict[key] = {
f"p_{group}": sum(v for k, v in energy.items() if group in k) / total_energy for group in groups
}
else:
# calculate corrected EPRs and distinguish by partition regions
epr_dict[key] = {}
for reg, corr in region_corrections.items():
reg_energy = {k: v for k, v in energy.items() if reg in k}

coefficients = get_mer_coefficients(key, reg)
if coefficients is None:
epr_dict[key].update(
{
f"p_{group}_{reg}": sum(v for k, v in reg_energy.items() if group in k) / total_energy
for group in groups
}
)
else:
epr_dict[key].update(
{
f"p_{group}_{reg}": coefficients[group] * sum(reg_energy.values()) / total_energy
for group in groups
}
)

# distinguish regions not included in region_corrections with 'default' key
def_energy = {k: v for k, v in energy.items() if all(reg not in k for reg in region_corrections.keys())}
epr_dict[key].update(
{
f"p_{group}_default": sum(v for k, v in def_energy.items() if group in k) / total_energy
for group in groups
result_json = json.load(f)

results_list = get_results_list(result_json)
original_params = parameter_values.pop(original_key)
for result_id, result in enumerate(results_list, 1):
energy = {k[2:]: v for k, v in result.items() if k.startswith("E_")}

# Add result index if we have multiple results
key = original_key + ("_" + str(result_id) if len(results_list) > 1 else "")
# duplicate params for each result in the json and add the result index
parameter_values[key] = [result_id] + original_params

def _sum_value(_dict, _key, _addition):
_dict[_key] = _dict.get(_key, 0.0) + _addition

# add sheet energies if 'sheet_approximations' are available
if "sheet_approximations" in pp_data:
xy_energy = {k[4:]: v for k, v in result.items() if k.startswith("Exy_")}
z_energy = {k[3:]: v for k, v in result.items() if k.startswith("Ez_")}

# read layers and material_dict data to determine sheet background materials
with open(f"{original_key}.json", "r", encoding="utf-8") as f:
sim_data = json.load(f)
sheet_layers = [(k, d) for k, d in sim_data["layers"].items() if k in xy_energy or k in z_energy]
eps_r_dict = {k: d["permittivity"] for k, d in sim_data["material_dict"].items() if "permittivity" in d}
bg_key = {k: d.get("background", "unknown_sheet_background") for k, d in sheet_layers}
bg_eps_r = {k: eps_r_dict.get(d.get("material"), 1.0) for k, d in sheet_layers}

for k, d in pp_data["sheet_approximations"].items():
if "thickness" not in d:
print(f'"thickness" missing from sheet_approximations["{k}"]')
continue
eps_r = d["eps_r"]

for xy_k, xy_v in xy_energy.items():
if k in xy_k:
_sum_value(energy, xy_k, xy_v * d["thickness"] * eps_r)
_sum_value(energy, bg_key[xy_k], -xy_v * d["thickness"] * bg_eps_r[xy_k])

for z_k, z_v in z_energy.items():
if k in z_k:
_sum_value(energy, z_k, z_v * d["thickness"] * (bg_eps_r[z_k] ** 2) / eps_r)
_sum_value(energy, bg_key[z_k], -z_v * d["thickness"] * bg_eps_r[z_k])

elif any(k.startswith("Exy_") or k.startswith("Ez_") for k in result.keys()):
print(
'Results contain boundary energies, but no "sheet_approximation" is defined. ',
"Boundary energies will be ignored in EPR",
)

total_energy = sum(energy.values())
if total_energy == 0.0:
print(f'Total energy 0 for simulation "{key}". No EPRs will be written.')
continue

if not groups:
# calculate EPR corresponding to each energy integral
epr_dict[key] = {f"p_{k}": v / total_energy for k, v in energy.items()}
elif not region_corrections:
# use EPR groups to combine layers
epr_dict[key] = {
f"p_{group}": sum(v for k, v in energy.items() if group in k) / total_energy for group in groups
}
)

# total EPR by groups
epr_dict[key].update(
{f"p_{group}": sum(v for k, v in epr_dict[key].items() if group in k) for group in groups}
)

epr_dict[key]["E_total"] = total_energy
else:
# calculate corrected EPRs and distinguish by partition regions
epr_dict[key] = {}
for reg, corr in region_corrections.items():
reg_energy = {k: v for k, v in energy.items() if reg in k}

coefficients = get_mer_coefficients(original_key, reg)
if coefficients is None:
epr_dict[key].update(
{
f"p_{group}_{reg}": sum(v for k, v in reg_energy.items() if group in k) / total_energy
for group in groups
}
)
else:
epr_dict[key].update(
{
f"p_{group}_{reg}": coefficients[group] * sum(reg_energy.values()) / total_energy
for group in groups
}
)

# distinguish regions not included in region_corrections with 'default' key
def_energy = {k: v for k, v in energy.items() if all(reg not in k for reg in region_corrections.keys())}
epr_dict[key].update(
{
f"p_{group}_default": sum(v for k, v in def_energy.items() if group in k) / total_energy
for group in groups
}
)

# total EPR by groups
epr_dict[key].update(
{f"p_{group}": sum(v for k, v in epr_dict[key].items() if group in k) for group in groups}
)

epr_dict[key]["E_total"] = total_energy

tabulate_into_csv(f"{os.path.basename(os.path.abspath(path))}_epr.csv", epr_dict, parameters, parameter_values)

0 comments on commit 83c8d09

Please sign in to comment.