[docs]classSemanticSimilarityInterface(BasicOntologyInterface,ABC):""" An interface for calculating similarity measures between pairs of terms or collections of terms """cached_information_content_map:Dict[CURIE,float]=None"""Mapping from term to information content"""
[docs]defmost_recent_common_ancestors(self,subject:CURIE,object:CURIE,predicates:List[PRED_CURIE]=None,include_owl_thing:bool=True,)->Iterable[CURIE]:""" Most recent common ancestors (MRCAs) for a pair of entities The MRCAs are the set of Common Ancestors (CAs) that are not themselves proper ancestors of another CA :param subject: :param object: :param predicates: :param include_owl_thing: :return: """ifisinstance(self,OboGraphInterface):s_ancs=set(self.ancestors([subject],predicates))o_ancs=set(self.ancestors([object],predicates))common=s_ancs.intersection(o_ancs)ancs_of_common=[]forcaincommon:forcaainself.ancestors(ca,predicates):ifcaa!=ca:ancs_of_common.append(caa)n=0foraincommon:ifanotinancs_of_common:yieldan+=1ifn==0:yieldOWL_THINGelse:raiseNotImplementedError
[docs]defsetwise_most_recent_common_ancestors(self,subjects:List[CURIE],predicates:List[PRED_CURIE]=None,include_owl_thing:bool=True,)->Iterable[CURIE]:""" Most recent common ancestors (MRCAs) for a set of entities The MRCAs are the set of Common Ancestors (CAs) that are not themselves proper ancestors of another CA :param subjects: :param predicates: :param include_owl_thing: :return: """ifnotisinstance(self,OboGraphInterface):raiseNotImplementedErrorancs=[]forsinsubjects:ancs.append(set(self.ancestors([s],predicates)))common=set.intersection(*ancs)ancs_of_common=[]forcaincommon:forcaainself.ancestors(ca,predicates):ifcaa!=ca:ancs_of_common.append(caa)n=0foraincommon:ifanotinancs_of_common:yieldan+=1ifn==0andinclude_owl_thing:yieldOWL_THING
[docs]defmultiset_most_recent_common_ancestors(self,subjects:List[CURIE],predicates:List[PRED_CURIE]=None,asymmetric=True)->Iterable[Tuple[CURIE,CURIE,CURIE]]:""" All pairwise common ancestors for all pairs in a set of terms :param subjects: :param predicates: :param asymmetric: :return: """graph_adapter=selfifnotisinstance(graph_adapter,OboGraphInterface):raiseNotImplementedErrorog=graph_adapter.ancestor_graph(subjects,predicates)dg=as_digraph(og)pairs=[]subjects=[sforsinsubjectsifsindg]forsinsubjects:foroinsubjects:ifasymmetricands>=o:continuepairs.append((s,o))fors,oinpairs:formrcainself.most_recent_common_ancestors(s,o,predicates=predicates):yields,o,mrca
[docs]defcommon_ancestors(self,subject:CURIE,object:CURIE,predicates:List[PRED_CURIE]=None,subject_ancestors:List[CURIE]=None,object_ancestors:List[CURIE]=None,include_owl_thing:bool=True,)->Iterable[CURIE]:""" Common ancestors of a subject-object pair :param subject: :param object: :param predicates: :param subject_ancestors: optional pre-generated ancestor list :param object_ancestors: optional pre-generated ancestor list :param include_owl_thing: :return: """ifsubject_ancestorsisnotNoneandobject_ancestorsisnotNone:subject_ancestors=set(subject_ancestors)object_ancestors=set(object_ancestors)elifisinstance(self,OboGraphInterface):subject_ancestors=set(self.ancestors(subject,predicates))object_ancestors=set(self.ancestors(object,predicates))else:raiseNotImplementedErrorifinclude_owl_thing:subject_ancestors.add(OWL_THING)object_ancestors.add(OWL_THING)forainsubject_ancestors.intersection(object_ancestors):yielda
[docs]defload_information_content_scores(self,source:str)->None:""" Load term information content values from file :param source: :return: """self.cached_information_content_map=load_information_content_map(source)
[docs]defset_information_content_scores(self,scores:Iterable[Tuple[CURIE,float]])->None:""" Load term information content values from file :param source: :return: """self.cached_information_content_map=dict(scores)
[docs]defget_information_content(self,curie:CURIE,predicates:List[PRED_CURIE]=None)->Optional[float]:""" Returns the information content of a term. IC(t) = -log2(Pr(t)) :param curie: :param predicates: :return: """pairs=list(self.information_content_scores([curie],object_closure_predicates=predicates))ifpairs:iflen(pairs)>1:raiseValueError(f"Multiple values for IC for {curie} = {pairs}")returnpairs[0][1]
[docs]definformation_content_scores(self,curies:Optional[Iterable[CURIE]]=None,predicates:List[PRED_CURIE]=None,object_closure_predicates:List[PRED_CURIE]=None,use_associations:bool=None,term_to_entities_map:Dict[CURIE,List[CURIE]]=None,**kwargs,)->Iterator[Tuple[CURIE,float]]:""" Yields entity-score pairs for a given collection of entities. The Information Content (IC) score for a term t is determined by: IC(t) = -log2(Pr(t)) Where the probability Pr(t) is determined by the frequency of that term against the whole corpus: Pr(t) = freq(t)/|items| :param curies: :param predicates: :param object_closure_predicates: :param use_associations: :param term_to_entities_map: :param kwargs: :return: """ifcuriesisNone:forcurie_itinchunk(self.entities()):yield fromself.information_content_scores(curie_it,predicates=predicates,object_closure_predicates=object_closure_predicates,use_associations=use_associations,term_to_entities_map=term_to_entities_map,**kwargs,)returncuries=list(curies)ifself.cached_information_content_mapisNoneanduse_associations:logging.info("Calculating and caching IC map from associations")fromoaklib.interfaces.association_provider_interfaceimport(AssociationProviderInterface,)ifnotisinstance(self,AssociationProviderInterface):raiseValueError(f"unable to retrieve associations from this interface, type {type(self)}")self.cached_information_content_map={}all_entities=set()forainself.associations():all_entities.add(a.subject)num_entities=len(all_entities)logging.info(f"num_entities={num_entities}")forterm,countinself.association_subject_counts(predicates=predicates,object_closure_predicates=object_closure_predicates):ifcount>num_entities:raiseAssertionError(f"Count {count} > num_entities {num_entities}")self.cached_information_content_map[term]=-math.log(count/num_entities)/math.log(2)ifcuries:forcurieincuries:ifcurienotinself.cached_information_content_map:self.cached_information_content_map[curie]=0.0ifself.cached_information_content_mapisnotNone:logging.debug("Using cached IC map")forcurieincuries:ifcurieinself.cached_information_content_map:yieldcurie,self.cached_information_content_map[curie]returnlogging.info("Calculating and caching IC map from ontology")all_entities=list(self.entities())num_entities=len(all_entities)ifnotisinstance(self,OboGraphInterface):raiseNotImplementedErroryielded_owl_thing=Falseforcurieincuries:descendants=list(self.descendants([curie],object_closure_predicates))yieldcurie,-math.log(len(descendants)/num_entities)ifcurie==OWL_THING:yielded_owl_thing=True# inject owl:Thing, which always has zero informationif(OWL_THINGincuriesornotcuries)andnotyielded_owl_thing:yieldOWL_THING,0.0
[docs]defpairwise_similarity(self,subject:CURIE,object:CURIE,predicates:List[PRED_CURIE]=None,subject_ancestors:List[CURIE]=None,object_ancestors:List[CURIE]=None,min_jaccard_similarity:Optional[float]=None,min_ancestor_information_content:Optional[float]=None,)->Optional[TermPairwiseSimilarity]:""" Pairwise similarity between a pair of ontology terms :param subject: :param object: :param predicates: :param subject_ancestors: optional pre-generated ancestor list :param object_ancestors: optional pre-generated ancestor list :param min_jaccard_similarity: minimum Jaccard similarity for a pair to be considered :param min_ancestor_information_content: minimum IC for a common ancestor to be considered :return: """logging.debug(f"Calculating pairwise similarity for {subject} x {object} over {predicates}")ifsubject_ancestorsisNoneandisinstance(self,OboGraphInterface):subject_ancestors=list(self.ancestors(subject,predicates=predicates))ifobject_ancestorsisNoneandisinstance(self,OboGraphInterface):object_ancestors=list(self.ancestors(object,predicates=predicates))ifsubject_ancestorsisnotNoneandobject_ancestorsisnotNone:jaccard_similarity=setwise_jaccard_similarity(subject_ancestors,object_ancestors)ifmin_jaccard_similarityisnotNoneandjaccard_similarity<min_jaccard_similarity:returnNonecas=list(self.common_ancestors(subject,object,predicates,subject_ancestors=subject_ancestors,object_ancestors=object_ancestors,))ifOWL_THINGincas:cas.remove(OWL_THING)logging.debug(f"Retrieving IC for {len(cas)} common ancestors")ics={a:icfora,icinself.information_content_scores(cas,object_closure_predicates=predicates)}iflen(ics)>0:max_ic=max(ics.values())best_mrcas=[aforainics.keys()ifmath.isclose(ics[a],max_ic,rel_tol=0.001)]anc=best_mrcas[0]else:max_ic=0.0anc=Noneifmin_ancestor_information_contentisnotNone:ifmax_ic<min_ancestor_information_content:returnNonelogging.debug(f"MRCA = {anc} with {max_ic}")sim=TermPairwiseSimilarity(subject_id=subject,object_id=object,ancestor_id=anc,subject_information_content=ics.get(subject,self.get_information_content(subject)),object_information_content=ics.get(object,self.get_information_content(object)),ancestor_information_content=max_ic,jaccard_similarity=jaccard_similarity,)sim.ancestor_information_content=max_icifsim.ancestor_information_contentandsim.jaccard_similarity:sim.phenodigm_score=math.sqrt(sim.jaccard_similarity*sim.ancestor_information_content)returnsim
[docs]defall_by_all_pairwise_similarity(self,subjects:Iterable[CURIE],objects:Iterable[CURIE],predicates:List[PRED_CURIE]=None,min_jaccard_similarity:Optional[float]=None,min_ancestor_information_content:Optional[float]=None,)->Iterator[TermPairwiseSimilarity]:""" Compute similarity for all combinations of terms in subsets vs all terms in objects :param subjects: :param objects: :param predicates: :return: """objects=list(objects)forsinsubjects:logging.info(f"Computing pairwise similarity for {s} x {len(objects)} objects")foroinobjects:val=self.pairwise_similarity(s,o,predicates=predicates,min_jaccard_similarity=min_jaccard_similarity,min_ancestor_information_content=min_ancestor_information_content,)ifval:yieldval