Source code for ontolearn.ea_algorithms

# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------

"""Evolutionary algorithms (for Evolearner)."""

from abc import ABCMeta, abstractmethod
from typing import ClassVar, Final, List, Optional, Tuple
from deap.algorithms import varAnd
from deap.base import Toolbox
from heapq import nlargest
import time
import logging
import itertools

from ontolearn.ea_utils import Tree

logger = logging.getLogger(__name__)


[docs] class AbstractEvolutionaryAlgorithm(metaclass=ABCMeta): """ An abstract class for evolutionary algorithms. """ __slots__ = () name: ClassVar[str] @abstractmethod def __init__(self): """Create a new evolutionary algorithm.""" pass
[docs] @abstractmethod def evolve(self, toolbox: Toolbox, population: List[Tree], num_generations: int, start_time: float, verbose: bool = False) -> Tuple[bool, List[Tree]]: pass
def _log_generation_info(self, toolbox: Toolbox, gen: int, population: List[Tree]): # pragma: no cover logger.info(f'Generation: {gen}') for node in toolbox.get_top_hypotheses(population): logger.info(node) print('#'*100)
[docs] class BaseEvolutionaryAlgorithm(AbstractEvolutionaryAlgorithm): __slots__ = () @abstractmethod def __init__(self): pass
[docs] def evolve(self, toolbox: Toolbox, population: List[Tree], num_generations: int, start_time: float, verbose: int = 0) -> Tuple[bool, List[Tree]]: for ind in population: toolbox.apply_fitness(ind) gen = 1 goal_found = False while gen <= num_generations and not (goal_found and toolbox.terminate_on_goal()): goal_found, population = self.generation(toolbox, population) if verbose > 0: self._log_generation_info(toolbox, gen, population) if (time.time() - start_time) > toolbox.max_runtime(): return goal_found, population gen += 1 return goal_found, population
[docs] @abstractmethod def generation(self, toolbox: Toolbox, population: List[Tree], num_selections: int = 0) -> Tuple[bool, List[Tree]]: pass
[docs] class EASimple(BaseEvolutionaryAlgorithm): __slots__ = 'crossover_pr', 'mutation_pr', 'elitism', 'elite_size' name: Final = 'EASimple' crossover_pr: float mutation_pr: float elitism: bool elite_size: float def __init__(self, crossover_pr: float = 0.9, mutation_pr: float = 0.1, elitism: bool = False, elite_size: float = 0.1): self.crossover_pr = crossover_pr self.mutation_pr = mutation_pr self.elitism = elitism self.elite_size = elite_size
[docs] def generation(self, toolbox: Toolbox, population: List[Tree], num_selections: int = 0) -> Tuple[bool, List[Tree]]: elite = [] goal_found = False num_selections = num_selections if num_selections > 0 else len(population) if self.elitism: # pragma: no cover num_elite = int(self.elite_size*num_selections) num_selections = num_selections - num_elite elite = nlargest(num_elite, population, key=lambda ind: ind.fitness.values[0]) offspring = toolbox.select(population, k=num_selections) offspring = varAnd(offspring, toolbox, self.crossover_pr, self.mutation_pr) for off in offspring: if not off.fitness.valid: toolbox.apply_fitness(off) if off.quality.values[0] == 1.0: goal_found = True population[:] = offspring + elite return goal_found, population
[docs] class RegularizedEvolution(BaseEvolutionaryAlgorithm): __slots__ = () name: Final = 'RegularizedEvolution' def __init__(self): pass
[docs] def generation(self, toolbox: Toolbox, population: List[Tree], num_selections: int = 0) -> Tuple[bool, List[Tree]]: # pragma: no cover # TODO: use queue, since normal list has O(n) for pop parent = toolbox.select(population, 1)[0] parent_copy = toolbox.clone(parent) offspring, = toolbox.mutate(parent_copy) toolbox.apply_fitness(offspring) population.append(offspring) population.pop(0) return offspring.quality.values[0] == 1.0, population
[docs] class MultiPopulation(AbstractEvolutionaryAlgorithm): # pragma: no cover __slots__ = 'base_algorithm', 'migration_size', 'num_populations', 'iso_generations', 'boost' name: Final = 'MultiPopulation' base_algorithm: BaseEvolutionaryAlgorithm migration_size: float num_populations: int iso_generations: float boost: float def __init__(self, base_algorithm: Optional[BaseEvolutionaryAlgorithm] = None, migration_size: float = 0.1, num_populations: int = 4, iso_generations: float = 0.1, boost: float = 0.0): self.migration_size = migration_size self.num_populations = num_populations self.iso_generations = iso_generations self.base_algorithm = base_algorithm self.boost = boost if self.base_algorithm is None: self.base_algorithm = EASimple()
[docs] def evolve(self, toolbox: Toolbox, population: List[Tree], num_generations: int, start_time: float, verbose: int = 0) -> Tuple[bool, List[Tree]]: assert len(population) % self.num_populations == 0 population_size = len(population) // self.num_populations populations = [population[i::self.num_populations] for i in range(self.num_populations)] iso_ngen = int(num_generations*self.iso_generations) num_migration = int(population_size*self.migration_size) for p in populations: for ind in p: toolbox.apply_fitness(ind) gen = 1 goal_found = [False] * self.num_populations while gen <= iso_ngen and not (any(goal_found) and toolbox.terminate_on_goal()): for idx, p in enumerate(populations): goal_found[idx], population = self.base_algorithm.generation(toolbox, p, population_size) if verbose > 0: self._log_generation_info(toolbox, gen, population, idx) populations[idx] = population if (time.time() - start_time) > toolbox.max_runtime(): return self._finalize(goal_found, populations, toolbox) gen += 1 while gen <= num_generations and not (any(goal_found) and toolbox.terminate_on_goal()): migrate_inds = [] for idx, p in enumerate(populations): mig = nlargest(num_migration, p, key=lambda ind: ind.fitness.values[0]) migrate_inds.append([toolbox.clone(ind) for ind in mig]) if self.boost > 0.0: for ind in migrate_inds[idx]: ind.fitness.values = (ind.fitness.values[0] + self.boost, ) for idx, p in enumerate(populations): goal_found[idx], population = self.base_algorithm.generation(toolbox, p + migrate_inds[idx], population_size) if verbose > 0: self._log_generation_info(toolbox, gen, population, idx) populations[idx] = population if (time.time() - start_time) > toolbox.max_runtime(): return self._finalize(goal_found, populations, toolbox) gen += 1 return self._finalize(goal_found, populations, toolbox)
def _log_generation_info(self, toolbox: Toolbox, gen: int, population: List[Tree], idx: int = 0): logger.info(f'Population {idx}:') logger.info(f'Generation: {gen}') for node in toolbox.get_top_hypotheses(population): logger.info(node) print('#'*100) def _finalize(self, goal_found, populations, toolbox): population = list(itertools.chain.from_iterable(populations)) # If boost was used we need to compute the actual fitness values once more, so there is # no individual left which has the boost applied if self.boost > 0: for ind in population: toolbox.apply_fitness(ind) return any(goal_found), population