diff --git a/parsers/PHAROS/src/loadPHAROS.py b/parsers/PHAROS/src/loadPHAROS.py index 8e327184..9f8dcf31 100644 --- a/parsers/PHAROS/src/loadPHAROS.py +++ b/parsers/PHAROS/src/loadPHAROS.py @@ -1,13 +1,12 @@ import os -import hashlib import argparse import pandas as pd import re -import random import requests from Common.loader_interface import SourceDataLoader, SourceDataBrokenError, SourceDataFailedError from Common.kgxmodel import kgxnode, kgxedge +from Common.node_types import GENE, DISEASE_OR_PHENOTYPIC_FEATURE, PUBLICATIONS from Common.utils import GetData, snakify from Common.db_connectors import MySQLConnector from Common.predicates import DGIDB_PREDICATE_MAPPING @@ -43,7 +42,7 @@ class PHAROSLoader(SourceDataLoader): source_id = 'PHAROS' provenance_id = 'infores:pharos' - parsing_version: str = '1.1' + parsing_version: str = '1.2' def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ @@ -56,6 +55,8 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): self.data_url = 'http://juniper.health.unm.edu/tcrd/download/' self.source_db = 'Target Central Resource Database' self.pharos_db = None + self.genetic_association_predicate = 'WIKIDATA_PROPERTY:P2293' + def get_latest_source_version(self) -> str: """ @@ -90,41 +91,27 @@ def parse_data(self) -> dict: "Manually stand up PHAROS DB and configure environment variables before trying again." raise SourceDataFailedError(error_message=error_message) - # storage for the node list - node_list: list = [] - final_record_count: int = 0 final_skipped_count: int = 0 # get the nodes and edges for each dataset self.logger.info('Querying for gene to disease..') - node_list, records, skipped = self.parse_gene_to_disease(node_list) + records, skipped = self.parse_gene_to_disease() final_record_count += records final_skipped_count += skipped + self.logger.info(f'Found {records} gene to disease records..') self.logger.info('Querying for gene to drug activity..') - node_list, records, skipped = self.parse_gene_to_drug_activity(node_list) + records, skipped = self.parse_gene_to_drug_activity() final_record_count += records final_skipped_count += skipped + self.logger.info(f'Found {records} gene to drug records..') self.logger.info('Querying for gene to compound activity..') - node_list, records, skipped = self.parse_gene_to_cmpd_activity(node_list) + records, skipped = self.parse_gene_to_cmpd_activity() final_record_count += records final_skipped_count += skipped - - # is there anything to do - if len(node_list) > 0: - self.logger.debug('Creating nodes and edges.') - - # create a data frame with the node list - df: pd.DataFrame = pd.DataFrame(node_list, columns=['grp', 'node_num', 'id', 'name', 'category', 'equivalent_identifiers', 'predicate', 'edge_label', 'pmids', 'affinity', 'affinity_parameter', 'provenance']) - - # get the list of unique nodes and edges - self.get_nodes_and_edges(df) - - self.logger.debug(f'{len(self.final_node_list)} nodes found, {len(self.final_edge_list)} edges found.') - else: - self.logger.warning(f'No records found.') + self.logger.info(f'Found {records} gene to compound records..') # load up the metadata load_metadata = { @@ -135,7 +122,7 @@ def parse_data(self) -> dict: # return the metadata to the caller return load_metadata - def parse_gene_to_disease(self, node_list: list) -> (list, int, int): + def parse_gene_to_disease(self) -> (int, int): """ gets gene to disease records from the pharos DB and creates nodes :param node_list: list, the node list to append this data to @@ -148,8 +135,8 @@ def parse_gene_to_disease(self, node_list: list) -> (list, int, int): # get the data gene_to_disease: dict = self.query_pharos_db(self.GENE_TO_DISEASE_QUERY) - # create a regex pattern to find UML nodes - pattern = re.compile('^C\d+$') # pattern for umls local id + # create a regex pattern to find UMLS nodes + umls_pattern = re.compile('^C\d+$') # pattern for umls local id # for each item in the list for item in gene_to_disease: @@ -157,45 +144,51 @@ def parse_gene_to_disease(self, node_list: list) -> (list, int, int): record_counter += 1 # get the pertinent info from the record - gene = item['value'] - did = item['did'] - name = item['name'] - gene_sym = item['sym'] - provenance = item['dtype'] + gene_id = item['value'] + gene_name = self.sanitize_name(item['sym']) + disease_id = item['did'] + disease_name = self.sanitize_name(item['name']) + edge_provenance = item['dtype'] # move along, no disease id - if did is None: + if disease_id is None: # increment the counter skipped_record_counter += 1 continue - # if this is a UML node, create the curie - elif pattern.match(did): - did = f"UMLS:{did}" - # if this is a orphanet node, create the curie - elif did.startswith('Orphanet:'): - dparts = did.split(':') - did = 'ORPHANET:' + dparts[1] - elif did.startswith('MIM'): - did = 'O' + did - - # create the group id - grp: str = gene + 'WIKIDATA_PROPERTY:P2293' + did + f'{random.random()}' - grp = hashlib.md5(grp.encode("utf-8")).hexdigest() + # if this is a UMLS node, create the curie + elif umls_pattern.match(disease_id): + disease_id = f"UMLS:{disease_id}" + elif disease_id.startswith('Orphanet:'): + disease_id = 'ORPHANET:' + disease_id.split(':')[1] + elif disease_id.startswith('MIM'): + disease_id = 'O' + disease_id # if the drug id is a gene ignore it - if did == gene: - self.logger.error(f'similar parse_gene_to_disease()! {did} == {gene}, {item}') + if disease_id == gene_id: + self.logger.error(f'similar parse_gene_to_disease()! {disease_id} == {gene_id}, {item}') else: - # create the gene node and add it to the node list - node_list.append({'grp': grp, 'node_num': 1, 'id': gene, 'name': gene_sym, 'category': '', 'equivalent_identifiers': ''}) - - # create the disease node and add it to the list - node_list.append({'grp': grp, 'node_num': 2, 'id': did, 'name': name, 'category': '', 'equivalent_identifiers': '', 'predicate': 'WIKIDATA_PROPERTY:P2293', 'edge_label': 'gene_associated_with_condition', 'pmids': [], 'affinity': 0, 'affinity_parameter': '', 'provenance': provenance}) - - # return the node list to the caller - return node_list, record_counter, skipped_record_counter - - def parse_gene_to_drug_activity(self, node_list: list) -> (list, int, int): + disease_node = kgxnode(disease_id, name=disease_name, categories=[DISEASE_OR_PHENOTYPIC_FEATURE]) + self.output_file_writer.write_kgx_node(disease_node) + + gene_node = kgxnode(gene_id, name=gene_name, categories=[GENE]) + self.output_file_writer.write_kgx_node(gene_node) + + if edge_provenance: + gene_to_disease_edge = kgxedge(subject_id=gene_id, + object_id=disease_id, + predicate=self.genetic_association_predicate, + primary_knowledge_source=edge_provenance, + aggregator_knowledge_sources=self.provenance_id) + else: + gene_to_disease_edge = kgxedge(subject_id=gene_id, + object_id=disease_id, + predicate=self.genetic_association_predicate, + primary_knowledge_source=self.provenance_id) + self.output_file_writer.write_kgx_edge(gene_to_disease_edge) + + return record_counter, skipped_record_counter + + def parse_gene_to_drug_activity(self) -> (int, int): """ gets gene to drug activity records from the pharos DB and creates nodes :param node_list: list, the node list to append this data to @@ -215,11 +208,11 @@ def parse_gene_to_drug_activity(self, node_list: list) -> (list, int, int): # increment the counter record_counter += 1 - name = item['drug'] - gene = item['value'] - gene_sym = item['sym'] drug_id = f"{prefixmap[item['id_src']]}{item['cid'].replace('CHEMBL', '')}" - predicate, pmids, props, provenance = self.get_edge_props(item) + drug_name = self.sanitize_name(item['drug']) + gene_id = item['value'] + gene_name = self.sanitize_name(item['sym']) + predicate, pmids, props, edge_provenance = self.get_edge_props(item) # if there were affinity properties use them if len(props) == 2: @@ -229,19 +222,42 @@ def parse_gene_to_drug_activity(self, node_list: list) -> (list, int, int): affinity = 0 affinity_parameter = None - # create the group id - grp: str = drug_id + predicate + gene + f'{random.random()}' - grp = hashlib.md5(grp.encode("utf-8")).hexdigest() - - # create the disease node and add it to the node list - node_list.append({'grp': grp, 'node_num': 1, 'id': drug_id, 'name': name, 'category': '', 'equivalent_identifiers': ''}) - - # create the gene node and add it to the list - node_list.append({'grp': grp, 'node_num': 2, 'id': gene, 'name': gene_sym, 'category': '', 'equivalent_identifiers': '', 'predicate': predicate, 'edge_label': '', 'pmids': pmids, 'affinity': affinity, 'affinity_parameter': affinity_parameter, 'provenance': provenance}) - - return node_list, record_counter, skipped_record_counter - - def parse_gene_to_cmpd_activity(self, node_list: list) -> (list, int, int): + drug_node = kgxnode(drug_id, + name=drug_name) + self.output_file_writer.write_kgx_node(drug_node) + + gene_node = kgxnode(gene_id, + name=gene_name, + categories=[GENE]) + self.output_file_writer.write_kgx_node(gene_node) + + edge_properties = { + PUBLICATIONS: pmids, + 'affinity': affinity, + 'affinity_parameter': affinity_parameter + } + if edge_provenance: + drug_to_gene_edge = kgxedge( + subject_id=drug_id, + object_id=gene_id, + predicate=predicate, + edgeprops=edge_properties, + primary_knowledge_source=edge_provenance, + aggregator_knowledge_sources=self.provenance_id + ) + else: + drug_to_gene_edge = kgxedge( + subject_id=drug_id, + object_id=gene_id, + predicate=predicate, + edgeprops=edge_properties, + primary_knowledge_source=self.provenance_id + ) + self.output_file_writer.write_kgx_edge(drug_to_gene_edge) + + return record_counter, skipped_record_counter + + def parse_gene_to_cmpd_activity(self) -> (int, int): """ gets gene to compound activity records from the pharos DB and creates nodes :param node_list: list, the node list to append this data to @@ -261,31 +277,50 @@ def parse_gene_to_cmpd_activity(self, node_list: list) -> (list, int, int): # increment the counter record_counter += 1 - name = item['drug'] - gene = item['value'] - gene_sym = item['sym'] cmpd_id = f"{prefixmap[item['id_src']]}{item['cid'].replace('CHEMBL', '')}" - predicate, pmids, props, provenance = self.get_edge_props(item) + cmpd_name = self.sanitize_name(item['drug']) + gene_id = item['value'] + gene_name = self.sanitize_name(item['sym']) + predicate, pmids, props, edge_provenance = self.get_edge_props(item) # if there were affinity properties use them if len(props) == 2: affinity = props['affinity'] affinity_parameter = props['affinity_parameter'] else: - affinity = 0 + affinity = None affinity_parameter = None - # create the group id - grp: str = cmpd_id + predicate + gene + f'{random.random()}' - grp = hashlib.md5(grp.encode("utf-8")).hexdigest() - - # create the gene node and add it to the list - node_list.append({'grp': grp, 'node_num': 1, 'id': gene, 'name': gene_sym, 'category': '', 'equivalent_identifiers': ''}) - - # create the compound node and add it to the node list - node_list.append({'grp': grp, 'node_num': 2, 'id': cmpd_id, 'name': name, 'category': '', 'equivalent_identifiers': '', 'predicate': predicate, 'edge_label': '', 'pmids': pmids, 'affinity': affinity, 'affinity_parameter': affinity_parameter, 'provenance': provenance}) + cmpd_node = kgxnode(cmpd_id, + name=cmpd_name) + self.output_file_writer.write_kgx_node(cmpd_node) + + gene_node = kgxnode(gene_id, + name=gene_name) + self.output_file_writer.write_kgx_node(gene_node) + + edge_properties = { + PUBLICATIONS: pmids + } + if affinity and affinity_parameter: + edge_properties['affinity'] = affinity + edge_properties['affinity_parameter'] = affinity_parameter + if edge_provenance: + cmpd_to_gene_edge = kgxedge(subject_id=cmpd_id, + object_id=gene_id, + predicate=predicate, + edgeprops=edge_properties, + primary_knowledge_source=edge_provenance, + aggregator_knowledge_sources=self.provenance_id) + else: + cmpd_to_gene_edge = kgxedge(subject_id=cmpd_id, + object_id=gene_id, + predicate=predicate, + edgeprops=edge_properties, + primary_knowledge_source=self.provenance_id) + self.output_file_writer.write_kgx_edge(cmpd_to_gene_edge) - return node_list, record_counter, skipped_record_counter + return record_counter, skipped_record_counter def get_edge_props(self, result) -> (str, list, dict, str): """ @@ -364,65 +399,8 @@ def query_pharos_db(self, sql_query: str): self.init_pharos_db() return self.pharos_db.query(sql_query) - def get_nodes_and_edges(self, df: pd.DataFrame): - """ - gets a list of nodes and edges for the data frame passed. - - :param df: node storage data frame - :return: - """ - - # separate the data into triplet groups - df_grp: pd.groupby_generic.DataFrameGroupBy = df.set_index('grp').groupby('grp') - - # iterate through the groups and create the edge records. - for row_index, rows in df_grp: - # did we get the correct number of records in the group - if len(rows) == 2: - # init variables for each group - node_1_id: str = '' - - # find the node - for row in rows.iterrows(): - # save the node and node id for the edge - if row[1].node_num == 1: - if row[1]["name"] is not None: - # save the id for the edge - node_1_id = row[1]['id'] - - # make sure the name doesnt have any odd characters - name = ''.join([x if ord(x) < 128 else '?' for x in row[1]["name"]]) - - # save the node - new_node = kgxnode(node_1_id, name=name) - self.final_node_list.append(new_node) - break - - # did we find the root node - if node_1_id != '': - # now for each node - for row in rows.iterrows(): - # save the nodes and the node id for the edge - if row[1].node_num != 1: - # make sure the name doesnt have any odd characters - name = ''.join([x if ord(x) < 128 else '?' for x in row[1]["name"]]) - - # save the node - new_node = kgxnode(row[1]['id'], name=name) - self.final_node_list.append(new_node) - - # save the edge - edge_props = {"publications": row[1]['pmids'], "affinity": row[1]['affinity'], "affinity_parameter": row[1]['affinity_parameter']} - new_edge = kgxedge(subject_id=node_1_id, - predicate=row[1]['predicate'], - object_id=row[1]['id'], - edgeprops=edge_props, - primary_knowledge_source=row[1]['provenance'], - aggregator_knowledge_sources=[self.provenance_id]) - self.final_edge_list.append(new_edge) - else: - self.logger.debug(f'node group mismatch. len: {len(rows)}, data: {rows}') - + def sanitize_name(self, name): + return ''.join([x if ord(x) < 128 else '?' for x in name]) if __name__ == '__main__': # create a command line parser