Skip to content

Commit

Permalink
fixed bug where compound to gene edges were backwards,
Browse files Browse the repository at this point in the history
big clean up refactor to use kgx file writer instead of appending to a list and using a dataframe
  • Loading branch information
EvanDietzMorris committed Jan 18, 2023
1 parent c8b3a8a commit 6cdc570
Showing 1 changed file with 126 additions and 148 deletions.
274 changes: 126 additions & 148 deletions parsers/PHAROS/src/loadPHAROS.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import os
import hashlib
import argparse
import pandas as pd
import re
import random
import requests

from Common.loader_interface import SourceDataLoader, SourceDataBrokenError, SourceDataFailedError
from Common.kgxmodel import kgxnode, kgxedge
from Common.node_types import GENE, DISEASE_OR_PHENOTYPIC_FEATURE, PUBLICATIONS
from Common.utils import GetData, snakify
from Common.db_connectors import MySQLConnector
from Common.predicates import DGIDB_PREDICATE_MAPPING
Expand Down Expand Up @@ -43,7 +42,7 @@ class PHAROSLoader(SourceDataLoader):

source_id = 'PHAROS'
provenance_id = 'infores:pharos'
parsing_version: str = '1.1'
parsing_version: str = '1.2'

def __init__(self, test_mode: bool = False, source_data_dir: str = None):
"""
Expand All @@ -56,6 +55,8 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
self.data_url = 'http://juniper.health.unm.edu/tcrd/download/'
self.source_db = 'Target Central Resource Database'
self.pharos_db = None
self.genetic_association_predicate = 'WIKIDATA_PROPERTY:P2293'


def get_latest_source_version(self) -> str:
"""
Expand Down Expand Up @@ -90,41 +91,27 @@ def parse_data(self) -> dict:
"Manually stand up PHAROS DB and configure environment variables before trying again."
raise SourceDataFailedError(error_message=error_message)

# storage for the node list
node_list: list = []

final_record_count: int = 0
final_skipped_count: int = 0

# get the nodes and edges for each dataset
self.logger.info('Querying for gene to disease..')
node_list, records, skipped = self.parse_gene_to_disease(node_list)
records, skipped = self.parse_gene_to_disease()
final_record_count += records
final_skipped_count += skipped
self.logger.info(f'Found {records} gene to disease records..')

self.logger.info('Querying for gene to drug activity..')
node_list, records, skipped = self.parse_gene_to_drug_activity(node_list)
records, skipped = self.parse_gene_to_drug_activity()
final_record_count += records
final_skipped_count += skipped
self.logger.info(f'Found {records} gene to drug records..')

self.logger.info('Querying for gene to compound activity..')
node_list, records, skipped = self.parse_gene_to_cmpd_activity(node_list)
records, skipped = self.parse_gene_to_cmpd_activity()
final_record_count += records
final_skipped_count += skipped

# is there anything to do
if len(node_list) > 0:
self.logger.debug('Creating nodes and edges.')

# create a data frame with the node list
df: pd.DataFrame = pd.DataFrame(node_list, columns=['grp', 'node_num', 'id', 'name', 'category', 'equivalent_identifiers', 'predicate', 'edge_label', 'pmids', 'affinity', 'affinity_parameter', 'provenance'])

# get the list of unique nodes and edges
self.get_nodes_and_edges(df)

self.logger.debug(f'{len(self.final_node_list)} nodes found, {len(self.final_edge_list)} edges found.')
else:
self.logger.warning(f'No records found.')
self.logger.info(f'Found {records} gene to compound records..')

# load up the metadata
load_metadata = {
Expand All @@ -135,7 +122,7 @@ def parse_data(self) -> dict:
# return the metadata to the caller
return load_metadata

def parse_gene_to_disease(self, node_list: list) -> (list, int, int):
def parse_gene_to_disease(self) -> (int, int):
"""
gets gene to disease records from the pharos DB and creates nodes
:param node_list: list, the node list to append this data to
Expand All @@ -148,54 +135,60 @@ def parse_gene_to_disease(self, node_list: list) -> (list, int, int):
# get the data
gene_to_disease: dict = self.query_pharos_db(self.GENE_TO_DISEASE_QUERY)

# create a regex pattern to find UML nodes
pattern = re.compile('^C\d+$') # pattern for umls local id
# create a regex pattern to find UMLS nodes
umls_pattern = re.compile('^C\d+$') # pattern for umls local id

# for each item in the list
for item in gene_to_disease:
# increment the counter
record_counter += 1

# get the pertinent info from the record
gene = item['value']
did = item['did']
name = item['name']
gene_sym = item['sym']
provenance = item['dtype']
gene_id = item['value']
gene_name = self.sanitize_name(item['sym'])
disease_id = item['did']
disease_name = self.sanitize_name(item['name'])
edge_provenance = item['dtype']

# move along, no disease id
if did is None:
if disease_id is None:
# increment the counter
skipped_record_counter += 1
continue
# if this is a UML node, create the curie
elif pattern.match(did):
did = f"UMLS:{did}"
# if this is a orphanet node, create the curie
elif did.startswith('Orphanet:'):
dparts = did.split(':')
did = 'ORPHANET:' + dparts[1]
elif did.startswith('MIM'):
did = 'O' + did

# create the group id
grp: str = gene + 'WIKIDATA_PROPERTY:P2293' + did + f'{random.random()}'
grp = hashlib.md5(grp.encode("utf-8")).hexdigest()
# if this is a UMLS node, create the curie
elif umls_pattern.match(disease_id):
disease_id = f"UMLS:{disease_id}"
elif disease_id.startswith('Orphanet:'):
disease_id = 'ORPHANET:' + disease_id.split(':')[1]
elif disease_id.startswith('MIM'):
disease_id = 'O' + disease_id

# if the drug id is a gene ignore it
if did == gene:
self.logger.error(f'similar parse_gene_to_disease()! {did} == {gene}, {item}')
if disease_id == gene_id:
self.logger.error(f'similar parse_gene_to_disease()! {disease_id} == {gene_id}, {item}')
else:
# create the gene node and add it to the node list
node_list.append({'grp': grp, 'node_num': 1, 'id': gene, 'name': gene_sym, 'category': '', 'equivalent_identifiers': ''})

# create the disease node and add it to the list
node_list.append({'grp': grp, 'node_num': 2, 'id': did, 'name': name, 'category': '', 'equivalent_identifiers': '', 'predicate': 'WIKIDATA_PROPERTY:P2293', 'edge_label': 'gene_associated_with_condition', 'pmids': [], 'affinity': 0, 'affinity_parameter': '', 'provenance': provenance})

# return the node list to the caller
return node_list, record_counter, skipped_record_counter

def parse_gene_to_drug_activity(self, node_list: list) -> (list, int, int):
disease_node = kgxnode(disease_id, name=disease_name, categories=[DISEASE_OR_PHENOTYPIC_FEATURE])
self.output_file_writer.write_kgx_node(disease_node)

gene_node = kgxnode(gene_id, name=gene_name, categories=[GENE])
self.output_file_writer.write_kgx_node(gene_node)

if edge_provenance:
gene_to_disease_edge = kgxedge(subject_id=gene_id,
object_id=disease_id,
predicate=self.genetic_association_predicate,
primary_knowledge_source=edge_provenance,
aggregator_knowledge_sources=self.provenance_id)
else:
gene_to_disease_edge = kgxedge(subject_id=gene_id,
object_id=disease_id,
predicate=self.genetic_association_predicate,
primary_knowledge_source=self.provenance_id)
self.output_file_writer.write_kgx_edge(gene_to_disease_edge)

return record_counter, skipped_record_counter

def parse_gene_to_drug_activity(self) -> (int, int):
"""
gets gene to drug activity records from the pharos DB and creates nodes
:param node_list: list, the node list to append this data to
Expand All @@ -215,11 +208,11 @@ def parse_gene_to_drug_activity(self, node_list: list) -> (list, int, int):
# increment the counter
record_counter += 1

name = item['drug']
gene = item['value']
gene_sym = item['sym']
drug_id = f"{prefixmap[item['id_src']]}{item['cid'].replace('CHEMBL', '')}"
predicate, pmids, props, provenance = self.get_edge_props(item)
drug_name = self.sanitize_name(item['drug'])
gene_id = item['value']
gene_name = self.sanitize_name(item['sym'])
predicate, pmids, props, edge_provenance = self.get_edge_props(item)

# if there were affinity properties use them
if len(props) == 2:
Expand All @@ -229,19 +222,42 @@ def parse_gene_to_drug_activity(self, node_list: list) -> (list, int, int):
affinity = 0
affinity_parameter = None

# create the group id
grp: str = drug_id + predicate + gene + f'{random.random()}'
grp = hashlib.md5(grp.encode("utf-8")).hexdigest()

# create the disease node and add it to the node list
node_list.append({'grp': grp, 'node_num': 1, 'id': drug_id, 'name': name, 'category': '', 'equivalent_identifiers': ''})

# create the gene node and add it to the list
node_list.append({'grp': grp, 'node_num': 2, 'id': gene, 'name': gene_sym, 'category': '', 'equivalent_identifiers': '', 'predicate': predicate, 'edge_label': '', 'pmids': pmids, 'affinity': affinity, 'affinity_parameter': affinity_parameter, 'provenance': provenance})

return node_list, record_counter, skipped_record_counter

def parse_gene_to_cmpd_activity(self, node_list: list) -> (list, int, int):
drug_node = kgxnode(drug_id,
name=drug_name)
self.output_file_writer.write_kgx_node(drug_node)

gene_node = kgxnode(gene_id,
name=gene_name,
categories=[GENE])
self.output_file_writer.write_kgx_node(gene_node)

edge_properties = {
PUBLICATIONS: pmids,
'affinity': affinity,
'affinity_parameter': affinity_parameter
}
if edge_provenance:
drug_to_gene_edge = kgxedge(
subject_id=drug_id,
object_id=gene_id,
predicate=predicate,
edgeprops=edge_properties,
primary_knowledge_source=edge_provenance,
aggregator_knowledge_sources=self.provenance_id
)
else:
drug_to_gene_edge = kgxedge(
subject_id=drug_id,
object_id=gene_id,
predicate=predicate,
edgeprops=edge_properties,
primary_knowledge_source=self.provenance_id
)
self.output_file_writer.write_kgx_edge(drug_to_gene_edge)

return record_counter, skipped_record_counter

def parse_gene_to_cmpd_activity(self) -> (int, int):
"""
gets gene to compound activity records from the pharos DB and creates nodes
:param node_list: list, the node list to append this data to
Expand All @@ -261,31 +277,50 @@ def parse_gene_to_cmpd_activity(self, node_list: list) -> (list, int, int):
# increment the counter
record_counter += 1

name = item['drug']
gene = item['value']
gene_sym = item['sym']
cmpd_id = f"{prefixmap[item['id_src']]}{item['cid'].replace('CHEMBL', '')}"
predicate, pmids, props, provenance = self.get_edge_props(item)
cmpd_name = self.sanitize_name(item['drug'])
gene_id = item['value']
gene_name = self.sanitize_name(item['sym'])
predicate, pmids, props, edge_provenance = self.get_edge_props(item)

# if there were affinity properties use them
if len(props) == 2:
affinity = props['affinity']
affinity_parameter = props['affinity_parameter']
else:
affinity = 0
affinity = None
affinity_parameter = None

# create the group id
grp: str = cmpd_id + predicate + gene + f'{random.random()}'
grp = hashlib.md5(grp.encode("utf-8")).hexdigest()

# create the gene node and add it to the list
node_list.append({'grp': grp, 'node_num': 1, 'id': gene, 'name': gene_sym, 'category': '', 'equivalent_identifiers': ''})

# create the compound node and add it to the node list
node_list.append({'grp': grp, 'node_num': 2, 'id': cmpd_id, 'name': name, 'category': '', 'equivalent_identifiers': '', 'predicate': predicate, 'edge_label': '', 'pmids': pmids, 'affinity': affinity, 'affinity_parameter': affinity_parameter, 'provenance': provenance})
cmpd_node = kgxnode(cmpd_id,
name=cmpd_name)
self.output_file_writer.write_kgx_node(cmpd_node)

gene_node = kgxnode(gene_id,
name=gene_name)
self.output_file_writer.write_kgx_node(gene_node)

edge_properties = {
PUBLICATIONS: pmids
}
if affinity and affinity_parameter:
edge_properties['affinity'] = affinity
edge_properties['affinity_parameter'] = affinity_parameter
if edge_provenance:
cmpd_to_gene_edge = kgxedge(subject_id=cmpd_id,
object_id=gene_id,
predicate=predicate,
edgeprops=edge_properties,
primary_knowledge_source=edge_provenance,
aggregator_knowledge_sources=self.provenance_id)
else:
cmpd_to_gene_edge = kgxedge(subject_id=cmpd_id,
object_id=gene_id,
predicate=predicate,
edgeprops=edge_properties,
primary_knowledge_source=self.provenance_id)
self.output_file_writer.write_kgx_edge(cmpd_to_gene_edge)

return node_list, record_counter, skipped_record_counter
return record_counter, skipped_record_counter

def get_edge_props(self, result) -> (str, list, dict, str):
"""
Expand Down Expand Up @@ -364,65 +399,8 @@ def query_pharos_db(self, sql_query: str):
self.init_pharos_db()
return self.pharos_db.query(sql_query)

def get_nodes_and_edges(self, df: pd.DataFrame):
"""
gets a list of nodes and edges for the data frame passed.
:param df: node storage data frame
:return:
"""

# separate the data into triplet groups
df_grp: pd.groupby_generic.DataFrameGroupBy = df.set_index('grp').groupby('grp')

# iterate through the groups and create the edge records.
for row_index, rows in df_grp:
# did we get the correct number of records in the group
if len(rows) == 2:
# init variables for each group
node_1_id: str = ''

# find the node
for row in rows.iterrows():
# save the node and node id for the edge
if row[1].node_num == 1:
if row[1]["name"] is not None:
# save the id for the edge
node_1_id = row[1]['id']

# make sure the name doesnt have any odd characters
name = ''.join([x if ord(x) < 128 else '?' for x in row[1]["name"]])

# save the node
new_node = kgxnode(node_1_id, name=name)
self.final_node_list.append(new_node)
break

# did we find the root node
if node_1_id != '':
# now for each node
for row in rows.iterrows():
# save the nodes and the node id for the edge
if row[1].node_num != 1:
# make sure the name doesnt have any odd characters
name = ''.join([x if ord(x) < 128 else '?' for x in row[1]["name"]])

# save the node
new_node = kgxnode(row[1]['id'], name=name)
self.final_node_list.append(new_node)

# save the edge
edge_props = {"publications": row[1]['pmids'], "affinity": row[1]['affinity'], "affinity_parameter": row[1]['affinity_parameter']}
new_edge = kgxedge(subject_id=node_1_id,
predicate=row[1]['predicate'],
object_id=row[1]['id'],
edgeprops=edge_props,
primary_knowledge_source=row[1]['provenance'],
aggregator_knowledge_sources=[self.provenance_id])
self.final_edge_list.append(new_edge)
else:
self.logger.debug(f'node group mismatch. len: {len(rows)}, data: {rows}')

def sanitize_name(self, name):
return ''.join([x if ord(x) < 128 else '?' for x in name])

if __name__ == '__main__':
# create a command line parser
Expand Down

0 comments on commit 6cdc570

Please sign in to comment.