Files
ai-tax-agent/libs/neo/queries.py
harkon b324ff09ef
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
Initial commit
2025-10-11 08:41:36 +01:00

79 lines
2.4 KiB
Python

"""Neo4j Cypher queries for coverage policy system"""
from datetime import datetime
from typing import Any
import structlog
logger = structlog.get_logger()
class TemporalQueries:
"""Helper class for temporal queries"""
@staticmethod
def get_current_state_query(
label: str, filters: dict[str, Any] | None = None
) -> str:
"""Get query for current state of nodes"""
where_clause = "n.retracted_at IS NULL"
if filters:
filter_conditions = []
for key, value in filters.items():
if isinstance(value, str):
filter_conditions.append(f"n.{key} = '{value}'")
else:
filter_conditions.append(f"n.{key} = {value}")
if filter_conditions:
where_clause += " AND " + " AND ".join(filter_conditions)
return f"""
MATCH (n:{label})
WHERE {where_clause}
RETURN n
ORDER BY n.asserted_at DESC
"""
@staticmethod
def get_historical_state_query(
label: str, as_of_time: datetime, filters: dict[str, Any] | None = None
) -> str:
"""Get query for historical state at specific time"""
where_clause = f"""
n.asserted_at <= datetime('{as_of_time.isoformat()}')
AND (n.retracted_at IS NULL OR n.retracted_at > datetime('{as_of_time.isoformat()}'))
"""
if filters:
filter_conditions = []
for key, value in filters.items():
if isinstance(value, str):
filter_conditions.append(f"n.{key} = '{value}'")
else:
filter_conditions.append(f"n.{key} = {value}")
if filter_conditions:
where_clause += " AND " + " AND ".join(filter_conditions)
return f"""
MATCH (n:{label})
WHERE {where_clause}
RETURN n
ORDER BY n.asserted_at DESC
"""
@staticmethod
def get_audit_trail_query(node_id: str) -> str:
"""Get complete audit trail for a node"""
return f"""
MATCH (n {{id: '{node_id}'}})
RETURN n.asserted_at as asserted_at,
n.retracted_at as retracted_at,
n.source as source,
n.extractor_version as extractor_version,
properties(n) as properties
ORDER BY n.asserted_at ASC
"""