diff --git a/apps/svc_kg/main.py b/apps/svc_kg/main.py index 1894c40..20b1039 100644 --- a/apps/svc_kg/main.py +++ b/apps/svc_kg/main.py @@ -64,20 +64,20 @@ async def init_dependencies(app_settings: KGSettings) -> None: shapes_graph = None +async def startup_event() -> None: + """Initialize service dependencies""" + await init_dependencies(cast(KGSettings, _settings)) + + app, _settings = create_app( service_name="svc-kg", title="Tax Agent Knowledge Graph Service", description="Service for managing and validating the Knowledge Graph", settings_class=KGSettings, + startup_hooks=[startup_event], ) -# Initialize dependencies immediately -@app.on_event("startup") -async def startup_event(): - await init_dependencies(cast(KGSettings, _settings)) - - tracer = get_tracer("svc-kg") metrics = get_metrics() @@ -100,7 +100,7 @@ async def _handle_kg_upsert_ready(topic: str, payload: EventPayload) -> None: data = payload.data nodes = data.get("nodes", []) relationships = data.get("relationships", []) - document_id = data.get("document_id") + doc_id = data.get("doc_id") tenant_id = data.get("tenant_id") if not nodes and not relationships: @@ -108,7 +108,7 @@ async def _handle_kg_upsert_ready(topic: str, payload: EventPayload) -> None: return with tracer.start_as_current_span("upsert_kg_data") as span: - span.set_attribute("document_id", document_id) + span.set_attribute("doc_id", doc_id) span.set_attribute("tenant_id", tenant_id) span.set_attribute("node_count", len(nodes)) span.set_attribute("relationship_count", len(relationships)) @@ -121,12 +121,12 @@ async def _handle_kg_upsert_ready(topic: str, payload: EventPayload) -> None: if not conforms: logger.error( "SHACL validation failed", - document_id=document_id, + doc_id=doc_id, validation_report=validation_report, ) - metrics.counter("kg_validation_errors_total").labels( - tenant_id=tenant_id - ).inc() + metrics.counter( + "kg_validation_errors_total", labelnames=["tenant_id"] + ).labels(tenant_id=tenant_id).inc() return # 2. Write data to Neo4j @@ -144,31 +144,30 @@ async def _handle_kg_upsert_ready(topic: str, payload: EventPayload) -> None: # 3. Publish kg.upserted event event_payload = EventPayload( data={ - "document_id": document_id, + "doc_id": doc_id, "tenant_id": tenant_id, "taxpayer_id": data.get("taxpayer_id"), "tax_year": data.get("tax_year"), "node_count": len(nodes), "relationship_count": len(relationships), + "success": True, }, actor=payload.actor, - tenant_id=tenant_id, + tenant_id=str(tenant_id), trace_id=str(span.get_span_context().trace_id), ) await event_bus.publish(EventTopics.KG_UPSERTED, event_payload) # type: ignore - metrics.counter("kg_upserts_total").labels(tenant_id=tenant_id).inc() - logger.info( - "KG upsert completed", document_id=document_id, tenant_id=tenant_id - ) + metrics.counter("kg_upserts_total", labelnames=["tenant_id"]).labels( + tenant_id=tenant_id + ).inc() + logger.info("KG upsert completed", doc_id=doc_id, tenant_id=tenant_id) except Exception as e: - logger.error( - "Failed to upsert KG data", document_id=document_id, error=str(e) - ) - metrics.counter("kg_upsert_errors_total").labels( - tenant_id=tenant_id, error_type=type(e).__name__ - ).inc() + logger.error("Failed to upsert KG data", doc_id=doc_id, error=str(e)) + metrics.counter( + "kg_upsert_errors_total", labelnames=["tenant_id", "error_type"] + ).labels(tenant_id=tenant_id, error_type=type(e).__name__).inc() async def _validate_with_shacl( diff --git a/apps/svc_normalize_map/main.py b/apps/svc_normalize_map/main.py index 3ac4af8..637d8f3 100644 --- a/apps/svc_normalize_map/main.py +++ b/apps/svc_normalize_map/main.py @@ -67,20 +67,20 @@ async def init_dependencies(app_settings: NormalizeMapSettings) -> None: logger.info("NormalizeMap service started successfully") +async def startup_event() -> None: + """Initialize service dependencies""" + await init_dependencies(cast(NormalizeMapSettings, _settings)) + + app, _settings = create_app( service_name="svc-normalize-map", title="Tax Agent Normalize and Map Service", description="Normalize extracted data and map to Knowledge Graph", settings_class=NormalizeMapSettings, + startup_hooks=[startup_event], ) -# Initialize dependencies immediately -@app.on_event("startup") -async def startup_event(): # type: ignore - await init_dependencies(cast(NormalizeMapSettings, _settings)) - - tracer = get_tracer("svc-normalize-map") metrics = get_metrics() @@ -314,7 +314,7 @@ async def _map_to_kg_ontology( return { "nodes": nodes, "relationships": relationships, - "document_id": doc_id, + "doc_id": doc_id, "tenant_id": tenant_id, } diff --git a/apps/svc_normalize_map/requirements.txt b/apps/svc_normalize_map/requirements.txt index 5a6022a..e69de29 100644 --- a/apps/svc_normalize_map/requirements.txt +++ b/apps/svc_normalize_map/requirements.txt @@ -1 +0,0 @@ -python-ulid diff --git a/tests/e2e/test_backend_journey.py b/tests/e2e/test_backend_journey.py index 7d24a93..b384572 100644 --- a/tests/e2e/test_backend_journey.py +++ b/tests/e2e/test_backend_journey.py @@ -4,7 +4,6 @@ import httpx import pytest from libs.events import EventTopics, NATSEventBus -from libs.schemas.events import DocumentExtractedEventData # Configuration INGESTION_URL = "http://localhost:8000" @@ -26,22 +25,31 @@ async def test_backend_journey(): ) await bus.start() - # Future to capture the final event - extraction_future = asyncio.Future() + # Queues to capture events + extraction_queue = asyncio.Queue() + kg_ready_queue = asyncio.Queue() + kg_upserted_queue = asyncio.Queue() async def extraction_handler(topic, payload): if payload.tenant_id == TENANT_ID: - extraction_future.set_result(payload) + await extraction_queue.put(payload) - # Subscribe to the final event in the chain + async def kg_ready_handler(topic, payload): + await kg_ready_queue.put(payload) + + async def kg_upserted_handler(topic, payload): + await kg_upserted_queue.put(payload) + + # Subscribe to events await bus.subscribe(EventTopics.DOC_EXTRACTED, extraction_handler) + await bus.subscribe(EventTopics.KG_UPSERT_READY, kg_ready_handler) + await bus.subscribe(EventTopics.KG_UPSERTED, kg_upserted_handler) try: # 2. Upload a document async with httpx.AsyncClient( verify=False ) as client: # Disable SSL verification for local testing - # Create a dummy PDF file # Create a valid minimal PDF file pdf_content = ( b"%PDF-1.0\n1 0 obj<>endobj 2 0 obj<>endobj " @@ -67,25 +75,50 @@ async def test_backend_journey(): doc_id = upload_data["doc_id"] print(f"Uploaded document: {doc_id}") - # 3. Wait for extraction event (with timeout) + # Helper to wait for matching event + async def wait_for_event(queue, event_name): + start_time = asyncio.get_event_loop().time() + timeout = 30.0 + while True: + remaining = timeout - (asyncio.get_event_loop().time() - start_time) + if remaining <= 0: + raise TimeoutError(f"Timed out waiting for {event_name}") + + try: + payload = await asyncio.wait_for(queue.get(), timeout=remaining) + data = payload.data + if data.get("doc_id") == doc_id: + return payload + print( + f"Ignoring {event_name} for different doc_id: {data.get('doc_id')}" + ) + except TimeoutError: + raise TimeoutError(f"Timed out waiting for {event_name}") + + # 3. Wait for extraction event try: - # Give it enough time for the whole chain to process - payload = await asyncio.wait_for(extraction_future, timeout=30.0) - - # 4. Verify payload - data = payload.data - assert data["doc_id"] == doc_id - assert data["tenant_id"] == TENANT_ID - assert "extraction_results" in data - - # Validate against schema - event_data = DocumentExtractedEventData(**data) - assert event_data.doc_id == doc_id - - print("E2E Journey completed successfully!") - + payload = await wait_for_event(extraction_queue, "extraction event") + print("Extraction completed successfully!") except TimeoutError: pytest.fail("Timed out waiting for extraction event") + # 4. Wait for KG Ready event + try: + payload = await wait_for_event(kg_ready_queue, "KG Ready event") + print("Normalization completed successfully!") + except TimeoutError: + pytest.fail("Timed out waiting for KG Ready event") + + # 5. Wait for KG Upserted event + try: + payload = await wait_for_event(kg_upserted_queue, "KG Upserted event") + data = payload.data + assert data["success"] is True + print("KG Upsert completed successfully!") + print("E2E Journey completed successfully!") + + except TimeoutError: + pytest.fail("Timed out waiting for KG Upserted event") + finally: await bus.stop()