import base64 import concurrent.futures import io import json import os from pathlib import Path from typing import Any import numpy as np import requests from PIL import Image, ImageFilter from PyPDF2 import PdfReader class OCRProcessor: def __init__( self, model_name: str = "llama3.2-vision:11b", base_url: str = "http://localhost:11434/api/generate", max_workers: int = 1, provider: str = "ollama", openai_api_key: str | None = None, openai_base_url: str = "https://api.openai.com/v1/chat/completions", ): self.model_name = model_name self.base_url = base_url self.max_workers = max_workers self.provider = provider.lower() self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY") self.openai_base_url = openai_base_url def _encode_image(self, image_path: str) -> str: """Convert image to base64 string""" with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") def _pdf_to_images(self, pdf_path: str) -> list[str]: """ Convert each page of a PDF to an image without PyMuPDF. Strategy: extract largest embedded image per page via PyPDF2. Saves each selected image as a temporary PNG and returns paths. Note: Text-only pages with no embedded images will be skipped here. Use _pdf_extract_text as a fallback for such pages. """ image_paths: list[str] = [] try: reader = PdfReader(pdf_path) for page_index, page in enumerate(reader.pages): try: resources = page.get("/Resources") if resources is None: continue xobject = resources.get("/XObject") if xobject is None: continue xobject = xobject.get_object() largest = None largest_area = -1 for _, obj_ref in xobject.items(): try: obj = obj_ref.get_object() if obj.get("/Subtype") != "/Image": continue width = int(obj.get("/Width", 0)) height = int(obj.get("/Height", 0)) area = width * height if area > largest_area: largest = obj largest_area = area except Exception: continue if largest is None: continue data = largest.get_data() filt = largest.get("/Filter") out_path = f"{pdf_path}_page{page_index}.png" # If JPEG/JPX, write bytes directly; else convert via PIL if filt in ("/DCTDecode",): # JPEG out_path = f"{pdf_path}_page{page_index}.jpg" with open(out_path, "wb") as f: f.write(data) elif filt in ("/JPXDecode",): out_path = f"{pdf_path}_page{page_index}.jp2" with open(out_path, "wb") as f: f.write(data) else: mode = "RGB" colorspace = largest.get("/ColorSpace") if colorspace in ("/DeviceGray",): mode = "L" width = int(largest.get("/Width", 0)) height = int(largest.get("/Height", 0)) try: img = Image.frombytes(mode, (width, height), data) except Exception: # Best-effort decode via Pillow img = Image.open(io.BytesIO(data)) img.save(out_path, format="PNG") image_paths.append(out_path) except Exception: # Continue gracefully for problematic pages/objects continue return image_paths except Exception as e: raise ValueError(f"Could not extract images from PDF: {e}") def _pdf_extract_text(self, pdf_path: str) -> list[str]: """Extract text per page using pdfplumber if available, else PyPDF2.""" texts: list[str] = [] try: try: import pdfplumber with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: texts.append(page.extract_text() or "") return texts except Exception: # Fallback to PyPDF2 reader = PdfReader(pdf_path) for page in reader.pages: # type: ignore texts.append(page.extract_text() or "") return texts except Exception as e: raise ValueError(f"Could not extract text from PDF: {e}") def _call_ollama_vision(self, prompt: str, image_base64: str) -> str: payload = { "model": self.model_name, "prompt": prompt, "stream": False, "images": [image_base64], } response = requests.post(self.base_url, json=payload) response.raise_for_status() return response.json().get("response", "") # type: ignore def _call_openai_vision(self, prompt: str, image_base64: str) -> str: if not self.openai_api_key: raise ValueError("OPENAI_API_KEY not set") # Compose chat.completions payload for GPT-4o/mini vision payload = { "model": self.model_name or "gpt-4o-mini", "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_base64}", }, }, ], } ], "temperature": 0, } headers = { "Authorization": f"Bearer {self.openai_api_key}", "Content-Type": "application/json", } response = requests.post(self.openai_base_url, headers=headers, json=payload) response.raise_for_status() data = response.json() try: return data["choices"][0]["message"]["content"] # type: ignore except Exception: return json.dumps(data) def _preprocess_image(self, image_path: str, language: str = "en") -> str: """ Preprocess image before OCR using Pillow + NumPy: - Convert to grayscale - Histogram equalization (contrast) - Median denoise - Otsu threshold and invert """ try: with Image.open(image_path) as img: if img.mode in ("RGBA", "LA"): img = img.convert("RGB") gray = img.convert("L") # Histogram equalization via cumulative distribution arr = np.asarray(gray) hist, _ = np.histogram(arr.flatten(), 256, [0, 256]) # type: ignore cdf = hist.cumsum() cdf_masked = np.ma.masked_equal(cdf, 0) # type: ignore cdf_min = cdf_masked.min() if cdf_masked.size else 0 cdf_max = cdf_masked.max() if cdf_masked.size else 0 if cdf_max == cdf_min: eq = arr else: cdf_scaled = (cdf_masked - cdf_min) * 255 / (cdf_max - cdf_min) lut = np.ma.filled(cdf_scaled, 0).astype("uint8") eq = lut[arr] eq_img = Image.fromarray(eq, mode="L") # Median filter (3x3) to reduce noise eq_img = eq_img.filter(ImageFilter.MedianFilter(size=3)) arr_eq = np.asarray(eq_img) # Otsu threshold hist2, _ = np.histogram(arr_eq, 256, [0, 256]) # type: ignore total = arr_eq.size sum_total = (np.arange(256) * hist2).sum() sum_b = 0.0 w_b = 0.0 max_var = 0.0 thr = 0 for t in range(256): w_b += hist2[t] if w_b == 0: continue w_f = total - w_b if w_f == 0: break sum_b += t * hist2[t] m_b = sum_b / w_b m_f = (sum_total - sum_b) / w_f var_between = w_b * w_f * (m_b - m_f) ** 2 if var_between > max_var: max_var = var_between thr = t binary = (arr_eq > thr).astype(np.uint8) * 255 # Invert: black text on white background binary = 255 - binary out_img = Image.fromarray(binary, mode="L") preprocessed_path = f"{image_path}_preprocessed.jpg" out_img.save(preprocessed_path, format="JPEG", quality=95) return preprocessed_path except Exception as e: raise ValueError(f"Failed to preprocess image {image_path}: {e}") def process_image( self, image_path: str, format_type: str = "markdown", preprocess: bool = True, custom_prompt: str | None = None, language: str = "en", ) -> str: """ Process an image (or PDF) and extract text in the specified format Args: image_path: Path to the image file or PDF file format_type: One of ["markdown", "text", "json", "structured", "key_value","custom"] preprocess: Whether to apply image preprocessing custom_prompt: If provided, this prompt overrides the default based on format_type language: Language code to apply language specific OCR preprocessing """ try: # If the input is a PDF, process all pages if image_path.lower().endswith(".pdf"): image_pages = self._pdf_to_images(image_path) responses: list[str] = [] if image_pages: for idx, page_file in enumerate(image_pages): # Process each page with preprocessing if enabled if preprocess: preprocessed_path = self._preprocess_image( page_file, language ) else: preprocessed_path = page_file image_base64 = self._encode_image(preprocessed_path) if custom_prompt and custom_prompt.strip(): prompt = custom_prompt else: prompts = { "markdown": f"""Extract all text content from this image in {language} **exactly as it appears**, without modification, summarization, or omission. Format the output in markdown: - Use headers (#, ##, ###) **only if they appear in the image** - Preserve original lists (-, *, numbered lists) as they are - Maintain all text formatting (bold, italics, underlines) exactly as seen - **Do not add, interpret, or restructure any content** """, "text": f"""Extract all visible text from this image in {language} **without any changes**. - **Do not summarize, paraphrase, or infer missing text.** - Retain all spacing, punctuation, and formatting exactly as in the image. - If text is unclear or partially visible, extract as much as possible without guessing. - **Include all text, even if it seems irrelevant or repeated.** """, "json": f"""Extract all text from this image in {language} and format it as JSON, **strictly preserving** the structure. - **Do not summarize, add, or modify any text.** - Maintain hierarchical sections and subsections as they appear. - Use keys that reflect the document's actual structure (e.g., "title", "body", "footer"). - Include all text, even if fragmented, blurry, or unclear. """, "structured": f"""Extract all text from this image in {language}, **ensuring complete structural accuracy**: - Identify and format tables **without altering content**. - Preserve list structures (bulleted, numbered) **exactly as shown**. - Maintain all section headings, indents, and alignments. - **Do not add, infer, or restructure the content in any way.** """, "key_value": f"""Extract all key-value pairs from this image in {language} **exactly as they appear**: - Identify and extract labels and their corresponding values without modification. - Maintain the exact wording, punctuation, and order. - Format each pair as 'key: value' **only if clearly structured that way in the image**. - **Do not infer missing values or add any extra text.** """, "table": f"""Extract all tabular data from this image in {language} **exactly as it appears**, without modification, summarization, or omission. - **Preserve the table structure** (rows, columns, headers) as closely as possible. - **Do not add missing values or infer content**—if a cell is empty, leave it empty. - Maintain all numerical, textual, and special character formatting. - If the table contains merged cells, indicate them clearly without altering their meaning. - Output the table in a structured format such as Markdown, CSV, or JSON, based on the intended use. """, } prompt = prompts.get(format_type, prompts["text"]) # Route to chosen provider if self.provider == "openai": res = self._call_openai_vision(prompt, image_base64) else: res = self._call_ollama_vision(prompt, image_base64) responses.append(f"Page {idx + 1}:\n{res}") # Clean up temporary files if preprocess and preprocessed_path.endswith( "_preprocessed.jpg" ): try: os.remove(preprocessed_path) except OSError: pass if page_file.endswith((".png", ".jpg", ".jp2")): try: os.remove(page_file) except OSError: pass final_result = "\n".join(responses) if format_type == "json": try: json_data = json.loads(final_result) return json.dumps(json_data, indent=2) except json.JSONDecodeError: return final_result return final_result else: # Fallback: no images found; extract raw text per page text_pages = self._pdf_extract_text(image_path) combined = [] for i, t in enumerate(text_pages): combined.append(f"Page {i + 1}:\n{t}") return "\n".join(combined) # Process non-PDF images as before. if preprocess: image_path = self._preprocess_image(image_path, language) image_base64 = self._encode_image(image_path) # Clean up temporary files if image_path.endswith(("_preprocessed.jpg", "_temp.jpg")): os.remove(image_path) if custom_prompt and custom_prompt.strip(): prompt = custom_prompt print("Using custom prompt:", prompt) else: prompts = { "markdown": f"""Extract all text content from this image in {language} **exactly as it appears**, without modification, summarization, or omission. Format the output in markdown: - Use headers (#, ##, ###) **only if they appear in the image** - Preserve original lists (-, *, numbered lists) as they are - Maintain all text formatting (bold, italics, underlines) exactly as seen - **Do not add, interpret, or restructure any content** """, "text": f"""Extract all visible text from this image in {language} **without any changes**. - **Do not summarize, paraphrase, or infer missing text.** - Retain all spacing, punctuation, and formatting exactly as in the image. - If text is unclear or partially visible, extract as much as possible without guessing. - **Include all text, even if it seems irrelevant or repeated.** """, "json": f"""Extract all text from this image in {language} and format it as JSON, **strictly preserving** the structure. - **Do not summarize, add, or modify any text.** - Maintain hierarchical sections and subsections as they appear. - Use keys that reflect the document's actual structure (e.g., "title", "body", "footer"). - Include all text, even if fragmented, blurry, or unclear. """, "structured": f"""Extract all text from this image in {language}, **ensuring complete structural accuracy**: - Identify and format tables **without altering content**. - Preserve list structures (bulleted, numbered) **exactly as shown**. - Maintain all section headings, indents, and alignments. - **Do not add, infer, or restructure the content in any way.** """, "key_value": f"""Extract all key-value pairs from this image in {language} **exactly as they appear**: - Identify and extract labels and their corresponding values without modification. - Maintain the exact wording, punctuation, and order. - Format each pair as 'key: value' **only if clearly structured that way in the image**. - **Do not infer missing values or add any extra text.** """, "table": f"""Extract all tabular data from this image in {language} **exactly as it appears**, without modification, summarization, or omission. - **Preserve the table structure** (rows, columns, headers) as closely as possible. - **Do not add missing values or infer content**—if a cell is empty, leave it empty. - Maintain all numerical, textual, and special character formatting. - If the table contains merged cells, indicate them clearly without altering their meaning. - Output the table in a structured format such as Markdown, CSV, or JSON, based on the intended use. """, } prompt = prompts.get(format_type, prompts["text"]) print("Using default prompt:", prompt) # Debug print # Call chosen provider with single image if self.provider == "openai": result = self._call_openai_vision(prompt, image_base64) else: result = self._call_ollama_vision(prompt, image_base64) if format_type == "json": try: json_data = json.loads(result) return json.dumps(json_data, indent=2) except json.JSONDecodeError: return str(result) return str(result) except Exception as e: return f"Error processing image: {str(e)}" def process_batch( self, input_path: str | list[str], format_type: str = "markdown", recursive: bool = False, preprocess: bool = True, custom_prompt: str | None = None, language: str = "en", ) -> dict[str, Any]: """ Process multiple images in batch Args: input_path: Path to directory or list of image paths format_type: Output format type recursive: Whether to search directories recursively preprocess: Whether to apply image preprocessing custom_prompt: If provided, this prompt overrides the default for each image language: Language code to apply language specific OCR preprocessing Returns: Dictionary with results and statistics """ # Collect all image paths image_paths: list[str | Path] = [] if isinstance(input_path, str): base_path = Path(input_path) if base_path.is_dir(): pattern = "**/*" if recursive else "*" for ext in [".png", ".jpg", ".jpeg", ".pdf", ".tiff"]: image_paths.extend(base_path.glob(f"{pattern}{ext}")) else: image_paths = [base_path] else: image_paths = [Path(p) for p in input_path] results = {} errors = {} # Process images in parallel with concurrent.futures.ThreadPoolExecutor( max_workers=self.max_workers ) as executor: future_to_path = { executor.submit( self.process_image, str(path), format_type, preprocess, custom_prompt, language, ): path for path in image_paths } for future in concurrent.futures.as_completed(future_to_path): path = future_to_path[future] try: results[str(path)] = future.result() except Exception as e: errors[str(path)] = str(e) # pbar.update(1) return { "results": results, "errors": errors, "statistics": { "total": len(image_paths), "successful": len(results), "failed": len(errors), }, }