import os import sys from datetime import UTC, datetime from typing import Any, cast import structlog import ulid from fastapi import HTTPException, Request from fastapi.responses import JSONResponse sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) from libs.app_factory import create_app from libs.config import ( BaseAppSettings, create_event_bus, create_minio_client, create_neo4j_client, ) from libs.events import EventBus, EventPayload, EventTopics from libs.neo import Neo4jClient from libs.observability import get_metrics, get_tracer, setup_observability from libs.schemas import ErrorResponse from libs.storage import DocumentStorage, StorageClient logger = structlog.get_logger() class NormalizeMapSettings(BaseAppSettings): """Settings for NormalizeMap service""" service_name: str = "svc-normalize-map" # Global clients storage_client: StorageClient | None = None document_storage: DocumentStorage | None = None event_bus: EventBus | None = None neo4j_client: Neo4jClient | None = None settings: NormalizeMapSettings async def init_dependencies(app_settings: NormalizeMapSettings) -> None: """Initialize service dependencies""" global storage_client, document_storage, event_bus, neo4j_client, settings settings = app_settings logger.info("Starting NormalizeMap service") setup_observability(settings) minio_client = create_minio_client(settings) storage_client = StorageClient(minio_client) document_storage = DocumentStorage(storage_client) neo4j_driver = create_neo4j_client(settings) neo4j_client = Neo4jClient(neo4j_driver) event_bus = create_event_bus(settings) if not event_bus: raise HTTPException(status_code=500, detail="Event bus not initialized") await event_bus.start() await event_bus.subscribe(EventTopics.DOC_EXTRACTED, _handle_document_extracted) logger.info("NormalizeMap service started successfully") async def startup_event() -> None: """Initialize service dependencies""" await init_dependencies(cast(NormalizeMapSettings, _settings)) app, _settings = create_app( service_name="svc-normalize-map", title="Tax Agent Normalize and Map Service", description="Normalize extracted data and map to Knowledge Graph", settings_class=NormalizeMapSettings, startup_hooks=[startup_event], ) tracer = get_tracer("svc-normalize-map") metrics = get_metrics() @app.on_event("shutdown") async def shutdown_event() -> None: """Cleanup service dependencies""" global event_bus, neo4j_client logger.info("Shutting down NormalizeMap service") if event_bus: await event_bus.stop() if neo4j_client: await neo4j_client.close() logger.info("NormalizeMap service shutdown complete") async def _handle_document_extracted(topic: str, payload: EventPayload) -> None: """Handle document extracted events""" data = payload.data doc_id = data.get("doc_id") tenant_id = data.get("tenant_id") extracted_fields = data.get("extraction_results", {}).get("extracted_fields", {}) provenance = data.get("extraction_results", {}).get("provenance", []) if not doc_id or not tenant_id or not extracted_fields: logger.warning("Invalid document extracted event", data=data) return with tracer.start_as_current_span("normalize_and_map") as span: span.set_attribute("doc_id", doc_id) span.set_attribute("tenant_id", tenant_id) try: # 1. Normalize data normalized_data = await _normalize_data(extracted_fields) # 2. Map to KG ontology kg_upsert_payload = await _map_to_kg_ontology( doc_id, tenant_id, normalized_data, provenance ) # 3. Publish kg.upsert.ready event event_payload = EventPayload( data=kg_upsert_payload, actor=payload.actor, tenant_id=tenant_id, trace_id=str(span.get_span_context().trace_id), ) await event_bus.publish(EventTopics.KG_UPSERT_READY, event_payload) # type: ignore metrics.counter("normalized_documents_total").labels( tenant_id=tenant_id ).inc() logger.info( "Document normalized and mapped", doc_id=doc_id, tenant_id=tenant_id ) except Exception as e: logger.error( "Failed to normalize and map document", doc_id=doc_id, error=str(e) ) metrics.counter("normalization_errors_total").labels( tenant_id=tenant_id, error_type=type(e).__name__ ).inc() async def _normalize_data(extracted_fields: dict[str, Any]) -> dict[str, Any]: """Normalize extracted data into a consistent format""" normalized_data = {} for key, value in extracted_fields.items(): # Example: Simple date normalization (can be expanded) if "date" in key.lower() and isinstance(value, str): try: # Attempt to parse various date formats # Add more robust date parsing logic here as needed normalized_data[key] = datetime.fromisoformat(value).date().isoformat() except ValueError: normalized_data[key] = value # Keep original if parsing fails elif "amount" in key.lower() and isinstance(value, str): # Example: Normalize currency to a Decimal try: normalized_data[key] = float(value.replace("£", "").replace(",", "")) except ValueError: normalized_data[key] = value else: normalized_data[key] = value return normalized_data async def _map_to_kg_ontology( doc_id: str, tenant_id: str, normalized_data: dict[str, Any], provenance: list[dict[str, Any]], ) -> dict[str, Any]: """Map normalized data to Knowledge Graph ontology nodes and relationships based on kg_schema.json""" nodes = [] relationships = [] now = datetime.now(UTC).isoformat() # Create a Document node doc_node_id = f"document_{doc_id}" nodes.append( { "id": doc_node_id, "type": "Document", "properties": { "node_type": "Document", "doc_id": doc_id, "kind": normalized_data.get("kind", "OtherSupportingDoc"), "source": normalized_data.get("source", "manual_upload"), "checksum": normalized_data.get("checksum", ""), "valid_from": now, "asserted_at": now, # "source": "svc-normalize-map", "extractor_version": "1.0.0", }, } ) # Create a TaxpayerProfile node taxpayer_id = normalized_data.get("taxpayer_id", "unknown_taxpayer") taxpayer_node_id = f"taxpayer_{taxpayer_id}" nodes.append( { "id": taxpayer_node_id, "type": "TaxpayerProfile", "properties": { "node_type": "TaxpayerProfile", "taxpayer_id": taxpayer_id, "type": "Individual", "valid_from": now, "asserted_at": now, "source": "svc-normalize-map", "extractor_version": "1.0.0", }, } ) relationships.append( { "id": f"rel_document_to_taxpayer_{doc_id}", "type": "BELONGS_TO", "sourceId": doc_node_id, "targetId": taxpayer_node_id, "properties": {}, } ) # Create IncomeItem/ExpenseItem nodes and Evidence nodes item_type = ( "IncomeItem" if normalized_data.get("kind") == "invoice" else "ExpenseItem" ) for field, value in normalized_data.items(): if field in ["total_amount", "net_amount", "vat_amount", "amount"]: item_id = f"item_{ulid.new()}" item_node_id = f"{item_type.lower()}_{item_id}" # Create the financial item node (IncomeItem or ExpenseItem) nodes.append( { "id": item_node_id, "type": item_type, "properties": { "node_type": item_type, "type": ( "self_employment" if "invoice" in normalized_data.get("kind", "") else "other" ), "gross": value, "currency": "GBP", "description": normalized_data.get("description", field), "valid_from": now, "asserted_at": now, "source": "svc-normalize-map", "extractor_version": "1.0.0", }, } ) relationships.append( { "id": f"rel_taxpayer_has_{item_type.lower()}_{item_id}", "type": ( "HAS_INCOME" if item_type == "IncomeItem" else "HAS_EXPENSE" ), "sourceId": taxpayer_node_id, "targetId": item_node_id, "properties": {}, } ) # Create an Evidence node linking the item to the document prov = next((p for p in provenance if p["field"] == field), None) if prov: evidence_id = f"evidence_{item_id}" nodes.append( { "id": evidence_id, "type": "Evidence", "properties": { "node_type": "Evidence", "snippet_id": evidence_id, "doc_ref": doc_id, "page": prov.get("page"), "bbox": prov.get("bbox"), "text_hash": "dummy_hash", # Placeholder "ocr_confidence": prov.get("confidence"), "extracted_text": str(value), "valid_from": now, "asserted_at": now, "source": "svc-normalize-map", "extractor_version": "1.0.0", }, } ) relationships.append( { "id": f"rel_item_supported_by_evidence_{item_id}", "type": "SUPPORTED_BY", "sourceId": item_node_id, "targetId": evidence_id, "properties": {}, } ) return { "nodes": nodes, "relationships": relationships, "doc_id": doc_id, "tenant_id": tenant_id, } @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: """Handle HTTP exceptions with RFC7807 format""" return JSONResponse( status_code=exc.status_code, content=ErrorResponse( type=f"https://httpstatuses.com/{exc.status_code}", title=exc.detail, status=exc.status_code, detail=exc.detail, instance=str(request.url), trace_id=getattr(request.state, "trace_id", None), ).model_dump(), ) if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8004, reload=True, log_config=None)