Source code for oaklib.implementations.ubergraph.ubergraph_implementation

import logging
import math
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import Iterable, Iterator, List, Optional, Tuple, Union

from rdflib import OWL, RDF, RDFS, URIRef

from oaklib.datamodels import obograph
from oaklib.datamodels.similarity import TermPairwiseSimilarity
from oaklib.implementations.sparql.abstract_sparql_implementation import (
    AbstractSparqlImplementation,
    _as_rdf_obj,
    _sparql_values,
)
from oaklib.implementations.sparql.sparql_query import SparqlQuery
from oaklib.interfaces import SubsetterInterface
from oaklib.interfaces.basic_ontology_interface import RELATIONSHIP, RELATIONSHIP_MAP
from oaklib.interfaces.mapping_provider_interface import MappingProviderInterface
from oaklib.interfaces.obograph_interface import GraphTraversalMethod, OboGraphInterface
from oaklib.interfaces.rdf_interface import TRIPLE
from oaklib.interfaces.relation_graph_interface import RelationGraphInterface
from oaklib.interfaces.search_interface import SearchInterface
from oaklib.interfaces.semsim_interface import SemanticSimilarityInterface
from oaklib.interfaces.usages_interface import UsagesInterface
from oaklib.types import CURIE, PRED_CURIE
from oaklib.utilities.graph.networkx_bridge import transitive_reduction_by_predicate
from oaklib.utilities.semsim.similarity_utils import setwise_jaccard_similarity

__all__ = [
    "RelationGraphEnum",
    "UbergraphImplementation",
]


[docs] class RelationGraphEnum(Enum): """ triples in UG are organized into different graphs """ ontology = "http://reasoner.renci.org/ontology" redundant = "http://reasoner.renci.org/redundant" nonredundant = "http://reasoner.renci.org/nonredundant" normalizedInformationContent = "http://reasoner.renci.org/vocab/normalizedInformationContent"
[docs] @dataclass class UbergraphImplementation( AbstractSparqlImplementation, RelationGraphInterface, SearchInterface, OboGraphInterface, MappingProviderInterface, SemanticSimilarityInterface, SubsetterInterface, UsagesInterface, ): """ Wraps the Ubergraph sparql endpoint See: `<https://github.com/INCATools/ubergraph>`_ This is a specialization of the more generic :class:`.SparqlImplementation`, which has knowledge of some of the specialized patterns found in Ubergraph An UbergraphImplementation can be initialed by: >>> from oaklib.implementations.ubergraph.ubergraph_implementation import UbergraphImplementation >>> adapter = UbergraphImplementation() or >>> from oaklib import get_adapter >>> adapter = get_adapter("ubergraph:") to use a specific ontology or named graph within ubergraph: >>> adapter = get_adapter("ubergraph:cl") """ def _default_url(self) -> str: return "https://ubergraph.apps.renci.org/sparql" def _is_blazegraph(self) -> bool: """ Currently Ubergraph uses blazegraph """ return True @property def named_graph(self) -> Optional[str]: if not self.resource or self.resource.slug is None: return None else: ont = self.resource.slug if ont: for g in self.list_of_named_graphs(): if f"/{ont}." in g or f"/{ont}-base" in g: return g logging.warning(f"No graph named: {ont}") # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: RelationGraph # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def _get_outgoing_edges_by_curie( self, curie: CURIE, graph: RelationGraphEnum, predicates: List[PRED_CURIE] = None ) -> Iterable[Tuple[CURIE, CURIE]]: query_uri = self.curie_to_sparql(curie) query = SparqlQuery( select=["?p", "?o"], where=[f"GRAPH <{graph.value}> {{ {query_uri} ?p ?o }}", "?o a owl:Class"], ) if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] query.where.append(f'VALUES ?p {{ {" ".join(pred_uris)} }}') bindings = self._sparql_query(query.query_str()) for row in bindings: pred = self.uri_to_curie(row["p"]["value"]) obj = self.uri_to_curie(row["o"]["value"]) yield pred, obj def _get_incoming_edges_by_curie( self, curie: CURIE, graph: RelationGraphEnum, predicates: List[PRED_CURIE] = None ) -> Iterable[Tuple[CURIE, CURIE]]: query_uri = self.curie_to_sparql(curie) query = SparqlQuery( select=["?s", "?p"], where=[f"GRAPH <{graph.value}> {{ ?s ?p {query_uri} }}", "?s a owl:Class"], ) if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] query.where.append(f'VALUES ?p {{ {" ".join(pred_uris)} }}') bindings = self._sparql_query(query.query_str()) for row in bindings: pred = self.uri_to_curie(row["p"]["value"]) subj = self.uri_to_curie(row["s"]["value"]) yield pred, subj def outgoing_relationship_map(self, curie: CURIE, isa_only: bool = False) -> RELATIONSHIP_MAP: rmap = defaultdict(list) for pred, obj in self._get_outgoing_edges_by_curie( curie, graph=RelationGraphEnum.nonredundant ): rmap[pred].append(obj) return rmap def incoming_relationship_map(self, curie: CURIE, isa_only: bool = False) -> RELATIONSHIP_MAP: rmap = defaultdict(list) for pred, s in self._get_incoming_edges_by_curie( curie, graph=RelationGraphEnum.nonredundant ): rmap[pred].append(s) return rmap def relationships( self, subjects: List[CURIE] = None, predicates: List[PRED_CURIE] = None, objects: List[CURIE] = None, include_tbox: bool = True, include_abox: bool = True, include_entailed: bool = False, ) -> Iterator[RELATIONSHIP]: query = SparqlQuery(select=["?s", "?p", "?o"], where=["?s ?p ?o"]) if not include_entailed: query.graph = RelationGraphEnum.nonredundant.value if subjects: query.where.append(_sparql_values("s", [self.curie_to_sparql(x) for x in subjects])) if predicates: query.where.append(_sparql_values("p", [self.curie_to_sparql(x) for x in predicates])) if objects: query.where.append(_sparql_values("o", [self.curie_to_sparql(x) for x in objects])) bindings = self._sparql_query(query.query_str()) for row in bindings: sub = self.uri_to_curie(row["s"]["value"]) pred = self.uri_to_curie(row["p"]["value"]) obj = self.uri_to_curie(row["o"]["value"]) yield sub, pred, obj def entailed_outgoing_relationships( self, curie: CURIE, predicates: List[PRED_CURIE] = None ) -> Iterable[Tuple[PRED_CURIE, CURIE]]: return self._get_outgoing_edges_by_curie( curie, graph=RelationGraphEnum.redundant, predicates=predicates ) def entailed_incoming_relationships( self, curie: CURIE, predicates: List[PRED_CURIE] = None ) -> Iterable[Tuple[PRED_CURIE, CURIE]]: return self._get_incoming_edges_by_curie( curie, graph=RelationGraphEnum.redundant, predicates=predicates ) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: OboGraph # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def _values(self, var: str, in_list: Optional[List[str]]) -> str: if in_list is None: return "" else: return f'VALUES ?{var} {{ {" ".join(in_list)} }}' def _from_subjects_chunked( self, subjects: List[CURIE], predicates: List[PRED_CURIE] = None, **kwargs ): size = 10 while len(subjects) > 0: next_subjects = subjects[0:size] subjects = subjects[size:] for r in self._from_subjects(next_subjects, predicates, **kwargs): yield r def _from_subjects( self, subjects: List[CURIE], predicates: List[PRED_CURIE] = None, graph: str = None, object_is_literal=False, where=None, ) -> Iterable[Tuple[CURIE, PRED_CURIE, CURIE]]: if where is None: where = [] subject_uris = [self.curie_to_sparql(curie) for curie in subjects] if predicates: predicate_uris = [self.curie_to_sparql(curie) for curie in predicates] else: predicate_uris = None query = SparqlQuery( select=["?s ?p ?o"], distinct=True, graph=graph, where=[ "?s ?p ?o", _sparql_values("s", subject_uris), _sparql_values("p", predicate_uris), ] + where, ) # print(f'G={graph} Q={query.query_str()}') bindings = self._sparql_query(query.query_str()) for row in bindings: v = row["o"]["value"] if not object_is_literal: v = self.uri_to_curie(v) yield (self.uri_to_curie(row["s"]["value"]), self.uri_to_curie(row["p"]["value"]), v) def _object_properties(self) -> List[PRED_CURIE]: return list(set([t[0] for t in self._triples(None, RDF.type, OWL.ObjectProperty)])) def ancestor_graph( self, start_curies: Union[CURIE, List[CURIE]], predicates: List[PRED_CURIE] = None ) -> obograph.Graph: ancs = list(self.ancestors(start_curies, predicates)) logging.info(f"NUM ANCS: {len(ancs)}") edges = [] nodes = {} for rel in self._from_subjects_chunked( ancs, predicates, graph=RelationGraphEnum.nonredundant.value, where=[] ): edges.append(obograph.Edge(sub=rel[0], pred=rel[1], obj=rel[2])) logging.info(f"NUM EDGES: {len(edges)}") for rel in self._from_subjects_chunked(ancs, [RDFS.label], object_is_literal=True): id = rel[0] nodes[id] = obograph.Node(id=id, lbl=rel[2]) logging.info(f"NUM NODES: {len(nodes)}") return obograph.Graph(id="query", nodes=list(nodes.values()), edges=edges) def relationships_to_graph(self, relationships: Iterable[RELATIONSHIP]) -> obograph.Graph: relationships = list(relationships) edges = [obograph.Edge(sub=s, pred=p, obj=o) for s, p, o in relationships] node_ids = set() for rel in relationships: node_ids.update(list(rel)) nodes = {} for s, _, o in self._from_subjects_chunked( list(node_ids), [RDFS.label], object_is_literal=True ): nodes[s] = obograph.Node(id=s, lbl=o) logging.info(f"NUM EDGES: {len(edges)}") return obograph.Graph(id="query", nodes=list(nodes.values()), edges=edges) def ancestors( self, start_curies: Union[CURIE, List[CURIE]], predicates: List[PRED_CURIE] = None, reflexive=True, method: Optional[GraphTraversalMethod] = None, ) -> Iterable[CURIE]: if method and method == GraphTraversalMethod.HOP: raise NotImplementedError("HOP not implemented for ubergraph") # TODO: DRY if not isinstance(start_curies, list): start_curies = [start_curies] query_uris = [self.curie_to_sparql(curie) for curie in start_curies] where = [ "?s ?p ?o", "?o a owl:Class", # f'?p a owl:ObjectProperty', _sparql_values("s", query_uris), ] if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] where.append(_sparql_values("p", pred_uris)) query = SparqlQuery(select=["?o"], distinct=True, where=where) bindings = self._sparql_query(query.query_str()) for row in bindings: yield self.uri_to_curie(row["o"]["value"]) def descendants( self, start_curies: Union[CURIE, List[CURIE]], predicates: List[PRED_CURIE] = None, reflexive=True, method: Optional[GraphTraversalMethod] = None, ) -> Iterable[CURIE]: if method and method == GraphTraversalMethod.HOP: raise NotImplementedError("HOP not implemented for ubergraph") # TODO: DRY query_uris = [self.curie_to_sparql(curie) for curie in start_curies] where = ["?s ?p ?o", "?s a owl:Class", f'VALUES ?o {{ {" ".join(query_uris)} }}'] if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] where.append(f'VALUES ?p {{ {" ".join(pred_uris)} }}') query = SparqlQuery(select=["?s"], distinct=True, where=where) bindings = self._sparql_query(query.query_str()) for row in bindings: yield self.uri_to_curie(row["s"]["value"]) def dump(self, path: str = None, syntax: str = None): raise NotImplementedError("Dump not allowed on ubergraph") # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: Subsetter # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def gap_fill_relationships( self, seed_curies: List[CURIE], predicates: List[PRED_CURIE] = None ) -> Iterator[RELATIONSHIP]: # TODO: compare with https://api.triplydb.com/s/_mZ9q_-rg query_uris = [self.curie_to_sparql(curie) for curie in seed_curies] where = ["?s ?p ?o", _sparql_values("s", query_uris), _sparql_values("o", query_uris)] if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] where.append(_sparql_values("p", pred_uris)) query = SparqlQuery(select=["?s ?p ?o"], where=where) bindings = self._sparql_query(query.query_str()) # TODO: remove redundancy rels = [] for row in bindings: rels.append( ( self.uri_to_curie(row["s"]["value"]), self.uri_to_curie(row["p"]["value"]), self.uri_to_curie(row["o"]["value"]), ) ) for rel in transitive_reduction_by_predicate(rels): yield rel # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: SemSim # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def common_ancestors( self, subject: CURIE, object: CURIE, predicates: List[PRED_CURIE] = None ) -> Iterable[CURIE]: s_uri = self.curie_to_sparql(subject) o_uri = self.curie_to_sparql(object) where = [f"{s_uri} ?sp ?a", f"{o_uri} ?op ?a", "?a a owl:Class"] if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] where.append(_sparql_values("sp", pred_uris)) where.append(_sparql_values("op", pred_uris)) query = SparqlQuery(select=["?a"], distinct=True, where=where) bindings = self._sparql_query(query.query_str()) for row in bindings: yield self.uri_to_curie(row["a"]["value"]) def most_recent_common_ancestors( self, subject: CURIE, object: CURIE, predicates: List[PRED_CURIE] = None ) -> Iterable[CURIE]: s_uri = self.curie_to_sparql(subject) o_uri = self.curie_to_sparql(object) where = [f"{s_uri} ?sp ?a", f"{o_uri} ?op ?a", "?a a owl:Class"] where2 = [f"{s_uri} ?sp2 ?a2", f"{o_uri} ?op2 ?a2", "?a2 ?ap2 ?a", "FILTER( ?a != ?a2)"] if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] where.append(_sparql_values("sp", pred_uris)) where.append(_sparql_values("op", pred_uris)) where2.append(_sparql_values("sp2", pred_uris)) where2.append(_sparql_values("op2", pred_uris)) where2.append(_sparql_values("ap2", pred_uris)) query = SparqlQuery(select=["?a"], distinct=True, where=where) subq = SparqlQuery(select=["?a2"], where=where2) query.add_not_in(subq) bindings = self._sparql_query(query.query_str()) for row in bindings: yield self.uri_to_curie(row["a"]["value"]) def get_information_content( self, curie: CURIE, background: CURIE = None, predicates: List[PRED_CURIE] = None ) -> float: if predicates is not None: raise NotImplementedError("Only predetermined predicates allowed") ics = self._get_anns(curie, URIRef(RelationGraphEnum.normalizedInformationContent.value)) if len(ics) > 1: raise ValueError(f"Multiple ICs for {curie} = {ics}") return float(ics[0]) def pairwise_similarity( self, subject: CURIE, object: CURIE = None, predicates: List[PRED_CURIE] = None ) -> TermPairwiseSimilarity: s_uri = self.curie_to_sparql(subject) o_uri = self.curie_to_sparql(object) where = [ f"{s_uri} ?sp ?a", f"{o_uri} ?op ?a", "?a a owl:Class", f"?a <{RelationGraphEnum.normalizedInformationContent.value}> ?ic", ] if predicates: pred_uris = [self.curie_to_sparql(pred) for pred in predicates] where.append(_sparql_values("sp", pred_uris)) where.append(_sparql_values("op", pred_uris)) query = SparqlQuery(select=["?a", "?ic"], distinct=True, where=where) bindings = self._sparql_query(query.query_str()) ics = { self.uri_to_curie(row["a"]["value"]): float(self.uri_to_curie(row["ic"]["value"])) for row in bindings } max_ic = max(list(ics.values())) best_mrcas = [a for a in ics if ics[a] == max_ic] mrca = best_mrcas[0] sim = TermPairwiseSimilarity(subject_id=subject, object_id=object, ancestor_id=mrca) for curie, label in self.labels([subject, object, mrca]): if label is None: continue # print(f'C={curie} L={label}') if curie == subject: sim.subject_label = label if curie == object: sim.object_label = label if curie == mrca: sim.ancestor_label = label sim.ancestor_information_content = max_ic sim.jaccard_similarity = setwise_jaccard_similarity( list(self.ancestors(subject, predicates=predicates)), list(self.ancestors(object, predicates=predicates)), ) sim.phenodigm_score = math.sqrt(sim.jaccard_similarity * sim.ancestor_information_content) return sim # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: RdfInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def extract_triples( self, seed_curies: List[CURIE], predicates: List[PRED_CURIE] = None, strategy=None, map_to_curies=True, ) -> Iterator[TRIPLE]: seed_uris = [self.curie_to_sparql(c) for c in seed_curies] # Note that some triplestores will have performance issues with this query traverse_preds = [ "rdfs:subClassOf", "owl:onProperty", "owl:someValuesFrom", "owl:annotatedSource", "owl:equivalentClass", ] if predicates: # note that predicates are only used in the ABox - for a RelationGraph-implementing # triplestore this will also include TBox existentials traverse_preds = list(set(traverse_preds + predicates)) query = SparqlQuery( select=["?s", "?p", "?o"], graph=[RelationGraphEnum.ontology.value], where=[ "?s ?p ?o ." f'?seed ({"|".join(traverse_preds)})* ?s', _sparql_values("seed", seed_uris), ], ) bindings = self._sparql_query(query) n = 0 for row in bindings: n += 1 triple = (row["s"], row["p"], row["o"]) if map_to_curies: yield tuple([self.uri_to_curie(v["value"]) for v in list(triple)]) else: yield tuple([_as_rdf_obj(v) for v in list(triple)]) logging.info(f"Total triples: {n}")