Source code for ontolearn.semantic_caching

# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------

"""python examples/retrieval_eval.py"""
from owlapy.owl_literal import OWLBottomObjectProperty, OWLTopObjectProperty

from ontolearn.owl_neural_reasoner import TripleStoreNeuralReasoner
from ontolearn.knowledge_base import KnowledgeBase
from ontolearn.utils import jaccard_similarity, concept_reducer, concept_reducer_properties
from owlapy.class_expression import (
    OWLObjectUnionOf,
    OWLObjectIntersectionOf,
    OWLObjectSomeValuesFrom,
    OWLObjectAllValuesFrom,
    OWLObjectMinCardinality,
    OWLObjectMaxCardinality,
    OWLObjectOneOf,
    OWLObjectComplementOf,
    OWLClass,
)
from owlapy.owl_property import OWLObjectInverseOf
import time
from typing import Tuple, Set
from owlapy import owl_expression_to_dl
from itertools import chain
import os
import random
import itertools
from owlready2 import *
from collections import OrderedDict
from owlapy.owl_reasoner import SyncReasoner
import pickle
from tqdm import tqdm


[docs] def concept_generator(path_kg): # (1) Initialize knowledge base. assert os.path.isfile(path_kg) symbolic_kb = KnowledgeBase(path=path_kg) # GENERATE ALCQ CONCEPTS TO EVALUATE RETRIEVAL PERFORMANCES # (3) R: Extract object properties. object_properties = sorted({i for i in symbolic_kb.get_object_properties()}) object_properties = set(object_properties) # (4) R⁻: Inverse of object properties. object_properties_inverse = {i.get_inverse_property() for i in object_properties} # (5) R*: R UNION R⁻. object_properties_and_inverse = object_properties.union(object_properties_inverse) # (6) NC: Named owl concepts. nc = sorted({i for i in symbolic_kb.get_concepts()}) nc = set(nc) # return to a set # (7) NC⁻: Complement of NC. nnc = {i.get_object_complement_of() for i in nc} # (8) UNNC: NC UNION NC⁻. unnc = nc.union(nnc) # (9) Retrieve 3 random Nominals. inds = list(symbolic_kb.individuals()) nominals = set(random.sample(inds, 3)) # (10) All Combinations of 3 for Nominals. nominal_combinations = set( OWLObjectOneOf(combination) for combination in itertools.combinations(nominals, 3) ) # (11) NC UNION NC. unions = concept_reducer(nc, opt=OWLObjectUnionOf) # (12) NC INTERSECTION NC. intersections = concept_reducer(nc, opt=OWLObjectIntersectionOf) # (13) UNNC UNION UNNC. unions_unnc = concept_reducer(unnc, opt=OWLObjectUnionOf) # (14) UNNC INTERACTION UNNC. intersections_unnc = concept_reducer(unnc, opt=OWLObjectIntersectionOf) # (15) \exist r. C s.t. C \in UNNC and r \in R* . exist_unnc = concept_reducer_properties( concepts=unnc, properties=object_properties,#object_properties_and_inverse, cls=OWLObjectSomeValuesFrom, ) # (16) \forall r. C s.t. C \in UNNC and r \in R* . for_all_unnc = concept_reducer_properties( concepts=unnc, properties=object_properties,#object_properties_and_inverse, cls=OWLObjectAllValuesFrom, ) # (17) >= n r. C and =< n r. C, s.t. C \in UNNC and r \in R* . min_cardinality_unnc_1, min_cardinality_unnc_2, min_cardinality_unnc_3 = ( concept_reducer_properties( concepts=unnc, properties=object_properties_and_inverse, cls=OWLObjectMinCardinality, cardinality=i, ) for i in [1, 2, 3] ) max_cardinality_unnc_1, max_cardinality_unnc_2, max_cardinality_unnc_3 = ( concept_reducer_properties( concepts=unnc, properties=object_properties_and_inverse, cls=OWLObjectMaxCardinality, cardinality=i, ) for i in [1, 2, 3] ) # (18) \exist r. Nominal s.t. Nominal \in Nominals and r \in R* . exist_nominals = concept_reducer_properties( concepts=nominal_combinations, properties=object_properties_and_inverse, cls=OWLObjectSomeValuesFrom, ) ################################################################### # () Converted to list so that the progress bar works. random.seed(0) if len(intersections_unnc) > 500: intersections_unnc = random.sample(intersections_unnc, k=500) if len(unions_unnc) > 500: unions_unnc = random.sample(unions_unnc, k=500) if len(exist_unnc) > 200: exist_unnc = set(list(exist_unnc)[:200]) if len(for_all_unnc) > 200: for_all_unnc = set(list(for_all_unnc)[:200]) concepts = list( chain(nc, nnc, unions_unnc, intersections_unnc, exist_unnc, for_all_unnc, ) ) return concepts
[docs] def get_saved_concepts(path_kg, data_name, shuffle): """Shuffle or not the generated concept and save it in a folder for reproducibility.""" # Create the directory if it does not exist cache_dir = f"caching_results_{data_name}" os.makedirs(cache_dir, exist_ok=True) # Determine the filename based on shuffle flag filename = "shuffled_concepts.pkl" if shuffle else "unshuffled_concepts.pkl" save_file = os.path.join(cache_dir, filename) if os.path.exists(save_file): with open(save_file, "rb") as f: alc_concepts = pickle.load(f) print(f"Loaded concepts from {filename}.") else: # Generate concepts and optionally shuffle alc_concepts = concept_generator(path_kg) if shuffle: random.seed(0) random.shuffle(alc_concepts) # Save the concepts with open(save_file, "wb") as f: pickle.dump(alc_concepts, f) print(f"Generated and saved {'shuffled' if shuffle else 'unshuffled'} concepts.") return alc_concepts
[docs] def concept_retrieval(retriever_func, c) -> Set[str]: return {i.str for i in retriever_func.individuals(c)}
[docs] class CacheWithEviction: def __init__(self, cache_size, strategy='LIFO', random_seed=10): self.cache = OrderedDict() # Store the actual cache self.access_times = {} # Track last access times for LRU and MRU self.cache_size = cache_size self.strategy = strategy self.random_seed = random_seed self.initialized = False # Track if cache is already initialized def _evict(self): '''empty the cache when it is full using different strategy''' if len(self.cache) > self.cache_size: if self.strategy == 'FIFO': self.cache.popitem(last=False) # Evict the oldest item (first in) elif self.strategy == 'LIFO': self.cache.popitem(last=True) # Evict the most recently added item elif self.strategy == 'LRU': # Evict the least recently used item based on `access_times` lru_key = min(self.access_times, key=self.access_times.get) del self.cache[lru_key] del self.access_times[lru_key] elif self.strategy == 'MRU': # Evict the most recently used item based on `access_times` mru_key = max(self.access_times, key=self.access_times.get) del self.cache[mru_key] del self.access_times[mru_key] elif self.strategy == 'RP': # Random eviction random.seed(self.random_seed) random_key = random.choice(list(self.cache.keys())) del self.cache[random_key] self.access_times.pop(random_key, None)
[docs] def get(self, key): """ Retrieve an item from the cache. Updates access time for LRU/MRU. """ if key in self.cache: if self.strategy in ['LRU', 'MRU']: self.access_times[key] = time.time() # Update access timestamp return self.cache[key] return None
[docs] def put(self, key, value): """ Add an item to the cache. Evicts an entry if the cache is full. """ if key in self.cache: del self.cache[key] # Remove existing entry to re-insert and maintain order self._evict() # Evict if necessary self.cache[key] = value if self.strategy in ['LRU', 'MRU']: self.access_times[key] = time.time() # Record access timestamp
[docs] def initialize_cache(self, func, path_onto, third, All_individuals, handle_restriction_func, concepts): """ Initialize the cache with precomputed results for OWLClass and Existential concepts. :param ontology: The loaded ontology. :param func: Function to retrieve individuals for a given expression. :param concepts: List of OWL concepts to precompute and store instances for. """ if self.initialized: return # Filter OWLClass and OWLObjectSomeValuesFrom concepts class_concepts = [concept for concept in concepts if isinstance(concept, OWLClass)] negated_class_concepts = [concept for concept in concepts if isinstance(concept, OWLObjectComplementOf)] existential_concepts = [concept for concept in concepts if isinstance(concept, OWLObjectSomeValuesFrom)] # Process OWLClass concepts for cls in tqdm(class_concepts, desc=f"Adding OWLClass concepts"): concept_str = owl_expression_to_dl(cls) self.put(concept_str, func(cls, path_onto, third)) for negated_cls in tqdm(negated_class_concepts, desc=f"Adding Complement concepts"): # Compute and store complement negated_cls_str = owl_expression_to_dl(negated_cls) cached = self.cache.get(negated_cls_str.split("¬")[-1]) if cached is None: cached = func(negated_cls, path_onto, third) neg = All_individuals - cached self.put(negated_cls_str, neg) # Process Existential concepts for existential in tqdm(existential_concepts, desc=f"Adding Existential concepts"): existential_str = owl_expression_to_dl(existential) self.put(existential_str, handle_restriction_func(existential)) self.initialized = True
[docs] def get_all_items(self): return list(self.cache.keys())
[docs] def is_full(self): """Check if the cache is full.""" return len(self.cache) >= self.max_size
[docs] def semantic_caching_size(func, cache_size, eviction_strategy, random_seed, cache_type, concepts): '''This function implements the semantic caching algorithm for ALC concepts as presented in the paper''' cache = CacheWithEviction(cache_size, strategy=eviction_strategy, random_seed=random_seed) # Cache for instances loaded_ontologies = {} #Cache for ontologies loaded_individuals = {} #cache for individuals cache_type = cache_type concepts = concepts stats = { 'hits': 0, 'misses': 0, 'time': 0 } time_initialization = 0 def wrapper(*args): nonlocal stats nonlocal time_initialization # Load ontology and individuals if not already cached path_onto = args[1] if path_onto not in loaded_ontologies: loaded_ontologies[path_onto] = get_ontology(path_onto).load() loaded_individuals[path_onto] = {a.iri for a in list(loaded_ontologies[path_onto].individuals())} onto = loaded_ontologies[path_onto] All_individuals = loaded_individuals[path_onto] str_expression = owl_expression_to_dl(args[0]) owl_expression = args[0] # Function to retrieve cached expression and count hits def retrieve_from_cache(expression): cached_result = cache.get(expression) if cached_result is not None: stats['hits'] += 1 return cached_result else: stats['misses'] += 1 return None def handle_owl_some_values_from(owl_expression): """ Process the OWLObjectSomeValuesFrom expression locally. When called, return the retrieval of OWLObjectSomeValuesFrom based on the Algorithm described in the paper """ if len(All_individuals)<1000: # The loop beomes unscalable when there are too many individuals object_property = owl_expression.get_property() if object_property == OWLBottomObjectProperty or object_property == OWLTopObjectProperty: return set() filler_expression = owl_expression.get_filler() instances = retrieve_from_cache(owl_expression_to_dl(filler_expression)) if instances is not None: result = set() if isinstance(object_property, OWLObjectInverseOf): r = onto.search_one(iri=object_property.get_inverse_property().str) else: r = onto.search_one(iri=object_property.str) individual_map = {ind: onto.search_one(iri=ind) for ind in All_individuals | instances} for ind_a in All_individuals: a = individual_map[ind_a] for ind_b in instances: b = individual_map[ind_b] if isinstance(object_property, OWLObjectInverseOf): if a in getattr(b, r.name): result.add(a) else: if b in getattr(a, r.name): result.add(ind_a) else: result = func(*args) else: result = func(*args) return result start_time = time.time() #state the timing before the cache initialization # Cold cache initialization start_time_initialization = time.time() if cache_type == 'cold' and not cache.initialized: cache.initialize_cache(func, path_onto, args[-1], All_individuals, handle_owl_some_values_from, concepts) time_initialization = time.time()- start_time_initialization # start_time = time.time() #state the timing after the cache initialization # Handle different OWL expression types and use cache when needed if isinstance(owl_expression, OWLClass): cached_result = retrieve_from_cache(str_expression) result = cached_result if cached_result is not None else func(*args) elif isinstance(owl_expression, OWLObjectComplementOf): if cache_type == 'cold': #If it is cold then all complement object are already cached at initialisation time cached_result_cold = retrieve_from_cache(str_expression) result = cached_result_cold if cached_result_cold is not None else func(*args) else: not_str_expression = str_expression.split("¬")[-1] cached_result = retrieve_from_cache(not_str_expression) result = (All_individuals - cached_result) if cached_result is not None else func(*args) elif isinstance(owl_expression, OWLObjectIntersectionOf): C_and_D = [owl_expression_to_dl(i) for i in owl_expression.operands()] cached_C = retrieve_from_cache(C_and_D[0]) cached_D = retrieve_from_cache(C_and_D[1]) if cached_C is not None and cached_D is not None: result = cached_C.intersection(cached_D) else: result = func(*args) elif isinstance(owl_expression, OWLObjectUnionOf): C_or_D = [owl_expression_to_dl(i) for i in owl_expression.operands()] cached_C = retrieve_from_cache(C_or_D[0]) cached_D = retrieve_from_cache(C_or_D[1]) if cached_C is not None and cached_D is not None: result = cached_C.union(cached_D) else: result = func(*args) elif isinstance(owl_expression, OWLObjectSomeValuesFrom): if cache_type == 'cold': cached_result_cold = retrieve_from_cache(str_expression) if cached_result_cold is not None: result = cached_result_cold else: result = handle_owl_some_values_from(owl_expression) else: result = handle_owl_some_values_from(owl_expression) elif isinstance(owl_expression, OWLObjectAllValuesFrom): all_values_expr = owl_expression_to_dl(owl_expression) some_values_expr = transform_forall_to_exists(all_values_expr) cached_result = retrieve_from_cache(some_values_expr) result = (All_individuals - cached_result) if cached_result is not None else func(*args) else: result = func(*args) stats['time'] += (time.time() - start_time) cache.put(str_expression, result) return result def transform_forall_to_exists(expression): pattern_negated = r'∀ (\w+)\.\(¬(\w+)\)' replacement_negated = r'∃ \1.\2' pattern_non_negated = r'∀ (\w+)\.(\w+)' replacement_non_negated = r'∃ \1.(¬\2)' transformed_expression = re.sub(pattern_negated, replacement_negated, expression) transformed_expression = re.sub(pattern_non_negated, replacement_non_negated, transformed_expression) return transformed_expression def get_stats(): total_requests = stats['hits'] + stats['misses'] hit_ratio = stats['hits'] / total_requests if total_requests > 0 else 0 miss_ratio = stats['misses'] / total_requests if total_requests > 0 else 0 avg_time = stats['time'] / total_requests if total_requests > 0 else 0 return { 'hit_ratio': hit_ratio, 'miss_ratio': miss_ratio, 'average_time_per_request': avg_time, 'total_time': stats['time'], 'time_initialization': time_initialization } wrapper.get_stats = get_stats return wrapper
[docs] def non_semantic_caching_size(func, cache_size): '''This function implements a caching algorithm for ALC concepts without semantics.''' cache = OrderedDict() # Cache for instances stats = { 'hits': 0, 'misses': 0, 'time': 0 } def wrapper(*args): nonlocal stats str_expression = owl_expression_to_dl(args[0]) def retrieve_from_cache(expression): if expression in cache: # Move the accessed item to the end to mark it as recently used cache.move_to_end(expression) stats['hits'] += 1 return cache[expression] else: stats['misses'] += 1 return None # Start timing before cache access and function execution start_time = time.time() # Try to retrieve the result from the cache If result is in cache, return it directly cached_result = retrieve_from_cache(str_expression) if cached_result is not None: stats['time'] += (time.time() - start_time) return cached_result # Compute the result and store it in the cache result = func(*args) cache[str_expression] = result # Apply LRU strategy: remove the least recently used item if the cache exceeds its size if len(cache) > cache_size: cache.popitem(last=False) stats['time'] += (time.time() - start_time) return result # Function to get cache statistics def get_stats(): total_requests = stats['hits'] + stats['misses'] hit_ratio = stats['hits'] / total_requests if total_requests > 0 else 0 miss_ratio = stats['misses'] / total_requests if total_requests > 0 else 0 avg_time = stats['time'] / total_requests if total_requests > 0 else 0 return { 'hit_ratio': hit_ratio, 'miss_ratio': miss_ratio, 'average_time_per_request': avg_time, 'total_time': stats['time'] } wrapper.get_stats = get_stats return wrapper
[docs] def retrieve(expression:str, path_kg:str, path_kge_model:str) -> Tuple[Set[str], Set[str]]: '''Retrieve instances with neural reasoner''' 'take a concept c and returns it set of retrieved individual' if path_kge_model: neural_owl_reasoner = TripleStoreNeuralReasoner( path_neural_embedding=path_kge_model, gamma=0.9 ) else: neural_owl_reasoner = TripleStoreNeuralReasoner( path_of_kb=path_kg, gamma=0.9 ) retrievals = concept_retrieval(neural_owl_reasoner, expression) # Retrieving with our reasoner return retrievals
[docs] def retrieve_other_reasoner(expression, path_kg, name_reasoner='HermiT'): '''Retrieve instances with symbolic reasoners''' reasoner = SyncReasoner(path_kg, reasoner=name_reasoner) if reasoner.has_consistent_ontology(): return {i.str for i in (reasoner.instances(expression, direct=False))} else: print("The knowledge base is not consistent")
[docs] def run_semantic_cache(path_kg:str, path_kge:str, cache_size:int, name_reasoner:str, eviction:str, random_seed:int, cache_type:str, shuffle_concepts:bool): '''Return cache performnace with semantics''' symbolic_kb = KnowledgeBase(path=path_kg) D = [] Avg_jaccard = [] Avg_jaccard_reas = [] data_name = path_kg.split("/")[-1].split("/")[-1].split(".")[0] if shuffle_concepts: alc_concepts = get_saved_concepts(path_kg, data_name=data_name, shuffle=True) else: alc_concepts = get_saved_concepts(path_kg, data_name=data_name, shuffle=False) if name_reasoner == 'EBR': cached_retriever = semantic_caching_size(retrieve, cache_size=cache_size, eviction_strategy=eviction, random_seed=random_seed, cache_type=cache_type, concepts=alc_concepts) else: cached_retriever = semantic_caching_size(retrieve_other_reasoner, cache_size=cache_size, eviction_strategy=eviction, random_seed=random_seed, cache_type=cache_type, concepts=alc_concepts) total_time_ebr = 0 for expr in alc_concepts: if name_reasoner == 'EBR': time_start_cache = time.time() A = cached_retriever(expr, path_kg, path_kge) #Retrieval with cache time_cache = time.time()-time_start_cache time_start = time.time() retrieve_ebr = retrieve(expr, path_kg, path_kge) #Retrieval without cache time_ebr = time.time()-time_start total_time_ebr += time_ebr else: time_start_cache = time.time() A = cached_retriever(expr, path_kg, name_reasoner) #Retrieval with cache time_cache = time.time()-time_start_cache time_start = time.time() retrieve_ebr = retrieve_other_reasoner(expr, path_kg, name_reasoner=name_reasoner) #Retrieval without cache time_ebr = time.time()-time_start total_time_ebr += time_ebr ground_truth = concept_retrieval(symbolic_kb, expr) jacc = jaccard_similarity(A, ground_truth) jacc_reas = jaccard_similarity(retrieve_ebr, ground_truth) Avg_jaccard.append(jacc) Avg_jaccard_reas.append(jacc_reas) D.append({'dataset':data_name,'Expression':owl_expression_to_dl(expr), "Type": type(expr).__name__ ,'cache_size':cache_size, "time_ebr":time_ebr, "time_cache": time_cache, "Jaccard":jacc}) print(f'Expression: {owl_expression_to_dl(expr)}') print(f'Jaccard similarity: {jacc}') # assert jacc == 1.0 stats = cached_retriever.get_stats() print('-'*50) print("Cache Statistics:") print(f"Hit Ratio: {stats['hit_ratio']:.2f}") print(f"Miss Ratio: {stats['miss_ratio']:.2f}") print(f"Average Time per Request: {stats['average_time_per_request']:.4f} seconds") print(f"Total Time with Caching: {stats['total_time']:.4f} seconds") print(f"Total Time Without Caching: {total_time_ebr:.4f} seconds") print(f"Total number of concepts: {len(alc_concepts)}") print(f"Average Jaccard for the {data_name} dataset", sum(Avg_jaccard)/len(Avg_jaccard)) return { 'dataset': data_name, 'cache_size': cache_size, 'hit_ratio': f"{stats['hit_ratio']:.2f}", 'miss_ratio': f"{stats['miss_ratio']:.2f}", 'RT_cache': f"{stats['total_time']:.3f}", 'RT': f"{total_time_ebr:.3f}", '#concepts': len(alc_concepts), 'avg_jaccard': f"{sum(Avg_jaccard) / len(Avg_jaccard):.3f}", 'avg_jaccard_reas': f"{sum(Avg_jaccard_reas) / len(Avg_jaccard_reas):.3f}", 'strategy': eviction }, D
[docs] def run_non_semantic_cache(path_kg:str, path_kge:str, cache_size:int, name_reasoner:str, shuffle_concepts:bool): '''Return cache performnace without any semantics''' symbolic_kb = KnowledgeBase(path=path_kg) D = [] Avg_jaccard = [] Avg_jaccard_reas = [] data_name = path_kg.split("/")[-1].split("/")[-1].split(".")[0] if shuffle_concepts: alc_concepts = get_saved_concepts(path_kg, data_name=data_name, shuffle=True) else: alc_concepts = get_saved_concepts(path_kg, data_name=data_name, shuffle=False) if name_reasoner == 'EBR': cached_retriever = non_semantic_caching_size(retrieve, cache_size=cache_size) else: cached_retriever = non_semantic_caching_size(retrieve_other_reasoner, cache_size=cache_size) total_time_ebr = 0 for expr in alc_concepts: if name_reasoner == 'EBR': time_start_cache = time.time() A = cached_retriever(expr, path_kg, path_kge) #Retrieval with cache time_cache = time.time()-time_start_cache time_start = time.time() retrieve_ebr = retrieve(expr, path_kg, path_kge) #Retrieval without cache time_ebr = time.time()-time_start total_time_ebr += time_ebr else: time_start_cache = time.time() A = cached_retriever(expr, path_kg, name_reasoner) #Retrieval with cache time_cache = time.time()-time_start_cache time_start = time.time() retrieve_ebr = retrieve_other_reasoner(expr, path_kg, name_reasoner=name_reasoner) #Retrieval without cache time_ebr = time.time()-time_start total_time_ebr += time_ebr ground_truth = concept_retrieval(symbolic_kb, expr) jacc = jaccard_similarity(A, ground_truth) jacc_reas = jaccard_similarity(retrieve_ebr, ground_truth) Avg_jaccard.append(jacc) Avg_jaccard_reas.append(jacc_reas) D.append({'dataset':data_name,'Expression':owl_expression_to_dl(expr), "Type": type(expr).__name__ ,'cache_size':cache_size, "time_ebr":time_ebr, "time_cache": time_cache, "Jaccard":jacc}) print(f'Expression: {owl_expression_to_dl(expr)}') print(f'Jaccard similarity: {jacc}') # assert jacc == 1.0 stats = cached_retriever.get_stats() print('-'*50) print("Cache Statistics:") print(f"Hit Ratio: {stats['hit_ratio']:.2f}") print(f"Miss Ratio: {stats['miss_ratio']:.2f}") print(f"Average Time per Request: {stats['average_time_per_request']:.4f} seconds") print(f"Total Time with Caching: {stats['total_time']:.4f} seconds") print(f"Total Time Without Caching: {total_time_ebr:.4f} seconds") print(f"Total number of concepts: {len(alc_concepts)}") print(f"Average Jaccard for the {data_name} dataset", sum(Avg_jaccard)/len(Avg_jaccard)) return { 'dataset': data_name, 'cache_size': cache_size, 'hit_ratio': f"{stats['hit_ratio']:.2f}", 'miss_ratio': f"{stats['miss_ratio']:.2f}", 'RT_cache': f"{stats['total_time']:.3f}", 'RT': f"{total_time_ebr:.3f}", '#concepts': len(alc_concepts), 'avg_jaccard': f"{sum(Avg_jaccard) / len(Avg_jaccard):.3f}", 'avg_jaccard_reas': f"{sum(Avg_jaccard_reas) / len(Avg_jaccard_reas):.3f}" }, D