import logging
import re
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Tuple, Union
import sssom_schema as sssom
from kgcl_schema.datamodel import kgcl
from linkml_runtime.dumpers import json_dumper
from linkml_runtime.loaders import json_loader
from oaklib.converters.obo_graph_to_rdf_owl_converter import SCOPE_MAP
from oaklib.datamodels import obograph
from oaklib.datamodels.obograph import (
Edge,
Graph,
GraphDocument,
LogicalDefinitionAxiom,
Meta,
Node,
)
from oaklib.datamodels.search import SearchConfiguration
from oaklib.datamodels.search_datamodel import SearchProperty, SearchTermSyntax
from oaklib.datamodels.vocabulary import (
EQUIVALENT_CLASS,
HAS_DBXREF,
INVERSE_OF,
IS_A,
LABEL_PREDICATE,
OWL_CLASS,
OWL_OBJECT_PROPERTY,
SUBPROPERTY_OF,
)
from oaklib.interfaces.basic_ontology_interface import (
ALIAS_MAP,
LANGUAGE_TAG,
RELATIONSHIP,
RELATIONSHIP_MAP,
)
from oaklib.interfaces.differ_interface import DifferInterface
from oaklib.interfaces.dumper_interface import DumperInterface
from oaklib.interfaces.merge_interface import MergeInterface
from oaklib.interfaces.obograph_interface import OboGraphInterface
from oaklib.interfaces.patcher_interface import PatcherInterface
from oaklib.interfaces.rdf_interface import RdfInterface
from oaklib.interfaces.search_interface import SearchInterface
from oaklib.interfaces.validator_interface import ValidatorInterface
from oaklib.resource import OntologyResource
from oaklib.types import CURIE, PRED_CURIE, SUBSET_CURIE, URI
from oaklib.utilities.axioms.logical_definition_utilities import (
logical_definition_matches,
)
from oaklib.utilities.basic_utils import pairs_as_dict
RDFLIB_FORMAT_MAP = {
"ttl": "ttl",
"n3": "n3",
"rdfxml": "xml",
"xml": "xml",
"owl": "xml",
}
[docs]
@dataclass
class OboGraphImplementation(
ValidatorInterface,
DifferInterface,
RdfInterface,
OboGraphInterface,
SearchInterface,
PatcherInterface,
DumperInterface,
MergeInterface,
):
"""
OBO Graphs JSON backed implementation.
This implementation works off of an in-memory GraphDocument object.
To use:
.. packages :: python
>>> from oaklib import get_adapter
>>> oi = get_adapter('obograph:tests/input/go-nucleus.json')
>>> for node_id in oi.entities():
... print(node_id, oi.label(node_id))
<BLANKLINE>
...
GO:0043226 organelle
...
"""
obograph_document: GraphDocument = None
_relationship_index_cache: Dict[CURIE, List[RELATIONSHIP]] = None
def __post_init__(self):
if self.obograph_document is None:
resource = self.resource
if resource and resource.local_path:
gd = json_loader.load(str(resource.local_path), target_class=GraphDocument)
else:
gd = GraphDocument()
self.obograph_document = gd
def uri_to_curie(
self, uri: URI, strict: bool = False, use_uri_fallback=True
) -> Optional[CURIE]:
# TODO: use a map
if uri == "is_a":
return IS_A
elif uri == "subPropertyOf":
return SUBPROPERTY_OF
elif uri == "inverseOf":
return INVERSE_OF
elif uri == EQUIVALENT_CLASS:
return EQUIVALENT_CLASS
else:
return super().uri_to_curie(uri, strict=strict, use_uri_fallback=use_uri_fallback)
def store(self, resource: OntologyResource = None) -> None:
if resource is None:
resource = self.resource
od = self.obograph_document
if resource.local:
if resource.slug:
json_dumper.dump(od, resource.slug)
else:
print(json_dumper.dumps(od))
else:
raise NotImplementedError(f"Cannot dump to {resource}")
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: BasicOntologyInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def _tuple_to_curies(self, t: Tuple) -> Tuple:
return tuple([self.uri_to_curie(x) for x in t])
def _all_relationships(self) -> Iterator[RELATIONSHIP]:
logging.info("Commencing indexing")
od = self.obograph_document
for g in od.graphs:
for e in g.edges:
yield self._tuple_to_curies((e.sub, e.pred, e.obj))
for ens in g.equivalentNodesSets:
for n1 in ens.nodeIds:
for n2 in ens.nodeIds:
if n1 != n2:
# directionality is lost in OboGraph representation
yield self._tuple_to_curies((n1, EQUIVALENT_CLASS, n2))
def entities(self, filter_obsoletes=True, owl_type=None) -> Iterable[CURIE]:
od = self.obograph_document
for g in od.graphs:
for n in g.nodes:
if filter_obsoletes and n.meta and n.meta.deprecated:
continue
if owl_type:
t = n.type
if t:
if t == "CLASS" and owl_type != OWL_CLASS:
continue
if t == "PROPERTY" and owl_type != OWL_OBJECT_PROPERTY:
continue
yield self.uri_to_curie(n.id)
def obsoletes(self) -> Iterable[CURIE]:
od = self.obograph_document
for g in od.graphs:
for n in g.nodes:
if n.meta.deprecated:
yield n.id
# TODO: abstract into separate standalone package
def _get_subset_curie(self, curie: str) -> str:
if "#" in curie:
return curie.split("#")[-1]
else:
return curie
def _node_subsets(self, node: Node) -> List[SUBSET_CURIE]:
if node.meta:
return [self._get_subset_curie(s) for s in node.meta.subsets]
else:
return []
def _entire_graph(self) -> Graph:
if len(self.obograph_document.graphs) > 1:
raise ValueError("Multiple graphs")
return self.obograph_document.graphs[0]
def _nodes(self) -> Iterator[Node]:
for g in self.obograph_document.graphs:
for n in g.nodes:
yield n
def _node(self, curie: CURIE, strict=False) -> Optional[Node]:
node: Optional[Node] = None
for g in self.obograph_document.graphs:
for n in g.nodes:
# TODO: make this more efficient
if self.uri_to_curie(n.id) == curie:
# handle duplicates
if node:
if node.lbl:
if strict:
raise ValueError(f"Multiple nodes with id {curie}")
else:
# previously encountered node was dangling/stub;
# replace
node = n
else:
node = n
if node:
node = deepcopy(node)
node.id = self.uri_to_curie(node.id)
return node
else:
if strict:
raise ValueError(f"No such node {curie}")
def _meta(self, curie: CURIE, strict=False) -> Optional[Meta]:
n = self._node(curie, strict=strict)
if n:
return n.meta
def ontologies(self) -> Iterable[CURIE]:
return [g.id for g in self.obograph_document.graphs]
def subsets(self) -> Iterable[CURIE]:
raise NotImplementedError
def subset_members(self, subset: SUBSET_CURIE) -> Iterable[CURIE]:
od = self.obograph_document
for g in od.graphs:
for n in g.nodes:
if subset in self._node_subsets(n):
yield n
def label(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]:
if lang:
raise NotImplementedError("Language tags not supported")
if curie == IS_A:
return "subClassOf"
n = self._node(curie)
if n:
return n.lbl
def set_label(self, curie: CURIE, label: str, lang: Optional[LANGUAGE_TAG] = None) -> bool:
if lang:
raise NotImplementedError("Language tags not supported")
n = self._node(curie, True)
n.lbl = label
return True
def curies_by_label(self, label: str) -> List[CURIE]:
return [self.uri_to_curie(n.id) for n in self._nodes() if n.lbl == label]
def create_entity(
self,
curie: CURIE,
label: Optional[str] = None,
relationships: Optional[RELATIONSHIP_MAP] = None,
type: Optional[str] = None,
**kwargs,
) -> CURIE:
g = self._entire_graph()
g.nodes.append(Node(curie, lbl=label, type=type))
for p, objs in relationships:
for obj in objs:
g.edges.append(Edge(curie, p, obj))
return curie
def definition(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]:
if lang:
raise NotImplementedError("Language tags not supported")
m = self._meta(curie)
if m:
return m.definition.val
def comments(self, curies: Iterable[CURIE]) -> Iterable[Tuple[CURIE, str]]:
for curie in curies:
m = self._meta(curie)
if m:
for v in m.comments:
yield curie, v
def entity_alias_map(self, curie: CURIE) -> ALIAS_MAP:
meta = self._meta(curie)
m = defaultdict(list)
lbl = self.label(curie)
if lbl:
m[LABEL_PREDICATE] = [lbl]
if meta is not None:
for syn in meta.synonyms:
pred = SCOPE_MAP.get(syn.pred, None)
m[pred].append(syn.val)
return m
# TODO: DRY
def relationships(
self,
subjects: List[CURIE] = None,
predicates: List[PRED_CURIE] = None,
objects: List[CURIE] = None,
include_tbox: bool = True,
include_abox: bool = True,
include_entailed: bool = False,
exclude_blank: bool = True,
) -> Iterator[RELATIONSHIP]:
ei = self.edge_index
if include_entailed:
raise NotImplementedError("Entailment not supported for pronto")
yield from ei.edges(
subjects=subjects,
predicates=predicates,
objects=objects,
)
# TODO: DRY
def outgoing_relationships(
self, curie: CURIE, predicates: List[PRED_CURIE] = None, entailed=False
) -> Iterator[Tuple[PRED_CURIE, CURIE]]:
for s, p, o in self.relationships([curie], predicates, include_entailed=entailed):
if s == curie:
yield p, o
# TODO: DRY
def outgoing_relationship_map(self, *args, **kwargs) -> RELATIONSHIP_MAP:
return pairs_as_dict(self.outgoing_relationships(*args, **kwargs))
# TODO: DRY
def incoming_relationships(
self, curie: CURIE, predicates: List[PRED_CURIE] = None, entailed=False
) -> Iterator[Tuple[PRED_CURIE, CURIE]]:
for s, p, o in self.relationships(None, predicates, [curie], include_entailed=entailed):
if o == curie:
yield p, s
# TODO: DRY
def incoming_relationship_map(self, *args, **kwargs) -> RELATIONSHIP_MAP:
return pairs_as_dict(self.incoming_relationships(*args, **kwargs))
# TODO: DRY
def basic_search(self, search_term: str, config: SearchConfiguration = None) -> Iterable[CURIE]:
# TODO: move up, avoid repeating packages
if config is None:
config = SearchConfiguration()
matches = []
mfunc = None
if config.syntax == SearchTermSyntax(SearchTermSyntax.STARTS_WITH):
mfunc = lambda label: str(label).startswith(search_term)
elif config.syntax == SearchTermSyntax(SearchTermSyntax.REGULAR_EXPRESSION):
prog = re.compile(search_term)
mfunc = lambda label: prog.search(label)
elif config.is_partial:
mfunc = lambda label: search_term in str(label)
else:
mfunc = lambda label: label == search_term
search_all = SearchProperty(SearchProperty.ANYTHING) in config.properties
logging.info(f"SEARCH={search_term}")
for t in self.entities():
lbl = self.label(t)
logging.debug(f"T={t} // {config}")
if (
search_all
or SearchProperty(SearchProperty.LABEL)
or config.properties not in config.properties
):
if lbl and mfunc(lbl):
matches.append(t)
logging.info(f"Name match to {t}")
continue
if search_all or SearchProperty(SearchProperty.IDENTIFIER) in config.properties:
if mfunc(t):
matches.append(t)
logging.info(f"identifier match to {t}")
continue
if search_all or SearchProperty(SearchProperty.ALIAS) in config.properties:
for syn in self.entity_aliases(t):
if mfunc(syn):
logging.info(f"Syn match to {t}")
matches.append(t)
continue
for m in matches:
yield m
def simple_mappings_by_curie(self, curie: CURIE) -> Iterable[Tuple[PRED_CURIE, CURIE]]:
meta = self._meta(curie, strict=False)
if meta:
# TODO: SKOS
for x in meta.xrefs:
yield HAS_DBXREF, x.val
def dump(self, path: str = None, syntax: str = "json", **kwargs):
logging.info(f"Dumping graph to {path} syntax: {syntax}")
if syntax == "json" or syntax == "obojson":
if path is None:
print(json_dumper.dumps(self.obograph_document))
else:
json_dumper.dump(self.obograph_document, to_file=str(path))
else:
super().dump(path, syntax, **kwargs)
def save(
self,
):
logging.info("Committing and flushing changes")
self.dump(self.resource.slug)
def load_graph(self, graph: Graph, replace: True) -> None:
if not replace:
raise NotImplementedError
self.obograph_document = GraphDocument(graphs=[graph])
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: MappingsInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def sssom_mappings(
self, curies: Optional[Union[CURIE, Iterable[CURIE]]] = None, source: Optional[str] = None
) -> Iterable[sssom.Mapping]:
raise NotImplementedError()
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: OboGraphInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def node(
self, curie: CURIE, strict=False, include_metadata=False, expand_curies=False
) -> obograph.Node:
return self._node(curie)
def as_obograph(self) -> Graph:
return self._entire_graph()
def logical_definitions(
self,
subjects: Iterable[CURIE] = None,
predicates: Iterable[PRED_CURIE] = None,
objects: Iterable[CURIE] = None,
**kwargs,
) -> Iterable[LogicalDefinitionAxiom]:
if subjects:
subjects = list(subjects)
for g in self.obograph_document.graphs:
for ldef in g.logicalDefinitionAxioms:
if logical_definition_matches(
ldef, subjects=subjects, predicates=predicates, objects=objects
):
yield ldef
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: SearchInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: PatcherInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def different_from(self, entity: CURIE, other_ontology: DifferInterface) -> bool:
raise NotImplementedError
def migrate_curies(self, curie_map: Mapping[CURIE, CURIE]) -> None:
raise NotImplementedError
def apply_patch(
self,
patch: kgcl.Change,
activity: kgcl.Activity = None,
metadata: Mapping[PRED_CURIE, Any] = None,
configuration: kgcl.Configuration = None,
) -> kgcl.Change:
raise NotImplementedError
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: OwlInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def transitive_object_properties(self) -> Iterable[CURIE]:
# TODO: obographs datamodel needs to be expanded to support this
pass
def simple_subproperty_of_chains(self) -> Iterable[Tuple[CURIE, List[CURIE]]]:
for g in self.obograph_document.graphs:
for pca in g.propertyChainAxioms:
yield self.uri_to_curie(pca.predicateId), [
self.uri_to_curie(p) for p in pca.chainPredicateIds
]