1
1
import itertools
2
+ import math
3
+ from numbers import Number
4
+ from typing import List
2
5
3
- from .errors import Mismatch , Uncombinable
6
+ import bw2data as bd
7
+ from loguru import logger
8
+
9
+ from .errors import Mismatch , MissingDatabase , Uncombinable
4
10
5
11
6
12
def split_by_semicolon (row : dict , version : str ) -> list [dict ]:
7
13
"""Possible split a data row into"""
14
+ if isinstance (row [f"Activity Name - { version } " ], Number ) and math .isnan (
15
+ row [f"Activity Name - { version } " ]
16
+ ):
17
+ return []
18
+
8
19
len_product = len (row [f"Reference Product - { version } " ].split (";\n " ))
9
20
len_unit = len (row [f"Reference Product Unit - { version } " ].split (";\n " ))
10
21
if len_product != len_unit :
@@ -113,9 +124,7 @@ def source_target_pair_as_bw_dict(
113
124
```
114
125
115
126
"""
116
- versions = [
117
- x .split (" - " )[- 1 ].strip () for x in row if x .startswith ("Activity Name" )
118
- ]
127
+ versions = [x .split (" - " )[- 1 ].strip () for x in row if x .startswith ("Activity Name" )]
119
128
if f"Activity Name - { source_version } " not in row :
120
129
raise ValueError (
121
130
f"""Can't find source version { source_version } in data row.
@@ -128,6 +137,10 @@ def source_target_pair_as_bw_dict(
128
137
)
129
138
130
139
sources = split_by_semicolon (row , source_version )
140
+ if not sources :
141
+ # New unit process dataset, no source objects
142
+ return []
143
+
131
144
targets = split_by_semicolon (row , target_version )
132
145
if len (sources ) > 1 and len (targets ) > 1 and len (sources ) != len (targets ):
133
146
raise Uncombinable (
@@ -140,4 +153,94 @@ def source_target_pair_as_bw_dict(
140
153
elif len (targets ) == 1 :
141
154
targets = itertools .repeat (targets [0 ])
142
155
143
- return [{"source" : s , "target" : t } for s , t in zip (sources , targets )]
156
+ return [
157
+ {"source" : s , "target" : t }
158
+ for s , t in zip (sources , targets )
159
+ if all (v .lower () != "nan" for v in itertools .chain (s .values (), t .values ()))
160
+ ]
161
+
162
+
163
+ def resolve_glo_row_rer_roe (
164
+ data : List [dict ], source_version : str , target_version : str , system_model : str
165
+ ) -> List [dict ]:
166
+ """Iterate through `data`, and change `location` attribute to `RoW` or `RoE` when needed.
167
+
168
+ Looks in actual database to get correct `location` attributes."""
169
+ source_db_name = f"ecoinvent-{ source_version } -{ system_model } "
170
+ target_db_name = f"ecoinvent-{ target_version } -{ system_model } "
171
+ if source_db_name not in bd .databases :
172
+ raise MissingDatabase (f"Missing source database: { source_db_name } " )
173
+ if target_db_name not in bd .databases :
174
+ raise MissingDatabase (f"Missing target database: { target_db_name } " )
175
+
176
+ logger .info ("Loading source database {db} to cache data attributes" , db = source_db_name )
177
+ source_lookup = {
178
+ tuple ([o [attr ] for attr in ("name" , "location" , "reference product" )])
179
+ for o in bd .Database (source_db_name )
180
+ }
181
+ logger .info ("Loading target database {db} to cache data attributes" , db = target_db_name )
182
+ target_lookup = {
183
+ tuple ([o [attr ] for attr in ("name" , "location" , "reference product" )])
184
+ for o in bd .Database (target_db_name )
185
+ }
186
+
187
+ for obj in data :
188
+ source_missing = None
189
+ for kind , lookup , db_name in [
190
+ ("source" , source_lookup , source_db_name ),
191
+ ("target" , target_lookup , target_db_name ),
192
+ ]:
193
+ key = tuple ([obj [kind ][attr ] for attr in ("name" , "location" , "reference product" )])
194
+ if key in lookup :
195
+ continue
196
+ elif (
197
+ key not in lookup
198
+ and obj [kind ]["location" ] == "GLO"
199
+ and (key [0 ], "RoW" , key [2 ]) in lookup
200
+ ):
201
+ obj [kind ]["location" ] = "RoW"
202
+ logger .debug (
203
+ "{kind} process {name} location corrected to 'RoW'" ,
204
+ kind = kind ,
205
+ name = obj [kind ]['name' ],
206
+ )
207
+ elif (
208
+ key not in lookup
209
+ and obj [kind ]["location" ] == "RER"
210
+ and (key [0 ], "RoE" , key [2 ]) in lookup
211
+ ):
212
+ obj [kind ]["location" ] = "RoE"
213
+ logger .debug (
214
+ "{kind} process {name} location corrected to 'RoE'" ,
215
+ kind = kind ,
216
+ name = obj [kind ]['name' ],
217
+ )
218
+ else :
219
+ if kind == 'target' and source_missing :
220
+ # Missing in both source and target for this system model
221
+ source_missing = None
222
+ continue
223
+ elif kind == "source" :
224
+ source_missing = obj [kind ]
225
+ else :
226
+ # Only missing in target database - but this is a big problem, we don't have a
227
+ # suitable target for existing edges to relink to.
228
+ logger .warning (
229
+ "{kind.title()} process given in change report but missing in {db_name} lookup: {ds}" ,
230
+ kind = kind ,
231
+ db_name = db_name ,
232
+ ds = obj [kind ],
233
+ )
234
+ # raise KeyError(
235
+ # f"""Can't find {kind} object in database {db_name}: {obj[kind]}"""
236
+ # )
237
+ if source_missing :
238
+ # Only a debug message because this won't break anything - there is no process in the
239
+ # source database to miss a link from.
240
+ logger .debug (
241
+ "Source process given in change report but missing in {db_name} lookup: {ds}" ,
242
+ db_name = source_db_name ,
243
+ ds = source_missing ,
244
+ )
245
+
246
+ return data
0 commit comments