Files
ai-tax-agent/libs/events/base.py
harkon fdba81809f
Some checks failed
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
completed local setup with compose
2025-11-26 13:17:17 +00:00

69 lines
2.0 KiB
Python

"""Base event classes and interfaces."""
import json
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import Any
import ulid
# Each payload MUST include: `event_id (ulid)`, `occurred_at (iso)`, `actor`, `tenant_id`, `trace_id`, `schema_version`, and a `data` object (service-specific).
class EventPayload:
"""Standard event payload structure"""
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
self,
data: dict[str, Any],
actor: str,
tenant_id: str,
trace_id: str | None = None,
schema_version: str = "1.0",
):
self.event_id = str(ulid.new())
self.occurred_at = datetime.now(UTC).isoformat()
self.actor = actor
self.tenant_id = tenant_id
self.trace_id = trace_id
self.schema_version = schema_version
self.data = data
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization"""
return {
"event_id": self.event_id,
"occurred_at": self.occurred_at,
"actor": self.actor,
"tenant_id": self.tenant_id,
"trace_id": self.trace_id,
"schema_version": self.schema_version,
"data": self.data,
}
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict())
class EventBus(ABC):
"""Abstract event bus interface"""
@abstractmethod
async def publish(self, topic: str, payload: EventPayload) -> bool:
"""Publish event to topic"""
@abstractmethod
async def subscribe(
self, topic: str, handler: Callable[[str, EventPayload], Awaitable[None]]
) -> None:
"""Subscribe to topic with handler"""
@abstractmethod
async def start(self) -> None:
"""Start the event bus"""
@abstractmethod
async def stop(self) -> None:
"""Stop the event bus"""