From 350f7b6d4bb5fa99170a6e31d89a297d08ea6f2d Mon Sep 17 00:00:00 2001 From: Evan Morris Date: Thu, 25 Jul 2024 14:50:44 -0400 Subject: [PATCH] reverting to sequential node norm calls --- Common/normalization.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/Common/normalization.py b/Common/normalization.py index 2b932052..fdc2c5d3 100644 --- a/Common/normalization.py +++ b/Common/normalization.py @@ -165,18 +165,27 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: else: break + # we should be able to do the following, but it's causing RemoteDisconnected errors with node norm + # # hit the node norm api with the chunks of curies in parallel # we could try to optimize the number of max_workers for ThreadPoolExecutor more specifically, # by default python attempts to find a reasonable # based on os.cpu_count() + # with ThreadPoolExecutor() as executor: + # executor_results = executor.map(self.hit_node_norm_service, chunks_of_ids) + # + # normalization_results = list(executor_results) + # for normalization_json, ids in zip(normalization_results, chunks_of_ids): + # if not normalization_json: + # raise NormalizationFailedError(f'!!! Normalization json results missing for ids: {ids}') + # else: + # merge the normalization results into one dictionary + # node_normalization_results.update(**normalization_json) + + # until we can get threading working, hit node norm sequentially node_normalization_results: dict = {} - with ThreadPoolExecutor() as executor: - normalization_results = list(executor.map(self.hit_node_norm_service, chunks_of_ids)) - for normalization_json, ids in zip(normalization_results, chunks_of_ids): - if not normalization_json: - self.logger.error(f'!!! Normalization json results missing for ids: {ids}') - else: - # merge the normalization results into one dictionary - node_normalization_results.update(**normalization_json) + for chunk in chunks_of_ids: + results = self.hit_node_norm_service(chunk) + node_normalization_results.update(**results) # reset the node index node_idx = 0