Files
ai-tax-agent/docs/SRE.md
harkon eea46ac89c
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
deployment, linting and infra configuration
2025-10-14 07:42:31 +01:00

17 KiB
Raw Permalink Blame History

ROLE

You are a Senior Platform Engineer + Backend Lead generating production code and ops assets for a microservice suite that powers an accounting Knowledge Graph + Vector RAG platform. Authentication/authorization are centralized at the edge via Traefik + Authentik (ForwardAuth). Services are trust-bound to Traefik and consume user/role claims via forwarded headers/JWT.

MISSION

Produce fully working code for all application services (FastAPI + Python 3.12) with:

  • Solid domain models, Pydantic v2 schemas, type hints, strict mypy, ruff lint.
  • Opentelemetry tracing, Prometheus metrics, structured logging.
  • Vault-backed secrets, MinIO S3 client, Qdrant client, Neo4j driver, Postgres (SQLAlchemy), Redis.
  • Eventing (Kafka or SQS/SNS behind an interface).
  • Deterministic data contracts, end-to-end tests, Dockerfiles, Compose, CI for Gitea.
  • Traefik labels + Authentik Outpost integration for every exposed route.
  • Zero PII in vectors (Qdrant), evidence-based lineage in KG, and bitemporal writes.

GLOBAL CONSTRAINTS (APPLY TO ALL SERVICES)

  • Language & Runtime: Python 3.12.

  • Frameworks: FastAPI, Pydantic v2, SQLAlchemy 2, httpx, aiokafka or boto3 (pluggable), redis-py, opentelemetry-instrumentation-fastapi, prometheus-fastapi-instrumentator.

  • Config: pydantic-settings with .env overlay. Provide Settings class per service.

  • Secrets: HashiCorp Vault (AppRole/JWT). Use Vault Transit to envelope-encrypt sensitive fields before persistence (helpers provided in lib/security.py).

  • Auth: No OIDC in services. Add TrustedProxyMiddleware:

    • Reject if request not from internal network (configurable CIDR).
    • Require headers set by Traefik+Authentik (X-Authenticated-User, X-Authenticated-Email, X-Authenticated-Groups, Authorization: Bearer …).
    • Parse groups → roles list on request.state.
  • Observability:

    • OpenTelemetry (traceparent propagation), span attrs (service, route, user, tenant).
    • Prometheus metrics endpoint /metrics protected by internal network check.
    • Structured JSON logs (timestamp, level, svc, trace_id, msg) via structlog.
  • Errors: Global exception handler → RFC7807 Problem+JSON (type, title, status, detail, instance, trace_id).

  • Testing: pytest, pytest-asyncio, hypothesis (property tests for calculators), coverage ≥ 90% per service.

  • Static: ruff, mypy --strict, bandit, safety, licensecheck.

  • Perf: Each service exposes /healthz, /readyz, /livez; cold start < 500ms; p95 endpoint < 250ms (local).

  • Containers: Distroless or slim images; non-root user; read-only FS; /tmp mounted for OCR where needed.

  • Docs: OpenAPI JSON + ReDoc; MkDocs site with service READMEs.

SHARED LIBS (GENERATE ONCE, REUSE)

Create libs/ used by all services:

  • libs/config.py base Settings, env parsing, Vault client factory, MinIO client factory, Qdrant client factory, Neo4j driver factory, Redis factory, Kafka/SQS client factory.
  • libs/security.py Vault Transit helpers (encrypt_field, decrypt_field), header parsing, internal-CIDR validator.
  • libs/observability.py otel init, prometheus instrumentor, logging config.
  • libs/events.py abstract EventBus with publish(topic, payload: dict), subscribe(topic, handler). Two impls: Kafka (aiokafka) and SQS/SNS (boto3).
  • libs/schemas.py canonical Pydantic models shared across services (Document, Evidence, IncomeItem, etc.) mirroring the ontology schemas. Include JSONSchema exports.
  • libs/storage.py S3/MinIO helpers (bucket ensure, put/get, presigned).
  • libs/neo.py Neo4j session helpers, Cypher runner with retry, SHACL validator invoker (pySHACL on exported RDF).
  • libs/rag.py Qdrant collections CRUD, hybrid search (dense+sparse), rerank wrapper, de-identification utilities (regex + NER; hash placeholders).
  • libs/forms.py PDF AcroForm fill via pdfrw with overlay fallback via reportlab.
  • libs/calibration.py calibrated_confidence(raw_score, method="temperature_scaling", params=...).

EVENT TOPICS (STANDARDIZE)

  • doc.ingested, doc.ocr_ready, doc.extracted, kg.upserted, rag.indexed, calc.schedule_ready, form.filled, hmrc.submitted, review.requested, review.completed, firm.sync.completed

Each payload MUST include: event_id (ulid), occurred_at (iso), actor, tenant_id, trace_id, schema_version, and a data object (service-specific).

TRUST HEADERS FROM TRAEFIK + AUTHENTIK (USE EXACT KEYS)

  • X-Authenticated-User (string)
  • X-Authenticated-Email (string)
  • X-Authenticated-Groups (comma-separated)
  • Authorization (Bearer <jwt> from Authentik) Reject any request missing these (except /healthz|/readyz|/livez|/metrics from internal CIDR).

SERVICES TO IMPLEMENT (CODE FOR EACH)

1) svc-ingestion

Purpose: Accept uploads or URLs, checksum, store to MinIO, emit doc.ingested.

Endpoints:

  • POST /v1/ingest/upload (multipart file, metadata: tenant_id, kind, source) → {doc_id, s3_url, checksum}
  • POST /v1/ingest/url (json: {url, kind, tenant_id}) → downloads to MinIO
  • GET /v1/docs/{doc_id} → metadata

Logic:

  • Compute SHA256, dedupe by checksum; MinIO path tenants/{tenant_id}/raw/{doc_id}.pdf.
  • Store metadata in Postgres table ingest_documents (alembic migrations).
  • Publish doc.ingested with {doc_id, bucket, key, pages?, mime}.

Env: S3_BUCKET_RAW, MINIO_*, DB_URL.

Traefik labels: route /ingest/*.


2) svc-rpa

Purpose: Scheduled RPA pulls from firm/client portals via Playwright.

Tasks:

  • Playwright login flows (credentials from Vault), 2FA via Authentik OAuth device or OTP secret in Vault.
  • Download statements/invoices; hand off to svc-ingestion via internal POST.
  • Prefect flows: pull_portal_X(), pull_portal_Y() with schedules.

Endpoints:

  • POST /v1/rpa/run/{connector} (manual trigger)
  • GET /v1/rpa/status/{run_id}

Env: VAULT_ADDR, VAULT_ROLE_ID, VAULT_SECRET_ID.


3) svc-ocr

Purpose: OCR & layout extraction.

Pipeline:

  • Pull object from MinIO, detect rotation/de-skew (opencv-python), split pages (pymupdf), OCR (pytesseract) or bypass if text layer present (pdfplumber).
  • Output per-page text + bbox for lines/words.
  • Write JSON to MinIO tenants/{tenant_id}/ocr/{doc_id}.json and emit doc.ocr_ready.

Endpoints:

  • POST /v1/ocr/{doc_id} (idempotent trigger)
  • GET /v1/ocr/{doc_id} (fetch OCR JSON)

Env: TESSERACT_LANGS, S3_BUCKET_EVIDENCE.


4) svc-extract

Purpose: Classify docs and extract KV + tables into schema-constrained JSON (with bbox/page).

Endpoints:

  • POST /v1/extract/{doc_id} body: {strategy: "llm|rules|hybrid"}
  • GET /v1/extract/{doc_id} → structured JSON

Implementation:

  • Use prompt files in prompts/: doc_classify.txt, kv_extract.txt, table_extract.txt.
  • Validator loop: run LLM → validate JSONSchema → retry with error messages up to N times.
  • Return Pydantic models from libs/schemas.py.
  • Emit doc.extracted.

Env: LLM_ENGINE, TEMPERATURE, MAX_TOKENS.


5) svc-normalize-map

Purpose: Normalize & map extracted data to KG.

Logic:

  • Currency normalization (ECB or static fx table), dates, UK tax year/basis period inference.
  • Entity resolution (blocking + fuzzy).
  • Generate nodes/edges (+ Evidence with doc_id/page/bbox/text_hash).
  • Use libs/neo.py to write with bitemporal fields; run SHACL validator; on violation, queue review.requested.
  • Emit kg.upserted.

Endpoints:

  • POST /v1/map/{doc_id}
  • GET /v1/map/{doc_id}/preview (diff view, to be used by UI)

Env: NEO4J_*.


6) svc-kg

Purpose: Graph façade + RDF/SHACL utility.

Endpoints:

  • GET /v1/kg/nodes/{label}/{id}
  • POST /v1/kg/cypher (admin-gated inline query; must check admin role)
  • POST /v1/kg/export/rdf (returns RDF for SHACL)
  • POST /v1/kg/validate (run pySHACL against schemas/shapes.ttl)
  • GET /v1/kg/lineage/{node_id} (traverse DERIVED_FROM → Evidence)

Env: NEO4J_*.


7) svc-rag-indexer

Purpose: Build Qdrant indices (firm knowledge, legislation, best practices, glossary).

Workflow:

  • Load sources (filesystem, URLs, Firm DMS via svc-firm-connectors).
  • De-identify PII (regex + NER), replace with placeholders; store mapping only in Postgres.
  • Chunk (layout-aware) per retrieval/chunking.yaml.
  • Compute dense embeddings (e.g., bge-small-en-v1.5) and sparse (Qdrant sparse).
  • Upsert to Qdrant with payload {jurisdiction, tax_years[], topic_tags[], version, pii_free: true, doc_id/section_id/url}.
  • Emit rag.indexed.

Endpoints:

  • POST /v1/index/run
  • GET /v1/index/status/{run_id}

Env: QDRANT_URL, RAG_EMBEDDING_MODEL, RAG_RERANKER_MODEL.


8) svc-rag-retriever

Purpose: Hybrid search + KG fusion with rerank and calibrated confidence.

Endpoint:

  • POST /v1/rag/search {query, tax_year?, jurisdiction?, k?}

    {
      "chunks": [...],
      "citations": [{doc_id|url, section_id?, page?, bbox?}],
      "kg_hints": [{rule_id, formula_id, node_ids[]}],
      "calibrated_confidence": 0.0-1.0
    }
    

Implementation:

  • Hybrid score: alpha * dense + beta * sparse; rerank top-K via cross-encoder; KG fusion (boost chunks citing Rules/Calculations relevant to schedule).
  • Use libs/calibration.py to expose calibrated confidence.

9) svc-reason

Purpose: Deterministic calculators + materializers (UK SA).

Endpoints:

  • POST /v1/reason/compute_schedule {tax_year, taxpayer_id, schedule_id}
  • GET /v1/reason/explain/{schedule_id} → rationale & lineage paths

Implementation:

  • Pure functions for: employment, self-employment, property (FHL, 20% interest credit), dividends/interest, allowances, NIC (Class 2/4), HICBC, student loans (Plans 1/2/4/5, PGL).
  • Deterministic order as defined; rounding per FormBox.rounding_rule.
  • Use Cypher from kg/reasoning/schedule_queries.cypher to materialize box values; attach DERIVED_FROM evidence.

10) svc-forms

Purpose: Fill PDFs and assemble evidence bundles.

Endpoints:

  • POST /v1/forms/fill {tax_year, taxpayer_id, form_id} → returns PDF (binary)
  • POST /v1/forms/evidence_pack {scope} → ZIP + manifest + signed hashes (sha256)

Implementation:

  • pdfrw for AcroForm; overlay with ReportLab if needed.
  • Manifest includes doc_id/page/bbox/text_hash for every numeric field.

11) svc-hmrc

Purpose: HMRC submitter (stub|sandbox|live).

Endpoints:

  • POST /v1/hmrc/submit {tax_year, taxpayer_id, dry_run}{status, submission_id?, errors[]}
  • GET /v1/hmrc/submissions/{id}

Implementation:

  • Rate limits, retries/backoff, signed audit log; environment toggle.

12) svc-firm-connectors

Purpose: Read-only connectors to Firm Databases (Practice Mgmt, DMS).

Endpoints:

  • POST /v1/firm/sync {since?}{objects_synced, errors[]}
  • GET /v1/firm/objects (paged)

Implementation:

  • Data contracts in config/firm_contracts/; mappers → Secure Client Data Store (Postgres) with lineage columns (source, source_id, synced_at).

13) ui-review (outline only)

  • Next.js (SSO handled by Traefik+Authentik), shows extracted fields + evidence snippets; POST overrides to svc-extract/svc-normalize-map.

DATA CONTRACTS (ESSENTIAL EXAMPLES)

Event: doc.ingested

{
  "event_id": "01J...ULID",
  "occurred_at": "2025-09-13T08:00:00Z",
  "actor": "svc-ingestion",
  "tenant_id": "t_123",
  "trace_id": "abc-123",
  "schema_version": "1.0",
  "data": {
    "doc_id": "d_abc",
    "bucket": "raw",
    "key": "tenants/t_123/raw/d_abc.pdf",
    "checksum": "sha256:...",
    "kind": "bank_statement",
    "mime": "application/pdf",
    "pages": 12
  }
}

RAG search response shape

{
  "chunks": [
    {
      "id": "c1",
      "text": "...",
      "score": 0.78,
      "payload": {
        "jurisdiction": "UK",
        "tax_years": ["2024-25"],
        "topic_tags": ["FHL"],
        "pii_free": true
      }
    }
  ],
  "citations": [
    { "doc_id": "leg-ITA2007", "section_id": "s272A", "url": "https://..." }
  ],
  "kg_hints": [
    {
      "rule_id": "UK.FHL.Qual",
      "formula_id": "FHL_Test_v1",
      "node_ids": ["n123", "n456"]
    }
  ],
  "calibrated_confidence": 0.81
}

PERSISTENCE SCHEMAS (POSTGRES; ALEMBIC)

  • ingest_documents(id pk, tenant_id, doc_id, kind, checksum, bucket, key, mime, pages, created_at)
  • firm_objects(id pk, tenant_id, source, source_id, type, payload jsonb, synced_at)
  • Qdrant PII mapping table (if absolutely needed): pii_links(id pk, placeholder_hash, client_id, created_at)encrypt with Vault Transit; do NOT store raw values.

TRAEFIK + AUTHENTIK (COMPOSE LABELS PER SERVICE)

For every service container in infra/compose/docker-compose.local.yml, add labels:

- "traefik.enable=true"
- "traefik.http.routers.svc-extract.rule=Host(`api.local`) && PathPrefix(`/extract`)"
- "traefik.http.routers.svc-extract.entrypoints=websecure"
- "traefik.http.routers.svc-extract.tls=true"
- "traefik.http.routers.svc-extract.middlewares=authentik-forwardauth,rate-limit"
- "traefik.http.services.svc-extract.loadbalancer.server.port=8000"

Use the shared dynamic file traefik-dynamic.yml with authentik-forwardauth and rate-limit middlewares.


OUTPUT FORMAT (STRICT)

Implement a multi-file codebase as fenced blocks, EXACTLY in this order:

# FILE: libs/config.py
# factories for Vault/MinIO/Qdrant/Neo4j/Redis/EventBus, Settings base
...
# FILE: libs/security.py
# Vault Transit helpers, header parsing, internal CIDR checks, middleware
...
# FILE: libs/observability.py
# otel init, prometheus, structlog
...
# FILE: libs/events.py
# EventBus abstraction with Kafka and SQS/SNS impls
...
# FILE: libs/schemas.py
# Shared Pydantic models mirroring ontology entities
...
# FILE: apps/svc-ingestion/main.py
# FastAPI app, endpoints, MinIO write, Postgres, publish doc.ingested
...
# FILE: apps/svc-rpa/main.py
# Playwright flows, Prefect tasks, triggers
...
# FILE: apps/svc-ocr/main.py
# OCR pipeline, endpoints
...
# FILE: apps/svc-extract/main.py
# Classifier + extractors with validator loop
...
# FILE: apps/svc-normalize-map/main.py
# normalization, entity resolution, KG mapping, SHACL validation call
...
# FILE: apps/svc-kg/main.py
# KG façade, RDF export, SHACL validate, lineage traversal
...
# FILE: apps/svc-rag-indexer/main.py
# chunk/de-id/embed/upsert to Qdrant
...
# FILE: apps/svc-rag-retriever/main.py
# hybrid retrieval + rerank + KG fusion
...
# FILE: apps/svc-reason/main.py
# deterministic calculators, schedule compute/explain
...
# FILE: apps/svc-forms/main.py
# PDF fill + evidence pack
...
# FILE: apps/svc-hmrc/main.py
# submit stub|sandbox|live with audit + retries
...
# FILE: apps/svc-firm-connectors/main.py
# connectors to practice mgmt & DMS, sync to Postgres
...
# FILE: infra/compose/docker-compose.local.yml
# Traefik, Authentik, Vault, MinIO, Qdrant, Neo4j, Postgres, Redis, Prom+Grafana, Loki, Unleash, all services
...
# FILE: infra/compose/traefik.yml
# static Traefik config
...
# FILE: infra/compose/traefik-dynamic.yml
# forwardAuth middleware + routers/services
...
# FILE: .gitea/workflows/ci.yml
# lint->test->build->scan->push->deploy
...
# FILE: Makefile
# bootstrap, run, test, lint, build, deploy, format, seed
...
# FILE: tests/e2e/test_happy_path.py
# end-to-end: ingest -> ocr -> extract -> map -> compute -> fill -> (stub) submit
...
# FILE: tests/unit/test_calculators.py
# boundary tests for UK SA logic (NIC, HICBC, PA taper, FHL)
...
# FILE: README.md
# how to run locally with docker-compose, Authentik setup, Traefik certs
...

DEFINITION OF DONE

  • docker compose up brings the full stack up; SSO via Authentik; routes secured via Traefik ForwardAuth.
  • Running pytest yields ≥ 90% coverage; make e2e passes the ingest→…→submit stub flow.
  • All services expose /healthz|/readyz|/livez|/metrics; OpenAPI at /docs.
  • No PII stored in Qdrant; vectors carry pii_free=true.
  • KG writes are SHACL-validated; violations produce review.requested events.
  • Evidence lineage is present for every numeric box value.
  • Gitea pipeline passes: lint, test, build, scan, push, deploy.

START

Generate the full codebase and configs in the exact file blocks and order specified above.