|
| 1 | +import logging |
| 2 | +from copy import copy |
| 3 | +from dataclasses import dataclass |
| 4 | +from enum import Enum |
| 5 | +from numbers import Number |
| 6 | +from typing import List, Union |
| 7 | + |
| 8 | +from blinker import signal |
| 9 | +from bw2data import Database, databases |
| 10 | +from bw2data.backends import Exchange |
| 11 | +from bw2data.project import ProjectDataset, projects |
| 12 | + |
| 13 | +from . import allocation_strategies |
| 14 | +from .allocation import property_allocation |
| 15 | + |
| 16 | +DEFAULT_ALLOCATIONS = set(allocation_strategies) |
| 17 | + |
| 18 | + |
| 19 | +class MessageType(Enum): |
| 20 | + NONNUMERIC_PRODUCT_PROPERTY = "Non-numeric product property" |
| 21 | + NONNUMERIC_EDGE_PROPERTY = "Non-numeric edge property" |
| 22 | + NONNUMERIC_PROPERTY = "Non-numeric property" |
| 23 | + MISSING_PRODUCT_PROPERTY = "Missing product property" |
| 24 | + MISSING_EDGE_PROPERTY = "Missing edge property" |
| 25 | + MISSING_PROPERTY = "Missing property" |
| 26 | + ALL_VALID = "All properties found and have correct type" |
| 27 | + |
| 28 | + |
| 29 | +@dataclass |
| 30 | +class PropertyMessage: |
| 31 | + level: int # logging levels WARNING and CRITICAL |
| 32 | + process_id: int # Can get object with bw2data.get_node(id=process_id) |
| 33 | + product_id: int # Can get object with bw2data.get_node(id=product_id) |
| 34 | + message_type: MessageType # Computer-readable error message type |
| 35 | + message: str # Human-readable error message |
| 36 | + |
| 37 | + |
| 38 | +def _get_unified_properties(edge: Exchange): |
| 39 | + try: |
| 40 | + properties = copy(edge.input["properties"]) |
| 41 | + except KeyError: |
| 42 | + properties = {} |
| 43 | + if "properties" in edge: |
| 44 | + properties.update(edge["properties"]) |
| 45 | + return properties |
| 46 | + |
| 47 | + |
| 48 | +def list_available_properties(database_label: str): |
| 49 | + """ |
| 50 | + Get a list of all properties in a database, and check their suitability for use. |
| 51 | +
|
| 52 | + `database_label`: String label of an existing database. |
| 53 | +
|
| 54 | + Returns a list of tuples like `(label: str, message: MessageType)`. Note that |
| 55 | + `NONNUMERIC_PROPERTY` is worse than `MISSING_PROPERTY` as missing properties can be assumed to |
| 56 | + be zero, but non-numeric ones break everything. |
| 57 | + """ |
| 58 | + if database_label not in databases: |
| 59 | + raise ValueError(f"Database `{database_label}` not defined in this project") |
| 60 | + |
| 61 | + results = [] |
| 62 | + all_properties = set() |
| 63 | + |
| 64 | + for ds in filter( |
| 65 | + lambda x: x["type"] == "multifunctional", Database(database_label) |
| 66 | + ): |
| 67 | + for edge in filter(lambda x: x.get("functional"), ds.exchanges()): |
| 68 | + for key in _get_unified_properties(edge): |
| 69 | + all_properties.add(key) |
| 70 | + |
| 71 | + for label in all_properties: |
| 72 | + check_results = check_property_for_allocation(database_label, label) |
| 73 | + if check_results is True: |
| 74 | + results.append((label, MessageType.ALL_VALID)) |
| 75 | + elif any( |
| 76 | + msg.message_type |
| 77 | + in ( |
| 78 | + MessageType.NONNUMERIC_PRODUCT_PROPERTY, |
| 79 | + MessageType.NONNUMERIC_EDGE_PROPERTY, |
| 80 | + ) |
| 81 | + for msg in check_results |
| 82 | + ): |
| 83 | + results.append((label, MessageType.NONNUMERIC_PROPERTY)) |
| 84 | + else: |
| 85 | + results.append((label, MessageType.MISSING_PROPERTY)) |
| 86 | + |
| 87 | + return results |
| 88 | + |
| 89 | + |
| 90 | +def check_property_for_allocation( |
| 91 | + database_label: str, property_label: str |
| 92 | +) -> Union[bool, List[PropertyMessage]]: |
| 93 | + """ |
| 94 | + Check that the given property is present for all functional edges in `multifunctional` |
| 95 | + processes. |
| 96 | +
|
| 97 | + `database_label`: String label of an existing database. |
| 98 | + `property_label`: String label of the property to be used for allocation. |
| 99 | +
|
| 100 | + If all the needed data is present, returns `True`. |
| 101 | +
|
| 102 | + If there is missing data, returns a list of `PropertyMessage` objects. |
| 103 | + """ |
| 104 | + if database_label not in databases: |
| 105 | + raise ValueError(f"Database `{database_label}` not defined in this project") |
| 106 | + |
| 107 | + db = Database(database_label) |
| 108 | + messages = [] |
| 109 | + |
| 110 | + for ds in filter(lambda x: x["type"] == "multifunctional", db): |
| 111 | + for edge in filter(lambda x: x.get("functional"), ds.exchanges()): |
| 112 | + properties = _get_unified_properties(edge) |
| 113 | + if ( |
| 114 | + property_label not in properties |
| 115 | + and edge.input["type"] != "readonly_process" |
| 116 | + ): |
| 117 | + messages.append( |
| 118 | + PropertyMessage( |
| 119 | + level=logging.WARNING, |
| 120 | + process_id=ds.id, |
| 121 | + product_id=edge.input.id, |
| 122 | + message_type=MessageType.MISSING_PRODUCT_PROPERTY, |
| 123 | + message=f"""Product is missing a property value for `{property_label}`. |
| 124 | + Missing values are treated as zeros. |
| 125 | + Please define this property for the product: |
| 126 | + {edge.input} |
| 127 | + Referenced by multifunctional process: |
| 128 | + {ds} |
| 129 | +
|
| 130 | +""", |
| 131 | + ) |
| 132 | + ) |
| 133 | + elif property_label not in properties: |
| 134 | + messages.append( |
| 135 | + PropertyMessage( |
| 136 | + level=logging.WARNING, |
| 137 | + process_id=ds.id, |
| 138 | + product_id=edge.input.id, |
| 139 | + message_type=MessageType.MISSING_EDGE_PROPERTY, |
| 140 | + message=f"""Functional edge is missing a property value for `{property_label}`. |
| 141 | + Missing values are treated as zeros. |
| 142 | + Please define this property for the edge: |
| 143 | + {edge} |
| 144 | + Found in multifunctional process: |
| 145 | + {ds} |
| 146 | +
|
| 147 | +""", |
| 148 | + ) |
| 149 | + ) |
| 150 | + elif ( |
| 151 | + not isinstance(properties[property_label], Number) |
| 152 | + or isinstance(properties[property_label], bool) |
| 153 | + and edge.input["type"] != "readonly_process" |
| 154 | + ): |
| 155 | + messages.append( |
| 156 | + PropertyMessage( |
| 157 | + level=logging.CRITICAL, |
| 158 | + process_id=ds.id, |
| 159 | + product_id=edge.input.id, |
| 160 | + message_type=MessageType.NONNUMERIC_PRODUCT_PROPERTY, |
| 161 | + message=f"""Found non-numeric value `{properties[property_label]}` in property `{property_label}`. |
| 162 | + Please redefine this property for the product: |
| 163 | + {edge.input} |
| 164 | + Referenced by multifunctional process: |
| 165 | + {ds} |
| 166 | +
|
| 167 | +""", |
| 168 | + ) |
| 169 | + ) |
| 170 | + elif not isinstance(properties[property_label], Number) or isinstance( |
| 171 | + properties[property_label], bool |
| 172 | + ): |
| 173 | + messages.append( |
| 174 | + PropertyMessage( |
| 175 | + level=logging.CRITICAL, |
| 176 | + process_id=ds.id, |
| 177 | + product_id=edge.input.id, |
| 178 | + message_type=MessageType.NONNUMERIC_EDGE_PROPERTY, |
| 179 | + message=f"""Found non-numeric value `{properties[property_label]}` in property `{property_label}`. |
| 180 | + Please redefine this property for the edge: |
| 181 | + {edge} |
| 182 | + Found in multifunctional process: |
| 183 | + {ds} |
| 184 | +
|
| 185 | +""", |
| 186 | + ) |
| 187 | + ) |
| 188 | + else: |
| 189 | + print( |
| 190 | + "No problem with:", |
| 191 | + properties, |
| 192 | + property_label, |
| 193 | + isinstance(properties[property_label], Number), |
| 194 | + ) |
| 195 | + return messages or True |
| 196 | + |
| 197 | + |
| 198 | +def add_custom_property_allocation_to_project( |
| 199 | + property_label: str, normalize_by_production_amount: bool = True |
| 200 | +) -> None: |
| 201 | + """ |
| 202 | + Add a new property-based allocation method to `allocation_strategies`, and persist to a project |
| 203 | + dataset. |
| 204 | +
|
| 205 | + Note that this function **does not** mark the created function as the default allocation |
| 206 | + anywhere. |
| 207 | +
|
| 208 | + `property_label` is a string giving a label in the functional products `properties` dictionary. |
| 209 | + You should probably use `check_property_for_allocation` to make sure this will work for the |
| 210 | + given database. |
| 211 | +
|
| 212 | + `normalize_by_production_amount` is a bool flag for converting calculated allocation factors so |
| 213 | + that they sum to one. |
| 214 | + """ |
| 215 | + if property_label in allocation_strategies: |
| 216 | + raise KeyError(f"`{property_label}` already defined in `allocation_strategies`") |
| 217 | + |
| 218 | + allocation_strategies[property_label] = property_allocation( |
| 219 | + property_label=property_label, |
| 220 | + normalize_by_production_amount=normalize_by_production_amount, |
| 221 | + ) |
| 222 | + |
| 223 | + if "multifunctional.custom_allocations" not in projects.dataset.data: |
| 224 | + projects.dataset.data["multifunctional.custom_allocations"] = {} |
| 225 | + |
| 226 | + projects.dataset.data["multifunctional.custom_allocations"][property_label] = { |
| 227 | + "property_label": property_label, |
| 228 | + "normalize_by_production_amount": normalize_by_production_amount, |
| 229 | + } |
| 230 | + projects.dataset.save() |
| 231 | + |
| 232 | + |
| 233 | +def update_allocation_strategies_on_project_change( |
| 234 | + project_dataset: ProjectDataset, |
| 235 | +) -> None: |
| 236 | + """Fix the single dict `allocation_strategies` to reflect custom data for this project.""" |
| 237 | + for obsolete in set(allocation_strategies).difference(DEFAULT_ALLOCATIONS): |
| 238 | + del allocation_strategies[obsolete] |
| 239 | + |
| 240 | + for key, value in project_dataset.data.get( |
| 241 | + "multifunctional.custom_allocations", {} |
| 242 | + ).items(): |
| 243 | + allocation_strategies[key] = property_allocation(**value) |
| 244 | + |
| 245 | + |
| 246 | +signal("bw2data.project_changed").connect( |
| 247 | + update_allocation_strategies_on_project_change |
| 248 | +) |
0 commit comments