Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
135 lines
4.7 KiB
Python
135 lines
4.7 KiB
Python
"""Simple document chunker for RAG indexing.
|
|
|
|
Splits documents into manageable chunks using configuration options.
|
|
Supports text files directly and PDFs via pdfplumber when available.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
@dataclass
|
|
class ChunkerConfig:
|
|
chunk_size: int = 1000
|
|
chunk_overlap: int = 100
|
|
max_chunks: int = 1000
|
|
|
|
|
|
class DocumentChunker:
|
|
def __init__(self, config_path: str) -> None:
|
|
try:
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
cfg = yaml.safe_load(f) or {}
|
|
except Exception:
|
|
cfg = {}
|
|
|
|
rcfg = cfg.get("chunking", {}) if isinstance(cfg, dict) else {}
|
|
self.config = ChunkerConfig(
|
|
chunk_size=int(rcfg.get("chunk_size", 1000)),
|
|
chunk_overlap=int(rcfg.get("chunk_overlap", 100)),
|
|
max_chunks=int(rcfg.get("max_chunks", 1000)),
|
|
)
|
|
|
|
async def chunk_document(self, document_path: str, metadata: dict[str, Any]) -> list[dict[str, Any]]:
|
|
path = Path(document_path)
|
|
ext = path.suffix.lower()
|
|
|
|
if ext == ".pdf":
|
|
return await self._chunk_pdf(path, metadata)
|
|
else:
|
|
return await self._chunk_text_like(path, metadata)
|
|
|
|
async def _chunk_pdf(self, path: Path, metadata: dict[str, Any]) -> list[dict[str, Any]]:
|
|
chunks: list[dict[str, Any]] = []
|
|
try:
|
|
import pdfplumber # type: ignore
|
|
|
|
with pdfplumber.open(str(path)) as pdf:
|
|
total_pages = len(pdf.pages)
|
|
doc_id = metadata.get("doc_id") or path.stem
|
|
for i, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text() or ""
|
|
if not text.strip():
|
|
continue
|
|
for j, content in enumerate(self._split_text(text), start=0):
|
|
cid = f"{doc_id}-p{i}-c{j}"
|
|
chunks.append(
|
|
{
|
|
"id": cid,
|
|
"document_id": doc_id,
|
|
"content": content,
|
|
"chunk_index": j,
|
|
"total_chunks": total_pages,
|
|
"page_numbers": [i],
|
|
"section_hierarchy": [],
|
|
"confidence_score": 1.0,
|
|
}
|
|
)
|
|
if len(chunks) >= self.config.max_chunks:
|
|
return chunks
|
|
except Exception:
|
|
# Fallback: treat as binary and produce a single empty chunk to avoid crashes
|
|
chunks.append(
|
|
{
|
|
"id": f"{path.stem}-p1-c0",
|
|
"document_id": path.stem,
|
|
"content": "",
|
|
"chunk_index": 0,
|
|
"total_chunks": 1,
|
|
"page_numbers": [1],
|
|
"section_hierarchy": [],
|
|
"confidence_score": 0.0,
|
|
}
|
|
)
|
|
return chunks
|
|
|
|
async def _chunk_text_like(self, path: Path, metadata: dict[str, Any]) -> list[dict[str, Any]]:
|
|
try:
|
|
text = path.read_text(encoding="utf-8", errors="ignore")
|
|
except Exception:
|
|
# As a last resort, read bytes and decode best-effort
|
|
data = path.read_bytes()
|
|
text = data.decode("utf-8", errors="ignore")
|
|
|
|
doc_id = metadata.get("doc_id") or path.stem
|
|
pieces = self._split_text(text)
|
|
chunks: list[dict[str, Any]] = []
|
|
total = min(len(pieces), self.config.max_chunks)
|
|
for i, content in enumerate(pieces[: total]):
|
|
chunks.append(
|
|
{
|
|
"id": f"{doc_id}-c{i}",
|
|
"document_id": doc_id,
|
|
"content": content,
|
|
"chunk_index": i,
|
|
"total_chunks": total,
|
|
"page_numbers": [],
|
|
"section_hierarchy": [],
|
|
"confidence_score": 1.0,
|
|
}
|
|
)
|
|
return chunks
|
|
|
|
def _split_text(self, text: str) -> list[str]:
|
|
size = max(self.config.chunk_size, 1)
|
|
overlap = max(min(self.config.chunk_overlap, size - 1), 0)
|
|
|
|
if not text:
|
|
return [""]
|
|
|
|
chunks: list[str] = []
|
|
start = 0
|
|
n = len(text)
|
|
step = size - overlap if size > overlap else size
|
|
while start < n and len(chunks) < self.config.max_chunks:
|
|
end = min(start + size, n)
|
|
chunks.append(text[start:end])
|
|
start += step
|
|
return chunks
|
|
|