Source code for ontolearn.ea_algorithms

"""Evolutionary algorithms (for Evolearner)."""

from abc import ABCMeta, abstractmethod
from typing import ClassVar, Final, List, Optional, Tuple
from deap.algorithms import varAnd
from deap.base import Toolbox
from heapq import nlargest
import time
import logging
import itertools

from ontolearn.ea_utils import Tree

logger = logging.getLogger(__name__)


[docs] class AbstractEvolutionaryAlgorithm(metaclass=ABCMeta): """ An abstract class for evolutionary algorithms. """ __slots__ = () name: ClassVar[str] @abstractmethod def __init__(self): """Create a new evolutionary algorithm.""" pass
[docs] @abstractmethod def evolve(self, toolbox: Toolbox, population: List[Tree], num_generations: int, start_time: float, verbose: bool = False) -> Tuple[bool, List[Tree]]: pass
def _log_generation_info(self, toolbox: Toolbox, gen: int, population: List[Tree]): logger.info(f'Generation: {gen}') for node in toolbox.get_top_hypotheses(population): logger.info(node) print('#'*100)
[docs] class BaseEvolutionaryAlgorithm(AbstractEvolutionaryAlgorithm): __slots__ = () @abstractmethod def __init__(self): pass
[docs] def evolve(self, toolbox: Toolbox, population: List[Tree], num_generations: int, start_time: float, verbose: int = 0) -> Tuple[bool, List[Tree]]: for ind in population: toolbox.apply_fitness(ind) gen = 1 goal_found = False while gen <= num_generations and not (goal_found and toolbox.terminate_on_goal()): goal_found, population = self.generation(toolbox, population) if verbose > 0: self._log_generation_info(toolbox, gen, population) if (time.time() - start_time) > toolbox.max_runtime(): return goal_found, population gen += 1 return goal_found, population
[docs] @abstractmethod def generation(self, toolbox: Toolbox, population: List[Tree], num_selections: int = 0) -> Tuple[bool, List[Tree]]: pass
[docs] class EASimple(BaseEvolutionaryAlgorithm): __slots__ = 'crossover_pr', 'mutation_pr', 'elitism', 'elite_size' name: Final = 'EASimple' crossover_pr: float mutation_pr: float elitism: bool elite_size: float def __init__(self, crossover_pr: float = 0.9, mutation_pr: float = 0.1, elitism: bool = False, elite_size: float = 0.1): self.crossover_pr = crossover_pr self.mutation_pr = mutation_pr self.elitism = elitism self.elite_size = elite_size
[docs] def generation(self, toolbox: Toolbox, population: List[Tree], num_selections: int = 0) -> Tuple[bool, List[Tree]]: elite = [] goal_found = False num_selections = num_selections if num_selections > 0 else len(population) if self.elitism: num_elite = int(self.elite_size*num_selections) num_selections = num_selections - num_elite elite = nlargest(num_elite, population, key=lambda ind: ind.fitness.values[0]) offspring = toolbox.select(population, k=num_selections) offspring = varAnd(offspring, toolbox, self.crossover_pr, self.mutation_pr) for off in offspring: if not off.fitness.valid: toolbox.apply_fitness(off) if off.quality.values[0] == 1.0: goal_found = True population[:] = offspring + elite return goal_found, population
[docs] class RegularizedEvolution(BaseEvolutionaryAlgorithm): __slots__ = () name: Final = 'RegularizedEvolution' def __init__(self): pass
[docs] def generation(self, toolbox: Toolbox, population: List[Tree], num_selections: int = 0) -> Tuple[bool, List[Tree]]: # TODO: use queue, since normal list has O(n) for pop parent = toolbox.select(population, 1)[0] parent_copy = toolbox.clone(parent) offspring, = toolbox.mutate(parent_copy) toolbox.apply_fitness(offspring) population.append(offspring) population.pop(0) return offspring.quality.values[0] == 1.0, population
[docs] class MultiPopulation(AbstractEvolutionaryAlgorithm): __slots__ = 'base_algorithm', 'migration_size', 'num_populations', 'iso_generations', 'boost' name: Final = 'MultiPopulation' base_algorithm: BaseEvolutionaryAlgorithm migration_size: float num_populations: int iso_generations: float boost: float def __init__(self, base_algorithm: Optional[BaseEvolutionaryAlgorithm] = None, migration_size: float = 0.1, num_populations: int = 4, iso_generations: float = 0.1, boost: float = 0.0): self.migration_size = migration_size self.num_populations = num_populations self.iso_generations = iso_generations self.base_algorithm = base_algorithm self.boost = boost if self.base_algorithm is None: self.base_algorithm = EASimple()
[docs] def evolve(self, toolbox: Toolbox, population: List[Tree], num_generations: int, start_time: float, verbose: int = 0) -> Tuple[bool, List[Tree]]: assert len(population) % self.num_populations == 0 population_size = len(population) // self.num_populations populations = [population[i::self.num_populations] for i in range(self.num_populations)] iso_ngen = int(num_generations*self.iso_generations) num_migration = int(population_size*self.migration_size) for p in populations: for ind in p: toolbox.apply_fitness(ind) gen = 1 goal_found = [False] * self.num_populations while gen <= iso_ngen and not (any(goal_found) and toolbox.terminate_on_goal()): for idx, p in enumerate(populations): goal_found[idx], population = self.base_algorithm.generation(toolbox, p, population_size) if verbose > 0: self._log_generation_info(toolbox, gen, population, idx) populations[idx] = population if (time.time() - start_time) > toolbox.max_runtime(): return self._finalize(goal_found, populations, toolbox) gen += 1 while gen <= num_generations and not (any(goal_found) and toolbox.terminate_on_goal()): migrate_inds = [] for idx, p in enumerate(populations): mig = nlargest(num_migration, p, key=lambda ind: ind.fitness.values[0]) migrate_inds.append([toolbox.clone(ind) for ind in mig]) if self.boost > 0.0: for ind in migrate_inds[idx]: ind.fitness.values = (ind.fitness.values[0] + self.boost, ) for idx, p in enumerate(populations): goal_found[idx], population = self.base_algorithm.generation(toolbox, p + migrate_inds[idx], population_size) if verbose > 0: self._log_generation_info(toolbox, gen, population, idx) populations[idx] = population if (time.time() - start_time) > toolbox.max_runtime(): return self._finalize(goal_found, populations, toolbox) gen += 1 return self._finalize(goal_found, populations, toolbox)
def _log_generation_info(self, toolbox: Toolbox, gen: int, population: List[Tree], idx: int = 0): logger.info(f'Population {idx}:') logger.info(f'Generation: {gen}') for node in toolbox.get_top_hypotheses(population): logger.info(node) print('#'*100) def _finalize(self, goal_found, populations, toolbox): population = list(itertools.chain.from_iterable(populations)) # If boost was used we need to compute the actual fitness values once more, so there is # no individual left which has the boost applied if self.boost > 0: for ind in population: toolbox.apply_fitness(ind) return any(goal_found), population