Source code for oaklib.implementations.pronto.pronto_implementation

import logging
import re
import shutil
import sys
import tempfile

# https://github.com/althonos/pronto/issues/173
import warnings
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Tuple, Union

import pronto
import sssom_schema as sssom
from deprecated import deprecated
from kgcl_schema.datamodel import kgcl
from linkml_runtime.dumpers import json_dumper
from pronto import LiteralPropertyValue, Ontology, ResourcePropertyValue, Term

from oaklib.converters.obo_graph_to_obo_format_converter import (
    OboGraphToOboFormatConverter,
)
from oaklib.datamodels import obograph
from oaklib.datamodels.obograph import Edge, Graph, GraphDocument
from oaklib.datamodels.search import SearchConfiguration
from oaklib.datamodels.search_datamodel import SearchProperty, SearchTermSyntax
from oaklib.datamodels.vocabulary import (
    CONSIDER_REPLACEMENT,
    DEPRECATED_PREDICATE,
    EQUIVALENT_CLASS,
    HAS_DBXREF,
    HAS_OBO_NAMESPACE,
    HAS_OBSOLESCENCE_REASON,
    IS_A,
    LABEL_PREDICATE,
    OIO_SUBSET_PROPERTY,
    OIO_SYNONYM_TYPE_PROPERTY,
    OWL_CLASS,
    OWL_OBJECT_PROPERTY,
    OWL_VERSION_INFO,
    SCOPE_TO_SYNONYM_PRED_MAP,
    SEMAPV,
    SKOS_MATCH_PREDICATES,
    TERM_REPLACED_BY,
    TERMS_MERGED,
)
from oaklib.inference.relation_graph_reasoner import RelationGraphReasoner
from oaklib.interfaces import TextAnnotatorInterface
from oaklib.interfaces.basic_ontology_interface import (
    ALIAS_MAP,
    DEFINITION,
    LANGUAGE_TAG,
    METADATA_MAP,
    PRED_CURIE,
    RELATIONSHIP,
    RELATIONSHIP_MAP,
)
from oaklib.interfaces.class_enrichment_calculation_interface import (
    ClassEnrichmentCalculationInterface,
)
from oaklib.interfaces.differ_interface import DifferInterface
from oaklib.interfaces.dumper_interface import DumperInterface
from oaklib.interfaces.mapping_provider_interface import MappingProviderInterface
from oaklib.interfaces.merge_interface import MergeInterface
from oaklib.interfaces.obograph_interface import OboGraphInterface
from oaklib.interfaces.obolegacy_interface import OboLegacyInterface
from oaklib.interfaces.owl_interface import OwlInterface
from oaklib.interfaces.patcher_interface import PatcherInterface
from oaklib.interfaces.rdf_interface import RdfInterface
from oaklib.interfaces.search_interface import SearchInterface
from oaklib.interfaces.semsim_interface import SemanticSimilarityInterface
from oaklib.interfaces.summary_statistics_interface import SummaryStatisticsInterface
from oaklib.interfaces.taxon_constraint_interface import TaxonConstraintInterface
from oaklib.interfaces.validator_interface import ValidatorInterface
from oaklib.resource import OntologyResource
from oaklib.types import CURIE, SUBSET_CURIE
from oaklib.utilities.axioms.logical_definition_utilities import (
    logical_definition_matches,
)
from oaklib.utilities.kgcl_utilities import tidy_change_object
from oaklib.utilities.mapping.sssom_utils import inject_mapping_sources

warnings.filterwarnings("ignore", category=pronto.warnings.SyntaxWarning, module="pronto")


def _synonym_scope_pred(s: pronto.Synonym) -> str:
    scope = s.scope.upper()
    if scope in SCOPE_TO_SYNONYM_PRED_MAP:
        return SCOPE_TO_SYNONYM_PRED_MAP[scope]
    else:
        raise ValueError(f"Unknown scope: {scope}")


[docs] @dataclass class ProntoImplementation( ValidatorInterface, RdfInterface, OboGraphInterface, OboLegacyInterface, SearchInterface, MappingProviderInterface, PatcherInterface, DifferInterface, # AssociationProviderInterface, ClassEnrichmentCalculationInterface, SemanticSimilarityInterface, TextAnnotatorInterface, SummaryStatisticsInterface, TaxonConstraintInterface, DumperInterface, MergeInterface, OwlInterface, ): """ An adapter that standardizes access to OBO Format files by wrapping the Pronto library. `Pronto <https://github.com/althonos/pronto/>`_ is a high-performance parsing library for parsing obo format 1.4 and other formats. Input Selector -------------- This adapter uses the ``pronto:`` :term:`Input Selector`; e.g. - ``pronto:path/to/my/file`` - ``pronto:https://example.com/my/file`` - ``prontolib:go` Examples -------- >>> from oaklib.implementations import ProntoImplementation >>> resource = OntologyResource(slug='go-nucleus.obo', directory='tests/input', local=True) >>> adapter = ProntoImplementation(resource) Or use a selector: >>> from oaklib import get_adapter >>> adapter = get_adapter("pronto:tests/input/go-nucleus.obo") Then you can use any of the methods implemented by pronto >>> rels = adapter.relationships(['GO:0005773']) >>> for _s, p, o in rels: ... print(f' {p} {o} ! {adapter.label(o)}') rdfs:subClassOf GO:0043231 ! intracellular membrane-bounded organelle BFO:0000050 GO:0005737 ! cytoplasm .. warning:: pronto uses the fastobo library for loading ontologies. This follows a strict interpretation of obo format, and some ontologies may fail to load. In these cases, consider an alternative implementation Alternatives: - use the :ref:`simple_obo_implementation` - convert to an OWL representation using ROBOT - convert to a JSON representation using ROBOT and use :ref:`obograph_implementation` """ wrapped_ontology: Ontology = None _relationship_index_cache: Dict[CURIE, List[RELATIONSHIP]] = None _alt_id_to_replacement_map: Dict[CURIE, List[CURIE]] = None def __post_init__(self): if self.wrapped_ontology is None: resource = self.resource logging.info(f"Pronto using resource: {resource}") kwargs = {} if resource and resource.import_depth is not None: kwargs["import_depth"] = resource.import_depth if resource is None: ontology = Ontology() elif resource.local: if resource.local_path: ontology = Ontology(str(resource.local_path), **kwargs) else: ontology = Ontology() else: ontology = Ontology.from_obo_library(resource.slug, **kwargs) self.wrapped_ontology = ontology for prefix, expansion in ontology.metadata.idspaces.items(): self.prefix_map()[prefix] = expansion[0] @classmethod @deprecated("old style") def create(cls, resource: OntologyResource = None) -> "ProntoImplementation": return ProntoImplementation(resource=resource) def _all_relationships(self) -> Iterator[RELATIONSHIP]: for s in self.entities(filter_obsoletes=False): term = self._entity(s) if isinstance(term, Term): # only "Terms" in pronto have relationships for o in term.superclasses(distance=1): if o.id != s: yield s, IS_A, o.id for rel_type, parents in term.relationships.items(): p = self._get_pronto_relationship_type_curie(rel_type) try: for o in parents: yield s, p, o.id except KeyError: pass if term.equivalent_to: for o in term.equivalent_to.ids: # symmetric yield s, EQUIVALENT_CLASS, o yield o, EQUIVALENT_CLASS, s def _all_entailed_relationships(self): reasoner = RelationGraphReasoner(self) yield from reasoner.entailed_edges() def store(self, resource: OntologyResource = None) -> None: if resource is None: resource = self.resource ontology = self.wrapped_ontology if resource.local: if resource.slug: with open(str(resource.local_path), "wb") as f: ontology.dump(f, format=resource.format) else: ontology.dump(sys.stdout.buffer, format=resource.format) else: raise NotImplementedError(f"Cannot dump to {resource}") def load_graph(self, graph: Graph, replace: True) -> None: if replace: converter = OboGraphToOboFormatConverter() gd = GraphDocument(graphs=[graph]) io = converter.as_bytes_io(gd) ont = Ontology(io) self.wrapped_ontology = ont self._relationship_index_cache = None self._alt_id_to_replacement_map = None return for n in graph.nodes: if n.id == IS_A: pass else: self.create_entity(n.id, n.lbl, type=n.type) for e in graph.edges: self.add_relationship(e.sub, e.pred, e.obj) @deprecated("Use this when we fix https://github.com/fastobo/fastobo/issues/42") def load_graph_using_jsondoc(self, graph: Graph, replace: True) -> None: tf = tempfile.NamedTemporaryFile() tf_name = "/tmp/tf.json" # noqa gd = GraphDocument(graphs=[graph]) json_dumper.dump(gd, to_file=tf_name) tf.flush() ont = Ontology(tf_name) if replace: self.wrapped_ontology = ont else: raise NotImplementedError # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: BasicOntologyInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def _entity(self, curie: CURIE, strict=False): for r in self.wrapped_ontology.relationships(): # TODO: use OboLegacyInterface # see https://owlcollab.github.io/oboformat/doc/obo-syntax.html#4.4.1 # pronto gives relations shorthand IDs for RO and BFO, as it is providing # oboformat as a level of abstraction. We want to map these back to the CURIEs if r.id == curie: return r if curie.startswith("RO:") or curie.startswith("BFO:"): if any(x for x in r.xrefs if x.id == curie): return r if curie in self.wrapped_ontology: return self.wrapped_ontology[curie] else: if strict: raise ValueError(f"No such CURIE: {curie}") return None def _create(self, curie: CURIE, exist_ok=True): if curie in self.wrapped_ontology: return self.wrapped_ontology[curie] else: return self.wrapped_ontology.create_term(curie) def _create_pred(self, curie: CURIE, exist_ok=True): if curie in self.wrapped_ontology.relationships(): return self.wrapped_ontology.get_relationship(curie) else: return self.wrapped_ontology.create_relationship(curie) def entities(self, filter_obsoletes=True, owl_type=None) -> Iterable[CURIE]: for t in self.wrapped_ontology.terms(): if filter_obsoletes and t.obsolete: continue if owl_type and owl_type != OWL_CLASS: continue yield t.id # note what Pronto calls "relationship" is actually "relationship type" for t in self.wrapped_ontology.relationships(): if filter_obsoletes and t.obsolete: continue if owl_type and owl_type != OWL_OBJECT_PROPERTY: continue curie = self._get_pronto_relationship_type_curie(t) yield curie for t in self.wrapped_ontology.synonym_types(): if owl_type and owl_type != OIO_SYNONYM_TYPE_PROPERTY: continue yield t.id for t in self.wrapped_ontology.metadata.subsetdefs: if owl_type and owl_type != OIO_SUBSET_PROPERTY: continue yield t.name if not owl_type or owl_type == OWL_CLASS: # note that in the case of alt_ids, metadata such as # original owl_type is lost. We assume that the original # owl_type was OWL_CLASS if not filter_obsoletes: for s in self._get_alt_id_to_replacement_map().keys(): yield s def ontologies(self) -> Iterable[CURIE]: yield self.wrapped_ontology.metadata.ontology def ontology_metadata_map(self, ontology: CURIE) -> METADATA_MAP: m = {} meta = self.wrapped_ontology.metadata ont_id = meta.ontology if meta.data_version: m[OWL_VERSION_INFO] = [f"obo:{ont_id}/{meta.data_version}{ont_id}.owl"] return m def owl_types(self, entities: Iterable[CURIE]) -> Iterable[Tuple[CURIE, CURIE]]: subset_names = [s.name for s in self.wrapped_ontology.metadata.subsetdefs] syntype_ids = [s.id for s in self.wrapped_ontology.synonym_types()] for curie in entities: if curie in self.wrapped_ontology.terms(): yield curie, OWL_CLASS elif curie in self.wrapped_ontology.relationships(): yield curie, OWL_OBJECT_PROPERTY elif curie in syntype_ids: yield curie, OIO_SYNONYM_TYPE_PROPERTY elif curie in subset_names: yield curie, OIO_SUBSET_PROPERTY else: yield curie, None def obsoletes(self, include_merged=True) -> Iterable[CURIE]: for t in self.wrapped_ontology.terms(): if t.obsolete: yield t.id # note what Pronto calls "relationship" is actually "relationship type" for t in self.wrapped_ontology.relationships(): if t.obsolete: yield t.id if include_merged: for a in self._get_alt_id_to_replacement_map().keys(): yield a def subsets(self) -> Iterable[CURIE]: reported = set() for s in self.wrapped_ontology.metadata.subsetdefs: reported.add(s.name) yield s.name # also yield implicit subsets subsets = set() for t in self.wrapped_ontology.terms(): subsets.update(t.subsets) for subset in subsets: if subset not in reported: yield subset def subset_members(self, subset: SUBSET_CURIE) -> Iterable[CURIE]: for t in self.wrapped_ontology.terms(): if subset in t.subsets: yield t.id def label(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> str: t = self._entity(curie) if t: return t.name else: if curie == IS_A: return "subClassOf" else: return None def set_label(self, curie: CURIE, label: str, lang: Optional[LANGUAGE_TAG] = None) -> bool: t = self._entity(curie) if t: curr = t.name if curr != label: t.name = label return True else: return False else: raise ValueError(f"No such ID: {curie}") def curies_by_label(self, label: str) -> List[CURIE]: return [t.id for t in self.wrapped_ontology.terms() if t.name == label] def _get_pronto_relationship_type_curie(self, rel_type: pronto.Relationship) -> CURIE: for x in rel_type.xrefs: if x.id.startswith("BFO:") or x.id.startswith("RO:"): return x.id for x in rel_type.xrefs: if x.id.startswith("http"): compacted = self.uri_to_curie(x.id) return compacted return rel_type.id def relationships( self, subjects: List[CURIE] = None, predicates: List[PRED_CURIE] = None, objects: List[CURIE] = None, include_tbox: bool = True, include_abox: bool = True, include_entailed: bool = False, exclude_blank: bool = True, ) -> Iterator[RELATIONSHIP]: ei = self.edge_index if include_entailed: ei = self.entailed_edge_index yield from ei.edges( subjects=subjects, predicates=predicates, objects=objects, ) def outgoing_relationships( self, curie: CURIE, predicates: List[PRED_CURIE] = None, entailed=False ) -> Iterator[Tuple[PRED_CURIE, CURIE]]: for _s, p, o in self.relationships([curie], predicates, include_entailed=entailed): yield p, o def create_entity( self, curie: CURIE, label: Optional[str] = None, relationships: Optional[RELATIONSHIP_MAP] = None, type: Optional[str] = None, replace=False, ) -> CURIE: ont = self.wrapped_ontology t = self._entity(curie, False) if t: if replace: t = None if not t: if not type or type == "CLASS": t = ont.create_term(curie) elif type == "PROPERTY": t = ont.create_relationship(curie) else: raise ValueError(f"Pronto cannot handle type of {type} for {curie}") t.name = label if relationships: for pred, fillers in relationships.items(): for filler in fillers: self.add_relationship(curie, pred, filler) logging.info(f"Created: {curie}") return curie def add_relationship(self, curie: CURIE, predicate: PRED_CURIE, filler: CURIE): t = self._entity(curie) filler_term = self._create(filler) if predicate == IS_A: t.superclasses().add(filler_term) else: predicate_term = self._create_pred(predicate) if predicate_term not in t.relationships.keys(): t.relationships[predicate_term] = [] t.relationships[predicate_term].add(filler_term) self._clear_relationship_index() def remove_relationship(self, curie: CURIE, predicate: Optional[PRED_CURIE], filler: CURIE): t = self._entity(curie) filler_term = self._entity(filler) if not predicate or predicate == IS_A: t.superclasses().remove(filler_term) else: predicate_term = self._create_pred(predicate) t.relationships[predicate_term].remove(filler_term) self._clear_relationship_index() def definition(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]: e = self._entity(curie) return str(e.definition) if e and e.definition else None def definitions( self, curies: Iterable[CURIE], include_metadata=False, include_missing=False, lang: Optional[LANGUAGE_TAG] = None, ) -> Iterator[DEFINITION]: for curie in curies: e = self._entity(curie) if not e: continue defn = e.definition if not defn and not include_missing: continue metadata = {} if include_metadata: metadata[HAS_DBXREF] = [] for x in defn.xrefs: metadata[HAS_DBXREF].append(x.id) yield curie, defn, metadata def comments(self, curies: Iterable[CURIE]) -> Iterable[Tuple[CURIE, str]]: for curie in curies: e = self._entity(curie) if e: yield curie, e.comment def entity_alias_map(self, curie: CURIE) -> ALIAS_MAP: t = self._entity(curie) if t is None: return {} m = defaultdict(list) m[LABEL_PREDICATE] = [t.name] for s in t.synonyms: pred = _synonym_scope_pred(s) m[pred].append(s.description) return m def simple_mappings_by_curie(self, curie: CURIE) -> Iterable[Tuple[PRED_CURIE, CURIE]]: t = self._entity(curie) if t is None: return for s in t.xrefs: yield HAS_DBXREF, s.id for s in t.annotations: rel = self._entity(s.property) if ":" in s.property: pred = s.property else: pred = self._get_pronto_relationship_type_curie(rel) if pred in SKOS_MATCH_PREDICATES: if isinstance(s, LiteralPropertyValue): v = s.literal yield pred, v elif isinstance(s, ResourcePropertyValue): yield pred, s.resource if isinstance(t, Term): for rel_type, parents in t.relationships.items(): pred = self._get_pronto_relationship_type_curie(rel_type) if pred in SKOS_MATCH_PREDICATES: for parent in parents: yield pred, parent.id def entity_metadata_map(self, curie: CURIE) -> METADATA_MAP: t = self._entity(curie) m = defaultdict(list) _alt_id_map = self._get_alt_id_to_replacement_map() if t: for ann in t.annotations: if isinstance(ann, LiteralPropertyValue): m[ann.property].append(ann.literal) elif isinstance(ann, ResourcePropertyValue): m[ann.property].append(ann.resource) if t.replaced_by: for x in t.replaced_by: m[TERM_REPLACED_BY].append(x.id) if t.consider: for x in t.consider: m[CONSIDER_REPLACEMENT].append(x.id) if t.obsolete: m[DEPRECATED_PREDICATE].append(True) if t.namespace: m[HAS_OBO_NAMESPACE].append(t.namespace) if curie in _alt_id_map: m[TERM_REPLACED_BY] += _alt_id_map[curie] m[DEPRECATED_PREDICATE].append(True) m[HAS_OBSOLESCENCE_REASON].append(TERMS_MERGED) self.add_missing_property_values(curie, m) return dict(m) def _get_alt_id_to_replacement_map(self) -> Dict[CURIE, List[CURIE]]: if self._alt_id_to_replacement_map is None: self._alt_id_to_replacement_map = defaultdict(list) for e in self.entities(): t = self._entity(e) if t and t.alternate_ids: for a in t.alternate_ids: self._alt_id_to_replacement_map[a].append(e) return self._alt_id_to_replacement_map def create_subontology(self, curies: List[CURIE]) -> "ProntoImplementation": subontology = Ontology() for curie in curies: t = self._entity(curie) subontology.create_term(curie) t2 = subontology[curie] t2.name = t.name # TODO - complete object return ProntoImplementation(wrapped_ontology=subontology) def clone(self, resource: Any) -> None: shutil.copyfile(self.resource.slug, resource.slug) return type(self)(resource) def dump( self, path: str = None, syntax: str = "obo", enforce_canonical_ordering=False, **kwargs ): if syntax is None: syntax = "obo" if syntax in ["obo", "json"]: if enforce_canonical_ordering: raise NotImplementedError("enforce_canonical_ordering not implemented for pronto") # TODO: simplify the logic here; not clear why pronto wants a binary file if isinstance(path, str): with open(path, "wb") as file: self.wrapped_ontology.dump(file, format=syntax) else: if path == sys.stdout: print(self.wrapped_ontology.dumps(format=syntax)) else: self.wrapped_ontology.dump(path, format=syntax) else: super().dump(path, syntax, **kwargs) def save( self, ): logging.info("Committing and flushing changes") self.dump(self.resource.slug) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: MappingsInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def sssom_mappings( self, curies: Optional[Union[CURIE, Iterable[CURIE]]] = None, source: Optional[str] = None ) -> Iterable[sssom.Mapping]: if isinstance(curies, CURIE): curies = [curies] elif curies is None: curies = list(self.entities()) else: curies = list(curies) # mappings where curie is the subject: for curie in curies: for pred, obj in self.simple_mappings_by_curie(curie): m = sssom.Mapping( subject_id=curie, predicate_id=pred, object_id=obj, mapping_justification=sssom.EntityReference(SEMAPV.UnspecifiedMatching.value), ) inject_mapping_sources(m) if source and m.object_source != source and m.subject_source != source: continue yield m # mappings where curie is the object: # TODO: use a cache to avoid re-calculating for e in self.entities(): t = self._entity(e) if t: for x in t.xrefs: if x.id in curies: m = sssom.Mapping( subject_id=e, predicate_id=HAS_DBXREF, object_id=x.id, mapping_justification=sssom.EntityReference( SEMAPV.UnspecifiedMatching.value ), ) inject_mapping_sources(m) if source and m.object_source != source and m.subject_source != source: continue yield m # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: OboGraphInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def node(self, curie: CURIE, strict=False, include_metadata=False) -> obograph.Node: t = self._entity(curie) if t is None: return obograph.Node(id=curie) else: meta = obograph.Meta() if isinstance(t, pronto.Relationship): t_id = self._get_pronto_relationship_type_curie(t) typ = "PROPERTY" else: t_id = t.id typ = "CLASS" if include_metadata: if t.definition: defn_xrefs = [x.id for x in t.definition.xrefs] meta.definition = obograph.DefinitionPropertyValue( val=t.definition, xrefs=defn_xrefs ) if t.xrefs: meta.xrefs = [obograph.XrefPropertyValue(val=x.id) for x in t.xrefs] if t.subsets: meta.subsets = [x for x in t.subsets] if isinstance(t, pronto.Relationship): for x in t.xrefs: if x.id.startswith("RO:") or x.id.startswith("BFO:"): t_id = x.id for s in t.synonyms: pred = SCOPE_TO_SYNONYM_PRED_MAP[s.scope].replace("oio:", "") synonym_type = s.type.id if s.type else None meta.synonyms.append( obograph.SynonymPropertyValue( val=s.description, pred=pred, synonymType=synonym_type, xrefs=[x.id for x in s.xrefs], ) ) return obograph.Node(id=t_id, lbl=t.name, type=typ, meta=meta) def as_obograph(self, expand_curies=False) -> Graph: om = self.wrapped_ontology.metadata entities = set(self.entities(filter_obsoletes=False, owl_type=OWL_CLASS)) # entities.extend(self.entities(filter_obsoletes=False, owl_type=OWL_OBJECT_PROPERTY)) nodes = [self.node(curie) for curie in entities] nodes = [n for n in nodes if n.lbl or n.meta] edges = [ Edge(sub=r[0], pred="is_a" if r[1] == IS_A else r[1], obj=r[2]) for r in self.relationships() ] ldefs = list(self.logical_definitions(entities)) graph_id = om.ontology if not graph_id: graph_id = self.resource.slug return Graph(id=graph_id, nodes=nodes, edges=edges, logicalDefinitionAxioms=ldefs) def synonym_property_values( self, subject: Union[CURIE, Iterable[CURIE]] ) -> Iterator[Tuple[CURIE, obograph.SynonymPropertyValue]]: if isinstance(subject, CURIE): subject = [subject] for curie in subject: e = self._entity(curie) if e: for s in e.synonyms: pred = _synonym_scope_pred(s).replace("oio:", "") xrefs = [x.id for x in s.xrefs] t = s.type.id if s.type else None spv = obograph.SynonymPropertyValue( pred=pred, val=s.description, xrefs=xrefs, synonymType=t ) yield curie, spv def logical_definitions( self, subjects: Optional[Iterable[CURIE]] = None, predicates: Iterable[PRED_CURIE] = None, objects: Iterable[CURIE] = None, **kwargs, ) -> Iterable[obograph.LogicalDefinitionAxiom]: if not subjects: subjects = self.entities() for s in subjects: term = self._entity(s) if term and term.intersection_of: ldef = obograph.LogicalDefinitionAxiom(definedClassId=s) for ix in term.intersection_of: if isinstance(ix, Term): ldef.genusIds.append(ix.id) else: (rel, filler) = ix rel = self._get_pronto_relationship_type_curie(rel) er = obograph.ExistentialRestrictionExpression( propertyId=rel, fillerId=filler.id ) ldef.restrictions.append(er) if logical_definition_matches(ldef, predicates=predicates, objects=objects): yield ldef # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: SearchInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def basic_search(self, search_term: str, config: SearchConfiguration = None) -> Iterable[CURIE]: if config is None: config = SearchConfiguration() matches = [] mfunc = None if config.syntax == SearchTermSyntax(SearchTermSyntax.STARTS_WITH): mfunc = lambda label: str(label).startswith(search_term) elif config.syntax == SearchTermSyntax(SearchTermSyntax.REGULAR_EXPRESSION): prog = re.compile(search_term) mfunc = lambda label: prog.search(label) elif config.is_partial: mfunc = lambda label: search_term in str(label) else: mfunc = lambda label: label == search_term search_all = SearchProperty(SearchProperty.ANYTHING) in config.properties logging.info(f"SEARCH={search_term}") for t in self.wrapped_ontology.terms(): logging.debug(f"T={t} // {config}") if ( search_all or SearchProperty(SearchProperty.LABEL) or config.properties not in config.properties ): if t.name and mfunc(t.name): matches.append(t.id) logging.info(f"Name match to {t.id}") continue if search_all or SearchProperty(SearchProperty.IDENTIFIER) in config.properties: if mfunc(t.id): matches.append(t.id) logging.info(f"identifier match to {t.id}") continue if ( search_all or SearchProperty(SearchProperty.REPLACEMENT_IDENTIFIER) in config.properties ): if t.replaced_by: for r in t.replaced_by: if mfunc(t.id): matches.append(r.id) logging.info(f"replaced_by match to {t.id}") continue if t.alternate_ids: for a in t.alternate_ids: if mfunc(a): matches.append(t.id) logging.info(f"alternate_id match to {t.id}") continue if search_all or SearchProperty(SearchProperty.ALIAS) in config.properties: for syn in t.synonyms: if mfunc(syn.description): logging.info(f"Syn match to {t.id}") matches.append(t.id) continue if search_all or SearchProperty(SearchProperty.MAPPED_IDENTIFIER) in config.properties: for x in t.xrefs: if mfunc(x.id): logging.info(f"Mapping match to {t.id}") matches.append(t.id) continue # if search_all or SearchProperty(SearchProperty.REPLACEMENT_IDENTIFIER) in config.properties: # if search_term in self._get_alt_id_to_replacement_map(): # matches.append(self._get_alt_id_to_replacement_map()[search_term]) for m in matches: yield m # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: PatcherInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def migrate_curies(self, curie_map: Dict[CURIE, CURIE]) -> None: pass def apply_patch( self, patch: kgcl.Change, activity: kgcl.Activity = None, metadata: Mapping[PRED_CURIE, Any] = None, configuration: kgcl.Configuration = None, ) -> Optional[kgcl.Change]: tidy_change_object(patch) if isinstance(patch, kgcl.NodeRename): self.set_label(patch.about_node, patch.new_value) elif isinstance(patch, kgcl.NodeObsoletion): t = self._entity(patch.about_node, strict=True) t.obsolete = True if isinstance(patch, kgcl.NodeObsoletionWithDirectReplacement): t.replaced_by = [self._entity(patch.has_direct_replacement)] elif isinstance(patch, kgcl.NodeDeletion): t = self._entity(patch.about_node, strict=True) raise NotImplementedError elif isinstance(patch, kgcl.NodeCreation): # TODO: decide which field to use in KGCL if patch.about_node: self.create_entity(patch.about_node, patch.name) else: self.create_entity(patch.node_id, patch.name) elif isinstance(patch, kgcl.SynonymReplacement): t = self._entity(patch.about_node, strict=True) for syn in t.synonyms: if syn.description == patch.old_value: syn.description = patch.new_value elif isinstance(patch, kgcl.NewTextDefinition): t = self._entity(patch.about_node, strict=True) xrefs = t.definition.xrefs if t.definition else [] t.definition = pronto.Definition(patch.new_value, xrefs=xrefs) elif isinstance(patch, kgcl.NodeTextDefinitionChange): t = self._entity(patch.about_node, strict=True) xrefs = t.definition.xrefs if t.definition else [] t.definition = pronto.Definition(patch.new_value, xrefs=xrefs) elif isinstance(patch, kgcl.NewSynonym): t = self._entity(patch.about_node, strict=True) # Get scope from patch.qualifier # rather than forcing all synonyms to be related. scope = str(patch.qualifier.value).upper() if patch.qualifier else "RELATED" t.add_synonym(description=patch.new_value, scope=scope) elif isinstance(patch, kgcl.AddNodeToSubset): t = self._entity(patch.about_node, strict=True) t.subsets = t.subsets.union({patch.in_subset}) elif isinstance(patch, kgcl.RemoveNodeFromSubset): t = self._entity(patch.about_node, strict=True) t.subsets = t.subsets.difference({patch.in_subset}) elif isinstance(patch, kgcl.EdgeCreation): self.add_relationship(patch.subject, patch.predicate, patch.object) elif isinstance(patch, kgcl.EdgeDeletion): self.remove_relationship(patch.subject, patch.predicate, patch.object) elif isinstance(patch, kgcl.RemoveSynonym): t = self._entity(patch.about_node, strict=True) synonym_to_remove = [ syn for syn in t._data().synonyms if syn.description == patch.old_value ][0] t._data().synonyms.discard(synonym_to_remove) else: raise NotImplementedError(f"cannot handle KGCL type {type(patch)}") return patch # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: OwlInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def transitive_object_properties(self) -> Iterable[CURIE]: for t in self.wrapped_ontology.relationships(): if t.transitive: yield self._get_pronto_relationship_type_curie(t) def simple_subproperty_of_chains(self) -> Iterable[Tuple[CURIE, List[CURIE]]]: for t in self.wrapped_ontology.relationships(): try: if t.holds_over_chain: subp = self._get_pronto_relationship_type_curie(t) r1, r2 = t.holds_over_chain p1 = self._get_pronto_relationship_type_curie(r1) p2 = self._get_pronto_relationship_type_curie(r2) yield subp, [p1, p2] except KeyError as e: logging.warning(f"could not find chain relationships for {t}: {e}")