"""Triple store representations."""
import logging
import re
from itertools import chain
from typing import Iterable, Set, Optional, Generator, Union, FrozenSet, Tuple
import requests
from owlapy.class_expression import OWLClassExpression, OWLThing, OWLClass, OWLObjectSomeValuesFrom, OWLObjectOneOf, \
OWLObjectMinCardinality
from owlapy.iri import IRI
from owlapy.owl_axiom import OWLObjectPropertyRangeAxiom, OWLObjectPropertyDomainAxiom, OWLDataPropertyRangeAxiom, \
OWLDataPropertyDomainAxiom, OWLClassAxiom, OWLEquivalentClassesAxiom
from owlapy.owl_datatype import OWLDatatype
from owlapy.owl_individual import OWLNamedIndividual
from owlapy.owl_literal import OWLLiteral
from owlapy.owl_ontology import OWLOntologyID, OWLOntology
from owlapy.owl_property import OWLDataProperty, OWLObjectPropertyExpression, OWLObjectInverseOf, OWLObjectProperty, \
OWLProperty
from requests import Response
from requests.exceptions import RequestException, JSONDecodeError
from owlapy.converter import Owl2SparqlConverter
from ontolearn.base.ext import OWLReasonerEx
from ontolearn.knowledge_base import KnowledgeBase
import rdflib
from ontolearn.concept_generator import ConceptGenerator
from ontolearn.base.owl.utils import OWLClassExpressionLengthMetric
import traceback
from collections import Counter
logger = logging.getLogger(__name__)
rdfs_prefix = "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>\n "
owl_prefix = "PREFIX owl: <http://www.w3.org/2002/07/owl#>\n "
rdf_prefix = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n "
xsd_prefix = "PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\n"
# CD: For the sake of efficient software development.
limit_posix = ""
from owlapy import owl_expression_to_sparql
from owlapy.class_expression import OWLObjectHasValue, OWLDataHasValue, OWLDataSomeValuesFrom, OWLDataOneOf
from typing import List
from owlapy.owl_property import OWLProperty
[docs]
def rdflib_to_str(sparql_result: rdflib.plugins.sparql.processor.SPARQLResult) -> str:
"""
@TODO: CD: Not quite sure whether we need this continuent function
"""
for result_row in sparql_result:
str_iri: str
yield result_row.x.n3()
[docs]
def is_valid_url(url) -> bool:
"""
Check the validity of a URL.
Args:
url (str): The url to validate.
Returns:
True if url is not None, and it passes the regex check.
"""
regex = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return url is not None and regex.search(url)
[docs]
def get_results_from_ts(triplestore_address: str, query: str, return_type: type):
"""
Execute the SPARQL query in the given triplestore_address and return the result as the given return_type.
Args:
triplestore_address (str): The triplestore address where the query will be executed.
query (str): SPARQL query where the root variable should be '?x'.
return_type (type): OWLAPY class as type. e.g. OWLClass, OWLNamedIndividual, etc.
Returns:
Generator containing the results of the query as the given type.
"""
try:
response = requests.post(triplestore_address, data={'query': query})
except RequestException as e:
raise RequestException(f"Make sure the server is running on the `triplestore_address` = '{triplestore_address}'"
f". Check the error below:"
f"\n -->Error: {e}")
try:
if return_type == OWLLiteral:
yield from unwrap(response)
else:
yield from [return_type(i) for i in unwrap(response) if i is not None]
# return [return_type(IRI.create(i['x']['value'])) for i in
# response.json()['results']['bindings']]
except JSONDecodeError as e:
raise JSONDecodeError(f"Something went wrong with decoding JSON from the response. Check for typos in "
f"the `triplestore_address` = '{triplestore_address}' otherwise the error is likely "
f"caused by an internal issue. \n -->Error: {e}")
[docs]
def unwrap(result: Response):
json = result.json()
vars_ = list(json['head']['vars'])
for b in json['results']['bindings']:
val = []
for v in vars_:
if b[v]['type'] == 'uri':
val.append(IRI.create(b[v]['value']))
elif b[v]['type'] == 'bnode':
continue
else:
print(b[v]['type'])
val.append(OWLLiteral(b[v]['value'], OWLDatatype(IRI.create(b[v]['datatype']))))
if len(val) == 1:
yield val.pop()
else:
yield None
[docs]
def suf(direct: bool):
"""Put the star for rdfs properties depending on direct param"""
return " " if direct else "* "
[docs]
class TripleStoreOntology(OWLOntology):
def __init__(self, triplestore_address: str):
assert (is_valid_url(triplestore_address)), "You should specify a valid URL in the following argument: " \
"'triplestore_address' of class `TripleStore`"
self.url = triplestore_address
[docs]
def classes_in_signature(self) -> Iterable[OWLClass]:
query = owl_prefix + "SELECT DISTINCT ?x WHERE {?x a owl:Class.}"
yield from get_results_from_ts(self.url, query, OWLClass)
[docs]
def data_properties_in_signature(self) -> Iterable[OWLDataProperty]:
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a owl:DatatypeProperty.}"
yield from get_results_from_ts(self.url, query, OWLDataProperty)
[docs]
def object_properties_in_signature(self) -> Iterable[OWLObjectProperty]:
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a owl:ObjectProperty.}"
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
[docs]
def individuals_in_signature(self) -> Iterable[OWLNamedIndividual]:
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a owl:NamedIndividual.}"
yield from get_results_from_ts(self.url, query, OWLNamedIndividual)
[docs]
def equivalent_classes_axioms(self, c: OWLClass) -> Iterable[OWLEquivalentClassesAxiom]:
query = owl_prefix + "SELECT DISTINCT ?x" + \
"WHERE { ?x owl:equivalentClass " + f"<{c.str}>." + \
"FILTER(?x != " + f"<{c.str}>)}}"
for cls in get_results_from_ts(self.url, query, OWLClass):
yield OWLEquivalentClassesAxiom([c, cls])
[docs]
def general_class_axioms(self) -> Iterable[OWLClassAxiom]:
raise NotImplementedError
[docs]
def data_property_domain_axioms(self, pe: OWLDataProperty) -> Iterable[OWLDataPropertyDomainAxiom]:
domains = self._get_property_domains(pe)
if len(domains) == 0:
yield OWLDataPropertyDomainAxiom(pe, OWLThing)
else:
for dom in domains:
yield OWLDataPropertyDomainAxiom(pe, dom)
[docs]
def data_property_range_axioms(self, pe: OWLDataProperty) -> Iterable[OWLDataPropertyRangeAxiom]:
raise NotImplementedError
[docs]
def object_property_domain_axioms(self, pe: OWLObjectProperty) -> Iterable[OWLObjectPropertyDomainAxiom]:
domains = self._get_property_domains(pe)
if len(domains) == 0:
yield OWLObjectPropertyDomainAxiom(pe, OWLThing)
else:
for dom in domains:
yield OWLObjectPropertyDomainAxiom(pe, dom)
[docs]
def object_property_range_axioms(self, pe: OWLObjectProperty) -> Iterable[OWLObjectPropertyRangeAxiom]:
query = rdfs_prefix + "SELECT ?x WHERE { " + f"<{pe.str}>" + " rdfs:range ?x. }"
ranges = set(get_results_from_ts(self.url, query, OWLClass))
if len(ranges) == 0:
yield OWLObjectPropertyRangeAxiom(pe, OWLThing)
else:
for rng in ranges:
yield OWLObjectPropertyRangeAxiom(pe, rng)
def _get_property_domains(self, pe: OWLProperty):
if isinstance(pe, OWLObjectProperty) or isinstance(pe, OWLDataProperty):
query = rdfs_prefix + "SELECT ?x WHERE { " + f"<{pe.str}>" + " rdfs:domain ?x. }"
domains = set(get_results_from_ts(self.url, query, OWLClass))
return domains
else:
raise NotImplementedError
[docs]
def get_owl_ontology_manager(self):
# no manager for this kind of Ontology
pass
[docs]
def get_ontology_id(self) -> OWLOntologyID:
# query = (rdf_prefix + owl_prefix +
# "SELECT ?ontologyIRI WHERE { ?ontology rdf:type owl:Ontology . ?ontology rdf:about ?ontologyIRI .}")
# return list(get_results_from_ts(self.url, query, OWLOntologyID)).pop()
raise NotImplementedError
[docs]
def __eq__(self, other):
if isinstance(other, type(self)):
return self.url == other.url
return NotImplemented
[docs]
def __hash__(self):
return hash(self.url)
[docs]
def __repr__(self):
return f'TripleStoreOntology({self.url})'
[docs]
class TripleStoreReasoner(OWLReasonerEx):
__slots__ = 'ontology'
def __init__(self, ontology: TripleStoreOntology):
self.ontology = ontology
self.url = self.ontology.url
self._owl2sparql_converter = Owl2SparqlConverter()
[docs]
def data_property_domains(self, pe: OWLDataProperty, direct: bool = False) -> Iterable[OWLClassExpression]:
domains = {d.get_domain() for d in self.ontology.data_property_domain_axioms(pe)}
sub_domains = set(chain.from_iterable([self.sub_classes(d) for d in domains]))
yield from domains - sub_domains
if not direct:
yield from sub_domains
[docs]
def object_property_domains(self, pe: OWLObjectProperty, direct: bool = False) -> Iterable[OWLClassExpression]:
domains = {d.get_domain() for d in self.ontology.object_property_domain_axioms(pe)}
sub_domains = set(chain.from_iterable([self.sub_classes(d) for d in domains]))
yield from domains - sub_domains
if not direct:
yield from sub_domains
[docs]
def object_property_ranges(self, pe: OWLObjectProperty, direct: bool = False) -> Iterable[OWLClassExpression]:
ranges = {r.get_range() for r in self.ontology.object_property_range_axioms(pe)}
sub_ranges = set(chain.from_iterable([self.sub_classes(d) for d in ranges]))
yield from ranges - sub_ranges
if not direct:
yield from sub_ranges
[docs]
def equivalent_classes(self, ce: OWLClassExpression, only_named: bool = True) -> Iterable[OWLClassExpression]:
if only_named:
if isinstance(ce, OWLClass):
query = owl_prefix + "SELECT DISTINCT ?x " + \
"WHERE { {?x owl:equivalentClass " + f"<{ce.str}>.}}" + \
"UNION {" + f"<{ce.str}>" + " owl:equivalentClass ?x.}" + \
"FILTER(?x != " + f"<{ce.str}>)}}"
yield from get_results_from_ts(self.url, query, OWLClass)
else:
raise NotImplementedError("Equivalent classes for complex class expressions is not implemented")
else:
raise NotImplementedError("Finding equivalent complex classes is not implemented")
[docs]
def disjoint_classes(self, ce: OWLClassExpression, only_named: bool = True) -> Iterable[OWLClassExpression]:
if only_named:
if isinstance(ce, OWLClass):
query = owl_prefix + " SELECT DISTINCT ?x " + \
"WHERE { " + f"<{ce.str}>" + " owl:disjointWith ?x .}"
yield from get_results_from_ts(self.url, query, OWLClass)
else:
raise NotImplementedError("Disjoint classes for complex class expressions is not implemented")
else:
raise NotImplementedError("Finding disjoint complex classes is not implemented")
[docs]
def different_individuals(self, ind: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]:
query = owl_prefix + rdf_prefix + "SELECT DISTINCT ?x \n" + \
"WHERE{ ?allDifferent owl:distinctMembers/rdf:rest*/rdf:first ?x.\n" + \
"?allDifferent owl:distinctMembers/rdf:rest*/rdf:first" + f"<{ind.str}>" + ".\n" + \
"FILTER(?x != " + f"<{ind.str}>" + ")}"
yield from get_results_from_ts(self.url, query, OWLNamedIndividual)
[docs]
def same_individuals(self, ind: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]:
query = owl_prefix + "SELECT DISTINCT ?x " + \
"WHERE {{ ?x owl:sameAs " + f"<{ind.str}>" + " .}" + \
"UNION { " + f"<{ind.str}>" + " owl:sameAs ?x.}}"
yield from get_results_from_ts(self.url, query, OWLNamedIndividual)
[docs]
def equivalent_object_properties(self, op: OWLObjectPropertyExpression) -> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = owl_prefix + "SELECT DISTINCT ?x " + \
"WHERE { {?x owl:equivalentProperty " + f"<{op.str}>.}}" + \
"UNION {" + f"<{op.str}>" + " owl:equivalentProperty ?x.}" + \
"FILTER(?x != " + f"<{op.str}>)}}"
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = owl_prefix + "SELECT DISTINCT ?x " + \
"WHERE { ?inverseProperty owl:inverseOf " + f"<{op.get_inverse().str}> ." + \
" {?x owl:equivalentProperty ?inverseProperty .}" + \
"UNION { ?inverseProperty owl:equivalentClass ?x.}" + \
"FILTER(?x != ?inverseProperty }>)}"
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
[docs]
def equivalent_data_properties(self, dp: OWLDataProperty) -> Iterable[OWLDataProperty]:
query = owl_prefix + "SELECT DISTINCT ?x" + \
"WHERE { {?x owl:equivalentProperty " + f"<{dp.str}>.}}" + \
"UNION {" + f"<{dp.str}>" + " owl:equivalentProperty ?x.}" + \
"FILTER(?x != " + f"<{dp.str}>)}}"
yield from get_results_from_ts(self.url, query, OWLDataProperty)
[docs]
def data_property_values(self, ind: OWLNamedIndividual, pe: OWLDataProperty, direct: bool = True) \
-> Iterable[OWLLiteral]:
query = "SELECT ?x WHERE { " + f"<{ind.str}>" + f"<{pe.str}>" + " ?x . }"
yield from get_results_from_ts(self.url, query, OWLLiteral)
if not direct:
for prop in self.sub_data_properties(pe):
yield from self.data_property_values(ind, prop, True)
[docs]
def object_property_values(self, ind: OWLNamedIndividual, pe: OWLObjectPropertyExpression, direct: bool = True) \
-> Iterable[OWLNamedIndividual]:
if isinstance(pe, OWLObjectProperty):
query = "SELECT ?x WHERE { " + f"<{ind.str}> " + f"<{pe.str}>" + " ?x . }"
yield from get_results_from_ts(self.url, query, OWLNamedIndividual)
elif isinstance(pe, OWLObjectInverseOf):
query = (owl_prefix + "SELECT ?x WHERE { ?inverseProperty owl:inverseOf " +
f"<{pe.get_inverse().str}>." +
f"<{ind.str}> ?inverseProperty ?x . }}")
yield from get_results_from_ts(self.url, query, OWLNamedIndividual)
if not direct:
for prop in self.sub_object_properties(pe):
yield from self.object_property_values(ind, prop, True)
[docs]
def flush(self) -> None:
pass
[docs]
def instances(self, ce: OWLClassExpression, direct: bool = False, seen_set: Set = None) \
-> Iterable[OWLNamedIndividual]:
if not seen_set:
seen_set = set()
seen_set.add(ce)
ce_to_sparql = self._owl2sparql_converter.as_query("?x", ce)
if not direct:
ce_to_sparql = ce_to_sparql.replace("?x a ", "?x a ?some_cls. \n ?some_cls "
"<http://www.w3.org/2000/01/rdf-schema#subClassOf>* ")
yield from get_results_from_ts(self.url, ce_to_sparql, OWLNamedIndividual)
if not direct:
for cls in self.equivalent_classes(ce):
if cls not in seen_set:
seen_set.add(cls)
yield from self.instances(cls, direct, seen_set)
[docs]
def sub_classes(self, ce: OWLClassExpression, direct: bool = False, only_named: bool = True) \
-> Iterable[OWLClassExpression]:
if not only_named:
raise NotImplementedError("Finding anonymous subclasses not implemented")
if isinstance(ce, OWLClass):
query = rdfs_prefix + \
"SELECT ?x WHERE { ?x rdfs:subClassOf" + suf(direct) + f"<{ce.str}>" + ". }"
results = list(get_results_from_ts(self.url, query, OWLClass))
if ce in results:
results.remove(ce)
yield from results
else:
raise NotImplementedError("Subclasses of complex classes retrieved via triple store is not implemented")
# query = "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> " \
# "SELECT DISTINCT ?x WHERE { ?x rdfs:subClassOf" + suf(direct) + " ?c. \n" \
# "?s a ?c . \n"
# ce_to_sparql_statements = self._owl2sparql_converter.convert("?s", ce)
# for s in ce_to_sparql_statements:
# query = query + s + "\n"
# query = query + "}"
# yield from get_results_from_ts(self._triplestore_address, query, OWLClass)
[docs]
def super_classes(self, ce: OWLClassExpression, direct: bool = False, only_named: bool = True) \
-> Iterable[OWLClassExpression]:
if not only_named:
raise NotImplementedError("Finding anonymous superclasses not implemented")
if isinstance(ce, OWLClass):
if ce == OWLThing:
return []
query = rdfs_prefix + \
"SELECT ?x WHERE { " + f"<{ce.str}>" + " rdfs:subClassOf" + suf(direct) + "?x. }"
results = list(get_results_from_ts(self.url, query, OWLClass))
if ce in results:
results.remove(ce)
if (not direct and OWLThing not in results) or len(results) == 0:
results.append(OWLThing)
yield from results
else:
raise NotImplementedError("Superclasses of complex classes retrieved via triple store is not "
"implemented")
[docs]
def disjoint_object_properties(self, op: OWLObjectPropertyExpression) -> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = owl_prefix + rdf_prefix + "SELECT DISTINCT ?x \n" + \
"WHERE{ ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?x.\n" + \
"?AllDisjointProperties owl:members/rdf:rest*/rdf:first" + f"<{op.str}>" + ".\n" + \
"FILTER(?x != " + f"<{op.str}>" + ")}"
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = owl_prefix + " SELECT DISTINCT ?x " + \
"WHERE { ?inverseProperty owl:inverseOf " + f"<{op.get_inverse().str}> ." + \
" ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?x.\n" + \
" ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?inverseProperty.\n" + \
" FILTER(?x != ?inverseProperty)}"
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
[docs]
def disjoint_data_properties(self, dp: OWLDataProperty) -> Iterable[OWLDataProperty]:
query = owl_prefix + rdf_prefix + "SELECT DISTINCT ?x \n" + \
"WHERE{ ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?x.\n" + \
"?AllDisjointProperties owl:members/rdf:rest*/rdf:first" + f"<{dp.str}>" + ".\n" + \
"FILTER(?x != " + f"<{dp.str}>" + ")}"
yield from get_results_from_ts(self.url, query, OWLDataProperty)
[docs]
def all_data_property_values(self, pe: OWLDataProperty, direct: bool = True) -> Iterable[OWLLiteral]:
query = "SELECT DISTINCT ?x WHERE { ?y" + f"<{pe.str}>" + " ?x . }"
yield from get_results_from_ts(self.url, query, OWLLiteral)
if not direct:
for prop in self.sub_data_properties(pe):
yield from self.all_data_property_values(prop, True)
[docs]
def sub_data_properties(self, dp: OWLDataProperty, direct: bool = False) -> Iterable[OWLDataProperty]:
query = rdfs_prefix + \
"SELECT ?x WHERE { ?x rdfs:subPropertyOf" + suf(direct) + f"<{dp.str}>" + ". }"
yield from get_results_from_ts(self.url, query, OWLDataProperty)
[docs]
def super_data_properties(self, dp: OWLDataProperty, direct: bool = False) -> Iterable[OWLDataProperty]:
query = rdfs_prefix + \
"SELECT ?x WHERE {" + f"<{dp.str}>" + " rdfs:subPropertyOf" + suf(direct) + " ?x. }"
yield from get_results_from_ts(self.url, query, OWLDataProperty)
[docs]
def sub_object_properties(self, op: OWLObjectPropertyExpression, direct: bool = False) \
-> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = (rdfs_prefix + "SELECT ?x WHERE { ?x rdfs:subPropertyOf"
+ suf(direct) + f"<{op.str}> . FILTER(?x != " + f"<{op.str}>) }}")
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = (rdfs_prefix + "SELECT ?x " +
"WHERE { ?inverseProperty owl:inverseOf " + f"<{op.get_inverse().str}> ." +
" ?x rdfs:subPropertyOf" + suf(direct) + " ?inverseProperty . }")
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
[docs]
def super_object_properties(self, op: OWLObjectPropertyExpression, direct: bool = False) \
-> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = (rdfs_prefix + "SELECT ?x WHERE {" + f"<{op.str}>" + " rdfs:subPropertyOf"
+ suf(direct) + " ?x. FILTER(?x != " + f"<{op.str}>) }}")
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = (rdfs_prefix + "SELECT ?x " +
"WHERE { ?inverseProperty owl:inverseOf " + f"<{op.get_inverse().str}> ." +
" ?inverseProperty rdfs:subPropertyOf" + suf(direct) + "?x . }")
yield from get_results_from_ts(self.url, query, OWLObjectProperty)
[docs]
def types(self, ind: OWLNamedIndividual, direct: bool = False) -> Iterable[OWLClass]:
if direct:
query = "SELECT ?x WHERE {" + f"<{ind.str}> a" + " ?x. }"
else:
query = rdfs_prefix + "SELECT DISTINCT ?x WHERE {" + f"<{ind.str}> a ?cls. " \
" ?cls rdfs:subClassOf* ?x}"
yield from [i for i in get_results_from_ts(self.url, query, OWLClass)
if i != OWLClass(IRI('http://www.w3.org/2002/07/owl#', 'NamedIndividual'))]
[docs]
def get_root_ontology(self) -> OWLOntology:
return self.ontology
[docs]
def is_isolated(self):
# not needed here
pass
[docs]
def is_using_triplestore(self):
"""No use! Deprecated."""
# TODO: Deprecated! Remove after it is removed from OWLReasoner in owlapy
pass
[docs]
class TripleStoreKnowledgeBase(KnowledgeBase):
url: str
ontology: TripleStoreOntology
reasoner: TripleStoreReasoner
def __init__(self, triplestore_address: str):
self.url = triplestore_address
self.ontology = TripleStoreOntology(triplestore_address)
self.reasoner = TripleStoreReasoner(self.ontology)
super().__init__(ontology=self.ontology, reasoner=self.reasoner)
#######################################################################################################################
[docs]
class TripleStoreReasonerOntology:
def __init__(self, url: str = None):
assert url is not None, "URL cannot be None"
self.url = url
[docs]
def query(self, sparql_query: str):
return requests.Session().post(self.url, data={'query': sparql_query}) #.json()["results"]["bindings"]
[docs]
def are_owl_concept_disjoint(self, c: OWLClass, cc: OWLClass) -> bool:
query = f"""{owl_prefix}ASK WHERE {{<{c.str}> owl:disjointWith <{cc.str}> .}}"""
# Workaround self.query doesn't work for ASK at the moment
return requests.Session().post(self.url, data={'query': query}).json()["boolean"]
[docs]
def abox(self, str_iri: str) -> Generator[Tuple[
Tuple[OWLNamedIndividual, OWLProperty, OWLClass],
Tuple[OWLObjectProperty, OWLObjectProperty, OWLNamedIndividual],
Tuple[OWLObjectProperty, OWLDataProperty, OWLLiteral]], None, None]:
"""@TODO:"""
sparql_query = f"SELECT DISTINCT ?p ?o WHERE {{ <{str_iri}> ?p ?o }}"
subject_ = OWLNamedIndividual(str_iri)
for binding in self.query(sparql_query).json()["results"]["bindings"]:
p, o = binding["p"], binding["o"]
# ORDER MATTERS
if p["value"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
yield subject_, OWLProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), OWLClass(o["value"])
elif o["type"] == "uri":
#################################################################
# IMPORTANT
# Can we assume that if o has URI and is not owl class, then o can be considered as an individual ?
#################################################################
yield subject_, OWLObjectProperty(p["value"]), OWLNamedIndividual(o["value"])
elif o["type"] == "literal":
if o["datatype"] == "http://www.w3.org/2001/XMLSchema#boolean":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=bool(o["value"]))
elif o["datatype"] == "http://www.w3.org/2001/XMLSchema#double":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=float(o["value"]))
else:
raise NotImplementedError(f"Currently this type of literal is not supported:{o} "
f"but can done easily let us know :)")
else:
raise RuntimeError(f"Unrecognized type {subject_} ({p}) ({o})")
[docs]
def classes_in_signature(self) -> Iterable[OWLClass]:
query = owl_prefix + """SELECT DISTINCT ?x WHERE { ?x a owl:Class }"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def most_general_classes(self) -> Iterable[OWLClass]:
""" At least it has single subclass and there is no superclass """
query = f"""{rdf_prefix}{rdfs_prefix}{owl_prefix} SELECT ?x WHERE {{
?concept rdf:type owl:Class .
FILTER EXISTS {{ ?x rdfs:subClassOf ?z . }}
FILTER NOT EXISTS {{ ?y rdfs:subClassOf ?x . }}
}}
"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def least_general_named_concepts(self) -> Generator[OWLClass, None, None]:
""" At least it has single superclass and there is no subclass """
query = f"""{rdf_prefix}{rdfs_prefix}{owl_prefix} SELECT ?concept WHERE {{
?concept rdf:type owl:Class .
FILTER EXISTS {{ ?concept rdfs:subClassOf ?x . }}
FILTER NOT EXISTS {{ ?y rdfs:subClassOf ?concept . }}
}}"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["concept"]["value"])
[docs]
def get_direct_parents(self, named_concept: OWLClass):
""" Father rdf:subClassOf Person"""
assert isinstance(named_concept, OWLClass)
str_named_concept = f"<{named_concept.str}>"
query = f"""{rdfs_prefix} SELECT ?x WHERE {{ {str_named_concept} rdfs:subClassOf ?x . }} """
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def subconcepts(self, named_concept: OWLClass, direct=True):
assert isinstance(named_concept, OWLClass)
str_named_concept = f"<{named_concept.str}>"
if direct:
query = f"""{rdfs_prefix} SELECT ?x WHERE {{ ?x rdfs:subClassOf* {str_named_concept}. }} """
else:
query = f"""{rdf_prefix} SELECT ?x WHERE {{ ?x rdf:subClassOf {str_named_concept}. }} """
for str_iri in self.query(query):
yield OWLClass(str_iri)
[docs]
def get_type_individuals(self, individual: str):
query = f"""SELECT DISTINCT ?x WHERE {{ <{individual}> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?x }}"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def instances(self, expression: OWLClassExpression) -> Generator[OWLNamedIndividual, None, None]:
assert isinstance(expression, OWLClassExpression)
try:
sparql_query = owl_expression_to_sparql(expression=expression)
except Exception as exc:
print(f"Error at converting {expression} into sparql")
traceback.print_exception(exc)
print(f"Error at converting {expression} into sparql")
raise RuntimeError("Couldn't convert")
try:
for binding in self.query(sparql_query).json()["results"]["bindings"]:
yield OWLNamedIndividual(binding["x"]["value"])
except:
print(self.query(sparql_query).text)
raise RuntimeError
[docs]
def individuals_in_signature(self) -> Generator[OWLNamedIndividual, None, None]:
# owl:OWLNamedIndividual is often missing: Perhaps we should add union as well
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a ?y. ?y a owl:Class.}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLNamedIndividual(binding["x"]["value"])
[docs]
def data_properties_in_signature(self) -> Iterable[OWLDataProperty]:
query = owl_prefix + "SELECT DISTINCT ?x " + "WHERE {?x a owl:DatatypeProperty.}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLDataProperty(binding["x"]["value"])
[docs]
def object_properties_in_signature(self) -> Iterable[OWLObjectProperty]:
query = owl_prefix + "SELECT DISTINCT ?x " + "WHERE {?x a owl:ObjectProperty.}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLObjectProperty(binding["x"]["value"])
[docs]
def boolean_data_properties(self):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?x rdfs:range xsd:boolean}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLDataProperty(binding["x"]["value"])
[docs]
def double_data_properties(self):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?x rdfs:range xsd:double}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLDataProperty(binding["x"]["value"])
[docs]
def range_of_double_data_properties(self, prop: OWLDataProperty):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?z <{prop.str}> ?x}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLLiteral(value=float(binding["x"]["value"]))
[docs]
def domain_of_double_data_properties(self, prop: OWLDataProperty):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?x <{prop.str}> ?z}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLNamedIndividual(binding["x"]["value"])
[docs]
class TripleStore:
""" Connecting a triple store"""
url: str
def __init__(self, reasoner=None, url: str = None):
if reasoner is None:
assert url is not None, f"Reasoner:{reasoner} and url of a triplestore {url} cannot be both None."
self.g = TripleStoreReasonerOntology(url=url)
else:
self.g = reasoner
# This assigment is done as many CEL models are implemented to use both attributes seperately.
# CEL models will be refactored.
self.ontology = self.g
self.reasoner = self.g
def __abox_expression(self, individual: OWLNamedIndividual) -> Generator[
Union[OWLClass, OWLObjectSomeValuesFrom, OWLObjectMinCardinality, OWLDataSomeValuesFrom], None, None]:
"""
Return OWL Class Expressions obtained from all set of triples where an input OWLNamedIndividual is subject.
Retrieve all triples (i,p,o) where p \in Resources, and o \in [Resources, Literals] and return the followings
1- Owl Named Classes: C(i)=1.
2- ObjectSomeValuesFrom Nominals: \exists r. {a, b, ..., d}, e.g. (i r, a) exists.
3- OWLObjectSomeValuesFrom over named classes: \exists r. C s.t. x \in {a, b, ..., d} C(x)=1.
4- OWLObjectMinCardinality over named classes: ≥ c r. C
5- OWLDataSomeValuesFrom over literals: \exists r. {literal_a, ..., literal_b}
"""
object_property_to_individuals = dict()
data_property_to_individuals = dict()
# To no return duplicate objects.
quantifier_gate = set()
# (1) Iterate over triples where individual is in the subject position.
for s, p, o in self.g.abox(str_iri=individual.str):
if isinstance(p, OWLProperty) and isinstance(o, OWLClass):
##############################################################
# RETURN OWLClass
##############################################################
yield o
elif isinstance(p, OWLObjectProperty) and isinstance(o, OWLNamedIndividual):
##############################################################
# Store for \exist r. {i, ..., j} and OWLObjectMinCardinality over type counts
##############################################################
object_property_to_individuals.setdefault(p, []).append(o)
elif isinstance(p, OWLDataProperty) and isinstance(o, OWLLiteral):
##############################################################
# Store for \exist r. {literal, ..., another literal}
##############################################################
data_property_to_individuals.setdefault(p, []).append(o)
else:
raise RuntimeError(f"Unrecognized triples to expression mappings {p}{o}")
# Iterating over the mappings of object properties to individuals.
for object_property, list_owl_individuals in object_property_to_individuals.items():
# RETURN: \exists r. {x1,x33, .., x8} => Existential restriction over nominals
yield OWLObjectSomeValuesFrom(property=object_property, filler=OWLObjectOneOf(list_owl_individuals))
owl_class: OWLClass
count: int
for owl_class, count in Counter(
[type_i for i in list_owl_individuals for type_i in
self.get_types(ind=i, direct=True)]).items():
existential_quantifier = OWLObjectSomeValuesFrom(property=object_property, filler=owl_class)
if existential_quantifier in quantifier_gate:
"Do nothing"
else:
##############################################################
# RETURN: \exists r. C => Existential quantifiers over Named OWL Class
##############################################################
quantifier_gate.add(existential_quantifier)
yield existential_quantifier
object_min_cardinality = OWLObjectMinCardinality(cardinality=count,
property=object_property,
filler=owl_class)
if object_min_cardinality in quantifier_gate:
"Do nothing"
else:
##############################################################
# RETURN: ≥ c r. C => OWLObjectMinCardinality over Named OWL Class
##############################################################
quantifier_gate.add(object_min_cardinality)
yield object_min_cardinality
# Iterating over the mappings of data properties to individuals.
for data_property, list_owl_literal in data_property_to_individuals.items():
##############################################################
# RETURN: \exists r. {literal, ..., another literal} => Existential quantifiers over Named OWL Class
##############################################################
# if list_owl_literal is {True, False) doesn't really make sense OWLDataSomeValuesFrom
# Perhaps, if
yield OWLDataSomeValuesFrom(property=data_property, filler=OWLDataOneOf(list_owl_literal))
[docs]
def abox(self, individual: OWLNamedIndividual, mode: str = "native"):
"""
Get all axioms of a given individual being a subject entity
Args:
individual (OWLNamedIndividual): An individual
mode (str): The return format.
1) 'native' -> returns triples as tuples of owlapy objects,
2) 'iri' -> returns triples as tuples of IRIs as string,
3) 'axiom' -> triples are represented by owlapy axioms.
4) 'expression' -> unique owl class expressions based on (1).
Returns: Iterable of tuples or owlapy axiom, depending on the mode.
"""
assert mode in ['native', 'iri', 'axiom',
"expression"], "Valid modes are: 'native', 'iri' or 'axiom', 'expression'"
if mode == "native":
yield from self.g.abox(str_iri=individual.str)
elif mode == "expression":
yield from self.__abox_expression(individual)
elif mode == "axiom":
raise NotImplementedError("Axioms should be checked.")
[docs]
def are_owl_concept_disjoint(self, c: OWLClass, cc: OWLClass) -> bool:
assert isinstance(c, OWLClass) and isinstance(cc, OWLClass)
return self.reasoner.are_owl_concept_disjoint(c, cc)
[docs]
def get_object_properties(self):
yield from self.reasoner.object_properties_in_signature()
[docs]
def get_data_properties(self):
yield from self.reasoner.data_properties_in_signature()
[docs]
def get_concepts(self) -> OWLClass:
yield from self.reasoner.classes_in_signature()
[docs]
def get_classes_in_signature(self) -> OWLClass:
yield from self.reasoner.classes_in_signature()
[docs]
def get_most_general_classes(self):
yield from self.reasoner.most_general_classes()
[docs]
def get_boolean_data_properties(self):
yield from self.reasoner.boolean_data_properties()
[docs]
def get_double_data_properties(self):
yield from self.reasoner.double_data_properties()
[docs]
def get_range_of_double_data_properties(self, prop: OWLDataProperty):
yield from self.reasoner.range_of_double_data_properties(prop)
[docs]
def individuals(self, concept: Optional[OWLClassExpression] = None) -> Generator[OWLNamedIndividual, None, None]:
"""Given an OWL class expression, retrieve all individuals belonging to it.
Args:
concept: Class expression of which to list individuals.
Returns:
Generator of individuals belonging to the given class.
"""
if concept is None or concept.is_owl_thing():
yield from self.reasoner.individuals_in_signature()
else:
yield from self.reasoner.instances(concept)
[docs]
def get_types(self, ind: OWLNamedIndividual, direct: True) -> Generator[OWLClass, None, None]:
if not direct:
raise NotImplementedError("Inferring indirect types not available")
return self.reasoner.get_type_individuals(ind.str)
[docs]
def get_all_sub_concepts(self, concept: OWLClass, direct=True):
yield from self.reasoner.subconcepts(concept, direct)
[docs]
def classes_in_signature(self):
yield from self.reasoner.classes_in_signature()
[docs]
def get_direct_parents(self, c: OWLClass):
yield from self.reasoner.get_direct_parents(c)
[docs]
def most_general_named_concepts(self):
yield from self.reasoner.most_general_named_concepts()
[docs]
def least_general_named_concepts(self):
yield from self.reasoner.least_general_named_concepts()
[docs]
def query(self, sparql: str) -> rdflib.plugins.sparql.processor.SPARQLResult:
yield from self.g.query(sparql_query=sparql)