Files
ai-tax-agent/libs/neo/__init__.py
harkon b324ff09ef
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
Initial commit
2025-10-11 08:41:36 +01:00

141 lines
4.0 KiB
Python

from typing import TYPE_CHECKING, Any
import structlog
from .client import Neo4jClient
from .queries import TemporalQueries
from .validator import SHACLValidator
if TYPE_CHECKING:
from libs.schemas.coverage.evaluation import Citation, FoundEvidence
logger = structlog.get_logger()
async def kg_boxes_exist(client: Neo4jClient, box_ids: list[str]) -> dict[str, bool]:
"""Check if form boxes exist in the knowledge graph"""
query = """
UNWIND $box_ids AS bid
OPTIONAL MATCH (fb:FormBox {box_id: bid})
RETURN bid, fb IS NOT NULL AS exists
"""
try:
results = await client.run_query(query, {"box_ids": box_ids})
return {result["bid"]: result["exists"] for result in results}
except Exception as e:
logger.error("Failed to check box existence", box_ids=box_ids, error=str(e))
return dict.fromkeys(box_ids, False)
async def kg_find_evidence(
client: Neo4jClient,
taxpayer_id: str,
tax_year: str,
kinds: list[str],
min_ocr: float = 0.6,
date_window: int = 30,
) -> list["FoundEvidence"]:
"""Find evidence documents for taxpayer in tax year"""
query = """
MATCH (p:TaxpayerProfile {taxpayer_id: $tid})-[:OF_TAX_YEAR]->(y:TaxYear {label: $tax_year})
MATCH (ev:Evidence)-[:DERIVED_FROM]->(d:Document)
WHERE (ev)-[:SUPPORTS]->(p) OR (d)-[:BELONGS_TO]->(p)
AND d.kind IN $kinds
AND date(d.date) >= date(y.start_date) AND date(d.date) <= date(y.end_date)
AND coalesce(ev.ocr_confidence, 0.0) >= $min_ocr
RETURN d.doc_id AS doc_id,
d.kind AS kind,
ev.page AS page,
ev.bbox AS bbox,
ev.ocr_confidence AS ocr_confidence,
ev.extract_confidence AS extract_confidence,
d.date AS date
ORDER BY ev.ocr_confidence DESC
LIMIT 100
"""
try:
results = await client.run_query(
query,
{
"tid": taxpayer_id,
"tax_year": tax_year,
"kinds": kinds,
"min_ocr": min_ocr,
},
)
# Convert to FoundEvidence format
from libs.schemas.coverage.evaluation import FoundEvidence
evidence_list = []
for result in results:
evidence = FoundEvidence(
doc_id=result["doc_id"],
kind=result["kind"],
pages=[result["page"]] if result["page"] else [],
bbox=result["bbox"],
ocr_confidence=result["ocr_confidence"] or 0.0,
extract_confidence=result["extract_confidence"] or 0.0,
date=result["date"],
)
evidence_list.append(evidence)
return evidence_list
except Exception as e:
logger.error(
"Failed to find evidence",
taxpayer_id=taxpayer_id,
tax_year=tax_year,
kinds=kinds,
error=str(e),
)
return []
async def kg_rule_citations(
client: Neo4jClient, schedule_id: str, box_ids: list[str]
) -> list["Citation"]:
"""Get rule citations for schedule and form boxes"""
query = """
MATCH (fb:FormBox)-[:GOVERNED_BY]->(r:Rule)-[:CITES]->(doc:Document)
WHERE fb.box_id IN $box_ids
RETURN r.rule_id AS rule_id,
doc.doc_id AS doc_id,
doc.locator AS locator
LIMIT 10
"""
try:
results = await client.run_query(query, {"box_ids": box_ids})
# Convert to Citation format
from libs.schemas.coverage.evaluation import Citation
citations = []
for result in results:
citation = Citation(
rule_id=result["rule_id"],
doc_id=result["doc_id"],
locator=result["locator"],
)
citations.append(citation)
return citations
except Exception as e:
logger.error(
"Failed to get rule citations",
schedule_id=schedule_id,
box_ids=box_ids,
error=str(e),
)
return []
__all__ = ["Neo4jClient", "TemporalQueries", "SHACLValidator"]