import logging
import re
import shutil
import sys
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
TextIO,
Tuple,
Union,
)
import sssom_schema as sssom
from kgcl_schema.datamodel import kgcl
from oaklib.converters.obo_graph_to_obo_format_converter import (
OboGraphToOboFormatConverter,
)
from oaklib.datamodels import obograph
from oaklib.datamodels.obograph import (
Edge,
ExistentialRestrictionExpression,
Graph,
GraphDocument,
LogicalDefinitionAxiom,
SynonymPropertyValue,
)
from oaklib.datamodels.search import SearchConfiguration
from oaklib.datamodels.search_datamodel import SearchProperty, SearchTermSyntax
from oaklib.datamodels.vocabulary import (
CONSIDER_REPLACEMENT,
CONTRIBUTOR,
CREATED,
CREATOR,
DEPRECATED_PREDICATE,
EQUIVALENT_CLASS,
HAS_DBXREF,
HAS_OBO_NAMESPACE,
HAS_OBSOLESCENCE_REASON,
INVERSE_OF,
IS_A,
LABEL_PREDICATE,
OIO_CREATED_BY,
OIO_CREATION_DATE,
OIO_SUBSET_PROPERTY,
OIO_SYNONYM_TYPE_PROPERTY,
OWL_CLASS,
OWL_OBJECT_PROPERTY,
OWL_VERSION_IRI,
RDFS_DOMAIN,
RDFS_RANGE,
SCOPE_TO_SYNONYM_PRED_MAP,
SEMAPV,
SKOS_MATCH_PREDICATES,
SUBPROPERTY_OF,
TERM_REPLACED_BY,
TERMS_MERGED,
)
from oaklib.implementations.simpleobo.simple_obo_parser import (
TAG_ALT_ID,
TAG_COMMENT,
TAG_CONSIDER,
TAG_CREATED_BY,
TAG_CREATION_DATE,
TAG_DATA_VERSION,
TAG_DEFINITION,
TAG_DOMAIN,
TAG_EQUIVALENT_TO,
TAG_HOLDS_OVER_CHAIN,
TAG_ID_SPACE,
TAG_INVERSE_OF,
TAG_IS_A,
TAG_IS_OBSOLETE,
TAG_IS_TRANSITIVE,
TAG_NAME,
TAG_NAMESPACE,
TAG_ONTOLOGY,
TAG_PROPERTY_VALUE,
TAG_RANGE,
TAG_RELATIONSHIP,
TAG_REPLACED_BY,
TAG_SUBSET,
TAG_SUBSETDEF,
TAG_SYNONYM,
TAG_SYNONYMTYPEDEF,
TAG_XREF,
OboDocument,
Stanza,
_synonym_scope_pred,
parse_obo_document,
)
from oaklib.inference.relation_graph_reasoner import RelationGraphReasoner
from oaklib.interfaces import TextAnnotatorInterface
from oaklib.interfaces.basic_ontology_interface import (
ALIAS_MAP,
DEFINITION,
LANGUAGE_TAG,
METADATA_MAP,
RELATIONSHIP,
RELATIONSHIP_MAP,
)
from oaklib.interfaces.differ_interface import DiffConfiguration, DifferInterface
from oaklib.interfaces.dumper_interface import DumperInterface
from oaklib.interfaces.mapping_provider_interface import MappingProviderInterface
from oaklib.interfaces.merge_interface import MergeInterface
from oaklib.interfaces.obograph_interface import OboGraphInterface
from oaklib.interfaces.obolegacy_interface import PRED_CODE, OboLegacyInterface
from oaklib.interfaces.owl_interface import OwlInterface
from oaklib.interfaces.patcher_interface import PatcherInterface
from oaklib.interfaces.rdf_interface import RdfInterface
from oaklib.interfaces.search_interface import SearchInterface
from oaklib.interfaces.semsim_interface import SemanticSimilarityInterface
from oaklib.interfaces.summary_statistics_interface import SummaryStatisticsInterface
from oaklib.interfaces.taxon_constraint_interface import TaxonConstraintInterface
from oaklib.interfaces.validator_interface import ValidatorInterface
from oaklib.resource import OntologyResource
from oaklib.types import CURIE, PRED_CURIE, SUBSET_CURIE
from oaklib.utilities.axioms.logical_definition_utilities import (
logical_definition_matches,
)
from oaklib.utilities.kgcl_utilities import generate_change_id, tidy_change_object
from oaklib.utilities.mapping.sssom_utils import inject_mapping_sources
def _is_isa(x: str):
return x == IS_A or x.lower() == "is_a" or x.lower() == "isa"
[docs]
@dataclass
class SimpleOboImplementation(
ValidatorInterface,
DifferInterface,
RdfInterface,
OboGraphInterface,
OboLegacyInterface,
SearchInterface,
MappingProviderInterface,
PatcherInterface,
SummaryStatisticsInterface,
SemanticSimilarityInterface,
TaxonConstraintInterface,
TextAnnotatorInterface,
DumperInterface,
MergeInterface,
OwlInterface,
):
"""
Simple OBO-file backed implementation
This implementation is incomplete and is intended primarily as a Patcher implementation
This can be abandoned when pronto is less strict
"""
obo_document: OboDocument = None
_relationship_index_cache: Dict[CURIE, List[RELATIONSHIP]] = None
_alt_id_to_replacement_map: Dict[CURIE, List[CURIE]] = None
_uses_legacy_properties: bool = None
def __post_init__(self):
if self.obo_document is None:
resource = self.resource
if resource and resource.local_path:
logging.info(f"Creating doc for {resource}")
self.obo_document = parse_obo_document(resource.local_path)
if "edit.obo" in str(resource.local_path) and self.auto_relax_axioms is None:
# TODO: in future ontology modules should explicitly set this in the metadata
logging.info(
f"Auto-setting auto_relax_axioms based on name: {resource.local_path}"
)
self.auto_relax_axioms = True
else:
self.obo_document = OboDocument()
for prefix, expansion in self.obo_document.header.pair_values(TAG_ID_SPACE):
self.prefix_map()[prefix] = expansion
def store(self, resource: OntologyResource = None) -> None:
if resource is None:
resource = self.resource
od = self.obo_document
if resource.local:
if resource.slug:
with open(str(resource.local_path), "w", encoding="UTF-8") as f:
od.dump(f)
else:
od.dump(sys.stdout.buffer)
else:
raise NotImplementedError(f"Cannot dump to {resource}")
def load_graph(self, graph: Graph, replace: True) -> None:
if not replace:
raise NotImplementedError("Cannot merge obograph")
converter = OboGraphToOboFormatConverter()
self.obo_document = OboDocument()
gd = GraphDocument(graphs=[graph])
converter.convert(gd, self.obo_document)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: BasicOntologyInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def _all_relationships(self) -> Iterator[RELATIONSHIP]:
logging.info("Commencing indexing")
n = 0
entities = list(self.entities(filter_obsoletes=False))
for s in entities:
t = self._stanza(s, strict=False)
if t is None:
# alt_ids
continue
is_relation = t.type == "Typedef"
for v in t.simple_values(TAG_IS_A):
n += 1
if is_relation:
yield s, SUBPROPERTY_OF, self.map_shorthand_to_curie(v)
else:
yield s, IS_A, v
for tag, prop in [
(TAG_INVERSE_OF, INVERSE_OF),
(TAG_DOMAIN, RDFS_DOMAIN),
(TAG_RANGE, RDFS_RANGE),
]:
for v in t.simple_values(tag):
n += 1
yield s, prop, self.map_shorthand_to_curie(v)
for v in t.simple_values(TAG_EQUIVALENT_TO):
n += 1
yield s, EQUIVALENT_CLASS, v
yield v, EQUIVALENT_CLASS, s
for p, v in t.pair_values(TAG_RELATIONSHIP):
yield s, self.map_shorthand_to_curie(p), v
# for p, v in t.intersection_of_tuples():
# n += 1
# yield s, self._get_relationship_type_curie(p), v
logging.info(f"Indexed {n} relationships")
if self.auto_relax_axioms:
n = 0
logging.info("Auto-relaxing axioms")
for ldef in self.logical_definitions(entities):
for p in ldef.genusIds:
yield ldef.definedClassId, IS_A, p
n += 1
for r in ldef.restrictions:
yield ldef.definedClassId, r.propertyId, r.fillerId
n += 1
logging.info(f"Relaxed {n} relationships")
def _all_entailed_relationships(self):
reasoner = RelationGraphReasoner(self)
yield from reasoner.entailed_edges()
def entities(self, filter_obsoletes=True, owl_type=None) -> Iterable[CURIE]:
od = self.obo_document
for s_id, s in od.stanzas.items():
if filter_obsoletes:
if s.get_boolean_value(TAG_IS_OBSOLETE):
continue
if (
owl_type is None
or (owl_type == OWL_CLASS and s.type == "Term")
or (owl_type == OWL_OBJECT_PROPERTY and s.type == "Typedef")
):
if s.type == "Typedef":
yield self.map_shorthand_to_curie(s_id)
else:
yield s_id
if not owl_type or owl_type == OWL_CLASS:
# note that in the case of alt_ids, metadata such as
# original owl_type is lost. We assume that the original
# owl_type was OWL_CLASS
if not filter_obsoletes:
for s in self._get_alt_id_to_replacement_map().keys():
yield s
if not owl_type or owl_type == OIO_SUBSET_PROPERTY:
for v in od.header.simple_values(TAG_SUBSETDEF):
yield v
if not owl_type or owl_type == OIO_SYNONYM_TYPE_PROPERTY:
for v in od.header.simple_values(TAG_SYNONYMTYPEDEF):
yield v
def owl_types(self, entities: Iterable[CURIE]) -> Iterable[Tuple[CURIE, CURIE]]:
od = self.obo_document
for curie in entities:
s = self._stanza(curie, False)
if s is None:
if curie in self.subsets():
yield curie, OIO_SUBSET_PROPERTY
elif curie in od.header.simple_values(TAG_SYNONYMTYPEDEF):
yield curie, OIO_SYNONYM_TYPE_PROPERTY
else:
yield curie, None
else:
if s.type == "Term":
yield curie, OWL_CLASS
elif s.type == "Typedef":
yield curie, OWL_OBJECT_PROPERTY
else:
raise ValueError(f"Unknown stanza type: {s.type}")
def obsoletes(self, include_merged=True) -> Iterable[CURIE]:
od = self.obo_document
for s in od.stanzas.values():
if s.get_boolean_value(TAG_IS_OBSOLETE):
yield s.id
if include_merged:
for s in self._get_alt_id_to_replacement_map().keys():
yield s
def subsets(self) -> Iterable[CURIE]:
od = self.obo_document
for s in od.header.simple_values(TAG_SUBSETDEF):
yield s
def subset_members(self, subset: SUBSET_CURIE) -> Iterable[CURIE]:
od = self.obo_document
for s in od.stanzas.values():
if subset in s.simple_values(TAG_SUBSET):
yield s.id
def terms_subsets(self, curies: Iterable[CURIE]) -> Iterable[Tuple[CURIE, SUBSET_CURIE]]:
for curie in curies:
s = self._stanza(curie, False)
if s:
for subset in s.simple_values(TAG_SUBSET):
yield curie, subset
def ontologies(self) -> Iterable[CURIE]:
od = self.obo_document
for v in od.header.simple_values(TAG_ONTOLOGY):
yield v
def ontology_metadata_map(self, ontology: CURIE) -> METADATA_MAP:
m = defaultdict(list)
m["id"] = [ontology]
omo_map = {
TAG_DATA_VERSION: OWL_VERSION_IRI,
}
header = self.obo_document.header
for tv in header.tag_values:
tag = tv.tag
if tag in omo_map:
p = omo_map[tag]
val = tv.value
if p == OWL_VERSION_IRI:
val = f"obo:{ontology}/{val}{ontology}.owl"
m[p].append(val)
return dict(m)
def _stanza(self, curie: CURIE, strict=True) -> Optional[Stanza]:
stanza = self.obo_document.stanzas.get(curie, None)
if stanza is None:
alt_curie = self.map_curie_to_shorthand(curie)
if alt_curie and alt_curie != curie:
stanza = self.obo_document.stanzas.get(alt_curie)
if strict and not stanza:
raise ValueError(f"No such stanza {curie}")
return stanza
def label(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]:
if lang:
raise NotImplementedError("Language tags not supported")
s = self._stanza(curie, False)
if s:
return s.singular_value(TAG_NAME)
else:
if curie == IS_A:
return "subClassOf"
else:
return None
def set_label(self, curie: CURIE, label: str, lang: Optional[LANGUAGE_TAG] = None) -> bool:
if lang:
raise NotImplementedError("Language tags not supported")
s = self._stanza(curie, False)
s.set_singular_tag(TAG_NAME, label)
return True
def curies_by_label(self, label: str) -> List[CURIE]:
return [
s.id
for s in self.obo_document.stanzas.values()
if s.singular_value(TAG_NAME, False) == label
]
def _lookup(self, label_or_curie: str) -> CURIE:
if ":" in label_or_curie and " " not in label_or_curie:
return label_or_curie
else:
candidates = self.curies_by_label(label_or_curie)
if len(candidates) != 1:
raise ValueError(f"{label_or_curie} => {candidates}")
return candidates[0]
def create_entity(
self,
curie: CURIE,
label: Optional[str] = None,
relationships: Optional[RELATIONSHIP_MAP] = None,
type: Optional[str] = None,
replace=False,
) -> CURIE:
if type is None or type == OWL_CLASS:
type = "Term"
elif type == OWL_OBJECT_PROPERTY:
type = "Typedef"
else:
raise ValueError(f"Cannot handle type: {type}")
stanza = self._stanza(curie, False)
if stanza:
if replace:
stanza = None
if not stanza:
stanza = Stanza(id=curie, type=type)
stanza.add_tag_value(TAG_NAME, label)
self.obo_document.add_stanza(stanza)
if relationships:
for pred, fillers in relationships.items():
for filler in fillers:
self.add_relationship(curie, pred, filler)
def add_relationship(self, curie: CURIE, predicate: PRED_CURIE, filler: CURIE, **kwargs):
t = self._stanza(curie)
if predicate == IS_A:
t.add_tag_value(TAG_IS_A, filler, **kwargs)
else:
predicate_code = self.map_curie_to_shorthand(predicate)
t.add_tag_value_pair(TAG_RELATIONSHIP, predicate_code, filler, **kwargs)
self._clear_relationship_index()
def remove_relationship(self, curie: CURIE, predicate: Optional[PRED_CURIE], filler: CURIE):
t = self._stanza(curie)
if not predicate or predicate == IS_A:
t.remove_simple_tag_value(TAG_IS_A, filler)
else:
predicate_code = self.map_curie_to_shorthand(predicate)
t.remove_pairwise_tag_value(TAG_RELATIONSHIP, predicate_code, filler)
self._clear_relationship_index()
def definition(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]:
s = self._stanza(curie, strict=False)
if s:
return s.quoted_value(TAG_DEFINITION)
def definitions(
self,
curies: Iterable[CURIE],
include_metadata=False,
include_missing=False,
lang: Optional[LANGUAGE_TAG] = None,
) -> Iterator[DEFINITION]:
for curie in curies:
s = self._stanza(curie, strict=False)
if s:
d = s.quoted_value(TAG_DEFINITION)
if d:
if include_metadata:
defn_tvs = [tv for tv in s.tag_values if tv.tag == TAG_DEFINITION]
if defn_tvs:
defn_tv = defn_tvs[0]
defn, xrefs = defn_tv.as_definition()
yield curie, defn, {HAS_DBXREF: xrefs}
else:
yield curie, d, None
elif include_missing:
yield curie, None, None
def comments(self, curies: Iterable[CURIE]) -> Iterable[Tuple[CURIE, str]]:
for curie in curies:
s = self._stanza(curie)
if s:
yield curie, s.singular_value(TAG_COMMENT)
def entity_alias_map(self, curie: CURIE) -> ALIAS_MAP:
s = self._stanza(curie, strict=False)
if s is None:
return {}
m = defaultdict(list)
lbl = self.label(curie)
if lbl:
m[LABEL_PREDICATE] = [lbl]
for st in s.synonyms():
syn, pred, _type, _xrefs = st
pred = _synonym_scope_pred(pred)
m[pred].append(syn)
return m
def synonym_property_values(
self, subject: Union[CURIE, Iterable[CURIE]]
) -> Iterator[Tuple[CURIE, SynonymPropertyValue]]:
if isinstance(subject, str):
subject = [subject]
for curie in subject:
s = self._stanza(curie, strict=False)
if not s:
continue
for syn in s.synonyms():
pred = _synonym_scope_pred(syn[1]).replace("oio:", "")
yield curie, SynonymPropertyValue(
pred=pred, val=syn[0], synonymType=syn[2], xrefs=syn[3]
)
def map_shorthand_to_curie(self, rel_code: PRED_CODE) -> PRED_CURIE:
"""
Maps either a true relationship type CURIE or a shorthand packages to a CURIE.
See `section 5.9 <https://owlcollab.github.io/oboformat/doc/obo-syntax.html#5.9>`_
:param rel_code:
:return:
"""
for _, x in self.simple_mappings_by_curie(rel_code):
if x.startswith("BFO:") or x.startswith("RO:"):
return x
if ":" not in rel_code and ":" in x:
return x
return rel_code
def map_curie_to_shorthand(self, rel_type: PRED_CURIE) -> PRED_CODE:
"""
Reciprocal of `_get_relationship_type_curie`
:param rel_type:
:return:
"""
if rel_type:
is_core = rel_type.startswith("BFO:") or rel_type.startswith("RO:")
for s in self.obo_document.stanzas.values():
if s.type == "Typedef":
for x in s.simple_values(TAG_XREF):
if x == rel_type:
if is_core or ":" not in s.id:
return s.id
return rel_type
def relationships(
self,
subjects: List[CURIE] = None,
predicates: List[PRED_CURIE] = None,
objects: List[CURIE] = None,
include_tbox: bool = True,
include_abox: bool = True,
include_entailed: bool = False,
exclude_blank: bool = True,
) -> Iterator[RELATIONSHIP]:
ei = self.edge_index
if include_entailed:
ei = self.entailed_edge_index
yield from ei.edges(
subjects=subjects,
predicates=predicates,
objects=objects,
)
def basic_search(self, search_term: str, config: SearchConfiguration = None) -> Iterable[CURIE]:
# TODO: move up, avoid repeating packages
if config is None:
config = SearchConfiguration()
matches = []
mfunc = None
if config.syntax == SearchTermSyntax(SearchTermSyntax.STARTS_WITH):
mfunc = lambda label: str(label).startswith(search_term)
elif config.syntax == SearchTermSyntax(SearchTermSyntax.REGULAR_EXPRESSION):
prog = re.compile(search_term)
mfunc = lambda label: prog.search(label)
elif config.is_partial:
mfunc = lambda label: search_term in str(label)
else:
mfunc = lambda label: label == search_term
search_all = SearchProperty(SearchProperty.ANYTHING) in config.properties
logging.info(f"SEARCH={search_term}")
for t in self.entities(filter_obsoletes=False):
lbl = self.label(t)
logging.debug(f"T={t} // {config}")
if (
search_all
or SearchProperty(SearchProperty.LABEL)
or config.properties not in config.properties
):
if lbl and mfunc(lbl):
matches.append(t)
logging.info(f"Name match to {t}")
continue
if search_all or SearchProperty(SearchProperty.IDENTIFIER) in config.properties:
if mfunc(t):
matches.append(t)
logging.info(f"identifier match to {t}")
continue
if (
search_all
or SearchProperty(SearchProperty.REPLACEMENT_IDENTIFIER) in config.properties
):
s = self._stanza(t, strict=False)
if s:
for r in s.simple_values(TAG_REPLACED_BY):
if mfunc(t):
matches.append(r)
logging.info(f"replaced_by match to {t}")
continue
for a in s.simple_values(TAG_ALT_ID):
if mfunc(a):
matches.append(t)
logging.info(f"alternate_id match to {t}")
continue
if search_all or SearchProperty(SearchProperty.ALIAS) in config.properties:
for syn in self.entity_aliases(t):
if mfunc(syn):
logging.info(f"Syn match to {t}")
matches.append(t)
continue
if search_all or SearchProperty(SearchProperty.MAPPED_IDENTIFIER) in config.properties:
for x in self.simple_mappings_by_curie(t):
if mfunc(x):
logging.info(f"Syn match to {t}")
matches.append(t)
continue
for m in matches:
yield m
def simple_mappings_by_curie(self, curie: CURIE) -> Iterable[Tuple[PRED_CURIE, CURIE]]:
t = self._stanza(curie, strict=False)
if t:
for v in t.simple_values(TAG_XREF):
yield HAS_DBXREF, v
def entity_metadata_map(self, curie: CURIE) -> METADATA_MAP:
t = self._stanza(curie, strict=False)
_alt_id_map = self._get_alt_id_to_replacement_map()
m = defaultdict(list)
if t:
for tag, mkey in [
(TAG_REPLACED_BY, TERM_REPLACED_BY),
(TAG_CONSIDER, CONSIDER_REPLACEMENT),
(TAG_NAMESPACE, HAS_OBO_NAMESPACE),
(TAG_IS_OBSOLETE, DEPRECATED_PREDICATE),
(TAG_CREATION_DATE, OIO_CREATION_DATE),
(TAG_CREATED_BY, OIO_CREATED_BY),
]:
for v in t.simple_values(tag):
if tag == TAG_IS_OBSOLETE:
v = True if v == "true" else False
m[mkey].append(v)
for pv in t.property_values():
m[self.map_shorthand_to_curie(pv[0])].append(pv[1])
if curie in _alt_id_map:
m[TERM_REPLACED_BY] += _alt_id_map[curie]
m[DEPRECATED_PREDICATE].append(True)
m[HAS_OBSOLESCENCE_REASON].append(TERMS_MERGED)
self.add_missing_property_values(curie, m)
return dict(m)
def _get_alt_id_to_replacement_map(self) -> Dict[CURIE, List[CURIE]]:
if self._alt_id_to_replacement_map is None:
self._alt_id_to_replacement_map = defaultdict(list)
for e in self.entities():
t = self._stanza(e, False)
if t:
for a in t.simple_values(TAG_ALT_ID):
self._alt_id_to_replacement_map[a].append(e)
return self._alt_id_to_replacement_map
def clone(self, resource: OntologyResource) -> "SimpleOboImplementation":
shutil.copyfile(self.resource.slug, resource.slug)
return type(self)(resource)
def dump(self, path: Union[str, TextIO] = None, syntax: str = "obo", **kwargs):
if syntax is None or syntax == "obo":
if isinstance(path, str) or isinstance(path, Path):
logging.info(f"Saving to {path}")
with open(path, "w", encoding="UTF-8") as file:
self.obo_document.dump(file)
else:
self.obo_document.dump(path)
else:
super().dump(path, syntax=syntax)
def save(
self,
):
logging.info("Committing and flushing changes")
self.dump(self.resource.slug)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: MappingsInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def get_sssom_mappings_by_curie(self, curie: Union[str, CURIE]) -> Iterator[sssom.Mapping]:
s = self._stanza(curie, strict=False)
if s:
for x in s.simple_values(TAG_XREF):
m = sssom.Mapping(
subject_id=curie,
predicate_id=HAS_DBXREF,
object_id=x,
mapping_justification=sssom.EntityReference(SEMAPV.UnspecifiedMatching.value),
)
inject_mapping_sources(m)
yield m
for x in s.property_values():
p = self.map_shorthand_to_curie(x[0])
if p in SKOS_MATCH_PREDICATES:
m = sssom.Mapping(
subject_id=curie,
predicate_id=p,
object_id=x[1],
mapping_justification=sssom.EntityReference(
SEMAPV.UnspecifiedMatching.value
),
)
inject_mapping_sources(m)
yield m
for p, v in s.pair_values(TAG_RELATIONSHIP):
p = self.map_shorthand_to_curie(p)
if p in SKOS_MATCH_PREDICATES:
m = sssom.Mapping(
subject_id=curie,
predicate_id=p,
object_id=v,
mapping_justification=sssom.EntityReference(
SEMAPV.UnspecifiedMatching.value
),
)
inject_mapping_sources(m)
yield m
# TODO: use a cache to avoid re-calculating
for _, stanza in self.obo_document.stanzas.items():
if len(stanza.simple_values(TAG_XREF)) > 0:
for x in stanza.simple_values(TAG_XREF):
if x == curie:
m = sssom.Mapping(
subject_id=stanza.id,
predicate_id=HAS_DBXREF,
object_id=curie,
mapping_justification=SEMAPV.UnspecifiedMatching.value,
)
inject_mapping_sources(m)
yield m
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: OboGraphInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def node(self, curie: CURIE, strict=False, include_metadata=False) -> obograph.Node:
t = self._stanza(curie, strict=False)
if t is None:
return obograph.Node(id=curie)
else:
types = self.owl_type(curie)
if OWL_CLASS in types:
typ = "CLASS"
elif OWL_OBJECT_PROPERTY in types:
typ = "PROPERTY"
else:
typ = None
meta = obograph.Meta()
if include_metadata:
for s in t.simple_values(TAG_SUBSET):
meta.subsets.append(s)
defn = self.definition(curie)
if defn:
meta.definition = obograph.DefinitionPropertyValue(val=defn)
for _, syn in self.synonym_property_values([curie]):
meta.synonyms.append(syn)
for _, subset in self.terms_subsets([curie]):
meta.subsets.append(subset)
return obograph.Node(id=curie, lbl=self.label(curie), type=typ, meta=meta)
def as_obograph(self, expand_curies=False) -> Graph:
def expand(curie: CURIE) -> CURIE:
if expand_curies:
uri = self.curie_to_uri(curie, strict=False)
return uri if uri is not None else curie
else:
return curie
entities = list(self.entities())
nodes = [self.node(expand(curie)) for curie in entities]
edges = [
Edge(sub=expand(r[0]), pred=expand(r[1]), obj=expand(r[2]))
for r in self.relationships()
]
ldefs = list(self.logical_definitions(entities))
return Graph(id="TODO", nodes=nodes, edges=edges, logicalDefinitionAxioms=ldefs)
def logical_definitions(
self,
subjects: Optional[Iterable[CURIE]] = None,
predicates: Iterable[PRED_CURIE] = None,
objects: Iterable[CURIE] = None,
**kwargs,
) -> Iterable[LogicalDefinitionAxiom]:
if subjects is None:
subjects = self.entities()
for s in subjects:
t = self._stanza(s, strict=False)
if not t:
continue
ldef_tuples = t.intersection_of_tuples()
if ldef_tuples:
ldef = LogicalDefinitionAxiom(definedClassId=s)
for m1, m2 in ldef_tuples:
if m2:
ldef.restrictions.append(
ExistentialRestrictionExpression(
propertyId=self.map_shorthand_to_curie(m1), fillerId=m2
)
)
else:
ldef.genusIds.append(m1)
if logical_definition_matches(ldef, predicates=predicates, objects=objects):
yield ldef
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: SearchInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: DifferInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def diff(
self,
other_ontology: DifferInterface,
configuration: DiffConfiguration = None,
**kwargs,
) -> Iterator[kgcl.Change]:
if configuration is None:
configuration = DiffConfiguration()
if not isinstance(other_ontology, SimpleOboImplementation):
raise ValueError("Can only diff SimpleOboImplementation")
stanzas1 = self.obo_document.stanzas
stanzas2 = other_ontology.obo_document.stanzas
all_ids = set(stanzas1.keys()).union(stanzas2.keys())
for id in all_ids:
yield from self._diff_stanzas(stanzas1.get(id, None), stanzas2.get(id, None))
def _diff_stanzas(
self, stanza1: Optional[Stanza], stanza2: Optional[Stanza]
) -> Iterator[kgcl.Change]:
def _id():
return generate_change_id()
node_is_deleted = False
node_is_created = False
if stanza1 is None and stanza2 is None:
raise ValueError("Both stanzas are None")
if stanza1 is None:
stanza1 = Stanza(id=stanza2.id, type=stanza2.type)
if stanza2.type == "Term":
yield kgcl.ClassCreation(
id=_id(), about_node=stanza2.id, name=stanza2.singular_value(TAG_NAME)
)
elif stanza2.type == "Typedef":
yield kgcl.NodeCreation(
id=_id(), about_node=stanza2.id, name=stanza2.singular_value(TAG_NAME)
)
else:
raise ValueError(f"Unknown stanza type: {stanza2.type}")
node_is_created = True
if stanza2 is None:
stanza2 = Stanza(id=stanza1.id, type=stanza1.type)
if stanza1.type == "Term":
yield kgcl.NodeDeletion(id=_id(), about_node=stanza1.id)
else:
yield kgcl.NodeDeletion(id=_id(), about_node=stanza1.id)
node_is_deleted = True
if stanza1 == stanza2:
return
if stanza1.type != stanza2.type:
raise ValueError(f"Stanza types differ: {stanza1.type} vs {stanza2.type}")
t1id = stanza1.id
t2id = stanza2.id
logging.info(f"Diffing: {t1id} vs {t2id}")
def _tv_dict(stanza: Stanza) -> Dict[str, List[str]]:
d = defaultdict(set)
for tv in stanza.tag_values:
d[tv.tag].add(tv.value)
return d
tv_dict1 = _tv_dict(stanza1)
tv_dict2 = _tv_dict(stanza2)
all_tags = set(tv_dict1.keys()).union(tv_dict2.keys())
for tag in all_tags:
vals1 = tv_dict1.get(tag, [])
vals2 = tv_dict2.get(tag, [])
vals1list = list(vals1)
vals2list = list(vals2)
tvs1 = [tv for tv in stanza1.tag_values if tv.tag == tag]
tvs2 = [tv for tv in stanza2.tag_values if tv.tag == tag]
if vals1 == vals2:
continue
logging.info(f"Difference in {tag}: {vals1} vs {vals2}")
if tag == TAG_NAME:
if node_is_deleted or node_is_created:
continue
if vals1 and vals2:
yield kgcl.NodeRename(
id=_id(), about_node=t1id, new_value=vals2list[0], old_value=vals1list[0]
)
elif vals2:
# Existing node goes from having no name to having a name
# In future KGCL may have a NodeNewName. For now we use NodeRename.
yield kgcl.NodeRename(
id=_id(), about_node=t1id, new_value=vals2list[0], old_value=None
)
else:
yield kgcl.NodeDeletion(
id=_id(), about_node=t1id, old_value=vals1list[0], new_value=None
)
elif tag == TAG_DEFINITION:
if node_is_deleted:
continue
# TODO: provenance changes
td1 = stanza1.quoted_value(TAG_DEFINITION)
td2 = stanza2.quoted_value(TAG_DEFINITION)
if vals1 and vals2:
yield kgcl.NodeTextDefinitionChange(
id=_id(), about_node=t1id, new_value=td2, old_value=td1
)
elif vals1:
yield kgcl.RemoveTextDefinition(id=_id(), about_node=t1id, old_value=td1)
else:
yield kgcl.NewTextDefinition(id=_id(), about_node=t2id, new_value=td2)
elif tag == TAG_IS_OBSOLETE:
if node_is_deleted:
continue
if vals1 and not vals2:
yield kgcl.NodeUnobsoletion(id=_id(), about_node=t1id)
elif not vals1 and vals2:
replaced_by = stanza2.simple_values(TAG_REPLACED_BY)
if replaced_by:
yield kgcl.NodeObsoletionWithDirectReplacement(
id=_id(), about_node=t2id, has_direct_replacement=replaced_by[0]
)
else:
yield kgcl.NodeObsoletion(id=_id(), about_node=t2id)
elif tag == TAG_SUBSET:
if node_is_deleted:
continue
xrefs1 = stanza1.simple_values(TAG_SUBSET)
xrefs2 = stanza2.simple_values(TAG_SUBSET)
for xref in xrefs1:
if xref not in xrefs2:
yield kgcl.RemoveNodeFromSubset(id=_id(), about_node=t1id, in_subset=xref)
for xref in xrefs2:
if xref not in xrefs1:
yield kgcl.AddNodeToSubset(id=_id(), about_node=t2id, in_subset=xref)
elif tag == TAG_IS_A:
isas1 = stanza1.simple_values(TAG_IS_A)
isas2 = stanza2.simple_values(TAG_IS_A)
for isa in isas1:
if isa not in isas2:
yield kgcl.EdgeDeletion(id=_id(), subject=t1id, predicate=IS_A, object=isa)
for isa in isas2:
if isa not in isas1:
yield kgcl.EdgeCreation(id=_id(), subject=t2id, predicate=IS_A, object=isa)
elif tag == TAG_RELATIONSHIP:
rels1 = stanza1.pair_values(TAG_RELATIONSHIP)
rels2 = stanza2.pair_values(TAG_RELATIONSHIP)
for p, v in rels1:
p_curie = self.map_shorthand_to_curie(p)
if (p, v) not in rels2:
yield kgcl.EdgeDeletion(id=_id(), subject=t1id, predicate=p_curie, object=v)
for p, v in rels2:
p_curie = self.map_shorthand_to_curie(p)
if (p, v) not in rels1:
yield kgcl.EdgeCreation(id=_id(), subject=t2id, predicate=p_curie, object=v)
elif tag == TAG_SYNONYM:
if node_is_deleted:
continue
# TODO: make this sensitive to annotation changes; for now we truncate the tuple
syns1 = [tv.as_synonym()[0:2] for tv in tvs1]
syns2 = [tv.as_synonym()[0:2] for tv in tvs2]
for syn in syns1:
if syn not in syns2:
yield kgcl.RemoveSynonym(id=_id(), about_node=t1id, old_value=syn[0])
for syn in syns2:
if syn not in syns1:
pred = SCOPE_TO_SYNONYM_PRED_MAP[syn[1]]
yield kgcl.NewSynonym(
id=_id(), about_node=t2id, new_value=syn[0], predicate=pred
)
elif tag == TAG_XREF:
if node_is_deleted:
continue
xrefs1 = stanza1.simple_values(TAG_XREF)
xrefs2 = stanza2.simple_values(TAG_XREF)
for xref in xrefs1:
if xref not in xrefs2:
yield kgcl.RemoveMapping(
id=_id(), about_node=t1id, object=xref, predicate=HAS_DBXREF
)
for xref in xrefs2:
if xref not in xrefs1:
yield kgcl.MappingCreation(
id=_id(), subject=t2id, object=xref, predicate=HAS_DBXREF
)
def different_from(self, entity: CURIE, other_ontology: DifferInterface) -> bool:
t1 = self._stanza(entity, strict=False)
if t1:
t2 = other_ontology._stanza(entity, strict=False)
if t2:
return str(t1) != str(t2)
return True
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: PatcherInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def migrate_curies(self, curie_map: Mapping[CURIE, CURIE]) -> None:
od = self.obo_document
for t in od.stanzas.values():
t.replace_token(curie_map)
od.reindex()
self._rebuild_relationship_index()
@property
def uses_legacy_properties(self) -> bool:
if self._uses_legacy_properties is not None:
return self._uses_legacy_properties
for s in self.obo_document.stanzas.values():
for tv in s.tag_values:
if tv.tag in [TAG_CREATED_BY, TAG_CREATION_DATE]:
self._uses_legacy_properties = True
return True
return False
def set_uses_legacy_properties(self, value: bool) -> None:
self._uses_legacy_properties = value
def add_contributors(self, curie: CURIE, agents: List[CURIE]) -> None:
t = self._stanza(curie, strict=True)
for agent in agents:
t.add_tag_value_pair(TAG_PROPERTY_VALUE, CONTRIBUTOR, agent)
def set_creator(self, curie: CURIE, agent: CURIE, date: Optional[str] = None) -> None:
t = self._stanza(curie, strict=True)
if self._uses_legacy_properties:
t.set_singular_tag(TAG_CREATED_BY, agent)
else:
t.add_tag_value_pair(TAG_PROPERTY_VALUE, CREATOR, agent)
if date:
self.set_creation_date(curie, date)
def set_creation_date(self, curie: CURIE, date: str) -> None:
t = self._stanza(curie, strict=True)
if self._uses_legacy_properties:
t.set_singular_tag(TAG_CREATION_DATE, date)
else:
t.add_tag_value_pair(TAG_PROPERTY_VALUE, CREATED, date)
def apply_patch(
self,
patch: kgcl.Change,
activity: kgcl.Activity = None,
metadata: Mapping[PRED_CURIE, Any] = None,
configuration: kgcl.Configuration = None,
) -> kgcl.Change:
od = self.obo_document
tidy_change_object(patch)
logging.debug(f"Applying {patch}")
modified_entities = []
if isinstance(patch, kgcl.NodeRename):
# self.set_label(patch.about_node, _clean(patch.new_value))
self.set_label(patch.about_node, patch.new_value)
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.NodeObsoletion):
t = self._stanza(patch.about_node, strict=True)
t.set_singular_tag(TAG_IS_OBSOLETE, "true")
if isinstance(patch, kgcl.NodeObsoletionWithDirectReplacement):
t.set_singular_tag(TAG_REPLACED_BY, patch.has_direct_replacement)
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.NodeDeletion):
try:
del od.stanzas[patch.about_node]
except KeyError:
logging.error(f"CURIE {patch.about_node} does not exist in the OBO file provided.")
elif isinstance(patch, kgcl.NodeCreation):
self.create_entity(patch.about_node, patch.name)
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.ClassCreation):
self.create_entity(patch.about_node, patch.name)
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.SynonymReplacement):
t = self._stanza(patch.about_node, strict=True)
n = 0
for tv in t.tag_values:
if tv.tag == TAG_SYNONYM:
syn = tv.as_synonym()
if syn[0] == patch.old_value:
tv.replace_quoted_part(patch.new_value)
n += 1
if not n:
raise ValueError(f"Failed to find synonym {patch.old_value} for {t.id}")
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.AddNodeToSubset):
t = self._stanza(patch.about_node, strict=True)
t.add_tag_value(TAG_SUBSET, patch.in_subset)
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.RemoveNodeFromSubset):
t = self._stanza(patch.about_node, strict=True)
t.remove_simple_tag_value(TAG_SUBSET, patch.in_subset)
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.NewTextDefinition):
t = self._stanza(patch.about_node, strict=True)
t.add_quoted_tag_value(TAG_DEFINITION, patch.new_value.strip("'"), xrefs=[])
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.RemoveTextDefinition):
t = self._stanza(patch.about_node, strict=True)
for tv in t.tag_values:
if tv.tag == TAG_DEFINITION:
# This is a remove_definition request
t.remove_tag_quoted_value(TAG_DEFINITION, t._quoted_value(tv.value))
elif isinstance(patch, kgcl.NodeTextDefinitionChange):
t = self._stanza(patch.about_node, strict=True)
for tv in t.tag_values:
if tv.tag == TAG_DEFINITION:
tv.replace_quoted_part(patch.new_value.strip("'"))
elif isinstance(patch, kgcl.NewSynonym):
t = self._stanza(patch.about_node, strict=True)
# Get scope from patch.qualifier
# rather than forcing all synonyms to be related.
if isinstance(patch.qualifier, str):
scope = patch.qualifier.upper()
else:
scope = str(patch.qualifier.value).upper() if patch.qualifier else "RELATED"
v = patch.new_value.replace('"', '\\"')
t.add_tag_value(TAG_SYNONYM, f'"{v}" {scope} []')
modified_entities.append(patch.about_node)
elif isinstance(patch, kgcl.RemoveSynonym):
t = self._stanza(patch.about_node, strict=True)
# scope = str(patch.qualifier.value).upper() if patch.qualifier else "RELATED"
v = patch.old_value.strip(
'"'
) # Handling a bug where quotes are accidentally introduced.
t.remove_tag_quoted_value(TAG_SYNONYM, v)
elif isinstance(patch, kgcl.EdgeCreation):
description = patch.change_description
self.add_relationship(
patch.subject, patch.predicate, patch.object, description=description
)
modified_entities.append(patch.subject)
elif isinstance(patch, kgcl.EdgeDeletion):
self.remove_relationship(patch.subject, patch.predicate, patch.object)
elif isinstance(patch, kgcl.NodeMove):
logging.warning(f"Cannot handle {patch}")
elif isinstance(patch, kgcl.PredicateChange):
e = patch.about_edge
subject = self._lookup(e.subject)
object = self._lookup(e.object)
t = self._stanza(subject, strict=True)
if _is_isa(patch.old_value):
t.remove_simple_tag_value(TAG_IS_A, object)
else:
pred = self.map_curie_to_shorthand(patch.old_value)
t.remove_pairwise_tag_value(TAG_RELATIONSHIP, pred, object)
if _is_isa(patch.new_value):
t.add_tag_value(TAG_IS_A, object)
else:
t.add_tag_value(TAG_RELATIONSHIP, f"{patch.new_value} {object}")
self._clear_relationship_index()
modified_entities.append(subject)
else:
raise NotImplementedError(f"cannot handle KGCL type {type(patch)}")
if patch.contributor:
self.add_contributors(patch.about_node, [patch.contributor])
modified_entities.append(patch.about_node)
for e in modified_entities:
stanza = self._stanza(e, strict=True)
stanza.normalize_order()
return patch
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: OwlInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def transitive_object_properties(self) -> Iterable[CURIE]:
od = self.obo_document
for s_id, s in od.stanzas.items():
if s.type == "Typedef":
if s.get_boolean_value(TAG_IS_TRANSITIVE, False):
yield self.map_shorthand_to_curie(s_id)
def simple_subproperty_of_chains(self) -> Iterable[Tuple[CURIE, List[CURIE]]]:
od = self.obo_document
for s_id, s in od.stanzas.items():
if s.type == "Typedef":
for p1, p2 in s.pair_values(TAG_HOLDS_OVER_CHAIN):
curie = self.map_shorthand_to_curie(s_id)
yield curie, [self.map_shorthand_to_curie(p1), self.map_shorthand_to_curie(p2)]