"""Classes representing hierarchy in OWL."""
import operator
from abc import ABCMeta, abstractmethod
from functools import reduce
from typing import Dict, Iterable, Tuple, overload, TypeVar, Generic, Type, cast, Optional, FrozenSet, Set
from owlapy.class_expression import OWLClass, OWLThing, OWLNothing
from owlapy.meta_classes import HasIRI
from owlapy.owl_literal import OWLTopObjectProperty, OWLBottomObjectProperty, OWLTopDataProperty, OWLBottomDataProperty
from owlapy.owl_property import OWLObjectProperty, OWLDataProperty
from owlapy.owl_reasoner import OWLReasoner
_S = TypeVar('_S', bound=HasIRI) #:
_U = TypeVar('_U', bound='AbstractHierarchy') #:
[docs]
class AbstractHierarchy(Generic[_S], metaclass=ABCMeta):
"""Representation of an abstract hierarchy which can be used for classes or properties.
Args:
hierarchy_down: A downwards hierarchy given as a mapping of Entities to sub-entities.
reasoner: Alternatively, a reasoner whose root_ontology is queried for entities.
"""
__slots__ = '_Type', '_ent_set', '_parents_map', '_parents_map_trans', '_children_map', '_children_map_trans', \
'_leaf_set', '_root_set', \
# '_eq_set'
_ent_set: FrozenSet[_S]
_parents_map: Dict[_S, Set[_S]] # Entity => parent entities
_parents_map_trans: Dict[_S, Set[_S]] # Entity => parent entities
_children_map: Dict[_S, Set[_S]] # Entity => child entities
_children_map_trans: Dict[_S, Set[_S]] # Entity => child entities
# _eq_set: Dict[_S, Set[_S]] # Entity => equivalent entities # TODO
_root_set: Set[_S] # root entities
_leaf_set: Set[_S] # leaf entities
@overload
def __init__(self, factory: Type[_S], hierarchy_down: Iterable[Tuple[_S, Iterable[_S]]]):
...
@overload
def __init__(self, factory: Type[_S], reasoner: OWLReasoner):
...
@abstractmethod
def __init__(self, factory: Type[_S], arg):
self._Type = factory
if isinstance(arg, OWLReasoner):
hier_down_gen = self._hierarchy_down_generator(arg)
self._init(hier_down_gen)
else:
self._init(arg)
@abstractmethod
def _hierarchy_down_generator(self, reasoner: OWLReasoner) -> Iterable[Tuple[_S, Iterable[_S]]]:
"""Generate the suitable downwards hierarchy based on the reasoner."""
pass
[docs]
@classmethod
@abstractmethod
def get_top_entity(cls) -> _S:
"""The most general entity in this hierarchy, which contains all the entities."""
pass
[docs]
@classmethod
@abstractmethod
def get_bottom_entity(cls) -> _S:
"""The most specific entity in this hierarchy, which contains none of the entities."""
pass
[docs]
@staticmethod
def restrict(hierarchy: _U, *, remove: Iterable[_S] = None, allow: Iterable[_S] = None) \
-> _U:
"""Restrict a given hierarchy to a set of allowed/removed entities.
Args:
hierarchy: An existing Entity hierarchy to restrict.
remove: Set of entities which should be ignored.
allow: Set of entities which should be used.
Returns:
The restricted hierarchy.
"""
remove_set = frozenset(remove) if remove is not None else None
allow_set = frozenset(allow) if allow is not None else None
def _filter(_: _S):
if remove_set is None or _ not in remove_set:
if allow_set is None or _ in allow_set:
return True
return False
_gen = ((_, filter(_filter, hierarchy.children(_, direct=False)))
for _ in filter(_filter, hierarchy.items()))
return type(hierarchy)(_gen)
[docs]
def restrict_and_copy(self: _U, *, remove: Iterable[_S] = None, allow: Iterable[_S] = None) \
-> _U:
"""Restrict this hierarchy.
See restrict for more info.
"""
return type(self).restrict(self, remove=remove, allow=allow)
def _init(self, hierarchy_down: Iterable[Tuple[_S, Iterable[_S]]]) -> None:
self._parents_map_trans = dict()
self._children_map_trans = dict()
# self._eq_set = dict()
ent_to_sub_entities = dict(hierarchy_down)
self._ent_set = frozenset(ent_to_sub_entities.keys())
for ent, sub_it in ent_to_sub_entities.items():
self._children_map_trans[ent] = set(sub_it)
self._parents_map_trans[ent] = set() # create empty parent entry for all classes
del ent_to_sub_entities # exhausted
# calculate transitive children
for ent in self._children_map_trans:
_children_transitive(self._children_map_trans, ent=ent, seen_set=set())
# TODO handling of eq_sets
# sccs = list(_strongly_connected_components(self._children_map_trans))
# for scc in sccs:
# sub_entities = 0
# for ent_enc in iter_bits(scc):
# self._eq_set[ent_enc] = scc
# sub_entities |= self._children_map_trans[ent_enc]
# del self._children_map_trans[ent_enc]
# del self._parents_map_trans[ent_enc]
# self._children_map_trans[scc] = sub_entities
# self._parents_map_trans[scc] = 0
# fill transitive parents
for ent, sub_entities in self._children_map_trans.items():
for sub in sub_entities:
self._parents_map_trans[sub] |= {ent}
self._children_map, self._leaf_set = _reduce_transitive(self._children_map_trans, self._parents_map_trans)
self._parents_map, self._root_set = _reduce_transitive(self._parents_map_trans, self._children_map_trans)
[docs]
def parents(self, entity: _S, direct: bool = True) -> Iterable[_S]:
"""Parents of an entity.
Args:
entity: Entity for which to query parent entities.
direct: False to return transitive parents.
Returns:
Super-entities.
"""
if entity == type(self).get_bottom_entity():
if not direct:
yield from self.items()
else:
yield from self.leaves()
elif entity == type(self).get_top_entity():
yield from {}
else:
if not direct:
yield from self._parents_map_trans[entity]
else:
yield from self._parents_map[entity]
[docs]
def is_parent_of(self, a: _S, b: _S) -> bool:
"""if A is a parent of B.
Note:
A is always a parent of A."""
if a == b:
return True
if a == type(self).get_top_entity():
return True
if {a} & self._parents_map_trans[b]:
return True
return False
[docs]
def is_child_of(self, a: _S, b: _S) -> bool:
"""If A is a child of B.
Note:
A is always a child of A."""
if a == b:
return True
if a == type(self).get_bottom_entity():
return True
if {a} & self._children_map_trans[b]:
return True
return False
[docs]
def children(self, entity: _S, direct: bool = True) -> Iterable[_S]:
"""Children of an entity.
Args:
entity: Entity for which to query child entities.
direct: False to return transitive children.
Returns:
Sub-entities.
"""
if entity == type(self).get_top_entity():
if not direct:
yield from self.items()
else:
yield from self.roots()
elif entity == type(self).get_bottom_entity():
yield from {}
else:
if not direct:
yield from self._children_map_trans[entity]
else:
yield from self._children_map[entity]
[docs]
def siblings(self, entity: _S) -> Iterable[_S]:
seen_set = {entity}
for parent in self.parents(entity, direct=True):
for sibling in self.children(parent, direct=True):
if sibling not in seen_set:
yield sibling
seen_set.add(sibling)
[docs]
def items(self) -> Iterable[_S]:
yield from self._ent_set
[docs]
def roots(self, of: Optional[_S] = None) -> Iterable[_S]:
if of is not None and of != type(self).get_bottom_entity():
yield from self._root_set & self._parents_map_trans[of]
else:
yield from self._root_set
[docs]
def leaves(self, of: Optional[_S] = None) -> Iterable[_S]:
if of is not None and of != type(self).get_top_entity():
yield from self._leaf_set & self._children_map_trans[of]
else:
yield from self._leaf_set
[docs]
def __contains__(self, item: _S) -> bool:
return item in self._ent_set
[docs]
def __len__(self):
return len(self._ent_set)
[docs]
class ClassHierarchy(AbstractHierarchy[OWLClass]):
"""Representation of a class hierarchy.
Args:
hierarchy_down: A downwards hierarchy given as a mapping of Class to sub-classes.
reasoner: Alternatively, a reasoner whose root_ontology is queried for classes and sub-classes.
"""
[docs]
@classmethod
def get_top_entity(cls) -> OWLClass:
return OWLThing
[docs]
@classmethod
def get_bottom_entity(cls) -> OWLClass:
return OWLNothing
def _hierarchy_down_generator(self, reasoner: OWLReasoner) -> Iterable[Tuple[OWLClass, Iterable[OWLClass]]]:
yield from ((_, reasoner.sub_classes(_, direct=True))
for _ in reasoner.get_root_ontology().classes_in_signature())
[docs]
def sub_classes(self, entity: OWLClass, direct: bool = True) -> Iterable[OWLClass]:
yield from self.children(entity, direct)
[docs]
def super_classes(self, entity: OWLClass, direct: bool = True) -> Iterable[OWLClass]:
yield from self.parents(entity, direct)
[docs]
def is_subclass_of(self, subclass: OWLClass, superclass: OWLClass) -> bool:
return self.is_child_of(subclass, superclass)
@overload
def __init__(self, hierarchy_down: Iterable[Tuple[OWLClass, Iterable[OWLClass]]]): ...
@overload
def __init__(self, reasoner: OWLReasoner): ...
def __init__(self, arg):
super().__init__(OWLClass, arg)
[docs]
class ObjectPropertyHierarchy(AbstractHierarchy[OWLObjectProperty]):
"""Representation of an objet property hierarchy."""
[docs]
@classmethod
def get_top_entity(cls) -> OWLObjectProperty:
return OWLTopObjectProperty
[docs]
@classmethod
def get_bottom_entity(cls) -> OWLObjectProperty:
return OWLBottomObjectProperty
def _hierarchy_down_generator(self, reasoner: OWLReasoner) \
-> Iterable[Tuple[OWLObjectProperty, Iterable[OWLObjectProperty]]]:
return ((_, map(lambda _: cast(OWLObjectProperty, _),
filter(lambda _: isinstance(_, OWLObjectProperty),
reasoner.sub_object_properties(_, direct=True))))
for _ in reasoner.get_root_ontology().object_properties_in_signature())
[docs]
def sub_object_properties(self, entity: OWLObjectProperty, direct: bool = True) -> Iterable[OWLObjectProperty]:
yield from self.children(entity, direct)
[docs]
def super_object_properties(self, entity: OWLObjectProperty, direct: bool = True) -> Iterable[OWLObjectProperty]:
yield from self.parents(entity, direct)
[docs]
def more_general_roles(self, role: OWLObjectProperty, direct: bool = True) -> Iterable[OWLObjectProperty]:
yield from self.parents(role, direct=direct)
[docs]
def more_special_roles(self, role: OWLObjectProperty, direct: bool = True) -> Iterable[OWLObjectProperty]:
yield from self.children(role, direct=direct)
[docs]
def is_sub_property_of(self, sub_property: OWLObjectProperty, super_property: OWLObjectProperty) -> bool:
return self.is_child_of(sub_property, super_property)
[docs]
def most_general_roles(self) -> Iterable[OWLObjectProperty]:
yield from self.roots()
[docs]
def most_special_roles(self) -> Iterable[OWLObjectProperty]:
yield from self.leaves()
@overload
def __init__(self, hierarchy_down: Iterable[Tuple[OWLObjectProperty, Iterable[OWLObjectProperty]]]): ...
@overload
def __init__(self, reasoner: OWLReasoner): ...
def __init__(self, arg):
super().__init__(OWLObjectProperty, arg)
[docs]
class DatatypePropertyHierarchy(AbstractHierarchy[OWLDataProperty]):
"""Representation of a data property hierarchy."""
[docs]
@classmethod
def get_top_entity(cls) -> OWLDataProperty:
return OWLTopDataProperty
[docs]
@classmethod
def get_bottom_entity(cls) -> OWLDataProperty:
return OWLBottomDataProperty
def _hierarchy_down_generator(self, reasoner: OWLReasoner) \
-> Iterable[Tuple[OWLDataProperty, Iterable[OWLDataProperty]]]:
return ((_, reasoner.sub_data_properties(_, direct=True))
for _ in reasoner.get_root_ontology().data_properties_in_signature())
[docs]
def sub_data_properties(self, entity: OWLDataProperty, direct: bool = True):
yield from self.children(entity, direct)
[docs]
def super_data_properties(self, entity: OWLDataProperty, direct: bool = True):
yield from self.parents(entity, direct)
[docs]
def more_general_roles(self, role: OWLDataProperty, direct: bool = True) -> Iterable[OWLDataProperty]:
yield from self.parents(role, direct=direct)
[docs]
def more_special_roles(self, role: OWLDataProperty, direct: bool = True) -> Iterable[OWLDataProperty]:
yield from self.children(role, direct=direct)
[docs]
def is_sub_property_of(self, sub_property: OWLDataProperty, super_property: OWLDataProperty) -> bool:
return self.is_child_of(sub_property, super_property)
[docs]
def most_general_roles(self) -> Iterable[OWLDataProperty]:
yield from self.roots()
[docs]
def most_special_roles(self) -> Iterable[OWLDataProperty]:
yield from self.leaves()
@overload
def __init__(self, hierarchy_down: Iterable[Tuple[OWLDataProperty, Iterable[OWLDataProperty]]]): ...
@overload
def __init__(self, reasoner: OWLReasoner): ...
def __init__(self, arg):
super().__init__(OWLDataProperty, arg)
def _children_transitive(hier_trans: Dict[_S, Set[_S]], ent: _S, seen_set: Set[_S]):
"""Add transitive links to map_trans.
Note:
Changes map_trans.
Args:
hier_trans: Map to which transitive links are added.
ent: Class in map_trans for which to add transitive sub-classes.
"""
sub_classes_ent = frozenset(hier_trans[ent])
for sub_ent in sub_classes_ent:
if not {sub_ent} & seen_set:
_children_transitive(hier_trans, sub_ent, seen_set | {ent})
seen_set = seen_set | {sub_ent} | hier_trans[sub_ent]
hier_trans[ent] |= hier_trans[sub_ent]
def _reduce_transitive(hier: Dict[_S, Set[_S]], hier_inverse: Dict[_S, Set[_S]]) \
-> Tuple[Dict[_S, Set[_S]], FrozenSet[_S]]:
"""Remove all transitive links.
Takes a downward hierarchy and an upward hierarchy with transitive links, and removes all links that can be
implicitly detected since they are transitive.
Args:
hier: downward hierarchy with all transitive links, from Class => sub-classes.
hier_inverse: upward hierarchy with all transitive links, from Class => super-classes.
Returns:
Thin map with only direct sub-classes.
Set of classes without sub-classes.
"""
result_hier: Dict[_S, Set[_S]] = dict()
leaf_set = set()
for ent, set_ent in hier.items():
direct_set = set()
for item_ent in set_ent:
if not hier_inverse[item_ent] & (set_ent ^ {item_ent}):
direct_set |= {item_ent}
result_hier[ent] = direct_set
if not direct_set:
leaf_set |= {ent}
return result_hier, frozenset(leaf_set)
def _strongly_connected_components(graph: Dict[_S, Set[_S]]) -> Iterable[FrozenSet[_S]]:
"""Strongly connected component algorithm.
Args:
graph: Directed Graph dictionary, vertex => set of vertices (there is an edge from v to each V).
Returns:
The strongly connected components.
Author: Mario Alviano
Source: https://github.com/alviano/python/blob/master/rewrite_aggregates/scc.py
Licence: GPL-3.0
"""
identified = set()
stack = []
index = {}
boundaries = []
def dfs(v):
nonlocal identified
index[v] = len(stack)
stack.append({v})
boundaries.append(index[v])
for w in graph[v]:
if w not in index:
yield from dfs(w)
elif not {w} & identified:
while index[w] < boundaries[-1]:
boundaries.pop()
if boundaries[-1] == index[v]:
boundaries.pop()
scc = reduce(operator.or_, stack[index[v]:])
del stack[index[v]:]
identified |= scc
yield frozenset(scc)
for v in graph:
if v not in index:
yield from dfs(v)