import subprocess
import platform
import tempfile
import rdflib
from rdflib import OWL, SH
from rdflib.term import BNode, URIRef, _SKOLEM_DEFAULT_AUTHORITY, rdflib_skolem_genid
from pathlib import Path
from typing import Optional
from urllib.parse import urljoin
# monkeypatch BNode.skolemize with a new function
def _new_bnode_skolemize(
self, authority: Optional[str] = None, basepath: Optional[str] = None
) -> URIRef:
"""Create a URIRef "skolem" representation of the BNode, in accordance
with http://www.w3.org/TR/rdf11-concepts/#section-skolemization
.. versionadded:: 4.0
"""
if authority is None:
authority = _SKOLEM_DEFAULT_AUTHORITY
if basepath is None:
basepath = rdflib_skolem_genid
skolem = "%s%s" % (basepath, str(self).replace(" ", "_"))
return URIRef(urljoin(authority, skolem))
BNode.skolemize = _new_bnode_skolemize
_MAX_EXTERNAL_LOOPS = 3
[docs]
def infer(
data_graph: rdflib.Graph, ontologies: rdflib.Graph, max_iterations: int = 100
):
# remove imports
imports = data_graph.triples((None, OWL.imports, None))
data_graph.remove((None, OWL.imports, None))
# skolemize before inference
data_graph_skolemized = data_graph.skolemize()
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Define the target path within the temporary directory
target_file_path = temp_dir_path / "data.ttl"
(data_graph_skolemized + ontologies).serialize(
target_file_path, format="turtle"
)
# set the SHACL_HOME environment variable to point to the shacl-1.4.2 directory
# so that the shaclinfer.sh script can find the shacl.jar file
env = {"SHACL_HOME": str(Path(__file__).parent / "topquadrant_shacl")}
# get the shacl-1.4.2/bin/shaclinfer.sh script from brickschema.bin in this package
# using pkgutil. If using *nix, use .sh; else if on windows use .bat
if platform.system() == "Windows":
script = [
str(Path(__file__).parent / "topquadrant_shacl/bin/shaclinfer.bat")
]
else:
script = [
"/bin/sh",
str(Path(__file__).parent / "topquadrant_shacl/bin/shaclinfer.sh"),
]
# Initialize the size of the graph
previous_size = 0
current_size = len(data_graph_skolemized)
current_iter = 0
# Run the shaclinfer multiple times until the skolemized data graph stops changing in size
while previous_size != current_size and current_iter < _MAX_EXTERNAL_LOOPS:
(data_graph_skolemized + ontologies).serialize(
target_file_path, format="turtle"
)
try:
print(f"Running {script} -datafile {target_file_path}")
output = subprocess.check_output(
[
*script,
"-datafile",
target_file_path,
"-maxiterations",
str(max_iterations),
],
stderr=subprocess.STDOUT,
universal_newlines=True,
env=env,
)
except subprocess.CalledProcessError as e:
output = e.output # Capture the output of the failed subprocess
# Write logs to a file in the temporary directory (or the desired location)
inferred_file_path = temp_dir_path / "inferred.ttl"
with open(inferred_file_path, "w") as f:
for line in output.splitlines():
if "::" not in line:
f.write(f"{line}\n")
inferred_triples = rdflib.Graph()
inferred_triples.parse(inferred_file_path, format="turtle")
print(f"Got {len(inferred_triples)} inferred triples")
for s, p, o in inferred_triples:
if isinstance(s, BNode) or isinstance(o, BNode):
continue
data_graph_skolemized.add((s, p, o))
# Update the size of the graph
previous_size = current_size
current_size = len(data_graph_skolemized)
current_iter += 1
expanded_graph = data_graph_skolemized.de_skolemize()
# add imports back in
for imp in imports:
expanded_graph.add(imp)
return expanded_graph
[docs]
def validate(data_graph: rdflib.Graph, shape_graphs: rdflib.Graph):
# remove imports
data_graph.remove((None, OWL.imports, None))
# set the SHACL_HOME environment variable to point to the shacl-1.4.2 directory
# so that the shaclinfer.sh script can find the shacl.jar file
env = {"SHACL_HOME": str(Path(__file__).parent / "topquadrant_shacl")}
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Define the target path within the temporary directory
target_file_path = temp_dir_path / "data.ttl"
inferred_graph = infer(data_graph, shape_graphs)
inferred_graph.serialize(target_file_path, format="ttl")
# get the shacl-1.4.2/bin/shaclvalidate.sh script from the same directory
# as this file
if platform.system() == "Windows":
script = [
str(Path(__file__).parent / "topquadrant_shacl/bin/shaclvalidate.bat")
]
else:
script = [
"/bin/sh",
str(Path(__file__).parent / "topquadrant_shacl/bin/shaclvalidate.sh"),
]
try:
print(f"Running {script} -datafile {target_file_path}")
output = subprocess.check_output(
[*script, "-datafile", target_file_path],
stderr=subprocess.STDOUT,
universal_newlines=True,
env=env,
)
except subprocess.CalledProcessError as e:
output = e.output # Capture the output of the failed subprocess
# Write logs to a file in the temporary directory (or the desired location)
report_file_path = temp_dir_path / "report.ttl"
with open(report_file_path, "w") as f:
for line in output.splitlines():
if "::" not in line: # filter out log output
f.write(f"{line}\n")
report_g = rdflib.Graph()
report_g.parse(report_file_path, format="turtle")
# check if there are any sh:resultSeverity sh:Violation predicate/object pairs
has_violation = len(
list(report_g.subjects(predicate=SH.resultSeverity, object=SH.Violation))
)
conforms = len(
list(report_g.subjects(predicate=SH.conforms, object=rdflib.Literal(True)))
)
validates = not has_violation or conforms
return validates, report_g, str(report_g.serialize(format="turtle"))