Source code for oaklib.converters.obo_graph_to_obo_format_converter
import logging
import sys
from dataclasses import dataclass
from io import BytesIO, StringIO
from typing import Any, Dict, List, Optional, Tuple, Union
import rdflib
from oaklib.converters.data_model_converter import DataModelConverter
from oaklib.datamodels.obograph import Edge, Graph, GraphDocument, Node
from oaklib.datamodels.vocabulary import IS_A, SYNONYM_PRED_TO_SCOPE_MAP
from oaklib.implementations.simpleobo.simple_obo_parser import (
TAG_DEFINITION,
TAG_INTERSECTION_OF,
TAG_INVERSE_OF,
TAG_IS_A,
TAG_NAME,
TAG_RELATIONSHIP,
TAG_SUBSET,
TAG_SYNONYM,
TAG_XREF,
OboDocument,
Stanza,
)
from oaklib.types import CURIE
from oaklib.utilities.oboformat_utils import subset_to_shorthand
from oaklib.utilities.obograph_utils import index_graph_edges_by_subject
TRIPLE = Tuple[rdflib.URIRef, rdflib.URIRef, Any]
DIRECT_PREDICATE_MAP = {
"is_a": TAG_IS_A,
IS_A: TAG_IS_A, # sometime obographs use the predicate rather than shorthand
"subPropertyOf": TAG_IS_A,
"inverseOf": TAG_INVERSE_OF,
}
typedef_type_map = {
"CLASS": "Term",
"PROPERTY": "Typedef",
}
def _escape(s: str) -> str:
return s.replace('"', '\\"').replace("\n", "\\n")
[docs]
@dataclass
class OboGraphToOboFormatConverter(DataModelConverter):
"""Converts from OboGraph to OBO Format."""
use_shorthand: bool = True
[docs]
def dump(self, source: GraphDocument, target: str = None, **kwargs) -> None:
"""
Dump an OBO Graph Document to a FHIR CodeSystem
:param source:
:param target:
:return:
"""
obodoc = self.convert(source)
if target is None:
obodoc.dump(sys.stdout)
else:
with open(target, "w", encoding="UTF-8") as f:
obodoc.dump(f)
[docs]
def dumps(
self,
source: Union[GraphDocument, Graph],
aux_graphs: Optional[List[Graph]] = None,
**kwargs,
) -> str:
"""
Dump an OBO Graph Document to a string
:param source:
:return:
"""
obodoc = self.convert(source, aux_graphs=aux_graphs)
io = StringIO()
obodoc.dump(io)
return io.getvalue()
[docs]
def as_bytes_io(self, source: GraphDocument, **kwargs) -> BytesIO:
"""
Dump an OBO Graph Document to a string
:param source:
:return:
"""
s = self.dumps(source)
return BytesIO(s.encode("UTF-8"))
[docs]
def convert(
self,
source: Union[GraphDocument, Graph],
target: OboDocument = None,
aux_graphs: Optional[List[Graph]] = None,
**kwargs,
) -> OboDocument:
"""
Convert an OBO Format Document.
:param source:
:param target: if None, one will be created
:param aux_graphs: additional graphs to use for label lookup
:return:
"""
if target is None:
target = OboDocument()
if isinstance(source, Graph):
source = GraphDocument(graphs=[source])
for g in source.graphs:
logging.info(f"Converting graph {g.id}, nodes: {len(g.nodes)}, edges: {len(g.edges)}")
self._convert_graph(g, target=target, aux_graphs=aux_graphs)
logging.info(f"Converted {len(target.stanzas)} stanzas")
return target
def _commentify(
self, curie: CURIE, graph: Graph, aux_graphs: Optional[List[Graph]] = None
) -> str:
graphs = [graph] + (aux_graphs or [])
for g in graphs:
for n in g.nodes:
if n.id == curie and n.lbl:
return f"{curie} ! {n.lbl}"
return curie
def _id(self, uri_or_curie: CURIE) -> CURIE:
if not self.curie_converter:
return uri_or_curie
return self.curie_converter.compress(uri_or_curie, passthrough=True)
def _predicate_id(self, uri_or_curie: CURIE, target: OboDocument) -> CURIE:
curie = self._id(uri_or_curie)
return target.curie_to_shorthand_map.get(curie, curie)
def _convert_graph(
self, source: Graph, target: OboDocument, aux_graphs: Optional[List[Graph]] = None
) -> OboDocument:
edges_by_subject = index_graph_edges_by_subject(source)
for n in source.nodes:
if n.type == "PROPERTY" and n.lbl:
shorthand = n.lbl.replace(" ", "_")
target.curie_to_shorthand_map[self._id(n.id)] = shorthand
for n in source.nodes:
logging.debug(f"Converting node {n.id}")
self._convert_node(
n, index=edges_by_subject, target=target, graph=source, aux_graphs=aux_graphs
)
for lda in source.logicalDefinitionAxioms:
defined_class_id = self._id(lda.definedClassId)
if defined_class_id not in target.stanzas:
target.add_stanza(Stanza(id=defined_class_id, type="Term"))
stanza = target.stanzas[defined_class_id]
for g in lda.genusIds:
obj = self._id(g)
obj = self._commentify(obj, source, aux_graphs)
stanza.add_tag_value(TAG_INTERSECTION_OF, obj)
for r in lda.restrictions:
filler = self._id(r.fillerId)
filler = self._commentify(filler, source, aux_graphs)
pred = self._id(r.propertyId)
stanza.add_tag_value_pair(TAG_INTERSECTION_OF, pred, filler)
return target
def _convert_node(
self,
source: Node,
index: Dict[CURIE, List[Edge]],
target: OboDocument,
graph: Graph = None,
aux_graphs: Optional[List[Graph]] = None,
) -> None:
id = self._id(source.id)
shorthand_xref = None
if id in target.curie_to_shorthand_map:
shorthand_xref = id
id = target.curie_to_shorthand_map[id]
logging.debug(f"Converting node {id} from {source}")
t = source.type
# if not t:
# logging.warning(f"No type for {id}")
# return
if id.startswith("oio:"):
return
typedef_type = typedef_type_map.get(t, None)
if not typedef_type:
return
stanza = Stanza(id=id, type=typedef_type)
target.add_stanza(stanza)
if source.lbl:
stanza.add_tag_value(TAG_NAME, source.lbl)
if source.meta:
self._convert_meta(source, target=stanza)
if shorthand_xref:
stanza.add_tag_value(TAG_XREF, shorthand_xref)
for e in index.get(source.id, []):
obj = self._id(e.obj)
obj_labeled = self._commentify(obj, graph, aux_graphs)
pred = self._predicate_id(e.pred, target)
if e.pred in DIRECT_PREDICATE_MAP:
stanza.add_tag_value(DIRECT_PREDICATE_MAP[e.pred], f"{obj_labeled}")
else:
stanza.add_tag_value(TAG_RELATIONSHIP, f"{pred} {obj_labeled}")
return
def _convert_meta(self, source: Node, target: Stanza):
meta = source.meta
logging.debug(f"ADDING DEF {target}")
if meta.definition:
xrefs = ", ".join(meta.definition.xrefs)
target.add_tag_value(TAG_DEFINITION, f'"{_escape(meta.definition.val)}" [{xrefs}]')
if meta.xrefs:
for x in meta.xrefs:
target.add_tag_value(TAG_XREF, x.val)
for x in meta.subsets:
target.add_tag_value(TAG_SUBSET, subset_to_shorthand(x))
for s in meta.synonyms:
xrefs = ", ".join(s.xrefs)
scope = SYNONYM_PRED_TO_SCOPE_MAP[f"oio:{s.pred}"]
target.add_tag_value(TAG_SYNONYM, f'"{_escape(s.val)}" {scope} [{xrefs}]')