Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
"""Neo4j Cypher queries for coverage policy system"""
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import structlog
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class TemporalQueries:
|
|
"""Helper class for temporal queries"""
|
|
|
|
@staticmethod
|
|
def get_current_state_query(
|
|
label: str, filters: dict[str, Any] | None = None
|
|
) -> str:
|
|
"""Get query for current state of nodes"""
|
|
where_clause = "n.retracted_at IS NULL"
|
|
|
|
if filters:
|
|
filter_conditions = []
|
|
for key, value in filters.items():
|
|
if isinstance(value, str):
|
|
filter_conditions.append(f"n.{key} = '{value}'")
|
|
else:
|
|
filter_conditions.append(f"n.{key} = {value}")
|
|
|
|
if filter_conditions:
|
|
where_clause += " AND " + " AND ".join(filter_conditions)
|
|
|
|
return f"""
|
|
MATCH (n:{label})
|
|
WHERE {where_clause}
|
|
RETURN n
|
|
ORDER BY n.asserted_at DESC
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_historical_state_query(
|
|
label: str, as_of_time: datetime, filters: dict[str, Any] | None = None
|
|
) -> str:
|
|
"""Get query for historical state at specific time"""
|
|
where_clause = f"""
|
|
n.asserted_at <= datetime('{as_of_time.isoformat()}')
|
|
AND (n.retracted_at IS NULL OR n.retracted_at > datetime('{as_of_time.isoformat()}'))
|
|
"""
|
|
|
|
if filters:
|
|
filter_conditions = []
|
|
for key, value in filters.items():
|
|
if isinstance(value, str):
|
|
filter_conditions.append(f"n.{key} = '{value}'")
|
|
else:
|
|
filter_conditions.append(f"n.{key} = {value}")
|
|
|
|
if filter_conditions:
|
|
where_clause += " AND " + " AND ".join(filter_conditions)
|
|
|
|
return f"""
|
|
MATCH (n:{label})
|
|
WHERE {where_clause}
|
|
RETURN n
|
|
ORDER BY n.asserted_at DESC
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_audit_trail_query(node_id: str) -> str:
|
|
"""Get complete audit trail for a node"""
|
|
return f"""
|
|
MATCH (n {{id: '{node_id}'}})
|
|
RETURN n.asserted_at as asserted_at,
|
|
n.retracted_at as retracted_at,
|
|
n.source as source,
|
|
n.extractor_version as extractor_version,
|
|
properties(n) as properties
|
|
ORDER BY n.asserted_at ASC
|
|
"""
|