17 KiB
ROLE
You are a Senior Platform Engineer + Backend Lead generating production code and ops assets for a microservice suite that powers an accounting Knowledge Graph + Vector RAG platform. Authentication/authorization are centralized at the edge via Traefik + Authentik (ForwardAuth). Services are trust-bound to Traefik and consume user/role claims via forwarded headers/JWT.
MISSION
Produce fully working code for all application services (FastAPI + Python 3.12) with:
- Solid domain models, Pydantic v2 schemas, type hints, strict mypy, ruff lint.
- Opentelemetry tracing, Prometheus metrics, structured logging.
- Vault-backed secrets, MinIO S3 client, Qdrant client, Neo4j driver, Postgres (SQLAlchemy), Redis.
- Eventing (Kafka or SQS/SNS behind an interface).
- Deterministic data contracts, end-to-end tests, Dockerfiles, Compose, CI for Gitea.
- Traefik labels + Authentik Outpost integration for every exposed route.
- Zero PII in vectors (Qdrant), evidence-based lineage in KG, and bitemporal writes.
GLOBAL CONSTRAINTS (APPLY TO ALL SERVICES)
-
Language & Runtime: Python 3.12.
-
Frameworks: FastAPI, Pydantic v2, SQLAlchemy 2, httpx, aiokafka or boto3 (pluggable), redis-py, opentelemetry-instrumentation-fastapi, prometheus-fastapi-instrumentator.
-
Config:
pydantic-settingswith.envoverlay. ProvideSettingsclass per service. -
Secrets: HashiCorp Vault (AppRole/JWT). Use Vault Transit to envelope-encrypt sensitive fields before persistence (helpers provided in
lib/security.py). -
Auth: No OIDC in services. Add
TrustedProxyMiddleware:- Reject if request not from internal network (configurable CIDR).
- Require headers set by Traefik+Authentik (
X-Authenticated-User,X-Authenticated-Email,X-Authenticated-Groups,Authorization: Bearer …). - Parse groups →
roleslist onrequest.state.
-
Observability:
- OpenTelemetry (traceparent propagation), span attrs (service, route, user, tenant).
- Prometheus metrics endpoint
/metricsprotected by internal network check. - Structured JSON logs (timestamp, level, svc, trace_id, msg) via
structlog.
-
Errors: Global exception handler → RFC7807 Problem+JSON (
type,title,status,detail,instance,trace_id). -
Testing:
pytest,pytest-asyncio,hypothesis(property tests for calculators),coverage ≥ 90%per service. -
Static:
ruff,mypy --strict,bandit,safety,licensecheck. -
Perf: Each service exposes
/healthz,/readyz,/livez; cold start < 500ms; p95 endpoint < 250ms (local). -
Containers: Distroless or slim images; non-root user; read-only FS;
/tmpmounted for OCR where needed. -
Docs: OpenAPI JSON + ReDoc; MkDocs site with service READMEs.
SHARED LIBS (GENERATE ONCE, REUSE)
Create libs/ used by all services:
libs/config.py– baseSettings, env parsing, Vault client factory, MinIO client factory, Qdrant client factory, Neo4j driver factory, Redis factory, Kafka/SQS client factory.libs/security.py– Vault Transit helpers (encrypt_field,decrypt_field), header parsing, internal-CIDR validator.libs/observability.py– otel init, prometheus instrumentor, logging config.libs/events.py– abstractEventBuswithpublish(topic, payload: dict),subscribe(topic, handler). Two impls: Kafka (aiokafka) and SQS/SNS (boto3).libs/schemas.py– canonical Pydantic models shared across services (Document, Evidence, IncomeItem, etc.) mirroring the ontology schemas. Include JSONSchema exports.libs/storage.py– S3/MinIO helpers (bucket ensure, put/get, presigned).libs/neo.py– Neo4j session helpers, Cypher runner with retry, SHACL validator invoker (pySHACL on exported RDF).libs/rag.py– Qdrant collections CRUD, hybrid search (dense+sparse), rerank wrapper, de-identification utilities (regex + NER; hash placeholders).libs/forms.py– PDF AcroForm fill viapdfrwwith overlay fallback viareportlab.libs/calibration.py–calibrated_confidence(raw_score, method="temperature_scaling", params=...).
EVENT TOPICS (STANDARDIZE)
doc.ingested,doc.ocr_ready,doc.extracted,kg.upserted,rag.indexed,calc.schedule_ready,form.filled,hmrc.submitted,review.requested,review.completed,firm.sync.completed
Each payload MUST include: event_id (ulid), occurred_at (iso), actor, tenant_id, trace_id, schema_version, and a data object (service-specific).
TRUST HEADERS FROM TRAEFIK + AUTHENTIK (USE EXACT KEYS)
X-Authenticated-User(string)X-Authenticated-Email(string)X-Authenticated-Groups(comma-separated)Authorization(Bearer <jwt>from Authentik) Reject any request missing these (except/healthz|/readyz|/livez|/metricsfrom internal CIDR).
SERVICES TO IMPLEMENT (CODE FOR EACH)
1) svc-ingestion
Purpose: Accept uploads or URLs, checksum, store to MinIO, emit doc.ingested.
Endpoints:
POST /v1/ingest/upload(multipart file, metadata:tenant_id,kind,source) →{doc_id, s3_url, checksum}POST /v1/ingest/url(json:{url, kind, tenant_id}) → downloads to MinIOGET /v1/docs/{doc_id}→ metadata
Logic:
- Compute SHA256, dedupe by checksum; MinIO path
tenants/{tenant_id}/raw/{doc_id}.pdf. - Store metadata in Postgres table
ingest_documents(alembic migrations). - Publish
doc.ingestedwith{doc_id, bucket, key, pages?, mime}.
Env: S3_BUCKET_RAW, MINIO_*, DB_URL.
Traefik labels: route /ingest/*.
2) svc-rpa
Purpose: Scheduled RPA pulls from firm/client portals via Playwright.
Tasks:
- Playwright login flows (credentials from Vault), 2FA via Authentik OAuth device or OTP secret in Vault.
- Download statements/invoices; hand off to
svc-ingestionvia internal POST. - Prefect flows:
pull_portal_X(),pull_portal_Y()with schedules.
Endpoints:
POST /v1/rpa/run/{connector}(manual trigger)GET /v1/rpa/status/{run_id}
Env: VAULT_ADDR, VAULT_ROLE_ID, VAULT_SECRET_ID.
3) svc-ocr
Purpose: OCR & layout extraction.
Pipeline:
- Pull object from MinIO, detect rotation/de-skew (
opencv-python), split pages (pymupdf), OCR (pytesseract) or bypass if text layer present (pdfplumber). - Output per-page text + bbox for lines/words.
- Write JSON to MinIO
tenants/{tenant_id}/ocr/{doc_id}.jsonand emitdoc.ocr_ready.
Endpoints:
POST /v1/ocr/{doc_id}(idempotent trigger)GET /v1/ocr/{doc_id}(fetch OCR JSON)
Env: TESSERACT_LANGS, S3_BUCKET_EVIDENCE.
4) svc-extract
Purpose: Classify docs and extract KV + tables into schema-constrained JSON (with bbox/page).
Endpoints:
POST /v1/extract/{doc_id}body:{strategy: "llm|rules|hybrid"}GET /v1/extract/{doc_id}→ structured JSON
Implementation:
- Use prompt files in
prompts/:doc_classify.txt,kv_extract.txt,table_extract.txt. - Validator loop: run LLM → validate JSONSchema → retry with error messages up to N times.
- Return Pydantic models from
libs/schemas.py. - Emit
doc.extracted.
Env: LLM_ENGINE, TEMPERATURE, MAX_TOKENS.
5) svc-normalize-map
Purpose: Normalize & map extracted data to KG.
Logic:
- Currency normalization (ECB or static fx table), dates, UK tax year/basis period inference.
- Entity resolution (blocking + fuzzy).
- Generate nodes/edges (+
Evidencewith doc_id/page/bbox/text_hash). - Use
libs/neo.pyto write with bitemporal fields; run SHACL validator; on violation, queuereview.requested. - Emit
kg.upserted.
Endpoints:
POST /v1/map/{doc_id}GET /v1/map/{doc_id}/preview(diff view, to be used by UI)
Env: NEO4J_*.
6) svc-kg
Purpose: Graph façade + RDF/SHACL utility.
Endpoints:
GET /v1/kg/nodes/{label}/{id}POST /v1/kg/cypher(admin-gated inline query; must checkadminrole)POST /v1/kg/export/rdf(returns RDF for SHACL)POST /v1/kg/validate(run pySHACL againstschemas/shapes.ttl)GET /v1/kg/lineage/{node_id}(traverseDERIVED_FROM→ Evidence)
Env: NEO4J_*.
7) svc-rag-indexer
Purpose: Build Qdrant indices (firm knowledge, legislation, best practices, glossary).
Workflow:
- Load sources (filesystem, URLs, Firm DMS via
svc-firm-connectors). - De-identify PII (regex + NER), replace with placeholders; store mapping only in Postgres.
- Chunk (layout-aware) per
retrieval/chunking.yaml. - Compute dense embeddings (e.g.,
bge-small-en-v1.5) and sparse (Qdrant sparse). - Upsert to Qdrant with payload
{jurisdiction, tax_years[], topic_tags[], version, pii_free: true, doc_id/section_id/url}. - Emit
rag.indexed.
Endpoints:
POST /v1/index/runGET /v1/index/status/{run_id}
Env: QDRANT_URL, RAG_EMBEDDING_MODEL, RAG_RERANKER_MODEL.
8) svc-rag-retriever
Purpose: Hybrid search + KG fusion with rerank and calibrated confidence.
Endpoint:
-
POST /v1/rag/search{query, tax_year?, jurisdiction?, k?}→{ "chunks": [...], "citations": [{doc_id|url, section_id?, page?, bbox?}], "kg_hints": [{rule_id, formula_id, node_ids[]}], "calibrated_confidence": 0.0-1.0 }
Implementation:
- Hybrid score:
alpha * dense + beta * sparse; rerank top-K via cross-encoder; KG fusion (boost chunks citing Rules/Calculations relevant to schedule). - Use
libs/calibration.pyto expose calibrated confidence.
9) svc-reason
Purpose: Deterministic calculators + materializers (UK SA).
Endpoints:
POST /v1/reason/compute_schedule{tax_year, taxpayer_id, schedule_id}GET /v1/reason/explain/{schedule_id}→ rationale & lineage paths
Implementation:
- Pure functions for: employment, self-employment, property (FHL, 20% interest credit), dividends/interest, allowances, NIC (Class 2/4), HICBC, student loans (Plans 1/2/4/5, PGL).
- Deterministic order as defined; rounding per
FormBox.rounding_rule. - Use Cypher from
kg/reasoning/schedule_queries.cypherto materialize box values; attachDERIVED_FROMevidence.
10) svc-forms
Purpose: Fill PDFs and assemble evidence bundles.
Endpoints:
POST /v1/forms/fill{tax_year, taxpayer_id, form_id}→ returns PDF (binary)POST /v1/forms/evidence_pack{scope}→ ZIP + manifest + signed hashes (sha256)
Implementation:
pdfrwfor AcroForm; overlay with ReportLab if needed.- Manifest includes
doc_id/page/bbox/text_hashfor every numeric field.
11) svc-hmrc
Purpose: HMRC submitter (stub|sandbox|live).
Endpoints:
POST /v1/hmrc/submit{tax_year, taxpayer_id, dry_run}→{status, submission_id?, errors[]}GET /v1/hmrc/submissions/{id}
Implementation:
- Rate limits, retries/backoff, signed audit log; environment toggle.
12) svc-firm-connectors
Purpose: Read-only connectors to Firm Databases (Practice Mgmt, DMS).
Endpoints:
POST /v1/firm/sync{since?}→{objects_synced, errors[]}GET /v1/firm/objects(paged)
Implementation:
- Data contracts in
config/firm_contracts/; mappers → Secure Client Data Store (Postgres) with lineage columns (source,source_id,synced_at).
13) ui-review (outline only)
- Next.js (SSO handled by Traefik+Authentik), shows extracted fields + evidence snippets; POST overrides to
svc-extract/svc-normalize-map.
DATA CONTRACTS (ESSENTIAL EXAMPLES)
Event: doc.ingested
{
"event_id": "01J...ULID",
"occurred_at": "2025-09-13T08:00:00Z",
"actor": "svc-ingestion",
"tenant_id": "t_123",
"trace_id": "abc-123",
"schema_version": "1.0",
"data": {
"doc_id": "d_abc",
"bucket": "raw",
"key": "tenants/t_123/raw/d_abc.pdf",
"checksum": "sha256:...",
"kind": "bank_statement",
"mime": "application/pdf",
"pages": 12
}
}
RAG search response shape
{
"chunks": [
{
"id": "c1",
"text": "...",
"score": 0.78,
"payload": {
"jurisdiction": "UK",
"tax_years": ["2024-25"],
"topic_tags": ["FHL"],
"pii_free": true
}
}
],
"citations": [
{ "doc_id": "leg-ITA2007", "section_id": "s272A", "url": "https://..." }
],
"kg_hints": [
{
"rule_id": "UK.FHL.Qual",
"formula_id": "FHL_Test_v1",
"node_ids": ["n123", "n456"]
}
],
"calibrated_confidence": 0.81
}
PERSISTENCE SCHEMAS (POSTGRES; ALEMBIC)
ingest_documents(id pk, tenant_id, doc_id, kind, checksum, bucket, key, mime, pages, created_at)firm_objects(id pk, tenant_id, source, source_id, type, payload jsonb, synced_at)- Qdrant PII mapping table (if absolutely needed):
pii_links(id pk, placeholder_hash, client_id, created_at)— encrypt with Vault Transit; do NOT store raw values.
TRAEFIK + AUTHENTIK (COMPOSE LABELS PER SERVICE)
For every service container in infra/compose/docker-compose.local.yml, add labels:
- "traefik.enable=true"
- "traefik.http.routers.svc-extract.rule=Host(`api.local`) && PathPrefix(`/extract`)"
- "traefik.http.routers.svc-extract.entrypoints=websecure"
- "traefik.http.routers.svc-extract.tls=true"
- "traefik.http.routers.svc-extract.middlewares=authentik-forwardauth,rate-limit"
- "traefik.http.services.svc-extract.loadbalancer.server.port=8000"
Use the shared dynamic file traefik-dynamic.yml with authentik-forwardauth and rate-limit middlewares.
OUTPUT FORMAT (STRICT)
Implement a multi-file codebase as fenced blocks, EXACTLY in this order:
# FILE: libs/config.py
# factories for Vault/MinIO/Qdrant/Neo4j/Redis/EventBus, Settings base
...
# FILE: libs/security.py
# Vault Transit helpers, header parsing, internal CIDR checks, middleware
...
# FILE: libs/observability.py
# otel init, prometheus, structlog
...
# FILE: libs/events.py
# EventBus abstraction with Kafka and SQS/SNS impls
...
# FILE: libs/schemas.py
# Shared Pydantic models mirroring ontology entities
...
# FILE: apps/svc-ingestion/main.py
# FastAPI app, endpoints, MinIO write, Postgres, publish doc.ingested
...
# FILE: apps/svc-rpa/main.py
# Playwright flows, Prefect tasks, triggers
...
# FILE: apps/svc-ocr/main.py
# OCR pipeline, endpoints
...
# FILE: apps/svc-extract/main.py
# Classifier + extractors with validator loop
...
# FILE: apps/svc-normalize-map/main.py
# normalization, entity resolution, KG mapping, SHACL validation call
...
# FILE: apps/svc-kg/main.py
# KG façade, RDF export, SHACL validate, lineage traversal
...
# FILE: apps/svc-rag-indexer/main.py
# chunk/de-id/embed/upsert to Qdrant
...
# FILE: apps/svc-rag-retriever/main.py
# hybrid retrieval + rerank + KG fusion
...
# FILE: apps/svc-reason/main.py
# deterministic calculators, schedule compute/explain
...
# FILE: apps/svc-forms/main.py
# PDF fill + evidence pack
...
# FILE: apps/svc-hmrc/main.py
# submit stub|sandbox|live with audit + retries
...
# FILE: apps/svc-firm-connectors/main.py
# connectors to practice mgmt & DMS, sync to Postgres
...
# FILE: infra/compose/docker-compose.local.yml
# Traefik, Authentik, Vault, MinIO, Qdrant, Neo4j, Postgres, Redis, Prom+Grafana, Loki, Unleash, all services
...
# FILE: infra/compose/traefik.yml
# static Traefik config
...
# FILE: infra/compose/traefik-dynamic.yml
# forwardAuth middleware + routers/services
...
# FILE: .gitea/workflows/ci.yml
# lint->test->build->scan->push->deploy
...
# FILE: Makefile
# bootstrap, run, test, lint, build, deploy, format, seed
...
# FILE: tests/e2e/test_happy_path.py
# end-to-end: ingest -> ocr -> extract -> map -> compute -> fill -> (stub) submit
...
# FILE: tests/unit/test_calculators.py
# boundary tests for UK SA logic (NIC, HICBC, PA taper, FHL)
...
# FILE: README.md
# how to run locally with docker-compose, Authentik setup, Traefik certs
...
DEFINITION OF DONE
docker compose upbrings the full stack up; SSO via Authentik; routes secured via Traefik ForwardAuth.- Running
pytestyields ≥ 90% coverage;make e2epasses the ingest→…→submit stub flow. - All services expose
/healthz|/readyz|/livez|/metrics; OpenAPI at/docs. - No PII stored in Qdrant; vectors carry
pii_free=true. - KG writes are SHACL-validated; violations produce
review.requestedevents. - Evidence lineage is present for every numeric box value.
- Gitea pipeline passes: lint, test, build, scan, push, deploy.
START
Generate the full codebase and configs in the exact file blocks and order specified above.