import logging
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Set, cast
import pyhornedowl
import rdflib
from kgcl_schema.datamodel import kgcl
from pyhornedowl.model import (
IRI,
AnnotatedComponent,
Annotation,
AnnotationAssertion,
ClassAssertion,
Component,
DatatypeLiteral,
DeclareAnnotationProperty,
DeclareClass,
DeclareDataProperty,
DeclareDatatype,
DeclareNamedIndividual,
DeclareObjectProperty,
EquivalentClasses,
InverseObjectProperties,
LanguageLiteral,
ObjectHasValue,
ObjectIntersectionOf,
ObjectPropertyAssertion,
ObjectPropertyDomain,
ObjectPropertyRange,
ObjectSomeValuesFrom,
SimpleLiteral,
SubClassOf,
SubObjectPropertyOf,
TransitiveObjectProperty,
)
from oaklib.datamodels import obograph
from oaklib.datamodels.vocabulary import (
DEPRECATED_PREDICATE,
EQUIVALENT_CLASS,
HAS_DBXREF,
HAS_DEFINITION_CURIE,
HAS_EXACT_SYNONYM,
HAS_NARROW_SYNONYM,
HAS_RELATED_SYNONYM,
IN_SUBSET,
INVERSE_OF,
IS_A,
LABEL_PREDICATE,
OWL_ANNOTATION_PROPERTY,
OWL_CLASS,
OWL_DATATYPE_PROPERTY,
OWL_NAMED_INDIVIDUAL,
OWL_OBJECT_PROPERTY,
OWL_TRANSITIVE_PROPERTY,
RDF_TYPE,
RDFS_DOMAIN,
RDFS_RANGE,
SUBPROPERTY_OF,
)
from oaklib.interfaces import SearchInterface
from oaklib.interfaces.basic_ontology_interface import LANGUAGE_TAG, RELATIONSHIP
from oaklib.interfaces.obograph_interface import OboGraphInterface
from oaklib.interfaces.owl_interface import OwlInterface, ReasonerConfiguration
from oaklib.interfaces.patcher_interface import PatcherInterface
from oaklib.types import CURIE, PRED_CURIE
from oaklib.utilities.axioms.logical_definition_utilities import logical_definition_matches
logger = logging.getLogger(__name__)
SERIALIZATION_ALIASES = {
"functional": "ofn",
"functional-owl": "ofn",
"functional-syntax": "ofn",
"funowl": "ofn",
"manchester": "ofn",
"omn": "ofn",
"ofn": "ofn",
"owl": "owl",
"owl-xml": "owx",
"owl/xml": "owx",
"owlxml": "owx",
"owx": "owx",
"rdf": "rdf",
"rdf-xml": "rdf",
"rdf/xml": "rdf",
"rdfxml": "rdf",
"xml": "rdf",
}
DECLARATION_TYPES = (
DeclareClass,
DeclareObjectProperty,
DeclareAnnotationProperty,
DeclareDataProperty,
DeclareNamedIndividual,
DeclareDatatype,
)
DECLARATION_TO_OWL_TYPE = {
DeclareClass: OWL_CLASS,
DeclareObjectProperty: OWL_OBJECT_PROPERTY,
DeclareAnnotationProperty: OWL_ANNOTATION_PROPERTY,
DeclareDataProperty: OWL_DATATYPE_PROPERTY,
DeclareNamedIndividual: OWL_NAMED_INDIVIDUAL,
}
LITERAL_TYPES = (SimpleLiteral, DatatypeLiteral, LanguageLiteral)
RELATIONSHIP_EXCLUDED_FROM_INHERITANCE = {
EQUIVALENT_CLASS,
INVERSE_OF,
IS_A,
RDF_TYPE,
RDFS_DOMAIN,
RDFS_RANGE,
SUBPROPERTY_OF,
}
SYNONYM_PREDICATES = [
HAS_EXACT_SYNONYM,
HAS_NARROW_SYNONYM,
HAS_RELATED_SYNONYM,
]
XSD_BOOLEAN = "http://www.w3.org/2001/XMLSchema#boolean"
@dataclass(frozen=True)
class ProjectedRelationship:
subject: CURIE
predicate: PRED_CURIE
object: CURIE
scope: str
def as_tuple(self) -> RELATIONSHIP:
return self.subject, self.predicate, self.object
[docs]
@dataclass
class FunOwlImplementation(
OwlInterface,
OboGraphInterface,
PatcherInterface,
SearchInterface,
):
"""
An experimental partial implementation of :ref:`OwlInterface`
This adapter keeps the historical ``funowl`` selector and class name for
backward compatibility, but it now uses py-horned-owl as the OWL parser and
object model.
"""
ontology_document: Optional[pyhornedowl.PyIndexedOntology] = None
_direct_relationship_cache: Optional[List[ProjectedRelationship]] = field(
default=None, init=False, repr=False
)
_entailed_relationship_cache: Optional[List[ProjectedRelationship]] = field(
default=None, init=False, repr=False
)
_metadata_map_cache: Dict[CURIE, Dict[PRED_CURIE, List[str]]] = field(
default_factory=dict, init=False, repr=False
)
_owl_type_map_cache: Optional[Dict[CURIE, Set[CURIE]]] = field(
default=None, init=False, repr=False
)
def __post_init__(self):
resource = self.resource
local_path = None if resource is None else resource.local_path
if self.ontology_document is None:
if local_path is None:
doc = pyhornedowl.PyIndexedOntology()
else:
local_path = Path(local_path)
logger.info("Loading %s into py-horned-owl", local_path)
serialization = self._serialization_for_path(local_path)
if resource is not None and serialization is not None:
resource.format = serialization
doc = pyhornedowl.open_ontology_from_file(
str(local_path), serialization=serialization
)
if serialization == "ofn":
self.prefix_map().update(self._extract_prefix_declarations(local_path))
else:
self.prefix_map().update(self._extract_prefixes_from_rdf(local_path))
self.ontology_document = doc
self.functional_writer = self.ontology_document
def _serialization_for_path(self, path: Path) -> Optional[str]:
explicit_format = None if self.resource is None else self.resource.format
serialization = self._normalize_serialization(explicit_format)
if serialization is not None:
return serialization
return self._sniff_serialization(path)
@staticmethod
def _normalize_serialization(format_name: Optional[str]) -> Optional[str]:
if format_name is None:
return None
key = format_name.strip().lower().replace("_", "-").replace(" ", "-")
return SERIALIZATION_ALIASES.get(key, key)
@staticmethod
def _sniff_serialization(path: Path) -> Optional[str]:
suffix = path.suffix.lower()
if suffix in {".ofn", ".omn"}:
return "ofn"
if suffix == ".owx":
return "owx"
if suffix not in {".owl", ".rdf", ".xml"}:
return None
try:
head = path.read_text(encoding="utf-8", errors="ignore")[:4096]
except OSError:
logger.debug("Could not sniff OWL serialization for %s", path, exc_info=True)
return None
head = head.lstrip("\ufeff \t\r\n")
if head.startswith("Prefix(") or head.startswith("Ontology("):
return "ofn"
if re.match(r"<Ontology(?:\s|>)", head):
return "owx"
if head.startswith("<?xml") or head.startswith("<rdf:RDF") or "xmlns:rdf=" in head:
return "rdf"
return None
@staticmethod
def _extract_prefix_declarations(path: Path) -> Mapping[str, str]:
prefix_map: Dict[str, str] = {}
text = path.read_text(encoding="utf-8")
for match in re.finditer(r"Prefix\(\s*([^=]+?)\s*=\s*<([^>]+)>\s*\)", text):
prefix = match.group(1).strip()
if prefix.endswith(":"):
prefix = prefix[:-1]
prefix_map[prefix] = match.group(2)
return prefix_map
@staticmethod
def _extract_prefixes_from_rdf(path: Path) -> Mapping[str, str]:
prefix_map: Dict[str, str] = {}
graph = rdflib.Graph()
try:
graph.parse(str(path))
except Exception:
try:
graph.parse(str(path), format="xml")
except Exception:
logger.debug("Could not extract RDF prefixes from %s", path, exc_info=True)
return prefix_map
for prefix, namespace in graph.namespaces():
if prefix in {None, ""}:
continue
prefix_map[prefix] = str(namespace)
if path.suffix in {".owl", ".rdf", ".xml"}:
text = path.read_text(encoding="utf-8")
for match in re.finditer(r'xmlns:([A-Za-z_][\w.-]*)="([^"]+)"', text):
prefix_map[match.group(1)] = match.group(2)
return prefix_map
@property
def _ontology(self) -> pyhornedowl.PyIndexedOntology:
return self.ontology_document
def owl_ontology(self) -> pyhornedowl.PyIndexedOntology:
return self._ontology
def _invalidate_caches(self) -> None:
self._clear_relationship_index()
self._entailed_edge_index = None
self._direct_relationship_cache = None
self._entailed_relationship_cache = None
self._metadata_map_cache.clear()
self._owl_type_map_cache = None
def _sync_prefix_mapping(self, curie: CURIE) -> None:
if ":" not in curie:
return
prefix, _, _ = curie.partition(":")
if prefix in self.prefix_map():
try:
self._ontology.add_prefix_mapping(prefix, self.prefix_map()[prefix])
except Exception:
logger.debug("Could not sync prefix mapping for %s", prefix, exc_info=True)
def entity_iri_to_curie(self, entity: IRI) -> CURIE:
return cast(CURIE, self.uri_to_curie(str(entity), use_uri_fallback=True))
def curie_to_entity_iri(self, curie: CURIE) -> IRI:
self._sync_prefix_mapping(curie)
return IRI.parse(self.curie_to_uri(curie))
def curie_to_class(self, curie: CURIE):
self._sync_prefix_mapping(curie)
return self._ontology.clazz(self.curie_to_uri(curie))
def curie_to_object_property(self, curie: CURIE):
self._sync_prefix_mapping(curie)
return self._ontology.object_property(self.curie_to_uri(curie))
def curie_to_annotation_property(self, curie: CURIE):
self._sync_prefix_mapping(curie)
return self._ontology.annotation_property(self.curie_to_uri(curie))
def _named_curie(self, entity: Any) -> Optional[CURIE]:
iri = self._entity_iri(entity)
if iri is None:
return None
return self.entity_iri_to_curie(iri)
def _annotation_value(self, value: Any) -> Optional[str]:
iri = self._entity_iri(value)
if iri is not None:
return self.entity_iri_to_curie(iri)
literal = self._literal_value(value)
if literal is not None:
return literal
return None
@staticmethod
def _is_truthy(values: Iterable[str]) -> bool:
return any(v.lower() in {"1", "true", "yes"} for v in values)
@staticmethod
def _merge_scopes(left: str, right: str) -> str:
return "abox" if "abox" in {left, right} else "tbox"
def _coerce_annotation_value(self, value: Any):
if isinstance(value, LITERAL_TYPES) or isinstance(value, IRI):
return value
if isinstance(value, bool):
return DatatypeLiteral(str(value).lower(), IRI.parse(XSD_BOOLEAN))
return SimpleLiteral(str(value))
def _single_valued_assignment(self, curie: CURIE, property: CURIE) -> Optional[str]:
values = self._ontology.get_annotations(self.curie_to_uri(curie), self.curie_to_uri(property))
if values:
if len(values) > 1:
logger.warning("Multiple values for %s %s = %s", curie, property, values)
return values[0]
return None
def definition(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]:
return self._single_valued_assignment(curie, HAS_DEFINITION_CURIE)
def label(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]:
return self._single_valued_assignment(curie, LABEL_PREDICATE)
def _owl_type_map(self) -> Dict[CURIE, Set[CURIE]]:
if self._owl_type_map_cache is None:
type_map: Dict[CURIE, Set[CURIE]] = defaultdict(set)
for axiom in self._ontology.get_axioms():
component = axiom.component
for declaration_type, owl_type in DECLARATION_TO_OWL_TYPE.items():
if isinstance(component, declaration_type):
curie = self._named_curie(component.first)
if curie is not None:
type_map[curie].add(owl_type)
break
if isinstance(component, TransitiveObjectProperty):
curie = self._named_curie(component.first)
if curie is not None:
type_map[curie].add(OWL_OBJECT_PROPERTY)
type_map[curie].add(OWL_TRANSITIVE_PROPERTY)
self._owl_type_map_cache = {
curie: set(types) for curie, types in type_map.items()
}
return self._owl_type_map_cache
def entities(self, filter_obsoletes=True, owl_type=None) -> Iterable[CURIE]:
seen = set()
obsolete_set = set(self.obsoletes()) if filter_obsoletes else set()
type_map = self._owl_type_map()
for axiom in self._ontology.get_axioms():
component = axiom.component
if not isinstance(component, DECLARATION_TYPES):
continue
curie = self._named_curie(component.first)
if curie is None or curie in seen:
continue
seen.add(curie)
if curie in obsolete_set:
continue
if owl_type is None or owl_type in type_map.get(curie, set()):
yield curie
def owl_types(self, entities: Iterable[CURIE]) -> Iterable[tuple[CURIE, CURIE]]:
type_map = self._owl_type_map()
for curie in entities:
for owl_type in sorted(type_map.get(curie, set())):
yield curie, owl_type
def obsoletes(self, include_merged=True) -> Iterable[CURIE]:
for curie in self.entities(filter_obsoletes=False):
if self._is_truthy(self.entity_metadata_map(curie).get(DEPRECATED_PREDICATE, [])):
yield curie
def entity_metadata_map(self, curie: CURIE) -> Dict[PRED_CURIE, List[str]]:
if curie not in self._metadata_map_cache:
metadata_map: Dict[PRED_CURIE, List[str]] = defaultdict(list)
for axiom in self.annotation_assertion_axioms(subject=curie):
predicate = self._named_curie(axiom.ann.ap)
value = self._annotation_value(axiom.ann.av)
if predicate is None or value is None:
continue
metadata_map[predicate].append(value)
self._metadata_map_cache[curie] = dict(metadata_map)
return self._metadata_map_cache[curie]
def entity_alias_map(self, curie: CURIE) -> Dict[PRED_CURIE, List[str]]:
alias_map: Dict[PRED_CURIE, List[str]] = defaultdict(list)
label = self.label(curie)
if label is not None:
alias_map[LABEL_PREDICATE].append(label)
metadata = self.entity_metadata_map(curie)
for predicate in SYNONYM_PREDICATES:
alias_map[predicate].extend(metadata.get(predicate, []))
return dict(alias_map)
def terms_subsets(self, curies: Iterable[CURIE]) -> Iterable[tuple[CURIE, CURIE]]:
for curie in curies:
for subset in self.entity_metadata_map(curie).get(IN_SUBSET, []):
yield curie, cast(CURIE, subset)
def synonym_property_values(
self, subject: CURIE | Iterable[CURIE]
) -> Iterator[tuple[CURIE, obograph.SynonymPropertyValue]]:
subjects = [subject] if isinstance(subject, str) else list(subject)
for curie in subjects:
alias_map = self.entity_alias_map(curie)
for predicate in SYNONYM_PREDICATES:
pred_text = predicate.split(":")[-1]
for value in alias_map.get(predicate, []):
yield curie, obograph.SynonymPropertyValue(pred=pred_text, val=value)
def node(
self, curie: CURIE, strict=False, include_metadata=False, expand_curies=False
) -> Optional[obograph.Node]:
entity_types = set(self.owl_type(curie))
label = self.label(curie)
if not entity_types and label is None:
if strict:
raise ValueError(f"Unknown entity: {curie}")
return None
if any(
owl_type in entity_types
for owl_type in [OWL_OBJECT_PROPERTY, OWL_ANNOTATION_PROPERTY, OWL_DATATYPE_PROPERTY]
):
node_type = "PROPERTY"
elif OWL_NAMED_INDIVIDUAL in entity_types:
node_type = "INDIVIDUAL"
else:
node_type = "CLASS"
node_id = cast(CURIE, self.curie_to_uri(curie)) if expand_curies else curie
meta = None
if include_metadata:
meta = obograph.Meta()
metadata_map = self.entity_metadata_map(curie)
definition = self.definition(curie)
if definition:
meta.definition = obograph.DefinitionPropertyValue(val=definition)
for xref in metadata_map.get(HAS_DBXREF, []):
cast(List[obograph.XrefPropertyValue], meta.xrefs).append(
obograph.XrefPropertyValue(val=xref)
)
for comment in metadata_map.get("rdfs:comment", []):
cast(List[str], meta.comments).append(comment)
for subset in metadata_map.get(IN_SUBSET, []):
cast(List[str], meta.subsets).append(subset)
if self._is_truthy(metadata_map.get(DEPRECATED_PREDICATE, [])):
meta.deprecated = True
for _, synonym in self.synonym_property_values([curie]):
cast(List[obograph.SynonymPropertyValue], meta.synonyms).append(synonym)
return obograph.Node(id=node_id, lbl=label, type=node_type, meta=meta)
def logical_definitions(
self,
subjects: Optional[Iterable[CURIE]] = None,
predicates: Optional[Iterable[PRED_CURIE]] = None,
objects: Optional[Iterable[CURIE]] = None,
**kwargs,
) -> Iterable[obograph.LogicalDefinitionAxiom]:
subject_set = set(subjects) if subjects is not None else None
predicate_list = list(predicates) if predicates is not None else None
object_list = list(objects) if objects is not None else None
for axiom in self.axioms():
if not isinstance(axiom, EquivalentClasses):
continue
expressions = list(axiom.first)
if len(expressions) != 2:
continue
defined_class = None
intersection = None
for expression in expressions:
curie = self._named_curie(expression)
if curie is not None:
defined_class = curie
elif isinstance(expression, ObjectIntersectionOf):
intersection = expression
if defined_class is None or intersection is None:
continue
if subject_set is not None and defined_class not in subject_set:
continue
ldef = obograph.LogicalDefinitionAxiom(definedClassId=defined_class)
valid = True
for expression in intersection.first:
genus = self._named_curie(expression)
if genus is not None:
cast(List[CURIE], ldef.genusIds).append(genus)
continue
if isinstance(expression, ObjectSomeValuesFrom):
predicate = self._named_curie(expression.ope)
filler = self._named_curie(expression.bce)
if predicate is None or filler is None:
valid = False
break
cast(
List[obograph.ExistentialRestrictionExpression],
ldef.restrictions,
).append(
obograph.ExistentialRestrictionExpression(
propertyId=predicate,
fillerId=filler,
)
)
continue
valid = False
break
if valid and logical_definition_matches(
ldef, predicates=predicate_list, objects=object_list
):
yield ldef
def axioms(self, reasoner: Optional[ReasonerConfiguration] = None) -> Iterable[Component]:
for axiom in self._ontology.get_axioms():
yield axiom.component
def _project_axiom_relationships(self, axiom: Component) -> Iterator[ProjectedRelationship]:
if isinstance(axiom, SubClassOf):
subject = self._named_curie(axiom.sub)
if subject is None:
return
object_curie = self._named_curie(axiom.sup)
if object_curie is not None:
yield ProjectedRelationship(subject, IS_A, object_curie, "tbox")
return
if isinstance(axiom.sup, ObjectSomeValuesFrom):
predicate = self._named_curie(axiom.sup.ope)
object_curie = self._named_curie(axiom.sup.bce)
if predicate is not None and object_curie is not None:
yield ProjectedRelationship(subject, predicate, object_curie, "tbox")
return
if isinstance(axiom.sup, ObjectHasValue):
predicate = self._named_curie(axiom.sup.ope)
object_curie = self._named_curie(axiom.sup.i)
if predicate is not None and object_curie is not None:
yield ProjectedRelationship(subject, predicate, object_curie, "abox")
return
if isinstance(axiom, EquivalentClasses):
expressions = [self._named_curie(expression) for expression in axiom.first]
if len(expressions) == 2 and all(expression is not None for expression in expressions):
left = cast(CURIE, expressions[0])
right = cast(CURIE, expressions[1])
yield ProjectedRelationship(left, EQUIVALENT_CLASS, right, "tbox")
yield ProjectedRelationship(right, EQUIVALENT_CLASS, left, "tbox")
return
if isinstance(axiom, SubObjectPropertyOf):
subject = self._named_curie(axiom.sub)
object_curie = self._named_curie(axiom.sup)
if subject is not None and object_curie is not None:
yield ProjectedRelationship(subject, SUBPROPERTY_OF, object_curie, "tbox")
return
if isinstance(axiom, ObjectPropertyDomain):
subject = self._named_curie(axiom.ope)
object_curie = self._named_curie(axiom.ce)
if subject is not None and object_curie is not None:
yield ProjectedRelationship(subject, RDFS_DOMAIN, object_curie, "tbox")
return
if isinstance(axiom, ObjectPropertyRange):
subject = self._named_curie(axiom.ope)
object_curie = self._named_curie(axiom.ce)
if subject is not None and object_curie is not None:
yield ProjectedRelationship(subject, RDFS_RANGE, object_curie, "tbox")
return
if isinstance(axiom, InverseObjectProperties):
first = self._named_curie(axiom.first)
second = self._named_curie(axiom.second)
if first is not None and second is not None:
yield ProjectedRelationship(first, INVERSE_OF, second, "tbox")
yield ProjectedRelationship(second, INVERSE_OF, first, "tbox")
return
if isinstance(axiom, ClassAssertion):
subject = self._named_curie(axiom.i)
if subject is None:
return
object_curie = self._named_curie(axiom.ce)
if object_curie is not None:
yield ProjectedRelationship(subject, RDF_TYPE, object_curie, "abox")
return
if isinstance(axiom.ce, ObjectHasValue):
predicate = self._named_curie(axiom.ce.ope)
object_curie = self._named_curie(axiom.ce.i)
if predicate is not None and object_curie is not None:
yield ProjectedRelationship(subject, predicate, object_curie, "abox")
return
if isinstance(axiom, ObjectPropertyAssertion):
subject = self._named_curie(axiom.source)
predicate = self._named_curie(axiom.ope)
object_curie = self._named_curie(axiom.target)
if subject is not None and predicate is not None and object_curie is not None:
yield ProjectedRelationship(subject, predicate, object_curie, "abox")
def _direct_relationships(self) -> List[ProjectedRelationship]:
if self._direct_relationship_cache is None:
seen = set()
relationships = []
for axiom in self.axioms():
for relationship in self._project_axiom_relationships(axiom):
if relationship in seen:
continue
seen.add(relationship)
relationships.append(relationship)
self._direct_relationship_cache = relationships
return self._direct_relationship_cache
@staticmethod
def _transitive_targets(
source: CURIE, adjacency_map: Mapping[CURIE, Set[CURIE]]
) -> Set[CURIE]:
stack = list(adjacency_map.get(source, set()))
targets = set()
while stack:
target = stack.pop()
if target in targets:
continue
targets.add(target)
stack.extend(adjacency_map.get(target, set()).difference(targets))
return targets
def _entailed_relationships(self) -> List[ProjectedRelationship]:
if self._entailed_relationship_cache is None:
direct_relationships = self._direct_relationships()
entailed = set(direct_relationships)
class_parents: Dict[CURIE, Set[CURIE]] = defaultdict(set)
property_parents: Dict[CURIE, Set[CURIE]] = defaultdict(set)
inverse_properties: Dict[CURIE, Set[CURIE]] = defaultdict(set)
for relationship in direct_relationships:
if relationship.predicate == IS_A:
class_parents[relationship.subject].add(relationship.object)
elif relationship.predicate == SUBPROPERTY_OF:
property_parents[relationship.subject].add(relationship.object)
elif relationship.predicate == INVERSE_OF:
inverse_properties[relationship.subject].add(relationship.object)
inverse_properties[relationship.object].add(relationship.subject)
class_ancestors = {
subject: self._transitive_targets(subject, class_parents)
for subject in class_parents
}
property_ancestors = {
subject: self._transitive_targets(subject, property_parents)
for subject in property_parents
}
descendants: Dict[CURIE, Set[CURIE]] = defaultdict(set)
for subject, ancestors in class_ancestors.items():
for ancestor in ancestors:
descendants[ancestor].add(subject)
transitive_properties = set(self.transitive_object_properties())
def add_relationship(relationship: ProjectedRelationship) -> bool:
if relationship.subject == relationship.object and relationship.predicate in {
IS_A,
SUBPROPERTY_OF,
}:
return False
if relationship in entailed:
return False
entailed.add(relationship)
return True
changed = True
while changed:
changed = False
current_relationships = list(entailed)
relationships_by_predicate: Dict[PRED_CURIE, List[ProjectedRelationship]] = (
defaultdict(list)
)
for relationship in current_relationships:
relationships_by_predicate[relationship.predicate].append(relationship)
for relationship in current_relationships:
if relationship.predicate == IS_A:
for ancestor in class_ancestors.get(relationship.object, set()):
changed |= add_relationship(
ProjectedRelationship(
relationship.subject,
IS_A,
ancestor,
"tbox",
)
)
elif relationship.predicate == SUBPROPERTY_OF:
for ancestor in property_ancestors.get(relationship.object, set()):
changed |= add_relationship(
ProjectedRelationship(
relationship.subject,
SUBPROPERTY_OF,
ancestor,
"tbox",
)
)
elif relationship.predicate == RDF_TYPE:
for ancestor in class_ancestors.get(relationship.object, set()):
changed |= add_relationship(
ProjectedRelationship(
relationship.subject,
RDF_TYPE,
ancestor,
relationship.scope,
)
)
if relationship.predicate not in RELATIONSHIP_EXCLUDED_FROM_INHERITANCE:
for descendant in descendants.get(relationship.subject, set()):
changed |= add_relationship(
ProjectedRelationship(
descendant,
relationship.predicate,
relationship.object,
relationship.scope,
)
)
for ancestor in property_ancestors.get(relationship.predicate, set()):
changed |= add_relationship(
ProjectedRelationship(
relationship.subject,
ancestor,
relationship.object,
relationship.scope,
)
)
for inverse_predicate in inverse_properties.get(relationship.predicate, set()):
changed |= add_relationship(
ProjectedRelationship(
relationship.object,
inverse_predicate,
relationship.subject,
relationship.scope,
)
)
for predicate in transitive_properties:
adjacency_map: Dict[CURIE, List[tuple[CURIE, str]]] = defaultdict(list)
for relationship in relationships_by_predicate.get(predicate, []):
adjacency_map[relationship.subject].append(
(relationship.object, relationship.scope)
)
for subject, targets in adjacency_map.items():
stack = list(targets)
seen = {target for target, _ in targets}
while stack:
intermediate, scope = stack.pop()
for target, target_scope in adjacency_map.get(intermediate, []):
if target == subject:
continue
merged_scope = self._merge_scopes(scope, target_scope)
changed |= add_relationship(
ProjectedRelationship(
subject,
predicate,
target,
merged_scope,
)
)
if target not in seen:
seen.add(target)
stack.append((target, merged_scope))
self._entailed_relationship_cache = sorted(
entailed,
key=lambda relationship: (
relationship.subject,
relationship.predicate,
relationship.object,
relationship.scope,
),
)
return self._entailed_relationship_cache
def _filter_relationships(
self,
relationships: Iterable[ProjectedRelationship],
subjects: Optional[Iterable[CURIE]] = None,
predicates: Optional[Iterable[PRED_CURIE]] = None,
objects: Optional[Iterable[CURIE]] = None,
include_tbox: bool = True,
include_abox: bool = True,
) -> Iterator[RELATIONSHIP]:
subject_set = set(subjects) if subjects is not None else None
predicate_set = set(predicates) if predicates is not None else None
object_set = set(objects) if objects is not None else None
for relationship in relationships:
if relationship.scope == "tbox" and not include_tbox:
continue
if relationship.scope == "abox" and not include_abox:
continue
if subject_set is not None and relationship.subject not in subject_set:
continue
if predicate_set is not None and relationship.predicate not in predicate_set:
continue
if object_set is not None and relationship.object not in object_set:
continue
yield relationship.as_tuple()
def relationships(
self,
subjects: Optional[Iterable[CURIE]] = None,
predicates: Optional[Iterable[PRED_CURIE]] = None,
objects: Optional[Iterable[CURIE]] = None,
include_tbox: bool = True,
include_abox: bool = True,
include_entailed: bool = False,
exclude_blank: bool = True,
invert: bool = False,
) -> Iterator[RELATIONSHIP]:
del exclude_blank
if invert:
for subject, predicate, object_curie in self.relationships(
subjects=objects,
predicates=predicates,
objects=subjects,
include_tbox=include_tbox,
include_abox=include_abox,
include_entailed=include_entailed,
):
yield object_curie, predicate, subject
return
projected = (
self._entailed_relationships() if include_entailed else self._direct_relationships()
)
yield from self._filter_relationships(
projected,
subjects=subjects,
predicates=predicates,
objects=objects,
include_tbox=include_tbox,
include_abox=include_abox,
)
def _all_relationships(self) -> Iterator[RELATIONSHIP]:
for relationship in self._direct_relationships():
yield relationship.as_tuple()
def _all_entailed_relationships(self):
for relationship in self._entailed_relationships():
yield relationship.as_tuple()
def _add_axiom(self, axiom: Component) -> None:
if isinstance(axiom, AnnotatedComponent):
self._ontology.add_axiom(axiom.component, set(axiom.ann))
else:
self._ontology.add_axiom(axiom)
def set_axioms(self, axioms: List[Component]) -> None:
for axiom in list(self._ontology.get_axioms()):
self._ontology.remove_axiom(axiom.component)
for axiom in axioms:
self._add_axiom(axiom)
self._invalidate_caches()
def dump(self, path: Optional[str] = None, syntax: Optional[str] = None, **kwargs):
syntax = syntax or "ofn"
if syntax == "ofn":
out = self._ontology.save_to_string("ofn")
elif syntax in {"ttl", "turtle"}:
rdfxml = self._ontology.save_to_string("owl")
g = rdflib.Graph()
g.parse(data=rdfxml, format="xml")
out = g.serialize(format="ttl")
elif syntax in {"owl", "owx"}:
out = self._ontology.save_to_string(syntax)
else:
out = self._ontology.save_to_string(syntax)
if path is None:
print(out)
elif isinstance(path, (str, Path)):
Path(path).write_text(str(out), encoding="utf-8")
else:
path.write(str(out))
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Implements: PatcherInterface
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
def _set_annotation_predicate_value(self, subject: CURIE, property: CURIE, value: Any):
for axiom in list(self.annotation_assertion_axioms(subject, property)):
self._ontology.remove_axiom(axiom)
self._ontology.add_axiom(
AnnotationAssertion(
self.curie_to_entity_iri(subject),
Annotation(
self.curie_to_annotation_property(property),
self._coerce_annotation_value(value),
),
)
)
self._invalidate_caches()
def apply_patch(
self,
patch: kgcl.Change,
activity: kgcl.Activity = None,
metadata: Optional[Mapping[PRED_CURIE, Any]] = None,
configuration: kgcl.Configuration = None,
strict=False,
) -> Optional[kgcl.Change]:
if isinstance(patch, kgcl.NodeChange):
about = patch.about_node
if isinstance(patch, kgcl.NodeRename):
self._set_annotation_predicate_value(about, LABEL_PREDICATE, patch.new_value)
elif isinstance(patch, kgcl.NodeTextDefinitionChange):
self._set_annotation_predicate_value(about, HAS_DEFINITION_CURIE, patch.new_value)
elif isinstance(patch, kgcl.NewSynonym):
self._ontology.add_axiom(
AnnotationAssertion(
self.curie_to_entity_iri(about),
Annotation(
self.curie_to_annotation_property(HAS_EXACT_SYNONYM),
self._coerce_annotation_value(patch.new_value),
),
)
)
self._invalidate_caches()
elif isinstance(patch, kgcl.NodeObsoletion):
self._set_annotation_predicate_value(about, DEPRECATED_PREDICATE, value=True)
elif isinstance(patch, kgcl.NodeDeletion):
raise NotImplementedError("Deletions not supported yet")
elif isinstance(patch, kgcl.NodeCreation):
self._set_annotation_predicate_value(about, LABEL_PREDICATE, patch.name)
elif isinstance(patch, kgcl.NameBecomesSynonym):
label = self.label(about)
self.apply_patch(
kgcl.NodeRename(id=f"{patch.id}-1", about_node=about, new_value=patch.new_value)
)
self.apply_patch(
kgcl.NewSynonym(id=f"{patch.id}-2", about_node=about, new_value=label)
)
else:
raise NotImplementedError(f"Cannot handle patches of type {type(patch)}")
elif isinstance(patch, kgcl.EdgeChange):
subject = self.curie_to_class(patch.subject)
object = self.curie_to_class(patch.object)
if isinstance(patch, kgcl.EdgeCreation):
if patch.predicate == IS_A or patch.predicate == "is_a":
self._ontology.add_axiom(SubClassOf(subject, object))
else:
predicate = self.curie_to_object_property(patch.predicate)
self._ontology.add_axiom(
SubClassOf(subject, ObjectSomeValuesFrom(predicate, object))
)
self._invalidate_caches()
else:
raise NotImplementedError(f"Cannot handle patches of type {type(patch)}")
else:
raise NotImplementedError(f"Cannot handle patches of type {type(patch)}")
return patch