Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
247 lines
9.5 KiB
Python
247 lines
9.5 KiB
Python
"""PDF form filling using pdfrw with reportlab fallback."""
|
|
|
|
import io
|
|
from typing import Any
|
|
|
|
import structlog
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class PDFFormFiller:
|
|
"""PDF form filling using pdfrw with reportlab fallback"""
|
|
|
|
def __init__(self) -> None:
|
|
self.form_templates: dict[str, Any] = {}
|
|
|
|
def load_template(self, form_id: str, template_path: str) -> bool:
|
|
"""Load PDF form template"""
|
|
try:
|
|
# pylint: disable=import-outside-toplevel
|
|
from pdfrw import PdfReader # type: ignore
|
|
|
|
template = PdfReader(template_path)
|
|
if template is None:
|
|
logger.error(
|
|
"Failed to load PDF template", form_id=form_id, path=template_path
|
|
)
|
|
return False
|
|
|
|
self.form_templates[form_id] = {"template": template, "path": template_path}
|
|
|
|
logger.info("Loaded PDF template", form_id=form_id, path=template_path)
|
|
return True
|
|
|
|
except ImportError:
|
|
logger.error("pdfrw not available for PDF form filling")
|
|
return False
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
logger.error("Failed to load PDF template", form_id=form_id, error=str(e))
|
|
return False
|
|
|
|
def fill_form(
|
|
self,
|
|
form_id: str,
|
|
field_values: dict[str, str | int | float | bool],
|
|
output_path: str | None = None,
|
|
) -> bytes | None:
|
|
"""Fill PDF form with values"""
|
|
|
|
if form_id not in self.form_templates:
|
|
logger.error("Form template not loaded", form_id=form_id)
|
|
return None
|
|
|
|
try:
|
|
return self._fill_with_pdfrw(form_id, field_values, output_path)
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
logger.warning(
|
|
"pdfrw filling failed, trying reportlab overlay", error=str(e)
|
|
)
|
|
return self._fill_with_overlay(form_id, field_values, output_path)
|
|
|
|
def _fill_with_pdfrw(
|
|
self,
|
|
form_id: str,
|
|
field_values: dict[str, Any],
|
|
output_path: str | None = None,
|
|
) -> bytes | None:
|
|
"""Fill form using pdfrw"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from pdfrw import PdfDict, PdfReader, PdfWriter
|
|
|
|
template_info = self.form_templates[form_id]
|
|
template = PdfReader(template_info["path"])
|
|
|
|
# Get form fields
|
|
if template.Root.AcroForm is None: # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue] # fmt: skip
|
|
logger.warning("PDF has no AcroForm fields", form_id=form_id)
|
|
return self._fill_with_overlay(form_id, field_values, output_path)
|
|
|
|
# Fill form fields
|
|
for field in template.Root.AcroForm.Fields: # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue] # fmt: skip
|
|
field_name = field.T
|
|
if field_name and field_name[1:-1] in field_values: # Remove parentheses
|
|
field_value = field_values[field_name[1:-1]]
|
|
|
|
# Set field value
|
|
if isinstance(field_value, bool):
|
|
# Checkbox field
|
|
if field_value:
|
|
field.V = PdfDict.Yes # fmt: skip # pyright: ignore[reportAttributeAccessIssue]
|
|
field.AS = PdfDict.Yes # fmt: skip # pyright: ignore[reportAttributeAccessIssue]
|
|
else:
|
|
field.V = PdfDict.Off # fmt: skip # pyright: ignore[reportAttributeAccessIssue]
|
|
field.AS = PdfDict.Off # fmt: skip # pyright: ignore[reportAttributeAccessIssue]
|
|
else:
|
|
# Text field
|
|
field.V = str(field_value)
|
|
|
|
# Make field read-only
|
|
field.Ff = 1 # Read-only flag
|
|
|
|
# Flatten form (make fields non-editable)
|
|
if template.Root.AcroForm: # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue] # fmt: skip
|
|
template.Root.AcroForm.NeedAppearances = True # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue] # fmt: skip
|
|
|
|
# Write to output
|
|
if output_path:
|
|
writer = PdfWriter(output_path)
|
|
writer.write(template)
|
|
with open(output_path, "rb") as f:
|
|
return f.read()
|
|
else:
|
|
# Write to bytes
|
|
output_buffer = io.BytesIO()
|
|
writer = PdfWriter(output_buffer)
|
|
writer.write(template)
|
|
return output_buffer.getvalue()
|
|
|
|
def _fill_with_overlay( # pylint: disable=too-many-locals
|
|
self,
|
|
form_id: str,
|
|
field_values: dict[str, Any],
|
|
output_path: str | None = None,
|
|
) -> bytes | None:
|
|
"""Fill form using reportlab overlay method"""
|
|
try:
|
|
# pylint: disable=import-outside-toplevel
|
|
from PyPDF2 import PdfReader, PdfWriter
|
|
from reportlab.lib.pagesizes import A4
|
|
from reportlab.pdfgen import canvas
|
|
|
|
template_info = self.form_templates[form_id]
|
|
|
|
# Read original PDF
|
|
original_pdf = PdfReader(template_info["path"])
|
|
|
|
# Create overlay with form data
|
|
overlay_buffer = io.BytesIO()
|
|
overlay_canvas = canvas.Canvas(overlay_buffer, pagesize=A4)
|
|
|
|
# Get field positions (this would be configured per form)
|
|
field_positions = self._get_field_positions(form_id)
|
|
|
|
# Add text to overlay
|
|
for field_name, value in field_values.items():
|
|
if field_name in field_positions:
|
|
pos = field_positions[field_name]
|
|
overlay_canvas.drawString(pos["x"], pos["y"], str(value))
|
|
|
|
overlay_canvas.save()
|
|
overlay_buffer.seek(0)
|
|
|
|
# Read overlay PDF
|
|
overlay_pdf = PdfReader(overlay_buffer)
|
|
|
|
# Merge original and overlay
|
|
writer = PdfWriter()
|
|
for page_num, _ in enumerate(original_pdf.pages):
|
|
original_page = original_pdf.pages[page_num]
|
|
|
|
if page_num < len(overlay_pdf.pages):
|
|
overlay_page = overlay_pdf.pages[page_num]
|
|
original_page.merge_page(overlay_page)
|
|
|
|
writer.add_page(original_page)
|
|
|
|
# Write result
|
|
if output_path:
|
|
with open(output_path, "wb") as output_file:
|
|
writer.write(output_file)
|
|
with open(output_path, "rb") as f:
|
|
return f.read()
|
|
else:
|
|
output_buffer = io.BytesIO()
|
|
writer.write(output_buffer)
|
|
return output_buffer.getvalue()
|
|
|
|
except ImportError as e:
|
|
logger.error(
|
|
"Required libraries not available for overlay method", error=str(e)
|
|
)
|
|
return None
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
logger.error("Overlay filling failed", form_id=form_id, error=str(e))
|
|
return None
|
|
|
|
def _get_field_positions(self, form_id: str) -> dict[str, dict[str, float]]:
|
|
"""Get field positions for overlay method"""
|
|
# This would be configured per form type
|
|
# For now, return sample positions for SA103
|
|
if form_id == "SA103":
|
|
return {
|
|
"box_1": {"x": 100, "y": 750}, # Business name
|
|
"box_2": {"x": 100, "y": 720}, # Business description
|
|
"box_20": {"x": 400, "y": 600}, # Total turnover
|
|
"box_31": {"x": 400, "y": 570}, # Total expenses
|
|
"box_32": {"x": 400, "y": 540}, # Net profit
|
|
}
|
|
return {}
|
|
|
|
def get_form_fields(self, form_id: str) -> list[dict[str, Any]]:
|
|
"""Get list of available form fields"""
|
|
if form_id not in self.form_templates:
|
|
return []
|
|
|
|
try:
|
|
# pylint: disable=import-outside-toplevel
|
|
from pdfrw import PdfReader
|
|
|
|
template_info = self.form_templates[form_id]
|
|
template = PdfReader(template_info["path"])
|
|
|
|
if template.Root.AcroForm is None: # fmt: skip # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
|
|
return []
|
|
|
|
fields = []
|
|
for field in template.Root.AcroForm.Fields: # fmt: skip # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
|
|
field_info = {
|
|
"name": field.T[1:-1] if field.T else None, # Remove parentheses
|
|
"type": self._get_field_type(field),
|
|
"required": bool(field.Ff and int(field.Ff) & 2), # Required flag
|
|
"readonly": bool(field.Ff and int(field.Ff) & 1), # Read-only flag
|
|
}
|
|
|
|
if field.V:
|
|
field_info["default_value"] = str(field.V)
|
|
|
|
fields.append(field_info)
|
|
|
|
return fields
|
|
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
logger.error("Failed to get form fields", form_id=form_id, error=str(e))
|
|
return []
|
|
|
|
def _get_field_type(self, field: Any) -> str:
|
|
"""Determine field type from PDF field"""
|
|
if hasattr(field, "FT"):
|
|
field_type = str(field.FT)
|
|
if "Tx" in field_type:
|
|
return "text"
|
|
if "Btn" in field_type:
|
|
return "checkbox" if field.Ff and int(field.Ff) & 32768 else "button"
|
|
if "Ch" in field_type:
|
|
return "choice"
|
|
return "unknown"
|