recovered config
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled

This commit is contained in:
harkon
2025-10-16 08:57:14 +01:00
parent eea46ac89c
commit 8fe5e62fee
14 changed files with 775 additions and 1000 deletions

View File

@@ -129,39 +129,6 @@ async def shutdown_event() -> None:
logger.info("Extraction service shutdown complete")
@app.get("/healthz")
async def health_check() -> dict[str, Any]:
"""Health check endpoint"""
return {
"status": "healthy",
"service": settings.service_name,
"version": settings.service_version,
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/readyz")
async def readiness_check() -> dict[str, Any]:
"""Readiness check endpoint"""
return {
"status": "ready",
"service": settings.service_name,
"version": settings.service_version,
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/livez")
async def liveness_check() -> dict[str, Any]:
"""Liveness check endpoint"""
return {
"status": "alive",
"service": settings.service_name,
"version": settings.service_version,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/extract/{doc_id}", response_model=ExtractionResponse)
async def extract_fields(
doc_id: str,

View File

@@ -480,7 +480,7 @@ async def _process_with_tesseract(image_data: bytes, page_num: int) -> dict[str,
config = f"{settings.tesseract_config} -l {settings.languages}"
# Extract text with confidence
data = pytesseract.image_to_data( # type: ignore
data = pytesseract.image_to_data(
image, config=config, output_type=pytesseract.Output.DICT
)

View File

@@ -0,0 +1 @@
CREATE DATABASE unleash OWNER postgres;

View File

@@ -1,5 +1,6 @@
"""Qdrant collections CRUD, hybrid search, rerank wrapper, de-identification utilities."""
from .chunker import DocumentChunker
from .collection_manager import QdrantCollectionManager
from .pii_detector import PIIDetector
from .retriever import RAGRetriever
@@ -10,4 +11,5 @@ __all__ = [
"QdrantCollectionManager",
"RAGRetriever",
"rag_search_for_citations",
"DocumentChunker",
]

134
libs/rag/chunker.py Normal file
View File

@@ -0,0 +1,134 @@
"""Simple document chunker for RAG indexing.
Splits documents into manageable chunks using configuration options.
Supports text files directly and PDFs via pdfplumber when available.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
@dataclass
class ChunkerConfig:
chunk_size: int = 1000
chunk_overlap: int = 100
max_chunks: int = 1000
class DocumentChunker:
def __init__(self, config_path: str) -> None:
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
except Exception:
cfg = {}
rcfg = cfg.get("chunking", {}) if isinstance(cfg, dict) else {}
self.config = ChunkerConfig(
chunk_size=int(rcfg.get("chunk_size", 1000)),
chunk_overlap=int(rcfg.get("chunk_overlap", 100)),
max_chunks=int(rcfg.get("max_chunks", 1000)),
)
async def chunk_document(self, document_path: str, metadata: dict[str, Any]) -> list[dict[str, Any]]:
path = Path(document_path)
ext = path.suffix.lower()
if ext == ".pdf":
return await self._chunk_pdf(path, metadata)
else:
return await self._chunk_text_like(path, metadata)
async def _chunk_pdf(self, path: Path, metadata: dict[str, Any]) -> list[dict[str, Any]]:
chunks: list[dict[str, Any]] = []
try:
import pdfplumber # type: ignore
with pdfplumber.open(str(path)) as pdf:
total_pages = len(pdf.pages)
doc_id = metadata.get("doc_id") or path.stem
for i, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
if not text.strip():
continue
for j, content in enumerate(self._split_text(text), start=0):
cid = f"{doc_id}-p{i}-c{j}"
chunks.append(
{
"id": cid,
"document_id": doc_id,
"content": content,
"chunk_index": j,
"total_chunks": total_pages,
"page_numbers": [i],
"section_hierarchy": [],
"confidence_score": 1.0,
}
)
if len(chunks) >= self.config.max_chunks:
return chunks
except Exception:
# Fallback: treat as binary and produce a single empty chunk to avoid crashes
chunks.append(
{
"id": f"{path.stem}-p1-c0",
"document_id": path.stem,
"content": "",
"chunk_index": 0,
"total_chunks": 1,
"page_numbers": [1],
"section_hierarchy": [],
"confidence_score": 0.0,
}
)
return chunks
async def _chunk_text_like(self, path: Path, metadata: dict[str, Any]) -> list[dict[str, Any]]:
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except Exception:
# As a last resort, read bytes and decode best-effort
data = path.read_bytes()
text = data.decode("utf-8", errors="ignore")
doc_id = metadata.get("doc_id") or path.stem
pieces = self._split_text(text)
chunks: list[dict[str, Any]] = []
total = min(len(pieces), self.config.max_chunks)
for i, content in enumerate(pieces[: total]):
chunks.append(
{
"id": f"{doc_id}-c{i}",
"document_id": doc_id,
"content": content,
"chunk_index": i,
"total_chunks": total,
"page_numbers": [],
"section_hierarchy": [],
"confidence_score": 1.0,
}
)
return chunks
def _split_text(self, text: str) -> list[str]:
size = max(self.config.chunk_size, 1)
overlap = max(min(self.config.chunk_overlap, size - 1), 0)
if not text:
return [""]
chunks: list[str] = []
start = 0
n = len(text)
step = size - overlap if size > overlap else size
while start < n and len(chunks) < self.config.max_chunks:
end = min(start + size, n)
chunks.append(text[start:end])
start += step
return chunks

View File

@@ -16,9 +16,10 @@ import yaml
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, SparseVector, VectorParams
from sentence_transformers import SentenceTransformer
from spacy.tokens import Doc
from .chunker import DocumentChunker
from .pii_detector import PIIDetector, PIIRedactor
from .pii_detector import PIIDetector
@dataclass
@@ -39,7 +40,6 @@ class RAGIndexer:
self.qdrant_client = QdrantClient(url=qdrant_url)
self.chunker = DocumentChunker(config_path)
self.pii_detector = PIIDetector()
self.pii_redactor = PIIRedactor()
# Initialize embedding models
self.dense_model = SentenceTransformer(
@@ -54,13 +54,13 @@ class RAGIndexer:
self.logger = logging.getLogger(__name__)
def _init_sparse_model(self):
def _init_sparse_model(self) -> Any | dict[str, Any]:
"""Initialize sparse embedding model (BM25 or SPLADE)"""
sparse_config = self.config.get("sparse_model", {})
model_type = sparse_config.get("type", "bm25")
if model_type == "bm25":
from rank_bm25 import BM25Okapi
from rank_bm25 import BM25Okapi # type: ignore
return BM25Okapi
elif model_type == "splade":
@@ -142,13 +142,11 @@ class RAGIndexer:
# Step 1: De-identify PII
content = chunk["content"]
pii_detected = self.pii_detector.detect(content)
pii_detected = self.pii_detector.detect_pii(content)
if pii_detected:
# Redact PII and create mapping
redacted_content, pii_mapping = self.pii_redactor.redact(
content, pii_detected
)
redacted_content, pii_mapping = self.pii_detector.de_identify_text(content)
# Store PII mapping securely (not in vector DB)
await self._store_pii_mapping(chunk["id"], pii_mapping)
@@ -216,7 +214,7 @@ class RAGIndexer:
]
# Create term frequency vector
term_freq = {}
term_freq: dict[str, int] = {}
for token in tokens:
term_freq[token] = term_freq.get(token, 0) + 1
@@ -378,7 +376,7 @@ class RAGIndexer:
"language": doc.lang_ if hasattr(doc, "lang_") else "en",
}
def _calculate_complexity(self, doc: dict) -> float:
def _calculate_complexity(self, doc: Doc) -> float:
"""Calculate text complexity score"""
if not doc:
return 0.0

View File

@@ -1,3 +1,4 @@
# RDF and semantic web libraries (only for KG service)
pyshacl>=0.30.1
rdflib>=7.2.1
spacy>=3.8.7

View File

@@ -0,0 +1,87 @@
groups:
- name: infrastructure
rules:
- alert: InstanceDown
expr: up == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: HighMemoryUsage
expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage on {{ $labels.instance }}"
description: "Memory usage is above 90% on {{ $labels.instance }}"
- alert: HighCPUUsage
expr: 100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
for: 5m
labels:
severity: warning
annotations:
summary: "High CPU usage on {{ $labels.instance }}"
description: "CPU usage is above 80% on {{ $labels.instance }}"
- name: application
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate on {{ $labels.job }}"
description: "Error rate is above 10% on {{ $labels.job }}"
- alert: SlowResponseTime
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "Slow response time on {{ $labels.job }}"
description: "95th percentile response time is above 1 second on {{ $labels.job }}"
- name: database
rules:
- alert: PostgreSQLDown
expr: up{job="postgres"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "PostgreSQL is down"
description: "PostgreSQL database is not responding"
- alert: Neo4jDown
expr: up{job="neo4j"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Neo4j is down"
description: "Neo4j graph database is not responding"
- alert: QdrantDown
expr: up{job="qdrant"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Qdrant is down"
description: "Qdrant vector database is not responding"
- alert: RedisDown
expr: up{job="redis"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Redis is down"
description: "Redis cache is not responding"

View File

@@ -0,0 +1,9 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true

View File

@@ -11,6 +11,10 @@ no_implicit_optional = True
check_untyped_defs = True
show_error_codes = True
pretty = True
disable_error_code = attr-defined
disable_error_code = disallow-untyped-calls
disable_error_code = import-untyped
[mypy-tests.*]
# tests may use fixtures without full annotations, but keep strict overall

View File

@@ -1,475 +1,203 @@
# ROLE
You are a **Solution Architect + Ontologist + Data Engineer + Platform/SRE** delivering a **production-grade accounting knowledge system** that ingests documents, fuses a **Knowledge Graph (KG)** with a **Vector DB (Qdrant)** for RAG, integrates with **Firm Databases**, and powers **AI agents** to complete workflows like **UK Self Assessment** — with **auditable provenance**.
**Authentication & authorization are centralized at the edge:** **Traefik** gateway + **Authentik** SSO (OIDC/ForwardAuth). **Backend services trust Traefik** on an internal network and consume user/role claims from forwarded headers/JWT.
# OBJECTIVE
Deliver a complete, implementable solution—ontology, extraction pipeline, RAG+KG retrieval, deterministic calculators, APIs, validations, **architecture & stack**, infra-as-code, CI/CD, observability, security/governance, test plan, and a worked example—so agents can:
1. read documents (and scrape portals via RPA),
2. populate/maintain a compliant accounting/tax KG,
3. retrieve firm knowledge via RAG (vector + keyword + graph),
4. compute/validate schedules and fill forms,
5. submit (stub/sandbox/live),
6. justify every output with **traceable provenance** (doc/page/bbox) and citations.
# SCOPE & VARIABLES
- **Jurisdiction:** {{jurisdiction}} (default: UK)
- **Tax regime / forms:** {{forms}} (default: SA100 + SA102, SA103, SA105, SA110; optional SA108)
- **Accounting basis:** {{standards}} (default: UK GAAP; support IFRS/XBRL mapping)
- **Document types:** bank statements, invoices, receipts, P\&L, balance sheet, payslips, dividend vouchers, property statements, prior returns, letters, certificates.
- **Primary stores:** KG = Neo4j; RAG = Qdrant; Objects = MinIO; Secrets = Vault; IdP/SSO = Authentik; **API Gateway = Traefik**.
- **PII constraints:** GDPR/UK-GDPR; **no raw PII in vector DB** (de-identify before indexing); role-based access; encryption; retention; right-to-erasure.
---
# ARCHITECTURE & STACK (LOCAL-FIRST; SCALE-OUT READY)
## Edge & Identity (centralized)
- **Traefik** (reverse proxy & ingress) terminates TLS, does **AuthN/AuthZ via Authentik**:
- Use **Authentik Outpost (ForwardAuth)** middleware in Traefik.
- Traefik injects verified headers/JWT to upstream services: `X-Authenticated-User`, `X-Authenticated-Email`, `X-Authenticated-Groups`, `Authorization: Bearer <jwt>`.
- **Per-route RBAC** via Traefik middlewares (group/claim checks); services only enforce **fine-grained, app-level authorization** using forwarded claims (no OIDC in each service).
- All services are **private** (only reachable behind Traefik on an internal Docker/K8s network). Direct access is denied.
## Services (independent deployables; Python 3.12 unless stated)
1. **svc-ingestion** — uploads/URLs; checksum; MinIO write; emits `doc.ingested`.
2. **svc-rpa** — Playwright RPA for firm/client portals; Prefect-scheduled; emits `doc.ingested`.
3. **svc-ocr** — Tesseract (local) or Textract (scale); de-skew/rotation/layout; emits `doc.ocr_ready`.
4. **svc-extract** — LLM + rules + table detectors → **schema-constrained JSON** (kv + tables + bbox/page); emits `doc.extracted`.
5. **svc-normalize-map** — normalize currency/dates; entity resolution; assign tax year; map to KG nodes/edges with **Evidence** anchors; emits `kg.upserted`.
6. **svc-kg** — Neo4j DDL + **SHACL** validation; **bitemporal** writes `{valid_from, valid_to, asserted_at}`; RDF export.
7. **svc-rag-indexer** — chunk/de-identify/embed; upsert **Qdrant** collections (firm knowledge, legislation, best practices, glossary).
8. **svc-rag-retriever** — **hybrid retrieval** (dense + sparse) + rerank + **KG-fusion**; returns chunks + citations + KG join hints.
9. **svc-reason** — deterministic calculators (employment, self-employment, property, dividends/interest, allowances, NIC, HICBC, student loans); Cypher materializers; explanations.
10. **svc-forms** — fill PDFs; ZIP evidence bundle (signed manifest).
11. **svc-hmrc** — submit stub|sandbox|live; rate-limit & retries; submission audit.
12. **svc-firm-connectors** — read-only connectors to Firm Databases; sync to **Secure Client Data Store** with lineage.
13. **ui-review** — Next.js reviewer portal (SSO via Traefik+Authentik); reviewers accept/override extractions.
## Orchestration & Messaging
- **Prefect 2.x** for local orchestration; **Temporal** for production scale (sagas, retries, idempotency).
- Events: Kafka (or SQS/SNS) — `doc.ingested`, `doc.ocr_ready`, `doc.extracted`, `kg.upserted`, `rag.indexed`, `calc.schedule_ready`, `form.filled`, `hmrc.submitted`, `review.requested`, `review.completed`, `firm.sync.completed`.
## Concrete Stack (pin/assume unless replaced)
- **Languages:** Python **3.12**, TypeScript 5/Node 20
- **Frameworks:** FastAPI, Pydantic v2, SQLAlchemy 2 (ledger), Prefect 2.x (local), Temporal (scale)
- **Gateway:** **Traefik** 3.x with **Authentik Outpost** (ForwardAuth)
- **Identity/SSO:** **Authentik** (OIDC/OAuth2)
- **Secrets:** **Vault** (AppRole/JWT; Transit for envelope encryption)
- **Object Storage:** **MinIO** (S3 API)
- **Vector DB:** **Qdrant** 1.x (dense + sparse hybrid)
- **Embeddings/Rerankers (local-first):**
Dense: `bge-m3` or `bge-small-en-v1.5`; Sparse: BM25/SPLADE (Qdrant sparse); Reranker: `cross-encoder/ms-marco-MiniLM-L-6-v2`
- **Datastores:**
- **Secure Client Data Store:** PostgreSQL 15 (encrypted; RLS; pgcrypto)
- **KG:** Neo4j 5.x
- **Cache/locks:** Redis
- **Infra:** **Docker-Compose** for local; **Kubernetes** for scale (Helm, ArgoCD optional later)
- **CI/CD:** **Gitea** + Gitea Actions (or Drone) → container registry → deploy
## Data Layer (three pillars + fusion)
1. **Firm Databases** → **Firm Connectors** (read-only) → **Secure Client Data Store (Postgres)** with lineage.
2. **Vector DB / Knowledge Base (Qdrant)** — internal knowledge, legislation, best practices, glossary; **no PII** (placeholders + hashes).
3. **Knowledge Graph (Neo4j)** — accounting/tax ontology with evidence anchors and rules/calculations.
**Fusion strategy:** Query → RAG retrieve (Qdrant) + KG traverse → **fusion** scoring (α·dense + β·sparse + γ·KG-link-boost) → results with citations (URL/doc_id+page/anchor) and graph paths.
## Non-functional Targets
- SLOs: ingest→extract p95 ≤ 3m; reconciliation ≥ 98%; lineage coverage ≥ 99%; schedule error ≤ 1/1k
- Throughput: local 2 docs/s; scale 5 docs/s sustained; burst 20 docs/s
- Idempotency: `sha256(doc_checksum + extractor_version)`
- Retention: raw images 7y; derived text 2y; vectors (non-PII) 7y; PII-min logs 90d
- Erasure: per `client_id` across MinIO, KG, Qdrant (payload filter), Postgres rows
---
# REPOSITORY LAYOUT (monorepo, local-first)
```
repo/
apps/
svc-ingestion/ svc-rpa/ svc-ocr/ svc-extract/
svc-normalize-map/ svc-kg/ svc-rag-indexer/ svc-rag-retriever/
svc-reason/ svc-forms/ svc-hmrc/ svc-firm-connectors/
ui-review/
kg/
ONTOLOGY.md
schemas/{nodes_and_edges.schema.json, context.jsonld, shapes.ttl}
db/{neo4j_schema.cypher, seed.cypher}
reasoning/schedule_queries.cypher
retrieval/
chunking.yaml qdrant_collections.json indexer.py retriever.py fusion.py
config/{heuristics.yaml, mapping.json}
prompts/{doc_classify.txt, kv_extract.txt, table_extract.txt, entity_link.txt, rag_answer.txt}
pipeline/etl.py
infra/
compose/{docker-compose.local.yml, traefik.yml, traefik-dynamic.yml, env.example}
k8s/ (optional later: Helm charts)
security/{dpia.md, ropa.md, retention_policy.md, threat_model.md}
ops/
runbooks/{ingest.md, calculators.md, hmrc.md, vector-indexing.md, dr-restore.md}
dashboards/grafana.json
alerts/prometheus-rules.yaml
tests/{unit, integration, e2e, data/{synthetic, golden}}
Makefile
.gitea/workflows/ci.yml
mkdocs.yml
```
---
# DELIVERABLES (RETURN ALL AS MARKED CODE BLOCKS)
1. **Ontology** (Concept model; JSON-Schema; JSON-LD; Neo4j DDL)
2. **Heuristics & Rules (YAML)**
3. **Extraction pipeline & prompts**
4. **RAG & Retrieval Layer** (chunking, Qdrant collections, indexer, retriever, fusion)
5. **Reasoning layer** (deterministic calculators + Cypher + tests)
6. **Agent interface (Tooling API)**
7. **Quality & Safety** (datasets, metrics, tests, red-team)
8. **Graph Constraints** (SHACL, IDs, bitemporal)
9. **Security & Compliance** (DPIA, ROPA, encryption, auditability)
10. **Worked Example** (end-to-end UK SA sample)
11. **Observability & SRE** (SLIs/SLOs, tracing, idempotency, DR, cost controls)
12. **Architecture & Local Infra** (**docker-compose** with Traefik + Authentik + Vault + MinIO + Qdrant + Neo4j + Postgres + Redis + Prometheus/Grafana + Loki + Unleash + services)
13. **Repo Scaffolding & Makefile** (dev tasks, lint, test, build, run)
14. **Firm Database Connectors** (data contracts, sync jobs, lineage)
15. **Traefik & Authentik configs** (static+dynamic, ForwardAuth, route labels)
---
# ONTOLOGY REQUIREMENTS (as before + RAG links)
- Nodes: `TaxpayerProfile`, `TaxYear`, `Jurisdiction`, `TaxForm`, `Schedule`, `FormBox`, `Document`, `Evidence`, `Party`, `Account`, `IncomeItem`, `ExpenseItem`, `PropertyAsset`, `BusinessActivity`, `Allowance`, `Relief`, `PensionContribution`, `StudentLoanPlan`, `Payment`, `ExchangeRate`, `Calculation`, `Rule`, `NormalizationEvent`, `Reconciliation`, `Consent`, `LegalBasis`, `ImportJob`, `ETLRun`
- Relationships: `BELONGS_TO`, `OF_TAX_YEAR`, `IN_JURISDICTION`, `HAS_SECTION`, `HAS_BOX`, `REPORTED_IN`, `COMPUTES`, `DERIVED_FROM`, `SUPPORTED_BY`, `PAID_BY`, `PAID_TO`, `OWNS`, `RENTED_BY`, `EMPLOYED_BY`, `APPLIES_TO`, `APPLIES`, `VIOLATES`, `NORMALIZED_FROM`, `HAS_VALID_BASIS`, `PRODUCED_BY`, **`CITES`**, **`DESCRIBES`**
- **Bitemporal** and **provenance** mandatory.
---
# UK-SPECIFIC REQUIREMENTS
- Year boundary 6 Apr5 Apr; basis period reform toggle
- Employment aggregation, BIK, PAYE offsets
- Self-employment: allowable/disallowable, capital allowances (AIA/WDA/SBA), loss rules, **NIC Class 2 & 4**
- Property: FHL tests, **mortgage interest 20% credit**, Rent-a-Room, joint splits
- Savings/dividends: allowances & rate bands; ordering
- Personal allowance tapering; Gift Aid & pension gross-up; **HICBC**; **Student Loan** plans 1/2/4/5 & PGL
- Rounding per `FormBox.rounding_rule`
---
# YAML HEURISTICS (KEEP SEPARATE FILE)
- document_kinds, field_normalization, line_item_mapping
- period_inference (UK boundary + reform), dedupe_rules
- **validation_rules:** `utr_checksum`, `ni_number_regex`, `iban_check`, `vat_gb_mod97`, `rounding_policy: "HMRC"`, `numeric_tolerance: 0.01`
- **entity_resolution:** blocking keys, fuzzy thresholds, canonical source priority
- **privacy_redaction:** `mask_except_last4` for NI/UTR/IBAN/sort_code/phone/email
- **jurisdiction_overrides:** by {{jurisdiction}} and {{tax\_year}}
---
# EXTRACTION PIPELINE (SPECIFY CODE & PROMPTS)
- ingest → classify → OCR/layout → extract (schema-constrained JSON with bbox/page) → validate → normalize → map_to_graph → post-checks
- Prompts: `doc_classify`, `kv_extract`, `table_extract` (multi-page), `entity_link`
- Contract: **JSON schema enforcement** with retry/validator loop; temperature guidance
- Reliability: de-skew/rotation/language/handwriting policy
- Mapping config: JSON mapping to nodes/edges + provenance (doc_id/page/bbox/text_hash)
---
# RAG & RETRIEVAL LAYER (Qdrant + KG Fusion)
- Collections: `firm_knowledge`, `legislation`, `best_practices`, `glossary` (payloads include jurisdiction, tax_years, topic_tags, version, `pii_free:true`)
- Chunking: layout-aware; tables serialized; \~1.5k token chunks, 1015% overlap
- Indexer: de-identify PII; placeholders only; embeddings (dense) + sparse; upsert with payload
- Retriever: hybrid scoring (α·dense + β·sparse), filters (jurisdiction/tax_year), rerank; return **citations** + **KG hints**
- Fusion: boost results linked to applicable `Rule`/`Calculation`/`Evidence` for current schedule
- Right-to-erasure: purge vectors via payload filter (`client_id?` only for client-authored knowledge)
---
# REASONING & CALCULATION (DETERMINISTIC)
- Order: incomes → allowances/capital allowances → loss offsets → personal allowance → savings/dividend bands → HICBC & student loans → NIC Class 2/4 → property 20% credit/FHL/Rent-a-Room
- Cypher materializers per schedule/box; explanations via `DERIVED_FROM` and RAG `CITES`
- Unit tests per rule; golden files; property-based tests
---
# AGENT TOOLING API (JSON SCHEMAS)
1. `ComputeSchedule({tax_year, taxpayer_id, schedule_id}) -> {boxes[], totals[], explanations[]}`
2. `PopulateFormBoxes({tax_year, taxpayer_id, form_id}) -> {fields[], pdf_fields[], confidence, calibrated_confidence}`
3. `AskClarifyingQuestion({gap, candidate_values, evidence}) -> {question_text, missing_docs}`
4. `GenerateEvidencePack({scope}) -> {bundle_manifest, signed_hashes}`
5. `ExplainLineage({node_id|field}) -> {chain:[evidence], graph_paths}`
6. `CheckDocumentCoverage({tax_year, taxpayer_id}) -> {required_docs[], missing[], blockers[]}`
7. `SubmitToHMRC({tax_year, taxpayer_id, dry_run}) -> {status, submission_id?, errors[]}`
8. `ReconcileBank({account_id, period}) -> {unmatched_invoices[], unmatched_bank_lines[], deltas}`
9. `RAGSearch({query, tax_year?, jurisdiction?, k?}) -> {chunks[], citations[], kg_hints[], calibrated_confidence}`
10. `SyncFirmDatabases({since}) -> {objects_synced, errors[]}`
**Env flags:** `HMRC_MTD_ITSA_MODE`, `RATE_LIMITS`, `RAG_EMBEDDING_MODEL`, `RAG_RERANKER_MODEL`, `RAG_ALPHA_BETA_GAMMA`
---
# SECURITY & COMPLIANCE
- **Traefik + Authentik SSO at edge** (ForwardAuth); per-route RBAC; inject verified claims headers/JWT
- **Vault** for secrets (AppRole/JWT, Transit for envelope encryption)
- **PII minimization:** no PII in Qdrant; placeholders; PII mapping only in Secure Client Data Store
- **Auditability:** tamper-evident logs (hash chain), signer identity, time sync
- **DPIA, ROPA, retention policy, right-to-erasure** workflows
---
# CI/CD (Gitea)
- Gitea Actions: `lint` (ruff/mypy/eslint), `test` (pytest+coverage, e2e), `build` (Docker), `scan` (Trivy/SAST), `push` (registry), `deploy` (compose up or K8s apply)
- SemVer tags; SBOM (Syft); OpenAPI + MkDocs publish; pre-commit hooks
---
# OBSERVABILITY & SRE
- SLIs/SLOs: ingest_time_p50, extract_precision\@field≥0.97, reconciliation_pass_rate≥0.98, lineage_coverage≥0.99, time_to_review_p95
- Dashboards: ingestion throughput, OCR error rates, extraction precision, mapping latency, calculator failures, HMRC submits, **RAG recall/precision & faithfulness**
- Alerts: OCR 5xx spike, extraction precision dip, reconciliation failures, HMRC rate-limit breaches, RAG drift
- Backups/DR: Neo4j dump (daily), Postgres PITR, Qdrant snapshot, MinIO versioning; quarterly restore test
- Cost controls: embedding cache, incremental indexing, compaction/TTL for stale vectors, cold archive for images
---
# OUTPUT FORMAT (STRICT)
Return results in the following order, each in its own fenced code block **with the exact language tag**:
```md
<!-- FILE: ONTOLOGY.md -->
# Concept Model
...
```
```json
// FILE: schemas/nodes_and_edges.schema.json
{ ... }
```
```json
// FILE: schemas/context.jsonld
{ ... }
```
```turtle
# FILE: schemas/shapes.ttl
# SHACL shapes for node/edge integrity
...
```
```cypher
// FILE: db/neo4j_schema.cypher
CREATE CONSTRAINT ...
```
```yaml
# FILE: config/heuristics.yaml
document_kinds: ...
```
```json
# FILE: config/mapping.json
{ "mappings": [ ... ] }
```
```yaml
# FILE: retrieval/chunking.yaml
# Layout-aware chunking, tables, overlap, token targets
```
```json
# FILE: retrieval/qdrant_collections.json
{
"collections": [
{ "name": "firm_knowledge", "dense": {"size": 1024}, "sparse": true, "payload_schema": { ... } },
{ "name": "legislation", "dense": {"size": 1024}, "sparse": true, "payload_schema": { ... } },
{ "name": "best_practices", "dense": {"size": 1024}, "sparse": true, "payload_schema": { ... } },
{ "name": "glossary", "dense": {"size": 768}, "sparse": true, "payload_schema": { ... } }
]
}
```
chunking_strategy:
default:
chunk_size: 1500 # tokens
overlap_percentage: 0.12 # 12% overlap
min_chunk_size: 300
max_chunk_size: 2000
```python
# FILE: retrieval/indexer.py
# De-identify -> embed dense/sparse -> upsert to Qdrant with payload
...
```
by_document_type:
legislation:
chunk_size: 2000 # Longer chunks for legal text
overlap_percentage: 0.15
preserve_sections: true
section_headers: ["Section", "Subsection", "Paragraph", "Article"]
```python
# FILE: retrieval/retriever.py
# Hybrid retrieval (alpha,beta), rerank, filters, return citations + KG hints
...
```
best_practices:
chunk_size: 1200
overlap_percentage: 0.10
preserve_lists: true
```python
# FILE: retrieval/fusion.py
# Join RAG chunks to KG rules/calculations/evidence; boost linked results
...
```
glossary:
chunk_size: 800 # Shorter for definitions
overlap_percentage: 0.05
preserve_definitions: true
```txt
# FILE: prompts/rag_answer.txt
[Instruction: cite every claim; forbid PII; return calibrated_confidence; JSON contract]
```
firm_knowledge:
chunk_size: 1500
overlap_percentage: 0.12
preserve_procedures: true
```python
# FILE: pipeline/etl.py
def ingest(...): ...
```
layout_awareness:
table_handling:
strategy: "serialize_structured"
max_table_size: 50 # rows
column_separator: " | "
row_separator: "\n"
preserve_headers: true
include_table_context: true # Include surrounding text
```txt
# FILE: prompts/kv_extract.txt
[Prompt with JSON contract + examples]
```
list_handling:
preserve_structure: true
bullet_points: ["•", "-", "*", "1.", "a.", "i."]
nested_indentation: true
```cypher
// FILE: reasoning/schedule_queries.cypher
// SA105: compute property income totals
MATCH ...
```
heading_hierarchy:
preserve_levels: true
max_heading_level: 6
include_parent_headings: true # For context
```json
// FILE: tools/agent_tools.json
{ ... }
```
paragraph_boundaries:
respect_boundaries: true
min_paragraph_length: 50 # characters
merge_short_paragraphs: true
```yaml
# FILE: infra/compose/docker-compose.local.yml
# Traefik (with Authentik ForwardAuth), Authentik, Vault, MinIO, Qdrant, Neo4j, Postgres, Redis, Prometheus/Grafana, Loki, Unleash, all services
```
text_preprocessing:
normalization:
unicode_normalization: "NFKC"
remove_extra_whitespace: true
standardize_quotes: true
fix_encoding_issues: true
```yaml
# FILE: infra/compose/traefik.yml
# Static config: entryPoints, providers, certificates, access logs
entryPoints:
web:
address: ":80"
websecure:
address: ":443"
providers:
docker: {}
file:
filename: /etc/traefik/traefik-dynamic.yml
api:
dashboard: true
log:
level: INFO
accessLog: {}
```
pii_handling:
de_identify_before_chunking: true
placeholder_format: "[{type}_{hash}]"
pii_types:
- "UTR"
- "NI_NUMBER"
- "IBAN"
- "SORT_CODE"
- "PHONE"
- "EMAIL"
- "POSTCODE"
- "NAME"
hash_algorithm: "sha256"
hash_truncate: 8 # characters
```yaml
# FILE: infra/compose/traefik-dynamic.yml
# Dynamic config: Authentik ForwardAuth middleware + routers per service
http:
middlewares:
authentik-forwardauth:
forwardAuth:
address: "http://authentik-outpost:9000/outpost.goauthentik.io/auth/traefik"
trustForwardHeader: true
authResponseHeaders:
- X-Authenticated-User
- X-Authenticated-Email
- X-Authenticated-Groups
- Authorization
rate-limit:
rateLimit:
average: 50
burst: 100
legal_text_handling:
preserve_citations: true
citation_patterns:
- "Section \\d+[A-Z]?"
- "Regulation \\d+"
- "Schedule \\d+"
- "Paragraph \\d+"
preserve_cross_references: true
routers:
svc-extract:
rule: "Host(`api.local`) && PathPrefix(`/extract`)"
entryPoints: ["websecure"]
service: svc-extract
middlewares: ["authentik-forwardauth", "rate-limit"]
tls: {}
services:
svc-extract:
loadBalancer:
servers:
- url: "http://svc-extract:8000"
```
chunking_rules:
sentence_boundary_detection:
use_spacy: true
model: "en_core_web_sm"
custom_abbreviations:
- "Ltd"
- "PLC"
- "HMRC"
- "UTR"
- "NIC"
- "PAYE"
- "VAT"
```yaml
# FILE: infra/compose/env.example
DOMAIN=local
EMAIL=admin@local
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=miniopass
POSTGRES_PASSWORD=postgres
NEO4J_PASSWORD=neo4jpass
QDRANT__SERVICE__GRPC_PORT=6334
VAULT_DEV_ROOT_TOKEN_ID=root
AUTHENTIK_SECRET_KEY=changeme
RAG_EMBEDDING_MODEL=bge-small-en-v1.5
RAG_RERANKER_MODEL=cross-encoder/ms-marco-MiniLM-L-6-v2
```
semantic_coherence:
avoid_splitting:
- "calculation_examples"
- "step_by_step_procedures"
- "form_instructions"
- "definition_blocks"
```yaml
# FILE: .gitea/workflows/ci.yml
# Lint → Test → Build → Scan → Push → Deploy (compose up)
```
overlap_strategy:
method: "sliding_window"
overlap_unit: "sentences" # vs "tokens" or "characters"
preserve_context: true
include_metadata_overlap: false
```makefile
# FILE: Makefile
# bootstrap, run, test, lint, build, deploy, format, seed
...
```
metadata_enrichment:
chunk_metadata:
- "source_document_id"
- "source_document_type"
- "chunk_index"
- "total_chunks"
- "page_numbers"
- "section_hierarchy"
- "table_count"
- "list_count"
- "has_calculations"
- "jurisdiction"
- "tax_years"
- "topic_tags"
- "confidence_score"
- "pii_free"
```md
<!-- FILE: TESTPLAN.md -->
content_analysis:
extract_entities:
- "tax_concepts"
- "form_references"
- "calculation_methods"
- "deadlines"
- "thresholds"
- "rates"
## Datasets, Metrics, Acceptance Criteria
topic_classification:
use_keywords: true
keyword_lists:
employment: ["PAYE", "payslip", "P60", "employment", "salary", "wages"]
self_employment:
["self-employed", "business", "turnover", "expenses", "profit"]
property: ["rental", "property", "landlord", "FHL", "mortgage interest"]
dividends: ["dividend", "shares", "distribution", "corporation tax"]
capital_gains: ["capital gains", "disposal", "acquisition", "CGT"]
- Extraction precision/recall per field
- Schedule-level absolute error
- Reconciliation pass-rate
- Explanation coverage
- RAG retrieval: top-k recall, nDCG, faithfulness, groundedness
- Security: Traefik+Authentik route auth tests, header spoofing prevention (internal network, trusted proxy)
- Red-team cases (OCR noise, conflicting docs, PII leak prevention)
...
```
quality_control:
validation_rules:
min_meaningful_content: 0.7 # Ratio of meaningful words
max_repetition_ratio: 0.3 # Avoid highly repetitive chunks
min_sentence_count: 2
max_sentence_count: 20
---
filtering:
exclude_patterns:
- "^\\s*$" # Empty chunks
- "^Page \\d+$" # Page numbers only
- "^\\[.*\\]$" # Placeholder-only chunks
- "^Table of Contents"
- "^Index$"
# STYLE & GUARANTEES
post_processing:
deduplicate_chunks: true
similarity_threshold: 0.95
merge_similar_chunks: false # Keep separate for provenance
- Be **concise but complete**; prefer schemas/code over prose.
- **No chain-of-thought.** Provide final artifacts and brief rationales.
- Every numeric output must include **lineage to Evidence → Document (page/bbox/text_hash)** and **citations** for narrative answers.
- Parameterize by {{jurisdiction}} and {{tax\_year}}.
- Include **calibrated_confidence** and name calibration method.
- Enforce **SHACL** on KG writes; reject/queue fixes on violation.
- **No PII** in Qdrant. Use de-ID placeholders; keep mappings only in Secure Client Data Store.
- Deterministic IDs; reproducible builds; version-pinned dependencies.
- **Trust boundary:** only Traefik exposes ports; all services on a private network; services accept only requests with Traefiks network identity; **never trust client-supplied auth headers**.
output_format:
chunk_structure:
id: "uuid4"
content: "string"
metadata: "object"
embeddings: "optional" # Added during indexing
# START
batch_processing:
batch_size: 100
parallel_workers: 4
memory_limit_mb: 1024
Produce the deliverables now, in the exact order and file/block structure above, implementing the **local-first stack (Python 3.12, Prefect, Vault, MinIO, Playwright, Qdrant, Authentik, Traefik, Docker-Compose, Gitea)** with optional **scale-out** notes (Temporal, K8s) where specified.
storage:
intermediate_format: "jsonl"
compression: "gzip"
include_source_mapping: true
performance_tuning:
caching:
cache_preprocessed: true
cache_embeddings: false # Too large
cache_metadata: true
ttl_hours: 24
optimization:
use_multiprocessing: true
chunk_size_adaptation: true # Adjust based on content type
early_stopping: true # For very long documents
monitoring:
track_processing_time: true
track_chunk_quality_scores: true
alert_on_failures: true
log_statistics: true

View File

@@ -1,507 +0,0 @@
# FILE: retrieval/indexer.py
# De-identify -> embed dense/sparse -> upsert to Qdrant with payload
import json
import logging
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import spacy
import torch
import yaml
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, SparseVector, VectorParams
from sentence_transformers import SentenceTransformer
from .chunker import DocumentChunker
from .pii_detector import PIIDetector, PIIRedactor
@dataclass
class IndexingResult:
collection_name: str
points_indexed: int
points_updated: int
points_failed: int
processing_time: float
errors: list[str]
class RAGIndexer:
def __init__(self, config_path: str, qdrant_url: str = "http://localhost:6333"):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.qdrant_client = QdrantClient(url=qdrant_url)
self.chunker = DocumentChunker(config_path)
self.pii_detector = PIIDetector()
self.pii_redactor = PIIRedactor()
# Initialize embedding models
self.dense_model = SentenceTransformer(
self.config.get("embedding_model", "bge-small-en-v1.5")
)
# Initialize sparse model (BM25/SPLADE)
self.sparse_model = self._init_sparse_model()
# Initialize NLP pipeline
self.nlp = spacy.load("en_core_web_sm")
self.logger = logging.getLogger(__name__)
def _init_sparse_model(self):
"""Initialize sparse embedding model (BM25 or SPLADE)"""
sparse_config = self.config.get("sparse_model", {})
model_type = sparse_config.get("type", "bm25")
if model_type == "bm25":
from rank_bm25 import BM25Okapi
return BM25Okapi
elif model_type == "splade":
from transformers import AutoModelForMaskedLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(
"naver/splade-cocondenser-ensembledistil"
)
model = AutoModelForMaskedLM.from_pretrained(
"naver/splade-cocondenser-ensembledistil"
)
return {"tokenizer": tokenizer, "model": model}
else:
raise ValueError(f"Unsupported sparse model type: {model_type}")
async def index_document(
self, document_path: str, collection_name: str, metadata: dict[str, Any]
) -> IndexingResult:
"""Index a single document into the specified collection"""
start_time = datetime.now()
errors = []
points_indexed = 0
points_updated = 0
points_failed = 0
try:
# Step 1: Chunk the document
chunks = await self.chunker.chunk_document(document_path, metadata)
# Step 2: Process each chunk
points = []
for chunk in chunks:
try:
point = await self._process_chunk(chunk, collection_name, metadata)
if point:
points.append(point)
except Exception as e:
self.logger.error(
f"Failed to process chunk {chunk.get('id', 'unknown')}: {str(e)}"
)
errors.append(f"Chunk processing error: {str(e)}")
points_failed += 1
# Step 3: Upsert to Qdrant
if points:
try:
operation_info = self.qdrant_client.upsert(
collection_name=collection_name, points=points, wait=True
)
points_indexed = len(points)
self.logger.info(
f"Indexed {points_indexed} points to {collection_name}"
)
except Exception as e:
self.logger.error(f"Failed to upsert to Qdrant: {str(e)}")
errors.append(f"Qdrant upsert error: {str(e)}")
points_failed += len(points)
points_indexed = 0
except Exception as e:
self.logger.error(f"Document indexing failed: {str(e)}")
errors.append(f"Document indexing error: {str(e)}")
processing_time = (datetime.now() - start_time).total_seconds()
return IndexingResult(
collection_name=collection_name,
points_indexed=points_indexed,
points_updated=points_updated,
points_failed=points_failed,
processing_time=processing_time,
errors=errors,
)
async def _process_chunk(
self, chunk: dict[str, Any], collection_name: str, base_metadata: dict[str, Any]
) -> PointStruct | None:
"""Process a single chunk: de-identify, embed, create point"""
# Step 1: De-identify PII
content = chunk["content"]
pii_detected = self.pii_detector.detect(content)
if pii_detected:
# Redact PII and create mapping
redacted_content, pii_mapping = self.pii_redactor.redact(
content, pii_detected
)
# Store PII mapping securely (not in vector DB)
await self._store_pii_mapping(chunk["id"], pii_mapping)
# Log PII detection for audit
self.logger.warning(
f"PII detected in chunk {chunk['id']}: {[p['type'] for p in pii_detected]}"
)
else:
redacted_content = content
# Verify no PII remains
if not self._verify_pii_free(redacted_content):
self.logger.error(f"PII verification failed for chunk {chunk['id']}")
return None
# Step 2: Generate embeddings
try:
dense_vector = await self._generate_dense_embedding(redacted_content)
sparse_vector = await self._generate_sparse_embedding(redacted_content)
except Exception as e:
self.logger.error(
f"Embedding generation failed for chunk {chunk['id']}: {str(e)}"
)
return None
# Step 3: Prepare metadata
payload = self._prepare_payload(chunk, base_metadata, redacted_content)
payload["pii_free"] = True # Verified above
# Step 4: Create point
point = PointStruct(
id=chunk["id"],
vector={"dense": dense_vector, "sparse": sparse_vector},
payload=payload,
)
return point
async def _generate_dense_embedding(self, text: str) -> list[float]:
"""Generate dense vector embedding"""
try:
# Use sentence transformer for dense embeddings
embedding = self.dense_model.encode(text, normalize_embeddings=True)
return embedding.tolist()
except Exception as e:
self.logger.error(f"Dense embedding generation failed: {str(e)}")
raise
async def _generate_sparse_embedding(self, text: str) -> SparseVector:
"""Generate sparse vector embedding (BM25 or SPLADE)"""
vector = SparseVector(indices=[], values=[])
try:
sparse_config = self.config.get("sparse_model", {})
model_type = sparse_config.get("type", "bm25")
if model_type == "bm25":
# Simple BM25-style sparse representation
doc = self.nlp(text)
tokens = [
token.lemma_.lower()
for token in doc
if not token.is_stop and not token.is_punct
]
# Create term frequency vector
term_freq = {}
for token in tokens:
term_freq[token] = term_freq.get(token, 0) + 1
# Convert to sparse vector format
vocab_size = sparse_config.get("vocab_size", 30000)
indices = []
values = []
for term, freq in term_freq.items():
# Simple hash-based vocabulary mapping
term_id = hash(term) % vocab_size
indices.append(term_id)
values.append(float(freq))
vector = SparseVector(indices=indices, values=values)
elif model_type == "splade":
# SPLADE sparse embeddings
tokenizer = self.sparse_model["tokenizer"]
model = self.sparse_model["model"]
inputs = tokenizer(
text, return_tensors="pt", truncation=True, max_length=512
)
outputs = model(**inputs)
# Extract sparse representation
logits = outputs.logits.squeeze()
sparse_rep = torch.relu(logits).detach().numpy()
# Convert to sparse format
indices = np.nonzero(sparse_rep)[0].tolist()
values = sparse_rep[indices].tolist()
vector = SparseVector(indices=indices, values=values)
return vector
except Exception as e:
self.logger.error(f"Sparse embedding generation failed: {str(e)}")
# Return empty sparse vector as fallback
return vector
def _prepare_payload(
self, chunk: dict[str, Any], base_metadata: dict[str, Any], content: str
) -> dict[str, Any]:
"""Prepare payload metadata for the chunk"""
# Start with base metadata
payload = base_metadata.copy()
# Add chunk-specific metadata
payload.update(
{
"document_id": chunk.get("document_id"),
"content": content, # De-identified content
"chunk_index": chunk.get("chunk_index", 0),
"total_chunks": chunk.get("total_chunks", 1),
"page_numbers": chunk.get("page_numbers", []),
"section_hierarchy": chunk.get("section_hierarchy", []),
"has_calculations": self._detect_calculations(content),
"has_forms": self._detect_form_references(content),
"confidence_score": chunk.get("confidence_score", 1.0),
"created_at": datetime.now().isoformat(),
"version": self.config.get("version", "1.0"),
}
)
# Extract and add topic tags
topic_tags = self._extract_topic_tags(content)
if topic_tags:
payload["topic_tags"] = topic_tags
# Add content analysis
payload.update(self._analyze_content(content))
return payload
def _detect_calculations(self, text: str) -> bool:
"""Detect if text contains calculations or formulas"""
calculation_patterns = [
r"\d+\s*[+\-*/]\s*\d+",
r"£\d+(?:,\d{3})*(?:\.\d{2})?",
r"\d+(?:\.\d+)?%",
r"total|sum|calculate|compute",
r"rate|threshold|allowance|relief",
]
for pattern in calculation_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def _detect_form_references(self, text: str) -> bool:
"""Detect references to tax forms"""
form_patterns = [
r"SA\d{3}",
r"P\d{2}",
r"CT\d{3}",
r"VAT\d{3}",
r"form\s+\w+",
r"schedule\s+\w+",
]
for pattern in form_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def _extract_topic_tags(self, text: str) -> list[str]:
"""Extract topic tags from content"""
topic_keywords = {
"employment": [
"PAYE",
"payslip",
"P60",
"employment",
"salary",
"wages",
"employer",
],
"self_employment": [
"self-employed",
"business",
"turnover",
"expenses",
"profit",
"loss",
],
"property": ["rental", "property", "landlord", "FHL", "mortgage", "rent"],
"dividends": ["dividend", "shares", "distribution", "corporation tax"],
"capital_gains": ["capital gains", "disposal", "acquisition", "CGT"],
"pensions": ["pension", "retirement", "SIPP", "occupational"],
"savings": ["interest", "savings", "ISA", "bonds"],
"inheritance": ["inheritance", "IHT", "estate", "probate"],
"vat": ["VAT", "value added tax", "registration", "return"],
}
tags = []
text_lower = text.lower()
for topic, keywords in topic_keywords.items():
for keyword in keywords:
if keyword.lower() in text_lower:
tags.append(topic)
break
return list(set(tags)) # Remove duplicates
def _analyze_content(self, text: str) -> dict[str, Any]:
"""Analyze content for additional metadata"""
doc = self.nlp(text)
return {
"word_count": len([token for token in doc if not token.is_space]),
"sentence_count": len(list(doc.sents)),
"entity_count": len(doc.ents),
"complexity_score": self._calculate_complexity(doc),
"language": doc.lang_ if hasattr(doc, "lang_") else "en",
}
def _calculate_complexity(self, doc: dict) -> float:
"""Calculate text complexity score"""
if not doc:
return 0.0
# Simple complexity based on sentence length and vocabulary
avg_sentence_length = sum(len(sent) for sent in doc.sents) / len(
list(doc.sents)
)
unique_words = len(set(token.lemma_.lower() for token in doc if token.is_alpha))
total_words = len([token for token in doc if token.is_alpha])
vocabulary_diversity = unique_words / total_words if total_words > 0 else 0
# Normalize to 0-1 scale
complexity = min(1.0, (avg_sentence_length / 20.0 + vocabulary_diversity) / 2.0)
return complexity
def _verify_pii_free(self, text: str) -> bool:
"""Verify that text contains no PII"""
# Quick verification using patterns
pii_patterns = [
r"\b[A-Z]{2}\d{6}[A-D]\b", # NI number
r"\b\d{10}\b", # UTR
r"\b[A-Z]{2}\d{2}[A-Z]{4}\d{14}\b", # IBAN
r"\b\d{2}-\d{2}-\d{2}\b", # Sort code
r"\b[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}\b", # Postcode
r"\b[\w\.-]+@[\w\.-]+\.\w+\b", # Email
r"\b(?:\+44|0)\d{10,11}\b", # Phone
]
for pattern in pii_patterns:
if re.search(pattern, text):
return False
return True
async def _store_pii_mapping(
self, chunk_id: str, pii_mapping: dict[str, Any]
) -> None:
"""Store PII mapping in secure client data store (not in vector DB)"""
# This would integrate with the secure PostgreSQL client data store
# For now, just log the mapping securely
self.logger.info(
f"PII mapping stored for chunk {chunk_id}: {len(pii_mapping)} items"
)
async def create_collections(self) -> None:
"""Create all Qdrant collections based on configuration"""
collections_config_path = Path(__file__).parent / "qdrant_collections.json"
with open(collections_config_path) as f:
collections_config = json.load(f)
for collection_config in collections_config["collections"]:
collection_name = collection_config["name"]
try:
# Check if collection exists
try:
self.qdrant_client.get_collection(collection_name)
self.logger.info(f"Collection {collection_name} already exists")
continue
except:
pass # Collection doesn't exist, create it
# Create collection
vectors_config = {}
# Dense vector configuration
if "dense" in collection_config:
vectors_config["dense"] = VectorParams(
size=collection_config["dense"]["size"],
distance=Distance.COSINE,
)
# Sparse vector configuration
if collection_config.get("sparse", False):
vectors_config["sparse"] = VectorParams(
size=30000, # Vocabulary size for sparse vectors
distance=Distance.DOT,
on_disk=True,
)
self.qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=vectors_config,
**collection_config.get("indexing_config", {}),
)
self.logger.info(f"Created collection: {collection_name}")
except Exception as e:
self.logger.error(
f"Failed to create collection {collection_name}: {str(e)}"
)
raise
async def batch_index(
self, documents: list[dict[str, Any]], collection_name: str
) -> list[IndexingResult]:
"""Index multiple documents in batch"""
results = []
for doc_info in documents:
result = await self.index_document(
doc_info["path"], collection_name, doc_info["metadata"]
)
results.append(result)
return results
def get_collection_stats(self, collection_name: str) -> dict[str, Any]:
"""Get statistics for a collection"""
try:
collection_info = self.qdrant_client.get_collection(collection_name)
return {
"name": collection_name,
"vectors_count": collection_info.vectors_count,
"indexed_vectors_count": collection_info.indexed_vectors_count,
"points_count": collection_info.points_count,
"segments_count": collection_info.segments_count,
"status": collection_info.status,
}
except Exception as e:
self.logger.error(f"Failed to get stats for {collection_name}: {str(e)}")
return {"error": str(e)}

View File

@@ -0,0 +1,351 @@
{
"collections": [
{
"name": "firm_knowledge",
"description": "Internal firm procedures, templates, and client-specific knowledge",
"dense": {
"size": 1024,
"distance": "Cosine"
},
"sparse": true,
"payload_schema": {
"type": "object",
"properties": {
"document_id": { "type": "string" },
"document_type": {
"type": "string",
"enum": ["procedure", "template", "memo", "guidance"]
},
"title": { "type": "string" },
"content": { "type": "string" },
"chunk_index": { "type": "integer" },
"total_chunks": { "type": "integer" },
"jurisdiction": { "type": "string", "enum": ["UK", "US", "EU"] },
"tax_years": { "type": "array", "items": { "type": "string" } },
"topic_tags": { "type": "array", "items": { "type": "string" } },
"client_types": {
"type": "array",
"items": {
"type": "string",
"enum": ["individual", "partnership", "company", "trust"]
}
},
"practice_areas": { "type": "array", "items": { "type": "string" } },
"version": { "type": "string" },
"created_at": { "type": "string", "format": "date-time" },
"updated_at": { "type": "string", "format": "date-time" },
"author": { "type": "string" },
"review_status": {
"type": "string",
"enum": ["draft", "reviewed", "approved", "archived"]
},
"access_level": {
"type": "string",
"enum": ["public", "internal", "restricted", "confidential"]
},
"pii_free": { "type": "boolean", "const": true },
"source_url": { "type": "string" },
"page_numbers": { "type": "array", "items": { "type": "integer" } },
"section_hierarchy": {
"type": "array",
"items": { "type": "string" }
},
"has_calculations": { "type": "boolean" },
"has_forms": { "type": "boolean" },
"confidence_score": { "type": "number", "minimum": 0, "maximum": 1 }
},
"required": [
"document_id",
"document_type",
"content",
"jurisdiction",
"pii_free"
]
},
"indexing_config": {
"replication_factor": 2,
"write_consistency_factor": 1,
"on_disk_payload": true,
"hnsw_config": {
"m": 16,
"ef_construct": 100,
"full_scan_threshold": 10000
},
"quantization_config": {
"scalar": {
"type": "int8",
"quantile": 0.99,
"always_ram": true
}
}
}
},
{
"name": "legislation",
"description": "Tax legislation, regulations, and official guidance",
"dense": {
"size": 1024,
"distance": "Cosine"
},
"sparse": true,
"payload_schema": {
"type": "object",
"properties": {
"document_id": { "type": "string" },
"document_type": {
"type": "string",
"enum": ["act", "regulation", "guidance", "case_law", "circular"]
},
"title": { "type": "string" },
"content": { "type": "string" },
"chunk_index": { "type": "integer" },
"total_chunks": { "type": "integer" },
"jurisdiction": { "type": "string" },
"effective_from": { "type": "string", "format": "date" },
"effective_to": { "type": "string", "format": "date" },
"tax_years": { "type": "array", "items": { "type": "string" } },
"legislation_reference": { "type": "string" },
"section_number": { "type": "string" },
"subsection_number": { "type": "string" },
"topic_tags": { "type": "array", "items": { "type": "string" } },
"form_references": { "type": "array", "items": { "type": "string" } },
"calculation_methods": {
"type": "array",
"items": { "type": "string" }
},
"thresholds": { "type": "array", "items": { "type": "object" } },
"rates": { "type": "array", "items": { "type": "object" } },
"deadlines": {
"type": "array",
"items": { "type": "string", "format": "date" }
},
"version": { "type": "string" },
"source_authority": {
"type": "string",
"enum": ["HMRC", "Parliament", "Courts", "Treasury"]
},
"pii_free": { "type": "boolean", "const": true },
"source_url": { "type": "string" },
"page_numbers": { "type": "array", "items": { "type": "integer" } },
"cross_references": {
"type": "array",
"items": { "type": "string" }
},
"amendments": { "type": "array", "items": { "type": "object" } },
"precedence_level": { "type": "integer", "minimum": 1, "maximum": 10 }
},
"required": [
"document_id",
"document_type",
"content",
"jurisdiction",
"effective_from",
"pii_free"
]
},
"indexing_config": {
"replication_factor": 3,
"write_consistency_factor": 2,
"on_disk_payload": true,
"hnsw_config": {
"m": 32,
"ef_construct": 200,
"full_scan_threshold": 20000
}
}
},
{
"name": "best_practices",
"description": "Industry best practices, professional standards, and methodologies",
"dense": {
"size": 1024,
"distance": "Cosine"
},
"sparse": true,
"payload_schema": {
"type": "object",
"properties": {
"document_id": { "type": "string" },
"document_type": {
"type": "string",
"enum": [
"standard",
"guideline",
"methodology",
"checklist",
"workflow"
]
},
"title": { "type": "string" },
"content": { "type": "string" },
"chunk_index": { "type": "integer" },
"total_chunks": { "type": "integer" },
"jurisdiction": { "type": "string" },
"applicable_years": {
"type": "array",
"items": { "type": "string" }
},
"topic_tags": { "type": "array", "items": { "type": "string" } },
"practice_areas": { "type": "array", "items": { "type": "string" } },
"complexity_level": {
"type": "string",
"enum": ["basic", "intermediate", "advanced", "expert"]
},
"client_types": { "type": "array", "items": { "type": "string" } },
"professional_body": {
"type": "string",
"enum": ["ICAEW", "ACCA", "CIOT", "ATT", "STEP"]
},
"version": { "type": "string" },
"last_reviewed": { "type": "string", "format": "date" },
"review_frequency": {
"type": "string",
"enum": ["annual", "biannual", "as_needed"]
},
"pii_free": { "type": "boolean", "const": true },
"source_url": { "type": "string" },
"related_forms": { "type": "array", "items": { "type": "string" } },
"risk_level": {
"type": "string",
"enum": ["low", "medium", "high", "critical"]
},
"automation_suitable": { "type": "boolean" },
"quality_score": { "type": "number", "minimum": 0, "maximum": 1 }
},
"required": [
"document_id",
"document_type",
"content",
"jurisdiction",
"pii_free"
]
},
"indexing_config": {
"replication_factor": 2,
"write_consistency_factor": 1,
"on_disk_payload": true,
"hnsw_config": {
"m": 16,
"ef_construct": 100,
"full_scan_threshold": 10000
}
}
},
{
"name": "glossary",
"description": "Tax terminology, definitions, and concept explanations",
"dense": {
"size": 768,
"distance": "Cosine"
},
"sparse": true,
"payload_schema": {
"type": "object",
"properties": {
"document_id": { "type": "string" },
"document_type": { "type": "string", "const": "definition" },
"term": { "type": "string" },
"definition": { "type": "string" },
"content": { "type": "string" },
"chunk_index": { "type": "integer" },
"total_chunks": { "type": "integer" },
"jurisdiction": { "type": "string" },
"applicable_years": {
"type": "array",
"items": { "type": "string" }
},
"category": {
"type": "string",
"enum": [
"tax_concept",
"legal_term",
"accounting_term",
"form_field",
"calculation_method"
]
},
"complexity_level": {
"type": "string",
"enum": ["basic", "intermediate", "advanced"]
},
"synonyms": { "type": "array", "items": { "type": "string" } },
"related_terms": { "type": "array", "items": { "type": "string" } },
"form_references": { "type": "array", "items": { "type": "string" } },
"legislation_references": {
"type": "array",
"items": { "type": "string" }
},
"examples": { "type": "array", "items": { "type": "string" } },
"version": { "type": "string" },
"source_authority": { "type": "string" },
"pii_free": { "type": "boolean", "const": true },
"source_url": { "type": "string" },
"usage_frequency": {
"type": "string",
"enum": ["common", "occasional", "rare", "obsolete"]
},
"definition_quality": { "type": "number", "minimum": 0, "maximum": 1 }
},
"required": [
"document_id",
"term",
"definition",
"content",
"jurisdiction",
"category",
"pii_free"
]
},
"indexing_config": {
"replication_factor": 2,
"write_consistency_factor": 1,
"on_disk_payload": true,
"hnsw_config": {
"m": 16,
"ef_construct": 100,
"full_scan_threshold": 5000
}
}
}
],
"global_config": {
"default_segment_number": 4,
"max_segment_size_kb": 1048576,
"memmap_threshold_kb": 1048576,
"indexing_threshold_kb": 20480,
"payload_storage_type": "on_disk",
"enable_payload_index": true,
"wal_config": {
"wal_capacity_mb": 32,
"wal_segments_ahead": 0
},
"optimizer_config": {
"deleted_threshold": 0.2,
"vacuum_min_vector_number": 1000,
"default_segment_number": 0,
"max_segment_size_kb": 1048576,
"memmap_threshold_kb": 1048576,
"indexing_threshold_kb": 20480,
"flush_interval_sec": 5,
"max_optimization_threads": 1
}
},
"backup_config": {
"enabled": true,
"schedule": "0 2 * * *",
"retention_days": 30,
"compression": true,
"verify_integrity": true
},
"monitoring": {
"metrics_enabled": true,
"log_level": "INFO",
"telemetry_disabled": false,
"performance_tracking": {
"track_search_latency": true,
"track_indexing_throughput": true,
"track_memory_usage": true,
"track_disk_usage": true
}
}
}