Source code for oaklib.implementations.simpleobo.simple_obo_implementation

import logging
import re
import shutil
import sys
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import (
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Mapping,
    Optional,
    TextIO,
    Tuple,
    Union,
)

import sssom_schema as sssom
from kgcl_schema.datamodel import kgcl

from oaklib.converters.obo_graph_to_obo_format_converter import (
    OboGraphToOboFormatConverter,
)
from oaklib.datamodels import obograph
from oaklib.datamodels.obograph import (
    Edge,
    ExistentialRestrictionExpression,
    Graph,
    GraphDocument,
    LogicalDefinitionAxiom,
    SynonymPropertyValue,
)
from oaklib.datamodels.search import SearchConfiguration
from oaklib.datamodels.search_datamodel import SearchProperty, SearchTermSyntax
from oaklib.datamodels.vocabulary import (
    CONSIDER_REPLACEMENT,
    CONTRIBUTOR,
    CREATED,
    CREATOR,
    DEPRECATED_PREDICATE,
    EQUIVALENT_CLASS,
    HAS_DBXREF,
    HAS_OBO_NAMESPACE,
    HAS_OBSOLESCENCE_REASON,
    INVERSE_OF,
    IS_A,
    LABEL_PREDICATE,
    OIO_CREATED_BY,
    OIO_CREATION_DATE,
    OIO_SUBSET_PROPERTY,
    OIO_SYNONYM_TYPE_PROPERTY,
    OWL_CLASS,
    OWL_OBJECT_PROPERTY,
    OWL_VERSION_IRI,
    RDFS_DOMAIN,
    RDFS_RANGE,
    SCOPE_TO_SYNONYM_PRED_MAP,
    SEMAPV,
    SKOS_MATCH_PREDICATES,
    SUBPROPERTY_OF,
    TERM_REPLACED_BY,
    TERMS_MERGED,
)
from oaklib.implementations.simpleobo.simple_obo_parser import (
    TAG_ALT_ID,
    TAG_COMMENT,
    TAG_CONSIDER,
    TAG_CREATED_BY,
    TAG_CREATION_DATE,
    TAG_DATA_VERSION,
    TAG_DEFINITION,
    TAG_DOMAIN,
    TAG_EQUIVALENT_TO,
    TAG_HOLDS_OVER_CHAIN,
    TAG_ID_SPACE,
    TAG_INVERSE_OF,
    TAG_IS_A,
    TAG_IS_OBSOLETE,
    TAG_IS_TRANSITIVE,
    TAG_NAME,
    TAG_NAMESPACE,
    TAG_ONTOLOGY,
    TAG_PROPERTY_VALUE,
    TAG_RANGE,
    TAG_RELATIONSHIP,
    TAG_REPLACED_BY,
    TAG_SUBSET,
    TAG_SUBSETDEF,
    TAG_SYNONYM,
    TAG_SYNONYMTYPEDEF,
    TAG_XREF,
    OboDocument,
    Stanza,
    _synonym_scope_pred,
    parse_obo_document,
)
from oaklib.inference.relation_graph_reasoner import RelationGraphReasoner
from oaklib.interfaces import TextAnnotatorInterface
from oaklib.interfaces.basic_ontology_interface import (
    ALIAS_MAP,
    DEFINITION,
    LANGUAGE_TAG,
    METADATA_MAP,
    RELATIONSHIP,
    RELATIONSHIP_MAP,
)
from oaklib.interfaces.differ_interface import DiffConfiguration, DifferInterface
from oaklib.interfaces.dumper_interface import DumperInterface
from oaklib.interfaces.mapping_provider_interface import MappingProviderInterface
from oaklib.interfaces.merge_interface import MergeInterface
from oaklib.interfaces.obograph_interface import OboGraphInterface
from oaklib.interfaces.obolegacy_interface import PRED_CODE, OboLegacyInterface
from oaklib.interfaces.owl_interface import OwlInterface
from oaklib.interfaces.patcher_interface import PatcherInterface
from oaklib.interfaces.rdf_interface import RdfInterface
from oaklib.interfaces.search_interface import SearchInterface
from oaklib.interfaces.semsim_interface import SemanticSimilarityInterface
from oaklib.interfaces.summary_statistics_interface import SummaryStatisticsInterface
from oaklib.interfaces.taxon_constraint_interface import TaxonConstraintInterface
from oaklib.interfaces.validator_interface import ValidatorInterface
from oaklib.resource import OntologyResource
from oaklib.types import CURIE, PRED_CURIE, SUBSET_CURIE
from oaklib.utilities.axioms.logical_definition_utilities import (
    logical_definition_matches,
)
from oaklib.utilities.kgcl_utilities import generate_change_id, tidy_change_object
from oaklib.utilities.mapping.sssom_utils import inject_mapping_sources


def _is_isa(x: str):
    return x == IS_A or x.lower() == "is_a" or x.lower() == "isa"


[docs] @dataclass class SimpleOboImplementation( ValidatorInterface, DifferInterface, RdfInterface, OboGraphInterface, OboLegacyInterface, SearchInterface, MappingProviderInterface, PatcherInterface, SummaryStatisticsInterface, SemanticSimilarityInterface, TaxonConstraintInterface, TextAnnotatorInterface, DumperInterface, MergeInterface, OwlInterface, ): """ Simple OBO-file backed implementation This implementation is incomplete and is intended primarily as a Patcher implementation This can be abandoned when pronto is less strict """ obo_document: OboDocument = None _relationship_index_cache: Dict[CURIE, List[RELATIONSHIP]] = None _alt_id_to_replacement_map: Dict[CURIE, List[CURIE]] = None _uses_legacy_properties: bool = None def __post_init__(self): if self.obo_document is None: resource = self.resource if resource and resource.local_path: logging.info(f"Creating doc for {resource}") self.obo_document = parse_obo_document(resource.local_path) if "edit.obo" in str(resource.local_path) and self.auto_relax_axioms is None: # TODO: in future ontology modules should explicitly set this in the metadata logging.info( f"Auto-setting auto_relax_axioms based on name: {resource.local_path}" ) self.auto_relax_axioms = True else: self.obo_document = OboDocument() for prefix, expansion in self.obo_document.header.pair_values(TAG_ID_SPACE): self.prefix_map()[prefix] = expansion def store(self, resource: OntologyResource = None) -> None: if resource is None: resource = self.resource od = self.obo_document if resource.local: if resource.slug: with open(str(resource.local_path), "w", encoding="UTF-8") as f: od.dump(f) else: od.dump(sys.stdout.buffer) else: raise NotImplementedError(f"Cannot dump to {resource}") def load_graph(self, graph: Graph, replace: True) -> None: if not replace: raise NotImplementedError("Cannot merge obograph") converter = OboGraphToOboFormatConverter() self.obo_document = OboDocument() gd = GraphDocument(graphs=[graph]) converter.convert(gd, self.obo_document) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: BasicOntologyInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def _all_relationships(self) -> Iterator[RELATIONSHIP]: logging.info("Commencing indexing") n = 0 entities = list(self.entities(filter_obsoletes=False)) for s in entities: t = self._stanza(s, strict=False) if t is None: # alt_ids continue is_relation = t.type == "Typedef" for v in t.simple_values(TAG_IS_A): n += 1 if is_relation: yield s, SUBPROPERTY_OF, self.map_shorthand_to_curie(v) else: yield s, IS_A, v for tag, prop in [ (TAG_INVERSE_OF, INVERSE_OF), (TAG_DOMAIN, RDFS_DOMAIN), (TAG_RANGE, RDFS_RANGE), ]: for v in t.simple_values(tag): n += 1 yield s, prop, self.map_shorthand_to_curie(v) for v in t.simple_values(TAG_EQUIVALENT_TO): n += 1 yield s, EQUIVALENT_CLASS, v yield v, EQUIVALENT_CLASS, s for p, v in t.pair_values(TAG_RELATIONSHIP): yield s, self.map_shorthand_to_curie(p), v # for p, v in t.intersection_of_tuples(): # n += 1 # yield s, self._get_relationship_type_curie(p), v logging.info(f"Indexed {n} relationships") if self.auto_relax_axioms: n = 0 logging.info("Auto-relaxing axioms") for ldef in self.logical_definitions(entities): for p in ldef.genusIds: yield ldef.definedClassId, IS_A, p n += 1 for r in ldef.restrictions: yield ldef.definedClassId, r.propertyId, r.fillerId n += 1 logging.info(f"Relaxed {n} relationships") def _all_entailed_relationships(self): reasoner = RelationGraphReasoner(self) yield from reasoner.entailed_edges() def entities(self, filter_obsoletes=True, owl_type=None) -> Iterable[CURIE]: od = self.obo_document for s_id, s in od.stanzas.items(): if filter_obsoletes: if s.get_boolean_value(TAG_IS_OBSOLETE): continue if ( owl_type is None or (owl_type == OWL_CLASS and s.type == "Term") or (owl_type == OWL_OBJECT_PROPERTY and s.type == "Typedef") ): if s.type == "Typedef": yield self.map_shorthand_to_curie(s_id) else: yield s_id if not owl_type or owl_type == OWL_CLASS: # note that in the case of alt_ids, metadata such as # original owl_type is lost. We assume that the original # owl_type was OWL_CLASS if not filter_obsoletes: for s in self._get_alt_id_to_replacement_map().keys(): yield s if not owl_type or owl_type == OIO_SUBSET_PROPERTY: for v in od.header.simple_values(TAG_SUBSETDEF): yield v if not owl_type or owl_type == OIO_SYNONYM_TYPE_PROPERTY: for v in od.header.simple_values(TAG_SYNONYMTYPEDEF): yield v def owl_types(self, entities: Iterable[CURIE]) -> Iterable[Tuple[CURIE, CURIE]]: od = self.obo_document for curie in entities: s = self._stanza(curie, False) if s is None: if curie in self.subsets(): yield curie, OIO_SUBSET_PROPERTY elif curie in od.header.simple_values(TAG_SYNONYMTYPEDEF): yield curie, OIO_SYNONYM_TYPE_PROPERTY else: yield curie, None else: if s.type == "Term": yield curie, OWL_CLASS elif s.type == "Typedef": yield curie, OWL_OBJECT_PROPERTY else: raise ValueError(f"Unknown stanza type: {s.type}") def obsoletes(self, include_merged=True) -> Iterable[CURIE]: od = self.obo_document for s in od.stanzas.values(): if s.get_boolean_value(TAG_IS_OBSOLETE): yield s.id if include_merged: for s in self._get_alt_id_to_replacement_map().keys(): yield s def subsets(self) -> Iterable[CURIE]: od = self.obo_document for s in od.header.simple_values(TAG_SUBSETDEF): yield s def subset_members(self, subset: SUBSET_CURIE) -> Iterable[CURIE]: od = self.obo_document for s in od.stanzas.values(): if subset in s.simple_values(TAG_SUBSET): yield s.id def terms_subsets(self, curies: Iterable[CURIE]) -> Iterable[Tuple[CURIE, SUBSET_CURIE]]: for curie in curies: s = self._stanza(curie, False) if s: for subset in s.simple_values(TAG_SUBSET): yield curie, subset def ontologies(self) -> Iterable[CURIE]: od = self.obo_document for v in od.header.simple_values(TAG_ONTOLOGY): yield v def ontology_metadata_map(self, ontology: CURIE) -> METADATA_MAP: m = defaultdict(list) m["id"] = [ontology] omo_map = { TAG_DATA_VERSION: OWL_VERSION_IRI, } header = self.obo_document.header for tv in header.tag_values: tag = tv.tag if tag in omo_map: p = omo_map[tag] val = tv.value if p == OWL_VERSION_IRI: val = f"obo:{ontology}/{val}{ontology}.owl" m[p].append(val) return dict(m) def _stanza(self, curie: CURIE, strict=True) -> Optional[Stanza]: stanza = self.obo_document.stanzas.get(curie, None) if stanza is None: alt_curie = self.map_curie_to_shorthand(curie) if alt_curie and alt_curie != curie: stanza = self.obo_document.stanzas.get(alt_curie) if strict and not stanza: raise ValueError(f"No such stanza {curie}") return stanza def label(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]: if lang: raise NotImplementedError("Language tags not supported") s = self._stanza(curie, False) if s: return s.singular_value(TAG_NAME) else: if curie == IS_A: return "subClassOf" else: return None def set_label(self, curie: CURIE, label: str, lang: Optional[LANGUAGE_TAG] = None) -> bool: if lang: raise NotImplementedError("Language tags not supported") s = self._stanza(curie, False) s.set_singular_tag(TAG_NAME, label) return True def curies_by_label(self, label: str) -> List[CURIE]: return [ s.id for s in self.obo_document.stanzas.values() if s.singular_value(TAG_NAME, False) == label ] def _lookup(self, label_or_curie: str) -> CURIE: if ":" in label_or_curie and " " not in label_or_curie: return label_or_curie else: candidates = self.curies_by_label(label_or_curie) if len(candidates) != 1: raise ValueError(f"{label_or_curie} => {candidates}") return candidates[0] def create_entity( self, curie: CURIE, label: Optional[str] = None, relationships: Optional[RELATIONSHIP_MAP] = None, type: Optional[str] = None, replace=False, ) -> CURIE: if type is None or type == OWL_CLASS: type = "Term" elif type == OWL_OBJECT_PROPERTY: type = "Typedef" else: raise ValueError(f"Cannot handle type: {type}") stanza = self._stanza(curie, False) if stanza: if replace: stanza = None if not stanza: stanza = Stanza(id=curie, type=type) stanza.add_tag_value(TAG_NAME, label) self.obo_document.add_stanza(stanza) if relationships: for pred, fillers in relationships.items(): for filler in fillers: self.add_relationship(curie, pred, filler) def add_relationship(self, curie: CURIE, predicate: PRED_CURIE, filler: CURIE, **kwargs): t = self._stanza(curie) if predicate == IS_A: t.add_tag_value(TAG_IS_A, filler, **kwargs) else: predicate_code = self.map_curie_to_shorthand(predicate) t.add_tag_value_pair(TAG_RELATIONSHIP, predicate_code, filler, **kwargs) self._clear_relationship_index() def remove_relationship(self, curie: CURIE, predicate: Optional[PRED_CURIE], filler: CURIE): t = self._stanza(curie) if not predicate or predicate == IS_A: t.remove_simple_tag_value(TAG_IS_A, filler) else: predicate_code = self.map_curie_to_shorthand(predicate) t.remove_pairwise_tag_value(TAG_RELATIONSHIP, predicate_code, filler) self._clear_relationship_index() def definition(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]: s = self._stanza(curie, strict=False) if s: return s.quoted_value(TAG_DEFINITION) def definitions( self, curies: Iterable[CURIE], include_metadata=False, include_missing=False, lang: Optional[LANGUAGE_TAG] = None, ) -> Iterator[DEFINITION]: for curie in curies: s = self._stanza(curie, strict=False) if s: d = s.quoted_value(TAG_DEFINITION) if d: if include_metadata: defn_tvs = [tv for tv in s.tag_values if tv.tag == TAG_DEFINITION] if defn_tvs: defn_tv = defn_tvs[0] defn, xrefs = defn_tv.as_definition() yield curie, defn, {HAS_DBXREF: xrefs} else: yield curie, d, None elif include_missing: yield curie, None, None def comments(self, curies: Iterable[CURIE]) -> Iterable[Tuple[CURIE, str]]: for curie in curies: s = self._stanza(curie) if s: yield curie, s.singular_value(TAG_COMMENT) def entity_alias_map(self, curie: CURIE) -> ALIAS_MAP: s = self._stanza(curie, strict=False) if s is None: return {} m = defaultdict(list) lbl = self.label(curie) if lbl: m[LABEL_PREDICATE] = [lbl] for st in s.synonyms(): syn, pred, _type, _xrefs = st pred = _synonym_scope_pred(pred) m[pred].append(syn) return m def synonym_property_values( self, subject: Union[CURIE, Iterable[CURIE]] ) -> Iterator[Tuple[CURIE, SynonymPropertyValue]]: if isinstance(subject, str): subject = [subject] for curie in subject: s = self._stanza(curie, strict=False) if not s: continue for syn in s.synonyms(): pred = _synonym_scope_pred(syn[1]).replace("oio:", "") yield curie, SynonymPropertyValue( pred=pred, val=syn[0], synonymType=syn[2], xrefs=syn[3] ) def map_shorthand_to_curie(self, rel_code: PRED_CODE) -> PRED_CURIE: """ Maps either a true relationship type CURIE or a shorthand packages to a CURIE. See `section 5.9 <https://owlcollab.github.io/oboformat/doc/obo-syntax.html#5.9>`_ :param rel_code: :return: """ for _, x in self.simple_mappings_by_curie(rel_code): if x.startswith("BFO:") or x.startswith("RO:"): return x if ":" not in rel_code and ":" in x: return x return rel_code def map_curie_to_shorthand(self, rel_type: PRED_CURIE) -> PRED_CODE: """ Reciprocal of `_get_relationship_type_curie` :param rel_type: :return: """ if rel_type: is_core = rel_type.startswith("BFO:") or rel_type.startswith("RO:") for s in self.obo_document.stanzas.values(): if s.type == "Typedef": for x in s.simple_values(TAG_XREF): if x == rel_type: if is_core or ":" not in s.id: return s.id return rel_type def relationships( self, subjects: List[CURIE] = None, predicates: List[PRED_CURIE] = None, objects: List[CURIE] = None, include_tbox: bool = True, include_abox: bool = True, include_entailed: bool = False, exclude_blank: bool = True, ) -> Iterator[RELATIONSHIP]: ei = self.edge_index if include_entailed: ei = self.entailed_edge_index yield from ei.edges( subjects=subjects, predicates=predicates, objects=objects, ) def basic_search(self, search_term: str, config: SearchConfiguration = None) -> Iterable[CURIE]: # TODO: move up, avoid repeating packages if config is None: config = SearchConfiguration() matches = [] mfunc = None if config.syntax == SearchTermSyntax(SearchTermSyntax.STARTS_WITH): mfunc = lambda label: str(label).startswith(search_term) elif config.syntax == SearchTermSyntax(SearchTermSyntax.REGULAR_EXPRESSION): prog = re.compile(search_term) mfunc = lambda label: prog.search(label) elif config.is_partial: mfunc = lambda label: search_term in str(label) else: mfunc = lambda label: label == search_term search_all = SearchProperty(SearchProperty.ANYTHING) in config.properties logging.info(f"SEARCH={search_term}") for t in self.entities(filter_obsoletes=False): lbl = self.label(t) logging.debug(f"T={t} // {config}") if ( search_all or SearchProperty(SearchProperty.LABEL) or config.properties not in config.properties ): if lbl and mfunc(lbl): matches.append(t) logging.info(f"Name match to {t}") continue if search_all or SearchProperty(SearchProperty.IDENTIFIER) in config.properties: if mfunc(t): matches.append(t) logging.info(f"identifier match to {t}") continue if ( search_all or SearchProperty(SearchProperty.REPLACEMENT_IDENTIFIER) in config.properties ): s = self._stanza(t, strict=False) if s: for r in s.simple_values(TAG_REPLACED_BY): if mfunc(t): matches.append(r) logging.info(f"replaced_by match to {t}") continue for a in s.simple_values(TAG_ALT_ID): if mfunc(a): matches.append(t) logging.info(f"alternate_id match to {t}") continue if search_all or SearchProperty(SearchProperty.ALIAS) in config.properties: for syn in self.entity_aliases(t): if mfunc(syn): logging.info(f"Syn match to {t}") matches.append(t) continue if search_all or SearchProperty(SearchProperty.MAPPED_IDENTIFIER) in config.properties: for x in self.simple_mappings_by_curie(t): if mfunc(x): logging.info(f"Syn match to {t}") matches.append(t) continue for m in matches: yield m def simple_mappings_by_curie(self, curie: CURIE) -> Iterable[Tuple[PRED_CURIE, CURIE]]: t = self._stanza(curie, strict=False) if t: for v in t.simple_values(TAG_XREF): yield HAS_DBXREF, v def entity_metadata_map(self, curie: CURIE) -> METADATA_MAP: t = self._stanza(curie, strict=False) _alt_id_map = self._get_alt_id_to_replacement_map() m = defaultdict(list) if t: for tag, mkey in [ (TAG_REPLACED_BY, TERM_REPLACED_BY), (TAG_CONSIDER, CONSIDER_REPLACEMENT), (TAG_NAMESPACE, HAS_OBO_NAMESPACE), (TAG_IS_OBSOLETE, DEPRECATED_PREDICATE), (TAG_CREATION_DATE, OIO_CREATION_DATE), (TAG_CREATED_BY, OIO_CREATED_BY), ]: for v in t.simple_values(tag): if tag == TAG_IS_OBSOLETE: v = True if v == "true" else False m[mkey].append(v) for pv in t.property_values(): m[self.map_shorthand_to_curie(pv[0])].append(pv[1]) if curie in _alt_id_map: m[TERM_REPLACED_BY] += _alt_id_map[curie] m[DEPRECATED_PREDICATE].append(True) m[HAS_OBSOLESCENCE_REASON].append(TERMS_MERGED) self.add_missing_property_values(curie, m) return dict(m) def _get_alt_id_to_replacement_map(self) -> Dict[CURIE, List[CURIE]]: if self._alt_id_to_replacement_map is None: self._alt_id_to_replacement_map = defaultdict(list) for e in self.entities(): t = self._stanza(e, False) if t: for a in t.simple_values(TAG_ALT_ID): self._alt_id_to_replacement_map[a].append(e) return self._alt_id_to_replacement_map def clone(self, resource: OntologyResource) -> "SimpleOboImplementation": shutil.copyfile(self.resource.slug, resource.slug) return type(self)(resource) def dump(self, path: Union[str, TextIO] = None, syntax: str = "obo", **kwargs): if syntax is None or syntax == "obo": if isinstance(path, str) or isinstance(path, Path): logging.info(f"Saving to {path}") with open(path, "w", encoding="UTF-8") as file: self.obo_document.dump(file) else: self.obo_document.dump(path) else: super().dump(path, syntax=syntax) def save( self, ): logging.info("Committing and flushing changes") self.dump(self.resource.slug) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: MappingsInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def get_sssom_mappings_by_curie(self, curie: Union[str, CURIE]) -> Iterator[sssom.Mapping]: s = self._stanza(curie, strict=False) if s: for x in s.simple_values(TAG_XREF): m = sssom.Mapping( subject_id=curie, predicate_id=HAS_DBXREF, object_id=x, mapping_justification=sssom.EntityReference(SEMAPV.UnspecifiedMatching.value), ) inject_mapping_sources(m) yield m for x in s.property_values(): p = self.map_shorthand_to_curie(x[0]) if p in SKOS_MATCH_PREDICATES: m = sssom.Mapping( subject_id=curie, predicate_id=p, object_id=x[1], mapping_justification=sssom.EntityReference( SEMAPV.UnspecifiedMatching.value ), ) inject_mapping_sources(m) yield m for p, v in s.pair_values(TAG_RELATIONSHIP): p = self.map_shorthand_to_curie(p) if p in SKOS_MATCH_PREDICATES: m = sssom.Mapping( subject_id=curie, predicate_id=p, object_id=v, mapping_justification=sssom.EntityReference( SEMAPV.UnspecifiedMatching.value ), ) inject_mapping_sources(m) yield m # TODO: use a cache to avoid re-calculating for _, stanza in self.obo_document.stanzas.items(): if len(stanza.simple_values(TAG_XREF)) > 0: for x in stanza.simple_values(TAG_XREF): if x == curie: m = sssom.Mapping( subject_id=stanza.id, predicate_id=HAS_DBXREF, object_id=curie, mapping_justification=SEMAPV.UnspecifiedMatching.value, ) inject_mapping_sources(m) yield m # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: OboGraphInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def node(self, curie: CURIE, strict=False, include_metadata=False) -> obograph.Node: t = self._stanza(curie, strict=False) if t is None: return obograph.Node(id=curie) else: types = self.owl_type(curie) if OWL_CLASS in types: typ = "CLASS" elif OWL_OBJECT_PROPERTY in types: typ = "PROPERTY" else: typ = None meta = obograph.Meta() if include_metadata: for s in t.simple_values(TAG_SUBSET): meta.subsets.append(s) defn = self.definition(curie) if defn: meta.definition = obograph.DefinitionPropertyValue(val=defn) for _, syn in self.synonym_property_values([curie]): meta.synonyms.append(syn) for _, subset in self.terms_subsets([curie]): meta.subsets.append(subset) return obograph.Node(id=curie, lbl=self.label(curie), type=typ, meta=meta) def as_obograph(self, expand_curies=False) -> Graph: def expand(curie: CURIE) -> CURIE: if expand_curies: uri = self.curie_to_uri(curie, strict=False) return uri if uri is not None else curie else: return curie entities = list(self.entities()) nodes = [self.node(expand(curie)) for curie in entities] edges = [ Edge(sub=expand(r[0]), pred=expand(r[1]), obj=expand(r[2])) for r in self.relationships() ] ldefs = list(self.logical_definitions(entities)) return Graph(id="TODO", nodes=nodes, edges=edges, logicalDefinitionAxioms=ldefs) def logical_definitions( self, subjects: Optional[Iterable[CURIE]] = None, predicates: Iterable[PRED_CURIE] = None, objects: Iterable[CURIE] = None, **kwargs, ) -> Iterable[LogicalDefinitionAxiom]: if subjects is None: subjects = self.entities() for s in subjects: t = self._stanza(s, strict=False) if not t: continue ldef_tuples = t.intersection_of_tuples() if ldef_tuples: ldef = LogicalDefinitionAxiom(definedClassId=s) for m1, m2 in ldef_tuples: if m2: ldef.restrictions.append( ExistentialRestrictionExpression( propertyId=self.map_shorthand_to_curie(m1), fillerId=m2 ) ) else: ldef.genusIds.append(m1) if logical_definition_matches(ldef, predicates=predicates, objects=objects): yield ldef # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: SearchInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: DifferInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def diff( self, other_ontology: DifferInterface, configuration: DiffConfiguration = None, **kwargs, ) -> Iterator[kgcl.Change]: if configuration is None: configuration = DiffConfiguration() if not isinstance(other_ontology, SimpleOboImplementation): raise ValueError("Can only diff SimpleOboImplementation") stanzas1 = self.obo_document.stanzas stanzas2 = other_ontology.obo_document.stanzas all_ids = set(stanzas1.keys()).union(stanzas2.keys()) for id in all_ids: yield from self._diff_stanzas(stanzas1.get(id, None), stanzas2.get(id, None)) def _diff_stanzas( self, stanza1: Optional[Stanza], stanza2: Optional[Stanza] ) -> Iterator[kgcl.Change]: def _id(): return generate_change_id() node_is_deleted = False node_is_created = False if stanza1 is None and stanza2 is None: raise ValueError("Both stanzas are None") if stanza1 is None: stanza1 = Stanza(id=stanza2.id, type=stanza2.type) if stanza2.type == "Term": yield kgcl.ClassCreation( id=_id(), about_node=stanza2.id, name=stanza2.singular_value(TAG_NAME) ) elif stanza2.type == "Typedef": yield kgcl.NodeCreation( id=_id(), about_node=stanza2.id, name=stanza2.singular_value(TAG_NAME) ) else: raise ValueError(f"Unknown stanza type: {stanza2.type}") node_is_created = True if stanza2 is None: stanza2 = Stanza(id=stanza1.id, type=stanza1.type) if stanza1.type == "Term": yield kgcl.NodeDeletion(id=_id(), about_node=stanza1.id) else: yield kgcl.NodeDeletion(id=_id(), about_node=stanza1.id) node_is_deleted = True if stanza1 == stanza2: return if stanza1.type != stanza2.type: raise ValueError(f"Stanza types differ: {stanza1.type} vs {stanza2.type}") t1id = stanza1.id t2id = stanza2.id logging.info(f"Diffing: {t1id} vs {t2id}") def _tv_dict(stanza: Stanza) -> Dict[str, List[str]]: d = defaultdict(set) for tv in stanza.tag_values: d[tv.tag].add(tv.value) return d tv_dict1 = _tv_dict(stanza1) tv_dict2 = _tv_dict(stanza2) all_tags = set(tv_dict1.keys()).union(tv_dict2.keys()) for tag in all_tags: vals1 = tv_dict1.get(tag, []) vals2 = tv_dict2.get(tag, []) vals1list = list(vals1) vals2list = list(vals2) tvs1 = [tv for tv in stanza1.tag_values if tv.tag == tag] tvs2 = [tv for tv in stanza2.tag_values if tv.tag == tag] if vals1 == vals2: continue logging.info(f"Difference in {tag}: {vals1} vs {vals2}") if tag == TAG_NAME: if node_is_deleted or node_is_created: continue if vals1 and vals2: yield kgcl.NodeRename( id=_id(), about_node=t1id, new_value=vals2list[0], old_value=vals1list[0] ) elif vals2: # Existing node goes from having no name to having a name # In future KGCL may have a NodeNewName. For now we use NodeRename. yield kgcl.NodeRename( id=_id(), about_node=t1id, new_value=vals2list[0], old_value=None ) else: yield kgcl.NodeDeletion( id=_id(), about_node=t1id, old_value=vals1list[0], new_value=None ) elif tag == TAG_DEFINITION: if node_is_deleted: continue # TODO: provenance changes td1 = stanza1.quoted_value(TAG_DEFINITION) td2 = stanza2.quoted_value(TAG_DEFINITION) if vals1 and vals2: yield kgcl.NodeTextDefinitionChange( id=_id(), about_node=t1id, new_value=td2, old_value=td1 ) elif vals1: yield kgcl.RemoveTextDefinition(id=_id(), about_node=t1id, old_value=td1) else: yield kgcl.NewTextDefinition(id=_id(), about_node=t2id, new_value=td2) elif tag == TAG_IS_OBSOLETE: if node_is_deleted: continue if vals1 and not vals2: yield kgcl.NodeUnobsoletion(id=_id(), about_node=t1id) elif not vals1 and vals2: replaced_by = stanza2.simple_values(TAG_REPLACED_BY) if replaced_by: yield kgcl.NodeObsoletionWithDirectReplacement( id=_id(), about_node=t2id, has_direct_replacement=replaced_by[0] ) else: yield kgcl.NodeObsoletion(id=_id(), about_node=t2id) elif tag == TAG_SUBSET: if node_is_deleted: continue xrefs1 = stanza1.simple_values(TAG_SUBSET) xrefs2 = stanza2.simple_values(TAG_SUBSET) for xref in xrefs1: if xref not in xrefs2: yield kgcl.RemoveNodeFromSubset(id=_id(), about_node=t1id, in_subset=xref) for xref in xrefs2: if xref not in xrefs1: yield kgcl.AddNodeToSubset(id=_id(), about_node=t2id, in_subset=xref) elif tag == TAG_IS_A: isas1 = stanza1.simple_values(TAG_IS_A) isas2 = stanza2.simple_values(TAG_IS_A) for isa in isas1: if isa not in isas2: yield kgcl.EdgeDeletion(id=_id(), subject=t1id, predicate=IS_A, object=isa) for isa in isas2: if isa not in isas1: yield kgcl.EdgeCreation(id=_id(), subject=t2id, predicate=IS_A, object=isa) elif tag == TAG_RELATIONSHIP: rels1 = stanza1.pair_values(TAG_RELATIONSHIP) rels2 = stanza2.pair_values(TAG_RELATIONSHIP) for p, v in rels1: p_curie = self.map_shorthand_to_curie(p) if (p, v) not in rels2: yield kgcl.EdgeDeletion(id=_id(), subject=t1id, predicate=p_curie, object=v) for p, v in rels2: p_curie = self.map_shorthand_to_curie(p) if (p, v) not in rels1: yield kgcl.EdgeCreation(id=_id(), subject=t2id, predicate=p_curie, object=v) elif tag == TAG_SYNONYM: if node_is_deleted: continue # TODO: make this sensitive to annotation changes; for now we truncate the tuple syns1 = [tv.as_synonym()[0:2] for tv in tvs1] syns2 = [tv.as_synonym()[0:2] for tv in tvs2] for syn in syns1: if syn not in syns2: yield kgcl.RemoveSynonym(id=_id(), about_node=t1id, old_value=syn[0]) for syn in syns2: if syn not in syns1: pred = SCOPE_TO_SYNONYM_PRED_MAP[syn[1]] yield kgcl.NewSynonym( id=_id(), about_node=t2id, new_value=syn[0], predicate=pred ) elif tag == TAG_XREF: if node_is_deleted: continue xrefs1 = stanza1.simple_values(TAG_XREF) xrefs2 = stanza2.simple_values(TAG_XREF) for xref in xrefs1: if xref not in xrefs2: yield kgcl.RemoveMapping( id=_id(), about_node=t1id, object=xref, predicate=HAS_DBXREF ) for xref in xrefs2: if xref not in xrefs1: yield kgcl.MappingCreation( id=_id(), subject=t2id, object=xref, predicate=HAS_DBXREF ) def different_from(self, entity: CURIE, other_ontology: DifferInterface) -> bool: t1 = self._stanza(entity, strict=False) if t1: t2 = other_ontology._stanza(entity, strict=False) if t2: return str(t1) != str(t2) return True # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: PatcherInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def migrate_curies(self, curie_map: Mapping[CURIE, CURIE]) -> None: od = self.obo_document for t in od.stanzas.values(): t.replace_token(curie_map) od.reindex() self._rebuild_relationship_index() @property def uses_legacy_properties(self) -> bool: if self._uses_legacy_properties is not None: return self._uses_legacy_properties for s in self.obo_document.stanzas.values(): for tv in s.tag_values: if tv.tag in [TAG_CREATED_BY, TAG_CREATION_DATE]: self._uses_legacy_properties = True return True return False def set_uses_legacy_properties(self, value: bool) -> None: self._uses_legacy_properties = value def add_contributors(self, curie: CURIE, agents: List[CURIE]) -> None: t = self._stanza(curie, strict=True) for agent in agents: t.add_tag_value_pair(TAG_PROPERTY_VALUE, CONTRIBUTOR, agent) def set_creator(self, curie: CURIE, agent: CURIE, date: Optional[str] = None) -> None: t = self._stanza(curie, strict=True) if self._uses_legacy_properties: t.set_singular_tag(TAG_CREATED_BY, agent) else: t.add_tag_value_pair(TAG_PROPERTY_VALUE, CREATOR, agent) if date: self.set_creation_date(curie, date) def set_creation_date(self, curie: CURIE, date: str) -> None: t = self._stanza(curie, strict=True) if self._uses_legacy_properties: t.set_singular_tag(TAG_CREATION_DATE, date) else: t.add_tag_value_pair(TAG_PROPERTY_VALUE, CREATED, date) def apply_patch( self, patch: kgcl.Change, activity: kgcl.Activity = None, metadata: Mapping[PRED_CURIE, Any] = None, configuration: kgcl.Configuration = None, ) -> kgcl.Change: od = self.obo_document tidy_change_object(patch) logging.debug(f"Applying {patch}") modified_entities = [] if isinstance(patch, kgcl.NodeRename): # self.set_label(patch.about_node, _clean(patch.new_value)) self.set_label(patch.about_node, patch.new_value) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.NodeObsoletion): t = self._stanza(patch.about_node, strict=True) t.set_singular_tag(TAG_IS_OBSOLETE, "true") if isinstance(patch, kgcl.NodeObsoletionWithDirectReplacement): t.set_singular_tag(TAG_REPLACED_BY, patch.has_direct_replacement) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.NodeDeletion): try: del od.stanzas[patch.about_node] except KeyError: logging.error(f"CURIE {patch.about_node} does not exist in the OBO file provided.") elif isinstance(patch, kgcl.NodeCreation): self.create_entity(patch.about_node, patch.name) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.ClassCreation): self.create_entity(patch.about_node, patch.name) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.SynonymReplacement): t = self._stanza(patch.about_node, strict=True) n = 0 for tv in t.tag_values: if tv.tag == TAG_SYNONYM: syn = tv.as_synonym() if syn[0] == patch.old_value: tv.replace_quoted_part(patch.new_value) n += 1 if not n: raise ValueError(f"Failed to find synonym {patch.old_value} for {t.id}") modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.AddNodeToSubset): t = self._stanza(patch.about_node, strict=True) t.add_tag_value(TAG_SUBSET, patch.in_subset) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.RemoveNodeFromSubset): t = self._stanza(patch.about_node, strict=True) t.remove_simple_tag_value(TAG_SUBSET, patch.in_subset) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.NewTextDefinition): t = self._stanza(patch.about_node, strict=True) t.add_quoted_tag_value(TAG_DEFINITION, patch.new_value.strip("'"), xrefs=[]) modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.RemoveTextDefinition): t = self._stanza(patch.about_node, strict=True) for tv in t.tag_values: if tv.tag == TAG_DEFINITION: # This is a remove_definition request t.remove_tag_quoted_value(TAG_DEFINITION, t._quoted_value(tv.value)) elif isinstance(patch, kgcl.NodeTextDefinitionChange): t = self._stanza(patch.about_node, strict=True) for tv in t.tag_values: if tv.tag == TAG_DEFINITION: tv.replace_quoted_part(patch.new_value.strip("'")) elif isinstance(patch, kgcl.NewSynonym): t = self._stanza(patch.about_node, strict=True) # Get scope from patch.qualifier # rather than forcing all synonyms to be related. if isinstance(patch.qualifier, str): scope = patch.qualifier.upper() else: scope = str(patch.qualifier.value).upper() if patch.qualifier else "RELATED" v = patch.new_value.replace('"', '\\"') t.add_tag_value(TAG_SYNONYM, f'"{v}" {scope} []') modified_entities.append(patch.about_node) elif isinstance(patch, kgcl.RemoveSynonym): t = self._stanza(patch.about_node, strict=True) # scope = str(patch.qualifier.value).upper() if patch.qualifier else "RELATED" v = patch.old_value.strip( '"' ) # Handling a bug where quotes are accidentally introduced. t.remove_tag_quoted_value(TAG_SYNONYM, v) elif isinstance(patch, kgcl.EdgeCreation): description = patch.change_description self.add_relationship( patch.subject, patch.predicate, patch.object, description=description ) modified_entities.append(patch.subject) elif isinstance(patch, kgcl.EdgeDeletion): self.remove_relationship(patch.subject, patch.predicate, patch.object) elif isinstance(patch, kgcl.NodeMove): logging.warning(f"Cannot handle {patch}") elif isinstance(patch, kgcl.PredicateChange): e = patch.about_edge subject = self._lookup(e.subject) object = self._lookup(e.object) t = self._stanza(subject, strict=True) if _is_isa(patch.old_value): t.remove_simple_tag_value(TAG_IS_A, object) else: pred = self.map_curie_to_shorthand(patch.old_value) t.remove_pairwise_tag_value(TAG_RELATIONSHIP, pred, object) if _is_isa(patch.new_value): t.add_tag_value(TAG_IS_A, object) else: t.add_tag_value(TAG_RELATIONSHIP, f"{patch.new_value} {object}") self._clear_relationship_index() modified_entities.append(subject) else: raise NotImplementedError(f"cannot handle KGCL type {type(patch)}") if patch.contributor: self.add_contributors(patch.about_node, [patch.contributor]) modified_entities.append(patch.about_node) for e in modified_entities: stanza = self._stanza(e, strict=True) stanza.normalize_order() return patch # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Implements: OwlInterface # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def transitive_object_properties(self) -> Iterable[CURIE]: od = self.obo_document for s_id, s in od.stanzas.items(): if s.type == "Typedef": if s.get_boolean_value(TAG_IS_TRANSITIVE, False): yield self.map_shorthand_to_curie(s_id) def simple_subproperty_of_chains(self) -> Iterable[Tuple[CURIE, List[CURIE]]]: od = self.obo_document for s_id, s in od.stanzas.items(): if s.type == "Typedef": for p1, p2 in s.pair_values(TAG_HOLDS_OVER_CHAIN): curie = self.map_shorthand_to_curie(s_id) yield curie, [self.map_shorthand_to_curie(p1), self.map_shorthand_to_curie(p2)]