Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
508 lines
24 KiB
Python
508 lines
24 KiB
Python
import base64
|
|
import concurrent.futures
|
|
import io
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
import requests
|
|
from PIL import Image, ImageFilter
|
|
from PyPDF2 import PdfReader
|
|
|
|
|
|
class OCRProcessor:
|
|
def __init__(
|
|
self,
|
|
model_name: str = "llama3.2-vision:11b",
|
|
base_url: str = "http://localhost:11434/api/generate",
|
|
max_workers: int = 1,
|
|
provider: str = "ollama",
|
|
openai_api_key: str | None = None,
|
|
openai_base_url: str = "https://api.openai.com/v1/chat/completions",
|
|
):
|
|
self.model_name = model_name
|
|
self.base_url = base_url
|
|
self.max_workers = max_workers
|
|
self.provider = provider.lower()
|
|
self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
|
|
self.openai_base_url = openai_base_url
|
|
|
|
def _encode_image(self, image_path: str) -> str:
|
|
"""Convert image to base64 string"""
|
|
with open(image_path, "rb") as image_file:
|
|
return base64.b64encode(image_file.read()).decode("utf-8")
|
|
|
|
def _pdf_to_images(self, pdf_path: str) -> list[str]:
|
|
"""
|
|
Convert each page of a PDF to an image without PyMuPDF.
|
|
Strategy: extract largest embedded image per page via PyPDF2.
|
|
Saves each selected image as a temporary PNG and returns paths.
|
|
|
|
Note: Text-only pages with no embedded images will be skipped here.
|
|
Use _pdf_extract_text as a fallback for such pages.
|
|
"""
|
|
image_paths: list[str] = []
|
|
try:
|
|
reader = PdfReader(pdf_path)
|
|
for page_index, page in enumerate(reader.pages):
|
|
try:
|
|
resources = page.get("/Resources")
|
|
if resources is None:
|
|
continue
|
|
xobject = resources.get("/XObject")
|
|
if xobject is None:
|
|
continue
|
|
xobject = xobject.get_object()
|
|
largest = None
|
|
largest_area = -1
|
|
for _, obj_ref in xobject.items():
|
|
try:
|
|
obj = obj_ref.get_object()
|
|
if obj.get("/Subtype") != "/Image":
|
|
continue
|
|
width = int(obj.get("/Width", 0))
|
|
height = int(obj.get("/Height", 0))
|
|
area = width * height
|
|
if area > largest_area:
|
|
largest = obj
|
|
largest_area = area
|
|
except Exception:
|
|
continue
|
|
|
|
if largest is None:
|
|
continue
|
|
|
|
data = largest.get_data()
|
|
filt = largest.get("/Filter")
|
|
out_path = f"{pdf_path}_page{page_index}.png"
|
|
# If JPEG/JPX, write bytes directly; else convert via PIL
|
|
if filt in ("/DCTDecode",):
|
|
# JPEG
|
|
out_path = f"{pdf_path}_page{page_index}.jpg"
|
|
with open(out_path, "wb") as f:
|
|
f.write(data)
|
|
elif filt in ("/JPXDecode",):
|
|
out_path = f"{pdf_path}_page{page_index}.jp2"
|
|
with open(out_path, "wb") as f:
|
|
f.write(data)
|
|
else:
|
|
mode = "RGB"
|
|
colorspace = largest.get("/ColorSpace")
|
|
if colorspace in ("/DeviceGray",):
|
|
mode = "L"
|
|
width = int(largest.get("/Width", 0))
|
|
height = int(largest.get("/Height", 0))
|
|
try:
|
|
img = Image.frombytes(mode, (width, height), data)
|
|
except Exception:
|
|
# Best-effort decode via Pillow
|
|
img = Image.open(io.BytesIO(data))
|
|
img.save(out_path, format="PNG")
|
|
|
|
image_paths.append(out_path)
|
|
except Exception:
|
|
# Continue gracefully for problematic pages/objects
|
|
continue
|
|
return image_paths
|
|
except Exception as e:
|
|
raise ValueError(f"Could not extract images from PDF: {e}")
|
|
|
|
def _pdf_extract_text(self, pdf_path: str) -> list[str]:
|
|
"""Extract text per page using pdfplumber if available, else PyPDF2."""
|
|
texts: list[str] = []
|
|
try:
|
|
try:
|
|
import pdfplumber
|
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
for page in pdf.pages:
|
|
texts.append(page.extract_text() or "")
|
|
return texts
|
|
except Exception:
|
|
# Fallback to PyPDF2
|
|
reader = PdfReader(pdf_path)
|
|
for page in reader.pages: # type: ignore
|
|
texts.append(page.extract_text() or "")
|
|
return texts
|
|
except Exception as e:
|
|
raise ValueError(f"Could not extract text from PDF: {e}")
|
|
|
|
def _call_ollama_vision(self, prompt: str, image_base64: str) -> str:
|
|
payload = {
|
|
"model": self.model_name,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"images": [image_base64],
|
|
}
|
|
response = requests.post(self.base_url, json=payload)
|
|
response.raise_for_status()
|
|
return response.json().get("response", "") # type: ignore
|
|
|
|
def _call_openai_vision(self, prompt: str, image_base64: str) -> str:
|
|
if not self.openai_api_key:
|
|
raise ValueError("OPENAI_API_KEY not set")
|
|
# Compose chat.completions payload for GPT-4o/mini vision
|
|
payload = {
|
|
"model": self.model_name or "gpt-4o-mini",
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{image_base64}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
"temperature": 0,
|
|
}
|
|
headers = {
|
|
"Authorization": f"Bearer {self.openai_api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.post(self.openai_base_url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
try:
|
|
return data["choices"][0]["message"]["content"] # type: ignore
|
|
except Exception:
|
|
return json.dumps(data)
|
|
|
|
def _preprocess_image(self, image_path: str, language: str = "en") -> str:
|
|
"""
|
|
Preprocess image before OCR using Pillow + NumPy:
|
|
- Convert to grayscale
|
|
- Histogram equalization (contrast)
|
|
- Median denoise
|
|
- Otsu threshold and invert
|
|
"""
|
|
try:
|
|
with Image.open(image_path) as img:
|
|
if img.mode in ("RGBA", "LA"):
|
|
img = img.convert("RGB")
|
|
gray = img.convert("L")
|
|
|
|
# Histogram equalization via cumulative distribution
|
|
arr = np.asarray(gray)
|
|
hist, _ = np.histogram(arr.flatten(), 256, [0, 256]) # type: ignore
|
|
cdf = hist.cumsum()
|
|
cdf_masked = np.ma.masked_equal(cdf, 0) # type: ignore
|
|
cdf_min = cdf_masked.min() if cdf_masked.size else 0
|
|
cdf_max = cdf_masked.max() if cdf_masked.size else 0
|
|
if cdf_max == cdf_min:
|
|
eq = arr
|
|
else:
|
|
cdf_scaled = (cdf_masked - cdf_min) * 255 / (cdf_max - cdf_min)
|
|
lut = np.ma.filled(cdf_scaled, 0).astype("uint8")
|
|
eq = lut[arr]
|
|
|
|
eq_img = Image.fromarray(eq, mode="L")
|
|
# Median filter (3x3) to reduce noise
|
|
eq_img = eq_img.filter(ImageFilter.MedianFilter(size=3))
|
|
arr_eq = np.asarray(eq_img)
|
|
|
|
# Otsu threshold
|
|
hist2, _ = np.histogram(arr_eq, 256, [0, 256]) # type: ignore
|
|
total = arr_eq.size
|
|
sum_total = (np.arange(256) * hist2).sum()
|
|
sum_b = 0.0
|
|
w_b = 0.0
|
|
max_var = 0.0
|
|
thr = 0
|
|
for t in range(256):
|
|
w_b += hist2[t]
|
|
if w_b == 0:
|
|
continue
|
|
w_f = total - w_b
|
|
if w_f == 0:
|
|
break
|
|
sum_b += t * hist2[t]
|
|
m_b = sum_b / w_b
|
|
m_f = (sum_total - sum_b) / w_f
|
|
var_between = w_b * w_f * (m_b - m_f) ** 2
|
|
if var_between > max_var:
|
|
max_var = var_between
|
|
thr = t
|
|
|
|
binary = (arr_eq > thr).astype(np.uint8) * 255
|
|
# Invert: black text on white background
|
|
binary = 255 - binary
|
|
|
|
out_img = Image.fromarray(binary, mode="L")
|
|
preprocessed_path = f"{image_path}_preprocessed.jpg"
|
|
out_img.save(preprocessed_path, format="JPEG", quality=95)
|
|
return preprocessed_path
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to preprocess image {image_path}: {e}")
|
|
|
|
def process_image(
|
|
self,
|
|
image_path: str,
|
|
format_type: str = "markdown",
|
|
preprocess: bool = True,
|
|
custom_prompt: str | None = None,
|
|
language: str = "en",
|
|
) -> str:
|
|
"""
|
|
Process an image (or PDF) and extract text in the specified format
|
|
|
|
Args:
|
|
image_path: Path to the image file or PDF file
|
|
format_type: One of ["markdown", "text", "json", "structured", "key_value","custom"]
|
|
preprocess: Whether to apply image preprocessing
|
|
custom_prompt: If provided, this prompt overrides the default based on format_type
|
|
language: Language code to apply language specific OCR preprocessing
|
|
"""
|
|
try:
|
|
# If the input is a PDF, process all pages
|
|
if image_path.lower().endswith(".pdf"):
|
|
image_pages = self._pdf_to_images(image_path)
|
|
responses: list[str] = []
|
|
if image_pages:
|
|
for idx, page_file in enumerate(image_pages):
|
|
# Process each page with preprocessing if enabled
|
|
if preprocess:
|
|
preprocessed_path = self._preprocess_image(
|
|
page_file, language
|
|
)
|
|
else:
|
|
preprocessed_path = page_file
|
|
|
|
image_base64 = self._encode_image(preprocessed_path)
|
|
|
|
if custom_prompt and custom_prompt.strip():
|
|
prompt = custom_prompt
|
|
else:
|
|
prompts = {
|
|
"markdown": f"""Extract all text content from this image in {language} **exactly as it appears**, without modification, summarization, or omission.
|
|
Format the output in markdown:
|
|
- Use headers (#, ##, ###) **only if they appear in the image**
|
|
- Preserve original lists (-, *, numbered lists) as they are
|
|
- Maintain all text formatting (bold, italics, underlines) exactly as seen
|
|
- **Do not add, interpret, or restructure any content**
|
|
""",
|
|
"text": f"""Extract all visible text from this image in {language} **without any changes**.
|
|
- **Do not summarize, paraphrase, or infer missing text.**
|
|
- Retain all spacing, punctuation, and formatting exactly as in the image.
|
|
- If text is unclear or partially visible, extract as much as possible without guessing.
|
|
- **Include all text, even if it seems irrelevant or repeated.**
|
|
""",
|
|
"json": f"""Extract all text from this image in {language} and format it as JSON, **strictly preserving** the structure.
|
|
- **Do not summarize, add, or modify any text.**
|
|
- Maintain hierarchical sections and subsections as they appear.
|
|
- Use keys that reflect the document's actual structure (e.g., "title", "body", "footer").
|
|
- Include all text, even if fragmented, blurry, or unclear.
|
|
""",
|
|
"structured": f"""Extract all text from this image in {language}, **ensuring complete structural accuracy**:
|
|
- Identify and format tables **without altering content**.
|
|
- Preserve list structures (bulleted, numbered) **exactly as shown**.
|
|
- Maintain all section headings, indents, and alignments.
|
|
- **Do not add, infer, or restructure the content in any way.**
|
|
""",
|
|
"key_value": f"""Extract all key-value pairs from this image in {language} **exactly as they appear**:
|
|
- Identify and extract labels and their corresponding values without modification.
|
|
- Maintain the exact wording, punctuation, and order.
|
|
- Format each pair as 'key: value' **only if clearly structured that way in the image**.
|
|
- **Do not infer missing values or add any extra text.**
|
|
""",
|
|
"table": f"""Extract all tabular data from this image in {language} **exactly as it appears**, without modification, summarization, or omission.
|
|
- **Preserve the table structure** (rows, columns, headers) as closely as possible.
|
|
- **Do not add missing values or infer content**—if a cell is empty, leave it empty.
|
|
- Maintain all numerical, textual, and special character formatting.
|
|
- If the table contains merged cells, indicate them clearly without altering their meaning.
|
|
- Output the table in a structured format such as Markdown, CSV, or JSON, based on the intended use.
|
|
""",
|
|
}
|
|
prompt = prompts.get(format_type, prompts["text"])
|
|
|
|
# Route to chosen provider
|
|
if self.provider == "openai":
|
|
res = self._call_openai_vision(prompt, image_base64)
|
|
else:
|
|
res = self._call_ollama_vision(prompt, image_base64)
|
|
|
|
responses.append(f"Page {idx + 1}:\n{res}")
|
|
|
|
# Clean up temporary files
|
|
if preprocess and preprocessed_path.endswith(
|
|
"_preprocessed.jpg"
|
|
):
|
|
try:
|
|
os.remove(preprocessed_path)
|
|
except OSError:
|
|
pass
|
|
if page_file.endswith((".png", ".jpg", ".jp2")):
|
|
try:
|
|
os.remove(page_file)
|
|
except OSError:
|
|
pass
|
|
|
|
final_result = "\n".join(responses)
|
|
if format_type == "json":
|
|
try:
|
|
json_data = json.loads(final_result)
|
|
return json.dumps(json_data, indent=2)
|
|
except json.JSONDecodeError:
|
|
return final_result
|
|
return final_result
|
|
else:
|
|
# Fallback: no images found; extract raw text per page
|
|
text_pages = self._pdf_extract_text(image_path)
|
|
combined = []
|
|
for i, t in enumerate(text_pages):
|
|
combined.append(f"Page {i + 1}:\n{t}")
|
|
return "\n".join(combined)
|
|
|
|
# Process non-PDF images as before.
|
|
if preprocess:
|
|
image_path = self._preprocess_image(image_path, language)
|
|
|
|
image_base64 = self._encode_image(image_path)
|
|
|
|
# Clean up temporary files
|
|
if image_path.endswith(("_preprocessed.jpg", "_temp.jpg")):
|
|
os.remove(image_path)
|
|
|
|
if custom_prompt and custom_prompt.strip():
|
|
prompt = custom_prompt
|
|
print("Using custom prompt:", prompt)
|
|
else:
|
|
prompts = {
|
|
"markdown": f"""Extract all text content from this image in {language} **exactly as it appears**, without modification, summarization, or omission.
|
|
Format the output in markdown:
|
|
- Use headers (#, ##, ###) **only if they appear in the image**
|
|
- Preserve original lists (-, *, numbered lists) as they are
|
|
- Maintain all text formatting (bold, italics, underlines) exactly as seen
|
|
- **Do not add, interpret, or restructure any content**
|
|
""",
|
|
"text": f"""Extract all visible text from this image in {language} **without any changes**.
|
|
- **Do not summarize, paraphrase, or infer missing text.**
|
|
- Retain all spacing, punctuation, and formatting exactly as in the image.
|
|
- If text is unclear or partially visible, extract as much as possible without guessing.
|
|
- **Include all text, even if it seems irrelevant or repeated.**
|
|
""",
|
|
"json": f"""Extract all text from this image in {language} and format it as JSON, **strictly preserving** the structure.
|
|
- **Do not summarize, add, or modify any text.**
|
|
- Maintain hierarchical sections and subsections as they appear.
|
|
- Use keys that reflect the document's actual structure (e.g., "title", "body", "footer").
|
|
- Include all text, even if fragmented, blurry, or unclear.
|
|
""",
|
|
"structured": f"""Extract all text from this image in {language}, **ensuring complete structural accuracy**:
|
|
- Identify and format tables **without altering content**.
|
|
- Preserve list structures (bulleted, numbered) **exactly as shown**.
|
|
- Maintain all section headings, indents, and alignments.
|
|
- **Do not add, infer, or restructure the content in any way.**
|
|
""",
|
|
"key_value": f"""Extract all key-value pairs from this image in {language} **exactly as they appear**:
|
|
- Identify and extract labels and their corresponding values without modification.
|
|
- Maintain the exact wording, punctuation, and order.
|
|
- Format each pair as 'key: value' **only if clearly structured that way in the image**.
|
|
- **Do not infer missing values or add any extra text.**
|
|
""",
|
|
"table": f"""Extract all tabular data from this image in {language} **exactly as it appears**, without modification, summarization, or omission.
|
|
- **Preserve the table structure** (rows, columns, headers) as closely as possible.
|
|
- **Do not add missing values or infer content**—if a cell is empty, leave it empty.
|
|
- Maintain all numerical, textual, and special character formatting.
|
|
- If the table contains merged cells, indicate them clearly without altering their meaning.
|
|
- Output the table in a structured format such as Markdown, CSV, or JSON, based on the intended use.
|
|
""",
|
|
}
|
|
prompt = prompts.get(format_type, prompts["text"])
|
|
print("Using default prompt:", prompt) # Debug print
|
|
|
|
# Call chosen provider with single image
|
|
if self.provider == "openai":
|
|
result = self._call_openai_vision(prompt, image_base64)
|
|
else:
|
|
result = self._call_ollama_vision(prompt, image_base64)
|
|
|
|
if format_type == "json":
|
|
try:
|
|
json_data = json.loads(result)
|
|
return json.dumps(json_data, indent=2)
|
|
except json.JSONDecodeError:
|
|
return str(result)
|
|
|
|
return str(result)
|
|
except Exception as e:
|
|
return f"Error processing image: {str(e)}"
|
|
|
|
def process_batch(
|
|
self,
|
|
input_path: str | list[str],
|
|
format_type: str = "markdown",
|
|
recursive: bool = False,
|
|
preprocess: bool = True,
|
|
custom_prompt: str | None = None,
|
|
language: str = "en",
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Process multiple images in batch
|
|
|
|
Args:
|
|
input_path: Path to directory or list of image paths
|
|
format_type: Output format type
|
|
recursive: Whether to search directories recursively
|
|
preprocess: Whether to apply image preprocessing
|
|
custom_prompt: If provided, this prompt overrides the default for each image
|
|
language: Language code to apply language specific OCR preprocessing
|
|
|
|
Returns:
|
|
Dictionary with results and statistics
|
|
"""
|
|
# Collect all image paths
|
|
image_paths: list[str | Path] = []
|
|
if isinstance(input_path, str):
|
|
base_path = Path(input_path)
|
|
if base_path.is_dir():
|
|
pattern = "**/*" if recursive else "*"
|
|
for ext in [".png", ".jpg", ".jpeg", ".pdf", ".tiff"]:
|
|
image_paths.extend(base_path.glob(f"{pattern}{ext}"))
|
|
else:
|
|
image_paths = [base_path]
|
|
else:
|
|
image_paths = [Path(p) for p in input_path]
|
|
|
|
results = {}
|
|
errors = {}
|
|
|
|
# Process images in parallel
|
|
with concurrent.futures.ThreadPoolExecutor(
|
|
max_workers=self.max_workers
|
|
) as executor:
|
|
future_to_path = {
|
|
executor.submit(
|
|
self.process_image,
|
|
str(path),
|
|
format_type,
|
|
preprocess,
|
|
custom_prompt,
|
|
language,
|
|
): path
|
|
for path in image_paths
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_path):
|
|
path = future_to_path[future]
|
|
try:
|
|
results[str(path)] = future.result()
|
|
except Exception as e:
|
|
errors[str(path)] = str(e)
|
|
# pbar.update(1)
|
|
|
|
return {
|
|
"results": results,
|
|
"errors": errors,
|
|
"statistics": {
|
|
"total": len(image_paths),
|
|
"successful": len(results),
|
|
"failed": len(errors),
|
|
},
|
|
}
|