Source code for ontolearn.value_splitter

"""Value splitters."""
from abc import ABCMeta, abstractmethod
from copy import deepcopy
from dataclasses import dataclass
from datetime import date, datetime
from functools import total_ordering
from itertools import chain

from owlapy.owl_individual import OWLNamedIndividual
from owlapy.owl_literal import OWLLiteral
from owlapy.owl_property import OWLDataProperty
from owlapy.owl_reasoner import OWLReasoner
from pandas import Timedelta
from scipy.stats import entropy
from sortedcontainers import SortedDict
from typing import Dict, List, Optional, Set, Tuple, Union

import math


Values = Union[OWLLiteral, int, float, bool, Timedelta, datetime, date]  #:


[docs] class AbstractValueSplitter(metaclass=ABCMeta): """Abstract base class for split calculation of data properties. """ __slots__ = 'max_nr_splits' max_nr_splits: int @abstractmethod def __init__(self, max_nr_splits: int): self.max_nr_splits = max_nr_splits
[docs] @abstractmethod def compute_splits_properties(self, reasoner: OWLReasoner, properties: List[OWLDataProperty]) \ -> Dict[OWLDataProperty, List[OWLLiteral]]: pass
def _combine_values(self, a: Values, b: Values) -> Values: if isinstance(a, int) and isinstance(b, int): return (a+b) // 2 elif isinstance(a, float) and isinstance(b, float): return round((a+b)/2, 3) else: return a
[docs] def reset(self): pass
[docs] class BinningValueSplitter(AbstractValueSplitter): """Calculate a number of bins of equal size as splits.""" __slots__ = () def __init__(self, max_nr_splits: int = 12): super().__init__(max_nr_splits)
[docs] def compute_splits_properties(self, reasoner: OWLReasoner, properties: List[OWLDataProperty]) \ -> Dict[OWLDataProperty, List[OWLLiteral]]: return {p: self._compute_splits(set(reasoner.all_data_property_values(p))) for p in properties}
def _compute_splits(self, dp_values: Set[OWLLiteral]) -> List[OWLLiteral]: values = sorted([val.to_python() for val in dp_values]) nr_splits = min(self.max_nr_splits, len(values) + 1) splits = set() if len(values) > 0: splits.add(values[0]) for i in range(1, nr_splits): index = max(math.floor(i * len(values) / nr_splits), math.floor(i * len(values) / (nr_splits - 1) - 1)) splits.add(self._combine_values(values[index], values[min(index + 1, len(values)-1)])) return sorted(list(map(OWLLiteral, splits)))
[docs] @total_ordering @dataclass class Split: pos: List[str] neg: List[str] entropy: float used_properties: Set[str]
[docs] def __eq__(self, other): if type(self) == type(other): return math.isclose(self.entropy, other.entropy) return NotImplemented
[docs] def __lt__(self, other): if type(self) == type(other): return self.entropy < other.entropy return NotImplemented
[docs] @dataclass class IndividualValues: pos_map: Dict[str, Values] neg_map: Dict[str, Values]
[docs] def get_pos_values(self) -> List[Values]: return list(self.pos_map.values())
[docs] def get_neg_values(self) -> List[Values]: return list(self.neg_map.values())
[docs] def get_overlapping_with_split(self, split: Split) -> 'IndividualValues': return IndividualValues({ind: v for ind, v in self.pos_map.items() if ind in split.pos}, {ind: v for ind, v in self.neg_map.items() if ind in split.neg})
[docs] class EntropyValueSplitter(AbstractValueSplitter): """Calculate the splits depending on the entropy of the resulting sets.""" __slots__ = '_prop_to_values' _prop_to_values: Dict[OWLDataProperty, IndividualValues] def __init__(self, max_nr_splits: int = 2): super().__init__(max_nr_splits) self._prop_to_values = {}
[docs] def compute_splits_properties(self, reasoner: OWLReasoner, properties: List[OWLDataProperty], pos: Set[OWLNamedIndividual] = None, neg: Set[OWLNamedIndividual] = None) \ -> Dict[OWLDataProperty, List[OWLLiteral]]: assert pos is not None assert neg is not None self.reset() properties = properties.copy() dp_splits: Dict[OWLDataProperty, List[OWLLiteral]] = {} for property_ in properties: dp_splits[property_] = [] self._prop_to_values[property_] = IndividualValues(self._get_values_for_inds(reasoner, property_, pos), self._get_values_for_inds(reasoner, property_, neg)) pos_str = [p.iri.get_remainder() for p in pos] neg_str = [n.iri.get_remainder() for n in neg] current_splits = [Split(pos_str, neg_str, 0, set())] while len(properties) > 0 and len(current_splits) > 0: next_level_splits = [] for property_ in properties[:]: for split in current_splits: if property_.iri.get_remainder() not in split.used_properties: value, new_splits = self._compute_split_value(property_, split) if value is not None: value = OWLLiteral(value) if value not in dp_splits[property_]: dp_splits[property_].append(value) next_level_splits.extend(new_splits) if len(dp_splits[property_]) >= self.max_nr_splits: properties.remove(property_) break current_splits = sorted(next_level_splits, reverse=True) return dp_splits
def _compute_split_value(self, property_: OWLDataProperty, split: Split) -> Tuple[Optional[Values], List[Split]]: current_values = self._prop_to_values[property_].get_overlapping_with_split(split) number_of_values = len(current_values.pos_map) + len(current_values.neg_map) if number_of_values == 0: return None, [] current_entropy = entropy((len(current_values.pos_map) / number_of_values, len(current_values.neg_map) / number_of_values)) best_gain = 0 best_value = None best_splits = None pos_inv: 'SortedDict[Values, List[str]]' = SortedDict() for k, v in current_values.pos_map.items(): pos_inv[v] = pos_inv.get(v, []) + [k] neg_inv: 'SortedDict[Values, List[str]]' = SortedDict() for k, v in current_values.neg_map.items(): neg_inv[v] = neg_inv.get(v, []) + [k] values = sorted(list(pos_inv.keys()) + list(neg_inv.keys())) values = [self._combine_values(x, y) for x, y in zip(values, values[1:])] for value in values: pos_below, pos_above = self._get_inds_below_above(value, pos_inv) neg_below, neg_above = self._get_inds_below_above(value, neg_inv) num_below = len(pos_below) + len(neg_below) num_above = len(pos_above) + len(neg_above) entropy_below = 0 if num_below > 0: entropy_below = entropy((len(pos_below) / num_below, len(neg_below) / num_below)) entropy_above = 0 if num_above > 0: entropy_above = entropy((len(pos_above) / num_above, len(neg_above) / num_above)) cond_entropy = ((num_below / number_of_values) * entropy_below + (num_above / number_of_values) * entropy_above) gain = current_entropy - cond_entropy if gain >= best_gain: best_gain = gain best_value = value best_splits = [] if entropy_below > 0: best_splits.append(self._make_split(pos_below, neg_below, entropy_below, split, property_)) if entropy_above > 0: best_splits.append(self._make_split(pos_above, neg_above, entropy_above, split, property_)) return best_value, best_splits def _make_split(self, pos: List[str], neg: List[str], entropy: float, split: Split, property_: OWLDataProperty) -> Split: used_properties = deepcopy(split.used_properties) used_properties.add(property_.iri.get_remainder()) return Split(pos, neg, entropy, used_properties) def _get_inds_below_above(self, value: Values, ind_value_map: 'SortedDict[Values, List[str]]') \ -> Tuple[List[str], List[str]]: idx = ind_value_map.bisect(value) inds_below = list(chain.from_iterable(ind_value_map.values()[:idx])) inds_above = list(chain.from_iterable(ind_value_map.values()[idx:])) return inds_below, inds_above def _get_values_for_inds(self, reasoner: OWLReasoner, property_: OWLDataProperty, inds: Set[OWLNamedIndividual]) \ -> Dict[str, Values]: inds_to_value = dict() for ind in inds: try: val = next(iter(reasoner.data_property_values(ind, property_))) inds_to_value[ind.iri.get_remainder()] = val.to_python() except StopIteration: pass return inds_to_value
[docs] def reset(self): self._prop_to_values = {}