Files
ai-tax-agent/libs/neo/validator.py
harkon b324ff09ef
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
Initial commit
2025-10-11 08:41:36 +01:00

71 lines
2.4 KiB
Python

"""SHACL validation using pySHACL"""
import asyncio
from typing import Any
import structlog
logger = structlog.get_logger()
# pyright: ignore[reportAttributeAccessIssue]
class SHACLValidator: # pylint: disable=too-few-public-methods
"""SHACL validation using pySHACL"""
def __init__(self, shapes_file: str) -> None:
self.shapes_file = shapes_file
async def validate_graph(self, rdf_data: str) -> dict[str, Any]:
"""Validate RDF data against SHACL shapes"""
def _validate() -> dict[str, Any]:
try:
# pylint: disable=import-outside-toplevel
from pyshacl import validate
from rdflib import Graph
# Load data graph
data_graph = Graph()
data_graph.parse(data=rdf_data, format="turtle")
# Load shapes graph
shapes_graph = Graph()
shapes_graph.parse(self.shapes_file, format="turtle")
# Run validation
conforms, results_graph, results_text = validate(
data_graph=data_graph,
shacl_graph=shapes_graph,
inference="rdfs",
abort_on_first=False,
allow_infos=True,
allow_warnings=True,
)
return {
"conforms": conforms,
"results_text": results_text,
"violations_count": len(
list(
results_graph.subjects() # pyright: ignore[reportAttributeAccessIssue]
) # fmt: skip # pyright: ignore[reportAttributeAccessIssue]
),
}
except ImportError:
logger.warning("pySHACL not available, skipping validation")
return {
"conforms": True,
"results_text": "SHACL validation skipped (pySHACL not installed)",
"violations_count": 0,
}
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("SHACL validation failed", error=str(e))
return {
"conforms": False,
"results_text": f"Validation error: {str(e)}",
"violations_count": -1,
}
return await asyncio.get_event_loop().run_in_executor(None, _validate)