Skip to content

🐍 Python API Reference

Welcome to the OmniDocs Python API Reference! This page provides live, auto-generated documentation for every major module, extractor, and utility in the OmniDocs ecosystem. Use this as your single source of truth for all classes, functions, and configuration options.


πŸ“¦ Core Package

The main OmniDocs package provides the top-level API, configuration, and shared utilities.

omnidocs


🧩 Tasks & Extractors

OmniDocs organizes all document AI into modular tasks. Each task has its own extractors, which you can import and use directly. Click any section below to expand the full API for that task.

πŸ“ Layout Analysis

Detect and analyze document structure, regions, and reading order.

omnidocs.tasks.layout_analysis

FlorenceLayoutDetector

FlorenceLayoutDetector(device: Optional[str] = None, show_log: bool = False, trust_remote_code: bool = True, **kwargs)

Bases: BaseLayoutDetector

Florence-based layout detection implementation.

Initialize Florence Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    trust_remote_code: bool = True,
    **kwargs
):
    """Initialize Florence Layout Detector."""
    super().__init__(show_log=show_log)

    # Initialize label mapper
    self._label_mapper = FlorenceLayoutMapper()

    logger.info("Initializing FlorenceLayoutDetector")

    if device:
        self.device = device
    logger.info(f"Using device: {self.device}")

    try:
        from transformers import AutoProcessor, AutoModelForCausalLM
    except ImportError as ex:
        logger.error("Failed to import transformers")
        raise ImportError(
            "transformers is not available. Please install it with: pip install transformers"
        ) from ex

    # Initialize the model and processor
    try:
        self.model = AutoModelForCausalLM.from_pretrained(
            self.MODEL_REPO,
            trust_remote_code=trust_remote_code,
            **kwargs
        )
        self.processor = AutoProcessor.from_pretrained(
            self.MODEL_REPO,
            trust_remote_code=trust_remote_code
        )
        self.model.to(self.device)
        logger.success("Model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], max_new_tokens: int = 1024, do_sample: bool = False, num_beams: int = 3, **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    max_new_tokens: int = 1024,
    do_sample: bool = False,
    num_beams: int = 3,
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    try:
        # Load and preprocess input
        image = Image.open(input_path).convert("RGB")

        # Prepare inputs
        prompt = "<OD>"
        inputs = self.processor(
            text=prompt,
            images=image,
            return_tensors="pt"
        ).to(self.device)

        # Generate predictions
        generated_ids = self.model.generate(
            input_ids=inputs["input_ids"],
            pixel_values=inputs["pixel_values"],
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            num_beams=num_beams,
            **kwargs
        )

        # Decode and post-process
        generated_text = self.processor.batch_decode(
            generated_ids,
            skip_special_tokens=False
        )[0]

        parsed_result = self.processor.post_process_generation(
            generated_text,
            task="<OD>",
            image_size=(image.width, image.height)
        )

        # Convert to standard format
        layout_boxes = []
        for bbox, label in zip(
            parsed_result["<OD>"]["bboxes"],
            parsed_result["<OD>"]["labels"]
        ):
            mapped_label = self.map_label(label.lower())
            if mapped_label:
                layout_boxes.append(
                    LayoutBox(
                        label=mapped_label,
                        bbox=[float(coord) for coord in bbox],
                        confidence=None  # Florence model doesn't provide confidence scores
                    )
                )

        # Create annotated image
        annotated_img = image.copy()
        draw = ImageDraw.Draw(annotated_img)

        # Draw boxes and labels
        for box in layout_boxes:
            color = self.color_map.get(box.label, 'gray')
            coords = box.bbox
            draw.rectangle(coords, outline=color, width=3)
            draw.text((coords[0], coords[1]-20), box.label, fill=color)

        return annotated_img, LayoutOutput(bboxes=layout_boxes)

    except Exception as e:
        logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save annotated image and layout data to files.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save annotated image and layout data to files.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    super().visualize(detection_result, output_path)

PaddleLayoutDetector

PaddleLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)

Bases: BaseLayoutDetector

PaddleOCR-based layout detection implementation.

Initialize PaddleOCR Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
def __init__(
    self, 
    device: Optional[str] = None,
    show_log: bool = False,
    **kwargs
):
    """Initialize PaddleOCR Layout Detector."""
    super().__init__()

    # Initialize label mapper
    self._label_mapper = PaddleLayoutMapper()

    # Log initialization
    logger.info("Initializing PaddleLayoutDetector")

    # Set device if specified
    if device:
        self.device = device
    logger.info(f"Using device: {self.device}")

    try:
        from paddleocr import PPStructure
    except ImportError as ex:
        logger.error("Failed to import paddleocr")
        raise ImportError(
            "paddleocr is not available. Please install it with: pip install paddleocr"
        ) from ex


    # Initialize the model
    try:
        self.model = PPStructure(
            table=True,
            ocr=True,
            show_log=show_log,
            **kwargs
        )
        logger.success("Model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
@log_execution_time
def detect(
    self, 
    input_path: Union[str, Path], 
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    try:
        # Load and preprocess input
        images = self.preprocess_input(input_path)

        results = []
        for img in images:
            # Get detection results
            det_result = self.model(img)

            # Convert to PIL Image if needed
            if isinstance(img, np.ndarray):
                img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

            # Create annotated image
            annotated_img = img.copy()
            draw = ImageDraw.Draw(annotated_img)

            # Convert detection results to LayoutBox objects with standardized labels
            layout_boxes = []

            for block in det_result:
                # Extract coordinates and type
                x1, y1, x2, y2 = block['bbox']
                model_label = block['type']
                mapped_label = self.map_label(model_label)

                if mapped_label:  # Only include boxes with valid mapped labels
                    layout_boxes.append(
                        LayoutBox(
                            label=mapped_label,
                            bbox=[float(x1), float(y1), float(x2), float(y2)],
                            confidence=block.get('confidence', None)
                        )
                    )

                    # Draw with standardized colors
                    color = self.color_map.get(mapped_label, 'gray')
                    draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
                    draw.text((x1, y1-20), mapped_label, fill=color)

            results.append((
                annotated_img,
                LayoutOutput(bboxes=layout_boxes)
            ))

        return results[0] if results else (None, LayoutOutput(bboxes=[]))

    except Exception as e:
        logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save annotated image and layout data to files.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save annotated image and layout data to files.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    super().visualize(detection_result, output_path)

RTDETRLayoutDetector

RTDETRLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, num_threads: Optional[int] = 4, use_cpu_only: bool = True)

Bases: BaseLayoutDetector

RT-DETR-based layout detection implementation.

Initialize RT-DETR Layout Detector with careful device handling.

Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[Union[str, Path]] = None,
    num_threads: Optional[int] = 4,
    use_cpu_only: bool = True
):
    """Initialize RT-DETR Layout Detector with careful device handling."""
    super().__init__(show_log=show_log)

    self._label_mapper = RTDETRLayoutMapper()

    if self.show_log:
        logger.info("Initializing RTDETRLayoutDetector")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "rtdetr_layout" / self.MODEL_REPO.replace("/", "_")

    self.model_path = Path(model_path)
    self.num_threads = num_threads

    # Careful device handling
    if use_cpu_only:
        self.device = "cpu"
        if self.show_log:
            logger.info("Forced CPU usage due to use_cpu_only flag")
    elif device:
        self.device = device
        if self.show_log:
            logger.info(f"Using specified device: {device}")
    else:
        # Check CUDA availability with error handling
        try:
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            if self.show_log:
                logger.info(f"Automatically selected device: {self.device}")
        except Exception as e:
            self.device = "cpu"
            if self.show_log:
                logger.warning(f"Error checking CUDA availability: {e}. Defaulting to CPU")

    self.num_threads = num_threads or int(os.environ.get("OMP_NUM_THREADS", 4))

    # Set thread count for CPU operations
    if self.device == "cpu":
        torch.set_num_threads(self.num_threads)
        if self.show_log:
            logger.info(f"Set CPU threads to {self.num_threads}")

    # Model parameters
    self.image_size = 640
    self.confidence_threshold = 0.6

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self._model_exists():
        if self.show_log:
            logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
        self._download_model()

    # Load model
    try:
        self._load_model()
        if self.show_log:
            logger.success("Model initialized successfully")
    except Exception as e:
        if self.show_log:
            logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], confidence_threshold: Optional[float] = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection using RT-DETR Transformers model.

Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    confidence_threshold: Optional[float] = None,
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection using RT-DETR Transformers model."""
    if self.model is None:
        raise RuntimeError("Model not loaded. Initialization failed.")

    try:
        # Load and preprocess image
        if isinstance(input_path, (str, Path)):
            image = Image.open(input_path).convert("RGB")
        elif isinstance(input_path, Image.Image):
            image = input_path.convert("RGB")
        elif isinstance(input_path, np.ndarray):
            image = Image.fromarray(input_path).convert("RGB")
        else:
            raise ValueError("Unsupported input type")

        # Preprocess the image using the image processor
        resize = {"height": self.image_size, "width": self.image_size}
        inputs = self.image_processor(
            images=image,
            return_tensors="pt",
            size=resize,
        )

        # Move inputs to the correct device
        if self.device == "cuda":
            inputs = {k: v.cuda() if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}

        # Run inference
        try:
            with torch.no_grad():
                outputs = self.model(**inputs)
        except Exception as e:
            raise RuntimeError(f"Error during model inference: {e}") from e 

        # Post-process results
        threshold = confidence_threshold or self.confidence_threshold
        results = self.image_processor.post_process_object_detection(
            outputs,
            target_sizes=torch.tensor([image.size[::-1]]),
            threshold=threshold
        )

        # Process predictions
        layout_boxes = []

        for result in results:
            for score, label_id, box in zip(result["scores"], result["labels"], result["boxes"]):
                score_val = float(score.item())
                label_idx = int(label_id.item())

                # Get label from model config (add 1 because model config is 0-indexed)
                model_label = self.model.config.id2label.get(label_idx + 1)
                if not model_label:
                    continue

                # Map to standardized label
                mapped_label = self.map_label(model_label)
                if not mapped_label:
                    continue

                # Convert box coordinates (already in image space)
                box = [round(i, 2) for i in box.tolist()]
                left, top, right, bottom = box

                layout_boxes.append(
                    LayoutBox(
                        label=mapped_label,
                        bbox=[left, top, right, bottom],
                        confidence=score_val
                    )
                )

        # Create annotated image
        annotated_img = image.copy()
        draw = ImageDraw.Draw(annotated_img)

        # Draw boxes with standardized colors
        for box in layout_boxes:
            color = self.color_map.get(box.label, 'gray')
            coords = box.bbox
            draw.rectangle(coords, outline=color, width=3)
            draw.text((coords[0], coords[1]-20), box.label, fill=color)

        return annotated_img, LayoutOutput(bboxes=layout_boxes)

    except Exception as e:
        if self.show_log:
            logger.error("Error during prediction", exc_info=True)
        raise

SuryaLayoutDetector

SuryaLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)

Bases: BaseLayoutDetector

Surya-based layout detection implementation.

Initialize Surya Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    **kwargs
):
    """Initialize Surya Layout Detector."""
    super().__init__(show_log=show_log)

    # Initialize label mapper
    self._label_mapper = SuryaLayoutMapper()

    if self.show_log:
        logger.info("Initializing SuryaLayoutDetector")

    # Set device if specified, otherwise use default from parent
    if device:
        self.device = device

    if self.show_log:
        logger.info(f"Using device: {self.device}")

    try:
        # Import required libraries - use new API
        import surya
        if self.show_log:
            logger.info(f"Found surya package at: {surya.__file__}")
    except ImportError as ex:
        if self.show_log:
            logger.error("Failed to import surya")
        raise ImportError(
            "surya is not available. Please install it with: pip install surya-ocr"
        ) from ex

    try:
        # Initialize detection and layout models using new API
        from surya.layout import LayoutPredictor

        self.layout_predictor = LayoutPredictor()

        if self.show_log:
            logger.success("Models initialized successfully")

    except Exception as e:
        if self.show_log:
            logger.error("Failed to initialize models", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    try:
        # Load and preprocess input
        if isinstance(input_path, (str, Path)):
            image = Image.open(input_path).convert("RGB")
        elif isinstance(input_path, Image.Image):
            image = input_path.convert("RGB")
        elif isinstance(input_path, np.ndarray):
            image = Image.fromarray(input_path).convert("RGB")
        else:
            raise ValueError("Unsupported input type")

        # Run layout detection using new API
        layout_predictions = self.layout_predictor([image])

        # Process the layout prediction (take first since we only processed one image)
        layout_pred = layout_predictions[0]

        # Convert to standardized format
        layout_boxes = []
        for box in layout_pred.bboxes:
            mapped_label = self.map_label(box.label)
            if mapped_label:
                layout_boxes.append(
                    LayoutBox(
                        label=mapped_label,
                        bbox=box.bbox,  # Already in [x1, y1, x2, y2] format
                        confidence=box.confidence
                    )
                )

        # Create annotated image
        annotated_img = image.copy()
        draw = ImageDraw.Draw(annotated_img)

        # Draw boxes with standardized colors
        for box in layout_boxes:
            color = self.color_map.get(box.label, 'gray')
            coords = box.bbox
            draw.rectangle(coords, outline=color, width=3)
            draw.text((coords[0], coords[1]-20), box.label, fill=color)

        # Create LayoutOutput with image size
        layout_output = LayoutOutput(
            bboxes=layout_boxes,
            image_size=image.size
        )

        return annotated_img, layout_output

    except Exception as e:
        if self.show_log:
            logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save annotated image and layout data to files.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save annotated image and layout data to files.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    super().visualize(detection_result, output_path)

YOLOLayoutDetector

YOLOLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None)

Bases: BaseLayoutDetector

YOLO-based layout detection implementation.

Initialize YOLO Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[Union[str, Path]] = None
):
    """Initialize YOLO Layout Detector."""
    super().__init__(show_log=show_log)

    self._label_mapper = YOLOLayoutMapper()
    if self.show_log:
        logger.info(f"Initializing YOLOLayoutDetector")

    if device:
        self.device = device
    if self.show_log:
        logger.info(f"Using device: {self.device}")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "yolo_layout" / self.MODEL_REPO.replace("/", "_")

    self.model_path = Path(model_path)
    if self.show_log:
        logger.info(f"Model directory: {self.model_path}")

    self.conf_threshold = 0.2
    self.img_size = 1024

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self._model_exists():
        if self.show_log:
            logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
        self._download_model()

    # Load model
    try:
        self._load_model()
        if self.show_log:
            logger.success("Model initialized successfully")
    except Exception as e:
        if self.show_log:
            logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], conf_threshold: float = None, img_size: int = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    conf_threshold: float = None,
    img_size: int = None,
    **kwargs,
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    if self.model is None:
        raise RuntimeError("Model not loaded. Initialization failed.")

    conf = conf_threshold if conf_threshold else self.conf_threshold
    imgsz = img_size if img_size else self.img_size

    try:
        images = self.preprocess_input(input_path)

        results = []
        for img in images:
            # Get detection results
            det_result = self.model.predict(
                img, imgsz=imgsz, conf=conf, device=self.device, **kwargs
            )

            # Convert detection results to LayoutBox objects
            layout_boxes = []
            for box in det_result[0].boxes:
                model_label = det_result[0].names[int(box.cls[0])]
                mapped_label = self.map_label(model_label)

                if mapped_label:
                    layout_boxes.append(
                        LayoutBox(
                            label=mapped_label,
                            bbox=box.xyxy[0].tolist(),
                            confidence=float(box.conf[0]) if box.conf is not None else None
                        )
                    )

            # Get the annotated image (will be a numpy array)
            annotated_img_array = det_result[0].plot(labels=False)  # Disable YOLO's default labels

            # Convert numpy array to PIL Image
            annotated_img = Image.fromarray(cv2.cvtColor(annotated_img_array, cv2.COLOR_BGR2RGB))

            # Draw standardized labels on the image
            draw = ImageDraw.Draw(annotated_img)
            for box in layout_boxes:
                color = self.color_map.get(box.label, 'gray')
                coords = box.bbox
                draw.rectangle(coords, outline=color, width=3)
                draw.text((coords[0], coords[1]-20), box.label, fill=color)

            results.append((
                annotated_img,
                LayoutOutput(bboxes=layout_boxes)
            ))

        return results[0] if results else (None, LayoutOutput(bboxes=[]))

    except Exception as e:
        if self.show_log:
            logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save the annotated image to file.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save the annotated image to file.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    annotated_image, _ = detection_result

    # Convert numpy array to PIL Image if necessary
    if isinstance(annotated_image, np.ndarray):
        annotated_image = Image.fromarray(annotated_image)

    if annotated_image is not None:
        annotated_image.save(str(output_path))

omnidocs.tasks.layout_analysis.extractors.doc_layout_yolo

YOLOLayoutDetector

YOLOLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None)

Bases: BaseLayoutDetector

YOLO-based layout detection implementation.

Initialize YOLO Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[Union[str, Path]] = None
):
    """Initialize YOLO Layout Detector."""
    super().__init__(show_log=show_log)

    self._label_mapper = YOLOLayoutMapper()
    if self.show_log:
        logger.info(f"Initializing YOLOLayoutDetector")

    if device:
        self.device = device
    if self.show_log:
        logger.info(f"Using device: {self.device}")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "yolo_layout" / self.MODEL_REPO.replace("/", "_")

    self.model_path = Path(model_path)
    if self.show_log:
        logger.info(f"Model directory: {self.model_path}")

    self.conf_threshold = 0.2
    self.img_size = 1024

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self._model_exists():
        if self.show_log:
            logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
        self._download_model()

    # Load model
    try:
        self._load_model()
        if self.show_log:
            logger.success("Model initialized successfully")
    except Exception as e:
        if self.show_log:
            logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], conf_threshold: float = None, img_size: int = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    conf_threshold: float = None,
    img_size: int = None,
    **kwargs,
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    if self.model is None:
        raise RuntimeError("Model not loaded. Initialization failed.")

    conf = conf_threshold if conf_threshold else self.conf_threshold
    imgsz = img_size if img_size else self.img_size

    try:
        images = self.preprocess_input(input_path)

        results = []
        for img in images:
            # Get detection results
            det_result = self.model.predict(
                img, imgsz=imgsz, conf=conf, device=self.device, **kwargs
            )

            # Convert detection results to LayoutBox objects
            layout_boxes = []
            for box in det_result[0].boxes:
                model_label = det_result[0].names[int(box.cls[0])]
                mapped_label = self.map_label(model_label)

                if mapped_label:
                    layout_boxes.append(
                        LayoutBox(
                            label=mapped_label,
                            bbox=box.xyxy[0].tolist(),
                            confidence=float(box.conf[0]) if box.conf is not None else None
                        )
                    )

            # Get the annotated image (will be a numpy array)
            annotated_img_array = det_result[0].plot(labels=False)  # Disable YOLO's default labels

            # Convert numpy array to PIL Image
            annotated_img = Image.fromarray(cv2.cvtColor(annotated_img_array, cv2.COLOR_BGR2RGB))

            # Draw standardized labels on the image
            draw = ImageDraw.Draw(annotated_img)
            for box in layout_boxes:
                color = self.color_map.get(box.label, 'gray')
                coords = box.bbox
                draw.rectangle(coords, outline=color, width=3)
                draw.text((coords[0], coords[1]-20), box.label, fill=color)

            results.append((
                annotated_img,
                LayoutOutput(bboxes=layout_boxes)
            ))

        return results[0] if results else (None, LayoutOutput(bboxes=[]))

    except Exception as e:
        if self.show_log:
            logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save the annotated image to file.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save the annotated image to file.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    annotated_image, _ = detection_result

    # Convert numpy array to PIL Image if necessary
    if isinstance(annotated_image, np.ndarray):
        annotated_image = Image.fromarray(annotated_image)

    if annotated_image is not None:
        annotated_image.save(str(output_path))

YOLOLayoutMapper

YOLOLayoutMapper()

Bases: BaseLayoutMapper

Label mapper for YOLO layout detection model.

Source code in omnidocs/tasks/layout_analysis/base.py
def __init__(self):
    self._mapping: Dict[str, LayoutLabel] = {}
    self._reverse_mapping: Dict[LayoutLabel, str] = {}
    self._setup_mapping()

omnidocs.tasks.layout_analysis.extractors.florence

FlorenceLayoutDetector

FlorenceLayoutDetector(device: Optional[str] = None, show_log: bool = False, trust_remote_code: bool = True, **kwargs)

Bases: BaseLayoutDetector

Florence-based layout detection implementation.

Initialize Florence Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    trust_remote_code: bool = True,
    **kwargs
):
    """Initialize Florence Layout Detector."""
    super().__init__(show_log=show_log)

    # Initialize label mapper
    self._label_mapper = FlorenceLayoutMapper()

    logger.info("Initializing FlorenceLayoutDetector")

    if device:
        self.device = device
    logger.info(f"Using device: {self.device}")

    try:
        from transformers import AutoProcessor, AutoModelForCausalLM
    except ImportError as ex:
        logger.error("Failed to import transformers")
        raise ImportError(
            "transformers is not available. Please install it with: pip install transformers"
        ) from ex

    # Initialize the model and processor
    try:
        self.model = AutoModelForCausalLM.from_pretrained(
            self.MODEL_REPO,
            trust_remote_code=trust_remote_code,
            **kwargs
        )
        self.processor = AutoProcessor.from_pretrained(
            self.MODEL_REPO,
            trust_remote_code=trust_remote_code
        )
        self.model.to(self.device)
        logger.success("Model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], max_new_tokens: int = 1024, do_sample: bool = False, num_beams: int = 3, **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    max_new_tokens: int = 1024,
    do_sample: bool = False,
    num_beams: int = 3,
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    try:
        # Load and preprocess input
        image = Image.open(input_path).convert("RGB")

        # Prepare inputs
        prompt = "<OD>"
        inputs = self.processor(
            text=prompt,
            images=image,
            return_tensors="pt"
        ).to(self.device)

        # Generate predictions
        generated_ids = self.model.generate(
            input_ids=inputs["input_ids"],
            pixel_values=inputs["pixel_values"],
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            num_beams=num_beams,
            **kwargs
        )

        # Decode and post-process
        generated_text = self.processor.batch_decode(
            generated_ids,
            skip_special_tokens=False
        )[0]

        parsed_result = self.processor.post_process_generation(
            generated_text,
            task="<OD>",
            image_size=(image.width, image.height)
        )

        # Convert to standard format
        layout_boxes = []
        for bbox, label in zip(
            parsed_result["<OD>"]["bboxes"],
            parsed_result["<OD>"]["labels"]
        ):
            mapped_label = self.map_label(label.lower())
            if mapped_label:
                layout_boxes.append(
                    LayoutBox(
                        label=mapped_label,
                        bbox=[float(coord) for coord in bbox],
                        confidence=None  # Florence model doesn't provide confidence scores
                    )
                )

        # Create annotated image
        annotated_img = image.copy()
        draw = ImageDraw.Draw(annotated_img)

        # Draw boxes and labels
        for box in layout_boxes:
            color = self.color_map.get(box.label, 'gray')
            coords = box.bbox
            draw.rectangle(coords, outline=color, width=3)
            draw.text((coords[0], coords[1]-20), box.label, fill=color)

        return annotated_img, LayoutOutput(bboxes=layout_boxes)

    except Exception as e:
        logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save annotated image and layout data to files.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save annotated image and layout data to files.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    super().visualize(detection_result, output_path)

FlorenceLayoutMapper

FlorenceLayoutMapper()

Bases: BaseLayoutMapper

Label mapper for Florence layout detection model.

Source code in omnidocs/tasks/layout_analysis/base.py
def __init__(self):
    self._mapping: Dict[str, LayoutLabel] = {}
    self._reverse_mapping: Dict[LayoutLabel, str] = {}
    self._setup_mapping()

omnidocs.tasks.layout_analysis.extractors.paddle

PaddleLayoutDetector

PaddleLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)

Bases: BaseLayoutDetector

PaddleOCR-based layout detection implementation.

Initialize PaddleOCR Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
def __init__(
    self, 
    device: Optional[str] = None,
    show_log: bool = False,
    **kwargs
):
    """Initialize PaddleOCR Layout Detector."""
    super().__init__()

    # Initialize label mapper
    self._label_mapper = PaddleLayoutMapper()

    # Log initialization
    logger.info("Initializing PaddleLayoutDetector")

    # Set device if specified
    if device:
        self.device = device
    logger.info(f"Using device: {self.device}")

    try:
        from paddleocr import PPStructure
    except ImportError as ex:
        logger.error("Failed to import paddleocr")
        raise ImportError(
            "paddleocr is not available. Please install it with: pip install paddleocr"
        ) from ex


    # Initialize the model
    try:
        self.model = PPStructure(
            table=True,
            ocr=True,
            show_log=show_log,
            **kwargs
        )
        logger.success("Model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
@log_execution_time
def detect(
    self, 
    input_path: Union[str, Path], 
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    try:
        # Load and preprocess input
        images = self.preprocess_input(input_path)

        results = []
        for img in images:
            # Get detection results
            det_result = self.model(img)

            # Convert to PIL Image if needed
            if isinstance(img, np.ndarray):
                img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

            # Create annotated image
            annotated_img = img.copy()
            draw = ImageDraw.Draw(annotated_img)

            # Convert detection results to LayoutBox objects with standardized labels
            layout_boxes = []

            for block in det_result:
                # Extract coordinates and type
                x1, y1, x2, y2 = block['bbox']
                model_label = block['type']
                mapped_label = self.map_label(model_label)

                if mapped_label:  # Only include boxes with valid mapped labels
                    layout_boxes.append(
                        LayoutBox(
                            label=mapped_label,
                            bbox=[float(x1), float(y1), float(x2), float(y2)],
                            confidence=block.get('confidence', None)
                        )
                    )

                    # Draw with standardized colors
                    color = self.color_map.get(mapped_label, 'gray')
                    draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
                    draw.text((x1, y1-20), mapped_label, fill=color)

            results.append((
                annotated_img,
                LayoutOutput(bboxes=layout_boxes)
            ))

        return results[0] if results else (None, LayoutOutput(bboxes=[]))

    except Exception as e:
        logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save annotated image and layout data to files.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save annotated image and layout data to files.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    super().visualize(detection_result, output_path)

PaddleLayoutMapper

PaddleLayoutMapper()

Bases: BaseLayoutMapper

Label mapper for PaddleOCR layout detection model.

Source code in omnidocs/tasks/layout_analysis/base.py
def __init__(self):
    self._mapping: Dict[str, LayoutLabel] = {}
    self._reverse_mapping: Dict[LayoutLabel, str] = {}
    self._setup_mapping()

omnidocs.tasks.layout_analysis.extractors.rtdetr

RTDETRLayoutDetector

RTDETRLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, num_threads: Optional[int] = 4, use_cpu_only: bool = True)

Bases: BaseLayoutDetector

RT-DETR-based layout detection implementation.

Initialize RT-DETR Layout Detector with careful device handling.

Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[Union[str, Path]] = None,
    num_threads: Optional[int] = 4,
    use_cpu_only: bool = True
):
    """Initialize RT-DETR Layout Detector with careful device handling."""
    super().__init__(show_log=show_log)

    self._label_mapper = RTDETRLayoutMapper()

    if self.show_log:
        logger.info("Initializing RTDETRLayoutDetector")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "rtdetr_layout" / self.MODEL_REPO.replace("/", "_")

    self.model_path = Path(model_path)
    self.num_threads = num_threads

    # Careful device handling
    if use_cpu_only:
        self.device = "cpu"
        if self.show_log:
            logger.info("Forced CPU usage due to use_cpu_only flag")
    elif device:
        self.device = device
        if self.show_log:
            logger.info(f"Using specified device: {device}")
    else:
        # Check CUDA availability with error handling
        try:
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            if self.show_log:
                logger.info(f"Automatically selected device: {self.device}")
        except Exception as e:
            self.device = "cpu"
            if self.show_log:
                logger.warning(f"Error checking CUDA availability: {e}. Defaulting to CPU")

    self.num_threads = num_threads or int(os.environ.get("OMP_NUM_THREADS", 4))

    # Set thread count for CPU operations
    if self.device == "cpu":
        torch.set_num_threads(self.num_threads)
        if self.show_log:
            logger.info(f"Set CPU threads to {self.num_threads}")

    # Model parameters
    self.image_size = 640
    self.confidence_threshold = 0.6

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self._model_exists():
        if self.show_log:
            logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
        self._download_model()

    # Load model
    try:
        self._load_model()
        if self.show_log:
            logger.success("Model initialized successfully")
    except Exception as e:
        if self.show_log:
            logger.error("Failed to initialize model", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], confidence_threshold: Optional[float] = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection using RT-DETR Transformers model.

Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    confidence_threshold: Optional[float] = None,
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection using RT-DETR Transformers model."""
    if self.model is None:
        raise RuntimeError("Model not loaded. Initialization failed.")

    try:
        # Load and preprocess image
        if isinstance(input_path, (str, Path)):
            image = Image.open(input_path).convert("RGB")
        elif isinstance(input_path, Image.Image):
            image = input_path.convert("RGB")
        elif isinstance(input_path, np.ndarray):
            image = Image.fromarray(input_path).convert("RGB")
        else:
            raise ValueError("Unsupported input type")

        # Preprocess the image using the image processor
        resize = {"height": self.image_size, "width": self.image_size}
        inputs = self.image_processor(
            images=image,
            return_tensors="pt",
            size=resize,
        )

        # Move inputs to the correct device
        if self.device == "cuda":
            inputs = {k: v.cuda() if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}

        # Run inference
        try:
            with torch.no_grad():
                outputs = self.model(**inputs)
        except Exception as e:
            raise RuntimeError(f"Error during model inference: {e}") from e 

        # Post-process results
        threshold = confidence_threshold or self.confidence_threshold
        results = self.image_processor.post_process_object_detection(
            outputs,
            target_sizes=torch.tensor([image.size[::-1]]),
            threshold=threshold
        )

        # Process predictions
        layout_boxes = []

        for result in results:
            for score, label_id, box in zip(result["scores"], result["labels"], result["boxes"]):
                score_val = float(score.item())
                label_idx = int(label_id.item())

                # Get label from model config (add 1 because model config is 0-indexed)
                model_label = self.model.config.id2label.get(label_idx + 1)
                if not model_label:
                    continue

                # Map to standardized label
                mapped_label = self.map_label(model_label)
                if not mapped_label:
                    continue

                # Convert box coordinates (already in image space)
                box = [round(i, 2) for i in box.tolist()]
                left, top, right, bottom = box

                layout_boxes.append(
                    LayoutBox(
                        label=mapped_label,
                        bbox=[left, top, right, bottom],
                        confidence=score_val
                    )
                )

        # Create annotated image
        annotated_img = image.copy()
        draw = ImageDraw.Draw(annotated_img)

        # Draw boxes with standardized colors
        for box in layout_boxes:
            color = self.color_map.get(box.label, 'gray')
            coords = box.bbox
            draw.rectangle(coords, outline=color, width=3)
            draw.text((coords[0], coords[1]-20), box.label, fill=color)

        return annotated_img, LayoutOutput(bboxes=layout_boxes)

    except Exception as e:
        if self.show_log:
            logger.error("Error during prediction", exc_info=True)
        raise

RTDETRLayoutMapper

RTDETRLayoutMapper()

Bases: BaseLayoutMapper

Label mapper for RT-DETR layout detection model.

Source code in omnidocs/tasks/layout_analysis/base.py
def __init__(self):
    self._mapping: Dict[str, LayoutLabel] = {}
    self._reverse_mapping: Dict[LayoutLabel, str] = {}
    self._setup_mapping()

omnidocs.tasks.layout_analysis.extractors.surya

SuryaLayoutDetector

SuryaLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)

Bases: BaseLayoutDetector

Surya-based layout detection implementation.

Initialize Surya Layout Detector.

Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    **kwargs
):
    """Initialize Surya Layout Detector."""
    super().__init__(show_log=show_log)

    # Initialize label mapper
    self._label_mapper = SuryaLayoutMapper()

    if self.show_log:
        logger.info("Initializing SuryaLayoutDetector")

    # Set device if specified, otherwise use default from parent
    if device:
        self.device = device

    if self.show_log:
        logger.info(f"Using device: {self.device}")

    try:
        # Import required libraries - use new API
        import surya
        if self.show_log:
            logger.info(f"Found surya package at: {surya.__file__}")
    except ImportError as ex:
        if self.show_log:
            logger.error("Failed to import surya")
        raise ImportError(
            "surya is not available. Please install it with: pip install surya-ocr"
        ) from ex

    try:
        # Initialize detection and layout models using new API
        from surya.layout import LayoutPredictor

        self.layout_predictor = LayoutPredictor()

        if self.show_log:
            logger.success("Models initialized successfully")

    except Exception as e:
        if self.show_log:
            logger.error("Failed to initialize models", exc_info=True)
        raise

detect

detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]

Run layout detection with standardized labels.

Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
@log_execution_time
def detect(
    self,
    input_path: Union[str, Path],
    **kwargs
) -> Tuple[Image.Image, LayoutOutput]:
    """Run layout detection with standardized labels."""
    try:
        # Load and preprocess input
        if isinstance(input_path, (str, Path)):
            image = Image.open(input_path).convert("RGB")
        elif isinstance(input_path, Image.Image):
            image = input_path.convert("RGB")
        elif isinstance(input_path, np.ndarray):
            image = Image.fromarray(input_path).convert("RGB")
        else:
            raise ValueError("Unsupported input type")

        # Run layout detection using new API
        layout_predictions = self.layout_predictor([image])

        # Process the layout prediction (take first since we only processed one image)
        layout_pred = layout_predictions[0]

        # Convert to standardized format
        layout_boxes = []
        for box in layout_pred.bboxes:
            mapped_label = self.map_label(box.label)
            if mapped_label:
                layout_boxes.append(
                    LayoutBox(
                        label=mapped_label,
                        bbox=box.bbox,  # Already in [x1, y1, x2, y2] format
                        confidence=box.confidence
                    )
                )

        # Create annotated image
        annotated_img = image.copy()
        draw = ImageDraw.Draw(annotated_img)

        # Draw boxes with standardized colors
        for box in layout_boxes:
            color = self.color_map.get(box.label, 'gray')
            coords = box.bbox
            draw.rectangle(coords, outline=color, width=3)
            draw.text((coords[0], coords[1]-20), box.label, fill=color)

        # Create LayoutOutput with image size
        layout_output = LayoutOutput(
            bboxes=layout_boxes,
            image_size=image.size
        )

        return annotated_img, layout_output

    except Exception as e:
        if self.show_log:
            logger.error("Error during prediction", exc_info=True)
        raise

visualize

visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None

Save annotated image and layout data to files.

Parameters:

Name Type Description Default
detection_result Tuple[Image, LayoutOutput]

Tuple containing (PIL Image, LayoutOutput)

required
output_path Union[str, Path]

Path to save visualization

required
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
def visualize(
    self,
    detection_result: Tuple[Image.Image, LayoutOutput],
    output_path: Union[str, Path],
) -> None:
    """
    Save annotated image and layout data to files.

    Args:
        detection_result: Tuple containing (PIL Image, LayoutOutput)
        output_path: Path to save visualization
    """
    super().visualize(detection_result, output_path)

SuryaLayoutMapper

SuryaLayoutMapper()

Bases: BaseLayoutMapper

Label mapper for Surya layout detection model.

Source code in omnidocs/tasks/layout_analysis/base.py
def __init__(self):
    self._mapping: Dict[str, LayoutLabel] = {}
    self._reverse_mapping: Dict[LayoutLabel, str] = {}
    self._setup_mapping()

πŸ“ Text Extraction

Extract raw and structured text from PDFs and images using classic and deep learning methods.

omnidocs.tasks.text_extraction

Text extraction module for OmniDocs.

This module provides base classes and implementations for text extraction from documents (PDFs, images, etc.).

BaseTextExtractor

BaseTextExtractor(device: Optional[str] = None, show_log: bool = False, engine_name: Optional[str] = None, extract_images: bool = False)

Bases: ABC

Base class for text extraction models.

Initialize the text extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run model on ('cuda' or 'cpu')

None
show_log bool

Whether to show detailed logs

False
engine_name Optional[str]

Name of the text extraction engine

None
extract_images bool

Whether to extract images alongside text

False
Source code in omnidocs/tasks/text_extraction/base.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             engine_name: Optional[str] = None,
             extract_images: bool = False):
    """Initialize the text extractor.

    Args:
        device: Device to run model on ('cuda' or 'cpu')
        show_log: Whether to show detailed logs
        engine_name: Name of the text extraction engine
        extract_images: Whether to extract images alongside text
    """
    self.show_log = show_log
    self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
    self.engine_name = engine_name or self.__class__.__name__.lower().replace('extractor', '')
    self.extract_images = extract_images
    self.model = None
    self.model_path = None
    self._label_mapper: Optional[BaseTextMapper] = None

    # Initialize mapper if engine name is provided
    if self.engine_name:
        self._label_mapper = BaseTextMapper(self.engine_name)

    if self.show_log:
        logger.info(f"Initializing {self.__class__.__name__}")
        logger.info(f"Using device: {self.device}")
        logger.info(f"Engine: {self.engine_name}")
        logger.info(f"Extract images: {self.extract_images}")

label_mapper property

label_mapper: BaseTextMapper

Get the label mapper for this extractor.

extract abstractmethod

extract(input_path: Union[str, Path], **kwargs) -> TextOutput

Extract text from input document.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/base.py
@abstractmethod
def extract(
    self,
    input_path: Union[str, Path],
    **kwargs
) -> TextOutput:
    """Extract text from input document.

    Args:
        input_path: Path to input document
        **kwargs: Additional model-specific parameters

    Returns:
        TextOutput containing extracted text
    """
    pass

extract_all

extract_all(input_paths: List[Union[str, Path]], **kwargs) -> List[TextOutput]

Extract text from multiple documents.

Parameters:

Name Type Description Default
input_paths List[Union[str, Path]]

List of document paths

required
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
List[TextOutput]

List of TextOutput objects

Source code in omnidocs/tasks/text_extraction/base.py
def extract_all(
    self,
    input_paths: List[Union[str, Path]],
    **kwargs
) -> List[TextOutput]:
    """Extract text from multiple documents.

    Args:
        input_paths: List of document paths
        **kwargs: Additional model-specific parameters

    Returns:
        List of TextOutput objects
    """
    results = []
    for input_path in input_paths:
        try:
            result = self.extract(input_path, **kwargs)
            results.append(result)
        except Exception as e:
            if self.show_log:
                logger.error(f"Error processing {input_path}: {str(e)}")
            raise
    return results

extract_from_pages

extract_from_pages(input_path: Union[str, Path], page_range: Optional[Tuple[int, int]] = None, **kwargs) -> TextOutput

Extract text from specific pages of a document.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required
page_range Optional[Tuple[int, int]]

Optional tuple of (start_page, end_page) (1-based, inclusive)

None
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text from specified pages

Source code in omnidocs/tasks/text_extraction/base.py
def extract_from_pages(
    self,
    input_path: Union[str, Path],
    page_range: Optional[Tuple[int, int]] = None,
    **kwargs
) -> TextOutput:
    """Extract text from specific pages of a document.

    Args:
        input_path: Path to input document
        page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
        **kwargs: Additional model-specific parameters

    Returns:
        TextOutput containing extracted text from specified pages
    """
    # Default implementation extracts all pages then filters
    # Child classes can override for more efficient page-specific extraction
    full_output = self.extract(input_path, **kwargs)

    if page_range is None:
        return full_output

    start_page, end_page = page_range
    filtered_blocks = [
        block for block in full_output.text_blocks
        if start_page <= block.page_num <= end_page
    ]

    # Rebuild full text from filtered blocks
    full_text = '\n'.join(block.text for block in filtered_blocks)

    return TextOutput(
        text_blocks=filtered_blocks,
        full_text=full_text,
        metadata=full_output.metadata,
        source_info=full_output.source_info,
        processing_time=full_output.processing_time,
        page_count=end_page - start_page + 1
    )

extract_with_layout

extract_with_layout(input_path: Union[str, Path], layout_regions: Optional[List[Dict]] = None, **kwargs) -> TextOutput

Extract text with optional layout information.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required
layout_regions Optional[List[Dict]]

Optional list of layout regions to focus extraction on

None
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/base.py
def extract_with_layout(
    self,
    input_path: Union[str, Path],
    layout_regions: Optional[List[Dict]] = None,
    **kwargs
) -> TextOutput:
    """Extract text with optional layout information.

    Args:
        input_path: Path to input document
        layout_regions: Optional list of layout regions to focus extraction on
        **kwargs: Additional model-specific parameters

    Returns:
        TextOutput containing extracted text
    """
    # Default implementation just calls extract, can be overridden by child classes
    return self.extract(input_path, **kwargs)

get_supported_formats

get_supported_formats() -> List[str]

Get list of supported document formats.

Source code in omnidocs/tasks/text_extraction/base.py
def get_supported_formats(self) -> List[str]:
    """Get list of supported document formats."""
    # Default formats - child classes should override
    return ['.txt', '.pdf']

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert raw text extraction output to standardized TextOutput format.

Parameters:

Name Type Description Default
raw_output Any

Raw output from text extraction engine

required
source_info Optional[Dict]

Optional source document information

None

Returns:

Type Description
TextOutput

Standardized TextOutput object

Source code in omnidocs/tasks/text_extraction/base.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert raw text extraction output to standardized TextOutput format.

    Args:
        raw_output: Raw output from text extraction engine
        source_info: Optional source document information

    Returns:
        Standardized TextOutput object
    """
    raise NotImplementedError("Child classes must implement postprocess_output method")

preprocess_input

preprocess_input(input_path: Union[str, Path]) -> Any

Preprocess input document for text extraction.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required

Returns:

Type Description
Any

Preprocessed document object

Source code in omnidocs/tasks/text_extraction/base.py
def preprocess_input(self, input_path: Union[str, Path]) -> Any:
    """Preprocess input document for text extraction.

    Args:
        input_path: Path to input document

    Returns:
        Preprocessed document object
    """
    # Default implementation - child classes should override for specific formats
    return input_path

BaseTextMapper

BaseTextMapper(engine_name: str)

Base class for mapping text extraction engine-specific outputs to standardized format.

Initialize mapper for specific text extraction engine.

Parameters:

Name Type Description Default
engine_name str

Name of the text extraction engine

required
Source code in omnidocs/tasks/text_extraction/base.py
def __init__(self, engine_name: str):
    """Initialize mapper for specific text extraction engine.

    Args:
        engine_name: Name of the text extraction engine
    """
    self.engine_name = engine_name.lower()
    self._block_type_mapping: Dict[str, str] = {}
    self._setup_block_type_mapping()

extract_font_info

extract_font_info(raw_font_data: Any) -> Dict[str, Any]

Extract and normalize font information.

Source code in omnidocs/tasks/text_extraction/base.py
def extract_font_info(self, raw_font_data: Any) -> Dict[str, Any]:
    """Extract and normalize font information."""
    font_info = {}

    if isinstance(raw_font_data, dict):
        font_info.update({
            'font_name': raw_font_data.get('name', raw_font_data.get('font_name')),
            'font_size': raw_font_data.get('size', raw_font_data.get('font_size')),
            'bold': raw_font_data.get('bold', raw_font_data.get('is_bold', False)),
            'italic': raw_font_data.get('italic', raw_font_data.get('is_italic', False)),
            'color': raw_font_data.get('color', raw_font_data.get('font_color'))
        })

    return {k: v for k, v in font_info.items() if v is not None}

normalize_bbox

normalize_bbox(bbox: List[float], page_width: int, page_height: int) -> List[float]

Normalize bounding box coordinates to absolute values.

Source code in omnidocs/tasks/text_extraction/base.py
def normalize_bbox(self, bbox: List[float], page_width: int, page_height: int) -> List[float]:
    """Normalize bounding box coordinates to absolute values."""
    if all(0 <= coord <= 1 for coord in bbox):
        return [
            bbox[0] * page_width,
            bbox[1] * page_height,
            bbox[2] * page_width,
            bbox[3] * page_height
        ]
    return bbox

normalize_block_type

normalize_block_type(engine_type: str) -> str

Convert engine-specific block type to standardized format.

Source code in omnidocs/tasks/text_extraction/base.py
def normalize_block_type(self, engine_type: str) -> str:
    """Convert engine-specific block type to standardized format."""
    return self._block_type_mapping.get(engine_type.lower(), engine_type)

TextBlock

Bases: BaseModel

Container for individual text block.

Attributes:

Name Type Description
text str

Text content

bbox Optional[List[float]]

Bounding box coordinates [x1, y1, x2, y2]

confidence Optional[float]

Confidence score for text extraction

page_num int

Page number (for multi-page documents)

block_type Optional[str]

Type of text block (paragraph, heading, list, etc.)

font_info Optional[Dict[str, Any]]

Optional font information

reading_order Optional[int]

Reading order index

language Optional[str]

Detected language of the text

to_dict

to_dict() -> Dict

Convert to dictionary representation.

Source code in omnidocs/tasks/text_extraction/base.py
def to_dict(self) -> Dict:
    """Convert to dictionary representation."""
    return {
        'text': self.text,
        'bbox': self.bbox,
        'confidence': self.confidence,
        'page_num': self.page_num,
        'block_type': self.block_type,
        'font_info': self.font_info,
        'reading_order': self.reading_order,
        'language': self.language
    }

TextOutput

Bases: BaseModel

Container for text extraction results.

Attributes:

Name Type Description
text_blocks List[TextBlock]

List of extracted text blocks

full_text str

Combined text from all blocks

metadata Optional[Dict[str, Any]]

Additional metadata from extraction

source_info Optional[Dict[str, Any]]

Information about the source document

processing_time Optional[float]

Time taken for text extraction

page_count int

Number of pages in the document

get_sorted_by_reading_order

get_sorted_by_reading_order() -> List[TextBlock]

Get text blocks sorted by reading order.

Source code in omnidocs/tasks/text_extraction/base.py
def get_sorted_by_reading_order(self) -> List[TextBlock]:
    """Get text blocks sorted by reading order."""
    blocks_with_order = [block for block in self.text_blocks if block.reading_order is not None]
    blocks_without_order = [block for block in self.text_blocks if block.reading_order is None]

    # Sort blocks with reading order
    blocks_with_order.sort(key=lambda x: (x.page_num, x.reading_order))

    # Sort blocks without reading order by page and bbox
    if blocks_without_order:
        blocks_without_order.sort(key=lambda x: (
            x.page_num,
            x.bbox[1] if x.bbox else 0,  # Sort by y coordinate (top to bottom)
            x.bbox[0] if x.bbox else 0   # Then by x coordinate (left to right)
        ))

    return blocks_with_order + blocks_without_order

get_text_by_confidence

get_text_by_confidence(min_confidence: float = 0.5) -> List[TextBlock]

Filter text blocks by minimum confidence threshold.

Source code in omnidocs/tasks/text_extraction/base.py
def get_text_by_confidence(self, min_confidence: float = 0.5) -> List[TextBlock]:
    """Filter text blocks by minimum confidence threshold."""
    return [block for block in self.text_blocks if block.confidence is None or block.confidence >= min_confidence]

get_text_by_page

get_text_by_page(page_num: int) -> List[TextBlock]

Get text blocks from a specific page.

Source code in omnidocs/tasks/text_extraction/base.py
def get_text_by_page(self, page_num: int) -> List[TextBlock]:
    """Get text blocks from a specific page."""
    return [block for block in self.text_blocks if block.page_num == page_num]

get_text_by_type

get_text_by_type(block_type: str) -> List[TextBlock]

Get text blocks of a specific type.

Source code in omnidocs/tasks/text_extraction/base.py
def get_text_by_type(self, block_type: str) -> List[TextBlock]:
    """Get text blocks of a specific type."""
    return [block for block in self.text_blocks if block.block_type == block_type]

save_json

save_json(output_path: Union[str, Path]) -> None

Save output to JSON file.

Source code in omnidocs/tasks/text_extraction/base.py
def save_json(self, output_path: Union[str, Path]) -> None:
    """Save output to JSON file."""
    import json
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)

save_markdown

save_markdown(output_path: Union[str, Path]) -> None

Save text as markdown with basic formatting.

Source code in omnidocs/tasks/text_extraction/base.py
def save_markdown(self, output_path: Union[str, Path]) -> None:
    """Save text as markdown with basic formatting."""
    markdown_content = []

    for block in self.get_sorted_by_reading_order():
        if block.block_type == 'heading':
            # Convert to markdown heading
            markdown_content.append(f"# {block.text}\n")
        elif block.block_type == 'subheading':
            markdown_content.append(f"## {block.text}\n")
        elif block.block_type == 'list':
            # Convert to markdown list
            lines = block.text.split('\n')
            for line in lines:
                if line.strip():
                    markdown_content.append(f"- {line.strip()}")
            markdown_content.append("")
        else:
            # Regular paragraph
            markdown_content.append(f"{block.text}\n")

    with open(output_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(markdown_content))

save_text

save_text(output_path: Union[str, Path]) -> None

Save full text to a text file.

Source code in omnidocs/tasks/text_extraction/base.py
def save_text(self, output_path: Union[str, Path]) -> None:
    """Save full text to a text file."""
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(self.full_text)

to_dict

to_dict() -> Dict

Convert to dictionary representation.

Source code in omnidocs/tasks/text_extraction/base.py
def to_dict(self) -> Dict:
    """Convert to dictionary representation."""
    return {
        'text_blocks': [block.to_dict() for block in self.text_blocks],
        'full_text': self.full_text,
        'metadata': self.metadata,
        'source_info': self.source_info,
        'processing_time': self.processing_time,
        'page_count': self.page_count
    }

omnidocs.tasks.text_extraction.extractors.pymupdf

PyMuPDFTextExtractor

PyMuPDFTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, extract_tables: bool = False, flags: int = 0, clip: Optional[tuple] = None)

Bases: BaseTextExtractor

Text extractor using PyMuPDF (fitz).

Initialize PyMuPDF text extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run on (not used for PyMuPDF)

None
show_log bool

Whether to show detailed logs

False
extract_images bool

Whether to extract images alongside text

False
extract_tables bool

Whether to extract tables

False
flags int

Text extraction flags (fitz.TEXT_PRESERVE_LIGATURES, etc.)

0
clip Optional[tuple]

Optional clipping rectangle (x0, y0, x1, y1)

None
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             extract_images: bool = False,
             extract_tables: bool = False,
             flags: int = 0,
             clip: Optional[tuple] = None):
    """Initialize PyMuPDF text extractor.

    Args:
        device: Device to run on (not used for PyMuPDF)
        show_log: Whether to show detailed logs
        extract_images: Whether to extract images alongside text
        extract_tables: Whether to extract tables
        flags: Text extraction flags (fitz.TEXT_PRESERVE_LIGATURES, etc.)
        clip: Optional clipping rectangle (x0, y0, x1, y1)
    """
    super().__init__(device, show_log, "pymupdf", extract_images)
    self.extract_tables = extract_tables
    self.flags = flags
    self.clip = clip
    self._label_mapper = PyMuPDFTextMapper()
    self._load_model()

extract

extract(input_path: Union[str, Path], use_layout: bool = True, **kwargs) -> TextOutput

Extract text from document using PyMuPDF.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required
use_layout bool

Whether to use layout information for extraction

True
**kwargs

Additional parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def extract(
    self,
    input_path: Union[str, Path],
    use_layout: bool = True,
    **kwargs
) -> TextOutput:
    """Extract text from document using PyMuPDF.

    Args:
        input_path: Path to input document
        use_layout: Whether to use layout information for extraction
        **kwargs: Additional parameters

    Returns:
        TextOutput containing extracted text
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}")

    try:
        all_text_blocks = []

        # Open document
        doc = fitz.open(str(input_path))

        try:
            total_pages = len(doc)

            for page_num in range(total_pages):
                page = doc[page_num]

                # Extract text blocks
                if use_layout:
                    page_blocks = self._extract_text_blocks(page)
                else:
                    page_blocks = self._extract_text_simple(page)

                all_text_blocks.extend(page_blocks)

                # Extract tables if requested
                if self.extract_tables:
                    table_blocks = self._extract_tables(page)
                    all_text_blocks.extend(table_blocks)

            # Create source info
            source_info = {
                'file_path': str(input_path),
                'file_name': input_path.name,
                'file_size': input_path.stat().st_size,
                'engine': 'pymupdf',
                'total_pages': total_pages,
                'metadata': doc.metadata
            }

        finally:
            doc.close()

        # Post-process output
        output = self.postprocess_output(all_text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks from {total_pages} pages in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

extract_from_pages

extract_from_pages(input_path: Union[str, Path], page_range: Optional[tuple] = None, use_layout: bool = True, **kwargs) -> TextOutput

Extract text from specific pages.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required
page_range Optional[tuple]

Optional tuple of (start_page, end_page) (1-based, inclusive)

None
use_layout bool

Whether to use layout information

True
**kwargs

Additional parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text from specified pages

Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def extract_from_pages(
    self,
    input_path: Union[str, Path],
    page_range: Optional[tuple] = None,
    use_layout: bool = True,
    **kwargs
) -> TextOutput:
    """Extract text from specific pages.

    Args:
        input_path: Path to input document
        page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
        use_layout: Whether to use layout information
        **kwargs: Additional parameters

    Returns:
        TextOutput containing extracted text from specified pages
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}, pages {page_range}")

    try:
        all_text_blocks = []

        # Open document
        doc = fitz.open(str(input_path))

        try:
            total_pages = len(doc)

            if page_range is None:
                start_page, end_page = 1, total_pages
            else:
                start_page, end_page = page_range

            # Convert to 0-based indexing
            start_idx = max(0, start_page - 1)
            end_idx = min(total_pages - 1, end_page - 1)

            for page_num in range(start_idx, end_idx + 1):
                page = doc[page_num]

                # Extract text blocks
                if use_layout:
                    page_blocks = self._extract_text_blocks(page)
                else:
                    page_blocks = self._extract_text_simple(page)

                all_text_blocks.extend(page_blocks)

                # Extract tables if requested
                if self.extract_tables:
                    table_blocks = self._extract_tables(page)
                    all_text_blocks.extend(table_blocks)

            # Create source info
            source_info = {
                'file_path': str(input_path),
                'file_name': input_path.name,
                'file_size': input_path.stat().st_size,
                'engine': 'pymupdf',
                'total_pages': total_pages,
                'page_range': page_range,
                'metadata': doc.metadata
            }

        finally:
            doc.close()

        # Post-process output
        output = self.postprocess_output(all_text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

get_supported_formats

get_supported_formats() -> List[str]

Get list of supported document formats.

Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def get_supported_formats(self) -> List[str]:
    """Get list of supported document formats."""
    return ['.pdf', '.xps', '.oxps', '.epub', '.mobi', '.fb2', '.cbz', '.svg']

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert PyMuPDF output to standardized TextOutput format.

Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert PyMuPDF output to standardized TextOutput format."""
    text_blocks = raw_output  # raw_output is already a list of TextBlocks

    # Sort blocks by page and reading order
    text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))

    # Combine all text
    full_text = '\n\n'.join(block.text for block in text_blocks if block.text.strip())

    # Get metadata
    metadata = {
        'engine': 'pymupdf',
        'extract_tables': self.extract_tables,
        'flags': self.flags,
        'clip': self.clip,
        'total_blocks': len(text_blocks)
    }

    return TextOutput(
        text_blocks=text_blocks,
        full_text=full_text,
        metadata=metadata,
        source_info=source_info,
        page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
    )

preprocess_input

preprocess_input(input_path: Union[str, Path]) -> Path

Preprocess input document.

Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def preprocess_input(self, input_path: Union[str, Path]) -> Path:
    """Preprocess input document."""
    input_path = Path(input_path)
    if not input_path.exists():
        raise FileNotFoundError(f"Input file not found: {input_path}")

    supported_formats = ['.pdf', '.xps', '.oxps', '.epub', '.mobi', '.fb2', '.cbz', '.svg']
    if input_path.suffix.lower() not in supported_formats:
        raise ValueError(f"Unsupported format: {input_path.suffix}. Supported: {supported_formats}")

    return input_path

PyMuPDFTextMapper

PyMuPDFTextMapper()

Bases: BaseTextMapper

Mapper for PyMuPDF text extraction output.

Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
def __init__(self):
    super().__init__("pymupdf")

omnidocs.tasks.text_extraction.extractors.pdfplumber

PdfplumberTextExtractor

PdfplumberTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, extract_tables: bool = False, use_layout: bool = True)

Bases: BaseTextExtractor

Text extractor using pdfplumber.

Initialize pdfplumber text extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run on (not used for pdfplumber)

None
show_log bool

Whether to show detailed logs

False
extract_images bool

Whether to extract images alongside text

False
extract_tables bool

Whether to extract tables

False
use_layout bool

Whether to use layout information for text extraction

True
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             extract_images: bool = False,
             extract_tables: bool = False,
             use_layout: bool = True):
    """Initialize pdfplumber text extractor.

    Args:
        device: Device to run on (not used for pdfplumber)
        show_log: Whether to show detailed logs
        extract_images: Whether to extract images alongside text
        extract_tables: Whether to extract tables
        use_layout: Whether to use layout information for text extraction
    """
    super().__init__(device, show_log, "pdfplumber", extract_images)
    self.extract_tables = extract_tables
    self.use_layout = use_layout
    self._label_mapper = PdfplumberTextMapper()
    self._load_model()

extract

extract(input_path: Union[str, Path], **kwargs) -> TextOutput

Extract text from PDF using pdfplumber.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input PDF

required
**kwargs

Additional parameters (ignored for pdfplumber)

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
def extract(
    self,
    input_path: Union[str, Path],
    **kwargs
) -> TextOutput:
    """Extract text from PDF using pdfplumber.

    Args:
        input_path: Path to input PDF
        **kwargs: Additional parameters (ignored for pdfplumber)

    Returns:
        TextOutput containing extracted text
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}")

    try:
        all_text_blocks = []

        with pdfplumber.open(input_path) as pdf:
            total_pages = len(pdf.pages)

            for page in pdf.pages:
                if self.use_layout:
                    page_blocks = self._extract_text_with_layout(page)
                else:
                    page_blocks = self._extract_text_simple(page)

                all_text_blocks.extend(page_blocks)

                # Extract tables if requested
                if self.extract_tables:
                    table_blocks = self._extract_tables(page)
                    all_text_blocks.extend(table_blocks)

        # Create source info
        source_info = {
            'file_path': str(input_path),
            'file_name': input_path.name,
            'file_size': input_path.stat().st_size,
            'engine': 'pdfplumber',
            'total_pages': total_pages
        }

        # Post-process output
        output = self.postprocess_output(all_text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks from {total_pages} pages in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

get_supported_formats

get_supported_formats() -> List[str]

Get list of supported document formats.

Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
def get_supported_formats(self) -> List[str]:
    """Get list of supported document formats."""
    return ['.pdf']

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert pdfplumber output to standardized TextOutput format.

Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert pdfplumber output to standardized TextOutput format."""
    text_blocks = raw_output  # raw_output is already a list of TextBlocks

    # Sort blocks by page and reading order
    text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))

    # Combine all text
    full_text = '\n\n'.join(block.text for block in text_blocks)

    # Get metadata
    metadata = {
        'engine': 'pdfplumber',
        'extract_tables': self.extract_tables,
        'use_layout': self.use_layout,
        'total_blocks': len(text_blocks)
    }

    return TextOutput(
        text_blocks=text_blocks,
        full_text=full_text,
        metadata=metadata,
        source_info=source_info,
        page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
    )

preprocess_input

preprocess_input(input_path: Union[str, Path]) -> Path

Preprocess input document.

Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
def preprocess_input(self, input_path: Union[str, Path]) -> Path:
    """Preprocess input document."""
    input_path = Path(input_path)
    if not input_path.exists():
        raise FileNotFoundError(f"Input file not found: {input_path}")

    if input_path.suffix.lower() != '.pdf':
        raise ValueError(f"pdfplumber only supports PDF files. Got: {input_path.suffix}")

    return input_path

PdfplumberTextMapper

PdfplumberTextMapper()

Bases: BaseTextMapper

Mapper for pdfplumber text extraction output.

Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
def __init__(self):
    super().__init__("pdfplumber")

omnidocs.tasks.text_extraction.extractors.pypdf2

PyPDF2TextExtractor

PyPDF2TextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, ignore_images: bool = True, extract_forms: bool = False)

Bases: BaseTextExtractor

Text extractor using PyPDF2.

Initialize PyPDF2 text extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run on (not used for PyPDF2)

None
show_log bool

Whether to show detailed logs

False
extract_images bool

Whether to extract images alongside text

False
ignore_images bool

Whether to ignore images during text extraction

True
extract_forms bool

Whether to extract form fields

False
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             extract_images: bool = False,
             ignore_images: bool = True,
             extract_forms: bool = False):
    """Initialize PyPDF2 text extractor.

    Args:
        device: Device to run on (not used for PyPDF2)
        show_log: Whether to show detailed logs
        extract_images: Whether to extract images alongside text
        ignore_images: Whether to ignore images during text extraction
        extract_forms: Whether to extract form fields
    """
    super().__init__(device, show_log, "pypdf2", extract_images)
    self.ignore_images = ignore_images
    self.extract_forms = extract_forms
    self._label_mapper = PyPDF2TextMapper()
    self._load_model()

extract

extract(input_path: Union[str, Path], password: Optional[str] = None, **kwargs) -> TextOutput

Extract text from PDF using PyPDF2.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input PDF

required
password Optional[str]

Optional password for encrypted PDFs

None
**kwargs

Additional parameters (ignored for PyPDF2)

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def extract(
    self,
    input_path: Union[str, Path],
    password: Optional[str] = None,
    **kwargs
) -> TextOutput:
    """Extract text from PDF using PyPDF2.

    Args:
        input_path: Path to input PDF
        password: Optional password for encrypted PDFs
        **kwargs: Additional parameters (ignored for PyPDF2)

    Returns:
        TextOutput containing extracted text
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}")

    try:
        all_text_blocks = []

        # Open PDF
        with open(input_path, 'rb') as file:
            reader = PdfReader(file)

            # Check if PDF is encrypted
            if reader.is_encrypted:
                if password:
                    if not reader.decrypt(password):
                        raise ValueError("Invalid password for encrypted PDF")
                else:
                    raise ValueError("PDF is encrypted but no password provided")

            total_pages = len(reader.pages)

            # Extract text from each page
            for page_num, page in enumerate(reader.pages, 1):
                page_blocks = self._extract_page_text(page, page_num)
                all_text_blocks.extend(page_blocks)

            # Extract form fields if requested
            if self.extract_forms:
                form_blocks = self._extract_form_fields(reader)
                all_text_blocks.extend(form_blocks)

            # Get PDF metadata
            pdf_metadata = self._get_pdf_metadata(reader)

            # Create source info
            source_info = {
                'file_path': str(input_path),
                'file_name': input_path.name,
                'file_size': input_path.stat().st_size,
                'engine': 'pypdf2',
                'total_pages': total_pages,
                'is_encrypted': reader.is_encrypted,
                'pdf_metadata': pdf_metadata
            }

        # Post-process output
        output = self.postprocess_output(all_text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks from {total_pages} pages in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

extract_from_pages

extract_from_pages(input_path: Union[str, Path], page_range: Optional[tuple] = None, password: Optional[str] = None, **kwargs) -> TextOutput

Extract text from specific pages.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input PDF

required
page_range Optional[tuple]

Optional tuple of (start_page, end_page) (1-based, inclusive)

None
password Optional[str]

Optional password for encrypted PDFs

None
**kwargs

Additional parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text from specified pages

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def extract_from_pages(
    self,
    input_path: Union[str, Path],
    page_range: Optional[tuple] = None,
    password: Optional[str] = None,
    **kwargs
) -> TextOutput:
    """Extract text from specific pages.

    Args:
        input_path: Path to input PDF
        page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
        password: Optional password for encrypted PDFs
        **kwargs: Additional parameters

    Returns:
        TextOutput containing extracted text from specified pages
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}, pages {page_range}")

    try:
        all_text_blocks = []

        # Open PDF
        with open(input_path, 'rb') as file:
            reader = PdfReader(file)

            # Check if PDF is encrypted
            if reader.is_encrypted:
                if password:
                    if not reader.decrypt(password):
                        raise ValueError("Invalid password for encrypted PDF")
                else:
                    raise ValueError("PDF is encrypted but no password provided")

            total_pages = len(reader.pages)

            if page_range is None:
                start_page, end_page = 1, total_pages
            else:
                start_page, end_page = page_range

            # Validate page range
            start_page = max(1, start_page)
            end_page = min(total_pages, end_page)

            # Extract text from specified pages
            for page_num in range(start_page, end_page + 1):
                page = reader.pages[page_num - 1]  # Convert to 0-based index
                page_blocks = self._extract_page_text(page, page_num)
                all_text_blocks.extend(page_blocks)

            # Get PDF metadata
            pdf_metadata = self._get_pdf_metadata(reader)

            # Create source info
            source_info = {
                'file_path': str(input_path),
                'file_name': input_path.name,
                'file_size': input_path.stat().st_size,
                'engine': 'pypdf2',
                'total_pages': total_pages,
                'page_range': page_range,
                'is_encrypted': reader.is_encrypted,
                'pdf_metadata': pdf_metadata
            }

        # Post-process output
        output = self.postprocess_output(all_text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

extract_with_password

extract_with_password(input_path: Union[str, Path], password: str, **kwargs) -> TextOutput

Extract text from password-protected PDF.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input PDF

required
password str

Password for encrypted PDF

required
**kwargs

Additional parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def extract_with_password(
    self,
    input_path: Union[str, Path],
    password: str,
    **kwargs
) -> TextOutput:
    """Extract text from password-protected PDF.

    Args:
        input_path: Path to input PDF
        password: Password for encrypted PDF
        **kwargs: Additional parameters

    Returns:
        TextOutput containing extracted text
    """
    return self.extract(input_path, password=password, **kwargs)

get_supported_formats

get_supported_formats() -> List[str]

Get list of supported document formats.

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def get_supported_formats(self) -> List[str]:
    """Get list of supported document formats."""
    return ['.pdf']

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert PyPDF2 output to standardized TextOutput format.

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert PyPDF2 output to standardized TextOutput format."""
    text_blocks = raw_output  # raw_output is already a list of TextBlocks

    # Sort blocks by page and reading order
    text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))

    # Combine all text
    full_text = '\n\n'.join(block.text for block in text_blocks if block.text.strip())

    # Get metadata
    metadata = {
        'engine': 'pypdf2',
        'ignore_images': self.ignore_images,
        'extract_forms': self.extract_forms,
        'total_blocks': len(text_blocks)
    }

    # Make everything JSON serializable
    metadata = sanitize_for_json(metadata)
    source_info = sanitize_for_json(source_info)

    return TextOutput(
        text_blocks=text_blocks,
        full_text=full_text,
        metadata=metadata,
        source_info=source_info,
        page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
    )

preprocess_input

preprocess_input(input_path: Union[str, Path]) -> Path

Preprocess input document.

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def preprocess_input(self, input_path: Union[str, Path]) -> Path:
    """Preprocess input document."""
    input_path = Path(input_path)
    if not input_path.exists():
        raise FileNotFoundError(f"Input file not found: {input_path}")

    if input_path.suffix.lower() != '.pdf':
        raise ValueError(f"PyPDF2 only supports PDF files. Got: {input_path.suffix}")

    return input_path

PyPDF2TextMapper

PyPDF2TextMapper()

Bases: BaseTextMapper

Mapper for PyPDF2 text extraction output.

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def __init__(self):
    super().__init__("pypdf2")

sanitize_for_json

sanitize_for_json(obj: Any) -> Any

Recursively convert PyPDF2 objects (like IndirectObject) to JSON-serializable types.

Parameters:

Name Type Description Default
obj Any

Input object that might contain non-serializable types

required

Returns:

Type Description
Any

JSON-serializable version of the input object

Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
def sanitize_for_json(obj: Any) -> Any:
    """
    Recursively convert PyPDF2 objects (like IndirectObject) to JSON-serializable types.

    Args:
        obj: Input object that might contain non-serializable types

    Returns:
        JSON-serializable version of the input object
    """
    if obj is None:
        return None

    # Handle common collection types recursively
    if isinstance(obj, dict):
        return {k: sanitize_for_json(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [sanitize_for_json(item) for item in obj]
    elif isinstance(obj, tuple):
        return tuple(sanitize_for_json(item) for item in obj)

    # Try to determine if this is a PyPDF2 IndirectObject or similar custom type
    # that's not JSON-serializable
    try:
        # This will work for built-in types that are JSON-serializable
        if isinstance(obj, (str, int, float, bool)):
            return obj

        # Check if it's a custom class from PyPDF2
        class_name = obj.__class__.__name__
        if "PyPDF2" in str(obj.__class__) or class_name in [
            "IndirectObject", "DictionaryObject", "ArrayObject", 
            "PdfObject", "NullObject", "NameObject"
        ]:
            return str(obj)

        # If we got here, it might be a normal object, let's try to serialize it
        return obj
    except Exception:
        # If all else fails, convert to string
        return str(obj)

omnidocs.tasks.text_extraction.extractors.pdftext

PdftextTextExtractor

PdftextTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, keep_layout: bool = False, physical_layout: bool = False)

Bases: BaseTextExtractor

Text extractor using pdftext.

Initialize pdftext text extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run on (not used for pdftext)

None
show_log bool

Whether to show detailed logs

False
extract_images bool

Whether to extract images alongside text

False
keep_layout bool

Whether to keep original layout formatting

False
physical_layout bool

Whether to use physical layout analysis

False
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             extract_images: bool = False,
             keep_layout: bool = False,
             physical_layout: bool = False):
    """Initialize pdftext text extractor.

    Args:
        device: Device to run on (not used for pdftext)
        show_log: Whether to show detailed logs
        extract_images: Whether to extract images alongside text
        keep_layout: Whether to keep original layout formatting
        physical_layout: Whether to use physical layout analysis
    """
    super().__init__(device, show_log, "pdftext", extract_images)
    self.keep_layout = keep_layout
    self.physical_layout = physical_layout
    self._label_mapper = PdftextTextMapper()
    self._load_model()

extract

extract(input_path: Union[str, Path], **kwargs) -> TextOutput

Extract text from PDF using pdftext.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input PDF

required
**kwargs

Additional parameters (ignored for pdftext)

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def extract(
    self,
    input_path: Union[str, Path],
    **kwargs
) -> TextOutput:
    """Extract text from PDF using pdftext.

    Args:
        input_path: Path to input PDF
        **kwargs: Additional parameters (ignored for pdftext)

    Returns:
        TextOutput containing extracted text
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}")

    try:
        # Extract text blocks
        text_blocks = self._extract_text_by_page(input_path)

        # Create source info
        source_info = {
            'file_path': str(input_path),
            'file_name': input_path.name,
            'file_size': input_path.stat().st_size,
            'engine': 'pdftext'
        }

        # Post-process output
        output = self.postprocess_output(text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

extract_from_pages

extract_from_pages(input_path: Union[str, Path], page_range: Optional[tuple] = None, **kwargs) -> TextOutput

Extract text from specific pages.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input PDF

required
page_range Optional[tuple]

Optional tuple of (start_page, end_page) (1-based, inclusive)

None
**kwargs

Additional parameters

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text from specified pages

Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def extract_from_pages(
    self,
    input_path: Union[str, Path],
    page_range: Optional[tuple] = None,
    **kwargs
) -> TextOutput:
    """Extract text from specific pages.

    Args:
        input_path: Path to input PDF
        page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
        **kwargs: Additional parameters

    Returns:
        TextOutput containing extracted text from specified pages
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}, pages {page_range}")

    try:
        text_blocks = []

        if page_range is None:
            # Extract all pages
            text_blocks = self._extract_text_by_page(input_path)
        else:
            start_page, end_page = page_range

            for page_num in range(start_page, end_page + 1):
                try:
                    page_text = pdftext.pdf_text(
                        str(input_path), 
                        page_num=page_num,
                        keep_layout=self.keep_layout,
                        physical_layout=self.physical_layout
                    )

                    if page_text and page_text.strip():
                        paragraphs = page_text.split('\n\n')

                        for para_idx, paragraph in enumerate(paragraphs):
                            if paragraph.strip():
                                block = TextBlock(
                                    text=paragraph.strip(),
                                    bbox=None,
                                    confidence=1.0,
                                    page_num=page_num,
                                    block_type='paragraph',
                                    reading_order=para_idx
                                )
                                text_blocks.append(block)

                except Exception as e:
                    logger.warning(f"Error extracting page {page_num}: {str(e)}")
                    continue

        # Create source info
        source_info = {
            'file_path': str(input_path),
            'file_name': input_path.name,
            'file_size': input_path.stat().st_size,
            'engine': 'pdftext',
            'page_range': page_range
        }

        # Post-process output
        output = self.postprocess_output(text_blocks, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

get_supported_formats

get_supported_formats() -> List[str]

Get list of supported document formats.

Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def get_supported_formats(self) -> List[str]:
    """Get list of supported document formats."""
    return ['.pdf']

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert pdftext output to standardized TextOutput format.

Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert pdftext output to standardized TextOutput format."""
    text_blocks = raw_output  # raw_output is already a list of TextBlocks

    # Sort blocks by page and reading order
    text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))

    # Combine all text
    full_text = '\n\n'.join(block.text for block in text_blocks if block.text.strip())

    # Get metadata
    metadata = {
        'engine': 'pdftext',
        'keep_layout': self.keep_layout,
        'physical_layout': self.physical_layout,
        'total_blocks': len(text_blocks)
    }

    return TextOutput(
        text_blocks=text_blocks,
        full_text=full_text,
        metadata=metadata,
        source_info=source_info,
        page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
    )

preprocess_input

preprocess_input(input_path: Union[str, Path]) -> Path

Preprocess input document.

Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def preprocess_input(self, input_path: Union[str, Path]) -> Path:
    """Preprocess input document."""
    input_path = Path(input_path)
    if not input_path.exists():
        raise FileNotFoundError(f"Input file not found: {input_path}")

    if input_path.suffix.lower() != '.pdf':
        raise ValueError(f"pdftext only supports PDF files. Got: {input_path.suffix}")

    return input_path

PdftextTextMapper

PdftextTextMapper()

Bases: BaseTextMapper

Mapper for pdftext text extraction output.

Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
def __init__(self):
    super().__init__("pdftext")

omnidocs.tasks.text_extraction.extractors.surya_text

SuryaTextExtractor

SuryaTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, model_path: Optional[Union[str, Path]] = None, **kwargs)

Bases: BaseTextExtractor

Surya-based text extraction implementation for images and documents.

Initialize Surya Text Extractor.

Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    extract_images: bool = False,
    model_path: Optional[Union[str, Path]] = None,
    **kwargs
):
    """Initialize Surya Text Extractor."""
    super().__init__(device=device, show_log=show_log, engine_name='surya', extract_images=extract_images)

    self._label_mapper = SuryaTextMapper()

    if self.show_log:
        logger.info("Initializing SuryaTextExtractor")

    # Set device if specified, otherwise use default from parent
    if device:
        self.device = device

    if self.show_log:
        logger.info(f"Using device: {self.device}")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "surya_text"

    self.model_path = Path(model_path)

    # Check dependencies and load model
    self._check_dependencies()
    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TextOutput

Extract text using Surya OCR.

Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TextOutput:
    """Extract text using Surya OCR."""
    start_time = time.time()

    try:
        # Preprocess input
        images = self.preprocess_input(input_path)

        predictions = []

        for img in images:
            # Run text detection and recognition
            try:
                from surya.common.surya.schema import TaskNames

                # Use recognition predictor for text extraction
                prediction = self.rec_predictor(
                    [img],
                    task_names=[TaskNames.ocr_with_boxes],
                    det_predictor=self.det_predictor,
                    math_mode=False  # Standard text mode
                )

                if prediction and len(prediction) > 0:
                    predictions.append(prediction[0])

            except Exception as e:
                if self.show_log:
                    logger.warning(f"Error processing image with Surya: {e}")
                continue

        # Prepare source info
        source_info = {
            'source_path': str(input_path) if not isinstance(input_path, Image.Image) else 'PIL_Image',
            'num_images': len(images),
            'processing_time': time.time() - start_time
        }

        # Convert to standardized format
        result = self.postprocess_output({
            'predictions': predictions,
            'processing_info': {
                'total_images': len(images),
                'successful_predictions': len(predictions)
            }
        }, source_info)

        if self.show_log:
            logger.info(f"Extracted {len(result.text_blocks)} text blocks using Surya")

        return result

    except Exception:
        if self.show_log:
            logger.error("Error during Surya text extraction", exc_info=True)
        raise

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert Surya output to standardized TextOutput format.

Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert Surya output to standardized TextOutput format."""
    text_blocks = []
    full_text_parts = []

    if 'predictions' in raw_output:
        for page_idx, prediction in enumerate(raw_output['predictions']):
            if hasattr(prediction, 'text_lines'):
                for line_idx, text_line in enumerate(prediction.text_lines):
                    # Create text block
                    block = TextBlock(
                        text=text_line.text.strip(),
                        bbox=text_line.bbox if hasattr(text_line, 'bbox') else None,
                        confidence=getattr(text_line, 'confidence', 1.0),
                        page_num=page_idx + 1,
                        block_type='text_line',
                        reading_order=line_idx
                    )
                    text_blocks.append(block)
                    full_text_parts.append(text_line.text.strip())

    # Build metadata
    metadata = {
        'engine': 'surya',
        'total_blocks': len(text_blocks),
        'processing_info': raw_output.get('processing_info', {})
    }

    if source_info:
        metadata.update(source_info)

    return TextOutput(
        text_blocks=text_blocks,
        full_text='\n'.join(full_text_parts),
        metadata=metadata,
        source_info=source_info,
        page_count=len(raw_output.get('predictions', []))
    )

preprocess_input

preprocess_input(input_path: Union[str, Path, Image]) -> List[Image.Image]

Preprocess input for Surya text extraction.

Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
def preprocess_input(self, input_path: Union[str, Path, Image.Image]) -> List[Image.Image]:
    """Preprocess input for Surya text extraction."""
    if isinstance(input_path, Image.Image):
        return [input_path.convert("RGB")]
    elif isinstance(input_path, (str, Path)):
        # Handle image files
        if str(input_path).lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp')):
            image = Image.open(input_path).convert("RGB")
            return [image]
        else:
            # For PDF files, we'd need to convert to images first
            # This is a simplified implementation - you might want to use pdf2image
            raise ValueError(f"Unsupported file type: {input_path}. Surya text extractor works with images.")
    else:
        raise ValueError("Unsupported input type for Surya text extractor")

SuryaTextMapper

SuryaTextMapper()

Bases: BaseTextMapper

Label mapper for Surya text model output.

Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
def __init__(self):
    super().__init__('surya')

omnidocs.tasks.text_extraction.extractors.docling_parse

DoclingTextExtractor

DoclingTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, ocr_enabled: bool = True, table_structure_enabled: bool = True)

Bases: BaseTextExtractor

Text extractor using Docling.

Initialize Docling text extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run on (not used for Docling)

None
show_log bool

Whether to show detailed logs

False
extract_images bool

Whether to extract images alongside text

False
ocr_enabled bool

Whether to enable OCR for scanned documents

True
table_structure_enabled bool

Whether to enable table structure detection

True
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             extract_images: bool = False,
             ocr_enabled: bool = True,
             table_structure_enabled: bool = True):
    """Initialize Docling text extractor.

    Args:
        device: Device to run on (not used for Docling)
        show_log: Whether to show detailed logs
        extract_images: Whether to extract images alongside text
        ocr_enabled: Whether to enable OCR for scanned documents
        table_structure_enabled: Whether to enable table structure detection
    """
    super().__init__(device, show_log, "docling", extract_images)
    self.ocr_enabled = ocr_enabled
    self.table_structure_enabled = table_structure_enabled
    self._label_mapper = DoclingTextMapper()
    self._load_model()

extract

extract(input_path: Union[str, Path], **kwargs) -> TextOutput

Extract text from document using Docling.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to input document

required
**kwargs

Additional parameters (ignored for Docling)

{}

Returns:

Type Description
TextOutput

TextOutput containing extracted text

Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
def extract(
    self,
    input_path: Union[str, Path],
    **kwargs
) -> TextOutput:
    """Extract text from document using Docling.

    Args:
        input_path: Path to input document
        **kwargs: Additional parameters (ignored for Docling)

    Returns:
        TextOutput containing extracted text
    """
    start_time = time.time()

    # Preprocess input
    input_path = self.preprocess_input(input_path)

    if self.show_log:
        logger.info(f"Extracting text from {input_path}")

    try:
        # Convert document
        result = self.model.convert(input_path)

        # Create source info
        source_info = {
            'file_path': str(input_path),
            'file_name': input_path.name,
            'file_size': input_path.stat().st_size,
            'engine': 'docling'
        }

        # Post-process output
        output = self.postprocess_output(result, source_info)
        output.processing_time = time.time() - start_time

        if self.show_log:
            logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")

        return output

    except Exception as e:
        logger.error(f"Error extracting text from {input_path}: {str(e)}")
        raise

get_supported_formats

get_supported_formats() -> List[str]

Get list of supported document formats.

Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
def get_supported_formats(self) -> List[str]:
    """Get list of supported document formats."""
    return ['.pdf', '.docx', '.pptx', '.html', '.md']

postprocess_output

postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput

Convert Docling output to standardized TextOutput format.

Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
    """Convert Docling output to standardized TextOutput format."""
    text_blocks = []

    # Process document elements
    for element in raw_output.document.texts:
        # Get bounding box if available
        bbox = None
        if hasattr(element, 'prov') and element.prov:
            for prov in element.prov:
                if hasattr(prov, 'bbox'):
                    bbox = [prov.bbox.l, prov.bbox.t, prov.bbox.r, prov.bbox.b]
                    break

        # Get page number
        page_num = 1
        if hasattr(element, 'prov') and element.prov:
            for prov in element.prov:
                if hasattr(prov, 'page'):
                    page_num = prov.page + 1  # Convert to 1-based
                    break

        # Create text block
        block = TextBlock(
            text=element.text,
            bbox=bbox,
            confidence=1.0,  # Docling doesn't provide confidence scores
            page_num=page_num,
            block_type=self._label_mapper.normalize_block_type(element.label),
            reading_order=getattr(element, 'reading_order', None)
        )
        text_blocks.append(block)

    # Sort blocks by reading order
    text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))

    # Combine all text
    full_text = '\n\n'.join(block.text for block in text_blocks)

    # Get metadata
    metadata = {
        'engine': 'docling',
        'ocr_enabled': self.ocr_enabled,
        'table_structure_enabled': self.table_structure_enabled,
        'total_elements': len(text_blocks)
    }

    return TextOutput(
        text_blocks=text_blocks,
        full_text=full_text,
        metadata=metadata,
        source_info=source_info,
        page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
    )

preprocess_input

preprocess_input(input_path: Union[str, Path]) -> Path

Preprocess input document.

Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
def preprocess_input(self, input_path: Union[str, Path]) -> Path:
    """Preprocess input document."""
    input_path = Path(input_path)
    if not input_path.exists():
        raise FileNotFoundError(f"Input file not found: {input_path}")

    supported_formats = ['.pdf', '.docx', '.pptx', '.html', '.md']
    if input_path.suffix.lower() not in supported_formats:
        raise ValueError(f"Unsupported format: {input_path.suffix}. Supported: {supported_formats}")

    return input_path

DoclingTextMapper

DoclingTextMapper()

Bases: BaseTextMapper

Mapper for Docling text extraction output.

Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
def __init__(self):
    super().__init__("docling")

πŸ”’ Math Expression Extraction

Recognize and extract LaTeX math expressions from images and PDFs.

omnidocs.tasks.math_expression_extraction

Math expression extraction module for OmniDocs.

This module provides base classes and implementations for mathematical expression extraction and LaTeX recognition from images and documents.

BaseLatexExtractor

BaseLatexExtractor(device: Optional[str] = None, show_log: bool = False)

Bases: ABC

Base class for LaTeX expression extraction models.

Initialize the LaTeX extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run model on ('cuda' or 'cpu')

None
show_log bool

Whether to show detailed logs

False
Source code in omnidocs/tasks/math_expression_extraction/base.py
def __init__(self, device: Optional[str] = None, show_log: bool = False):
    """Initialize the LaTeX extractor.

    Args:
        device: Device to run model on ('cuda' or 'cpu')
        show_log: Whether to show detailed logs
    """
    self.show_log = show_log
    self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
    self.model = None
    self.model_path = None
    self._label_mapper: Optional[BaseLatexMapper] = None

    if self.show_log:
        logger.info(f"Initializing {self.__class__.__name__}")
        logger.info(f"Using device: {self.device}")

label_mapper property

label_mapper: BaseLatexMapper

Get the label mapper for this extractor.

extract abstractmethod

extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput

Extract LaTeX expressions from input image.

Parameters:

Name Type Description Default
input_path Union[str, Path, Image]

Path to input image or image data

required
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
LatexOutput

LatexOutput containing extracted expressions

Source code in omnidocs/tasks/math_expression_extraction/base.py
@abstractmethod
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> LatexOutput:
    """Extract LaTeX expressions from input image.

    Args:
        input_path: Path to input image or image data
        **kwargs: Additional model-specific parameters

    Returns:
        LatexOutput containing extracted expressions
    """
    pass

extract_all

extract_all(input_paths: List[Union[str, Path, Image]], **kwargs) -> List[LatexOutput]

Extract LaTeX from multiple images.

Parameters:

Name Type Description Default
input_paths List[Union[str, Path, Image]]

List of image paths or image data

required
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
List[LatexOutput]

List of LatexOutput objects

Source code in omnidocs/tasks/math_expression_extraction/base.py
def extract_all(
    self,
    input_paths: List[Union[str, Path, Image.Image]],
    **kwargs
) -> List[LatexOutput]:
    """Extract LaTeX from multiple images.

    Args:
        input_paths: List of image paths or image data
        **kwargs: Additional model-specific parameters

    Returns:
        List of LatexOutput objects
    """
    results = []
    for input_path in input_paths:
        try:
            result = self.extract(input_path, **kwargs)
            results.append(result)
        except Exception as e:
            if self.show_log:
                logger.error(f"Error processing {input_path}: {str(e)}")
            raise
    return results

map_expression

map_expression(expression: str) -> str

Map model-specific LaTeX to standardized format.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def map_expression(self, expression: str) -> str:
    """Map model-specific LaTeX to standardized format."""
    if self._label_mapper is None:
        return expression
    return self._label_mapper.to_standard(expression)

preprocess_input

preprocess_input(input_path: Union[str, Path, Image, ndarray]) -> List[Image.Image]

Convert input to list of PIL Images.

Parameters:

Name Type Description Default
input_path Union[str, Path, Image, ndarray]

Input image path or image data

required

Returns:

Type Description
List[Image]

List of PIL Images

Source code in omnidocs/tasks/math_expression_extraction/base.py
def preprocess_input(self, input_path: Union[str, Path, Image.Image, np.ndarray]) -> List[Image.Image]:
    """Convert input to list of PIL Images.

    Args:
        input_path: Input image path or image data

    Returns:
        List of PIL Images
    """
    if isinstance(input_path, (str, Path)):
        image = Image.open(input_path).convert('RGB')
        return [image]
    elif isinstance(input_path, Image.Image):
        return [input_path.convert('RGB')]
    elif isinstance(input_path, np.ndarray):
        return [Image.fromarray(cv2.cvtColor(input_path, cv2.COLOR_BGR2RGB))]
    else:
        raise ValueError(f"Unsupported input type: {type(input_path)}")

BaseLatexMapper

BaseLatexMapper()

Base class for mapping model-specific outputs to standardized format.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def __init__(self):
    self._mapping: Dict[str, str] = {}
    self._reverse_mapping: Dict[str, str] = {}
    self._setup_mapping()

from_standard

from_standard(standard_latex: str) -> str

Convert standardized LaTeX to model-specific format.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def from_standard(self, standard_latex: str) -> str:
    """Convert standardized LaTeX to model-specific format."""
    return self._reverse_mapping.get(standard_latex, standard_latex)

to_standard

to_standard(model_output: str) -> str

Convert model-specific LaTeX to standardized format.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def to_standard(self, model_output: str) -> str:
    """Convert model-specific LaTeX to standardized format."""
    return self._mapping.get(model_output, model_output)

LatexOutput

Bases: BaseModel

Container for extracted LaTeX expressions.

Attributes:

Name Type Description
expressions List[str]

List of extracted LaTeX expressions

confidences Optional[List[float]]

Optional confidence scores for each expression

bboxes Optional[List[List[float]]]

Optional bounding boxes for each expression

source_img_size Optional[Tuple[int, int]]

Optional tuple of source image dimensions

save_json

save_json(output_path: Union[str, Path]) -> None

Save output to JSON file.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def save_json(self, output_path: Union[str, Path]) -> None:
    """Save output to JSON file."""
    import json
    with open(output_path, 'w') as f:
        json.dump(self.to_dict(), f, indent=2)

to_dict

to_dict() -> Dict

Convert to dictionary representation.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def to_dict(self) -> Dict:
    """Convert to dictionary representation."""
    return {
        'expressions': self.expressions,
        'confidences': self.confidences,
        'bboxes': self.bboxes,
        'source_img_size': self.source_img_size
    }

omnidocs.tasks.math_expression_extraction.extractors.donut

DonutExtractor

DonutExtractor(device: Optional[str] = None, show_log: bool = False, model_name: str = 'naver-clova-ix/donut-base-finetuned-cord-v2', model_path: Optional[Union[str, Path]] = None, **kwargs)

Bases: BaseLatexExtractor

Donut (NAVER CLOVA) based expression extraction implementation.

Initialize Donut Extractor.

Source code in omnidocs/tasks/math_expression_extraction/extractors/donut.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_name: str = "naver-clova-ix/donut-base-finetuned-cord-v2",
    model_path: Optional[Union[str, Path]] = None,
    **kwargs
):
    """Initialize Donut Extractor."""
    super().__init__(device=device, show_log=show_log)

    self._label_mapper = DonutMapper()
    self.model_name = model_name

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "donut_models" / model_name.replace("/", "_")

    self.model_path = Path(model_path)

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self._model_exists():
        if self.show_log:
            logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
        self._download_model()

    try:
        self._load_model()
        if self.show_log:
            logger.success("Donut model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize Donut model", exc_info=True)
        raise

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput

Extract LaTeX expressions using Donut.

Source code in omnidocs/tasks/math_expression_extraction/extractors/donut.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> LatexOutput:
    """Extract LaTeX expressions using Donut."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)

        expressions = []
        for img in images:
            # Prepare image for Donut
            pixel_values = self.processor(img, return_tensors="pt").pixel_values
            pixel_values = pixel_values.to(self.device)

            # Prepare task prompt (adjust based on your specific task)
            task_prompt = "<s_cord-v2>"  # Default CORD v2 task(this is used for receipt/invoice parsing)
            decoder_input_ids = self.processor.tokenizer(
                task_prompt, 
                add_special_tokens=False, 
                return_tensors="pt" #returns pytorch tensor 
            ).input_ids
            decoder_input_ids = decoder_input_ids.to(self.device)

            # Generate
            with torch.no_grad():
                outputs = self.model.generate(
                    pixel_values,
                    decoder_input_ids=decoder_input_ids,
                    max_length=self.model.decoder.config.max_position_embeddings,
                    early_stopping=True,
                    pad_token_id=self.processor.tokenizer.pad_token_id,
                    eos_token_id=self.processor.tokenizer.eos_token_id,
                    use_cache=True,
                    num_beams=1,
                    bad_words_ids=[[self.processor.tokenizer.unk_token_id]],
                    return_dict_in_generate=True,
                )

            # Decode output
            #converts the generated token IDs back into a string
            sequence = self.processor.batch_decode(outputs.sequences)[0]
            #removes any pos and eos 
            sequence = sequence.replace(self.processor.tokenizer.eos_token, "").replace(self.processor.tokenizer.pad_token, "")
            #removes task prompt 
            sequence = sequence.replace(task_prompt, "")

            # Extract math content from JSON-like output
            math_content = self._extract_math_from_json(sequence)

            # Map to standard format
            mapped_expr = self.map_expression(math_content)
            expressions.append(mapped_expr)

        return LatexOutput(
            expressions=expressions,
            source_img_size=images[0].size if images else None
        )

    except Exception as e:
        logger.error("Error during Donut extraction", exc_info=True)
        raise

DonutMapper

DonutMapper()

Bases: BaseLatexMapper

Label mapper for Donut model output.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def __init__(self):
    self._mapping: Dict[str, str] = {}
    self._reverse_mapping: Dict[str, str] = {}
    self._setup_mapping()

omnidocs.tasks.math_expression_extraction.extractors.nougat

Nougat (Neural Optical Understanding for Academic Documents) LaTeX Expression Extractor

This module provides LaTeX expression extraction using Facebook's Nougat model via Hugging Face transformers.

NougatExtractor

NougatExtractor(model_type: str = 'small', device: Optional[str] = None, show_log: bool = False, model_path: Optional[str] = None, **kwargs)

Bases: BaseLatexExtractor

Nougat (Neural Optical Understanding for Academic Documents) based expression extraction.

Initialize Nougat Extractor.

Source code in omnidocs/tasks/math_expression_extraction/extractors/nougat.py
def __init__(
    self,
    model_type: str = "small",
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[str] = None,
    **kwargs
):
    """Initialize Nougat Extractor."""
    super().__init__(device=device, show_log=show_log)

    self._label_mapper = NougatMapper()
    self.model_type = model_type

    # Set default model path if not provided
    if model_path is None:
        model_path = _MODELS_DIR / f"nougat_{model_type}"
    self.model_path = Path(model_path)

    # Check dependencies
    self._check_dependencies()

    try:
        # Check if model exists locally, download if needed
        if not self._model_exists():
            if self.show_log:
                logger.info("Model not found locally, will download from Hugging Face")
            self._download_model()
        else:
            if self.show_log:
                logger.info("Model found locally, using that version")

        self._load_model()
        if self.show_log:
            logger.success("Nougat model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize Nougat model", exc_info=True)
        raise

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput

Extract LaTeX expressions using Nougat.

Source code in omnidocs/tasks/math_expression_extraction/extractors/nougat.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> LatexOutput:
    """Extract LaTeX expressions using Nougat."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)

        all_expressions = []
        for img in images:
            # Add padding to make it look more like a document page
            from PIL import ImageOps
            padded_image = ImageOps.expand(img, border=100, fill='white')

            # Process image with Nougat processor
            pixel_values = self.processor(padded_image, return_tensors="pt").pixel_values
            pixel_values = pixel_values.to(self.device)

            # Generate text using the model
            with torch.no_grad():
                outputs = self.model.generate(
                    pixel_values,
                    max_length=512,
                    num_beams=1,  # Use greedy decoding for faster inference
                    do_sample=False,
                    early_stopping=False
                )

            # Decode the generated text
            generated_text = self.processor.batch_decode(outputs, skip_special_tokens=True)[0]

            # Extract mathematical expressions from the text
            expressions = self._extract_math_expressions(generated_text)

            # Map expressions to standard format
            mapped_expressions = [self.map_expression(expr) for expr in expressions]
            all_expressions.extend(mapped_expressions)

        return LatexOutput(
            expressions=all_expressions,
            source_img_size=images[0].size if images else None
        )

    except Exception as e:
        logger.error("Error during Nougat extraction", exc_info=True)
        raise

NougatMapper

NougatMapper()

Bases: BaseLatexMapper

Label mapper for Nougat model output.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def __init__(self):
    self._mapping: Dict[str, str] = {}
    self._reverse_mapping: Dict[str, str] = {}
    self._setup_mapping()

omnidocs.tasks.math_expression_extraction.extractors.surya_math

SuryaMathExtractor

SuryaMathExtractor(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, **kwargs)

Bases: BaseLatexExtractor

Surya-based mathematical expression extraction implementation.

Initialize Surya Math Extractor.

Source code in omnidocs/tasks/math_expression_extraction/extractors/surya_math.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[Union[str, Path]] = None,
    **kwargs
):
    """Initialize Surya Math Extractor."""
    super().__init__(device=device, show_log=show_log)

    self._label_mapper = SuryaMathMapper()

    if self.show_log:
        logger.info("Initializing SuryaMathExtractor")

    # Set device if specified, otherwise use default from parent
    if device:
        self.device = device

    if self.show_log:
        logger.info(f"Using device: {self.device}")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "surya_math"

    self.model_path = Path(model_path)

    # Check dependencies and load model
    self._check_dependencies()
    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput

Extract LaTeX expressions using Surya.

Source code in omnidocs/tasks/math_expression_extraction/extractors/surya_math.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> LatexOutput:
    """Extract LaTeX expressions using Surya."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)

        expressions = []
        confidences = []
        bboxes = []

        for img in images:
            # Convert PIL to RGB if needed
            if isinstance(img, Image.Image):
                img_rgb = img.convert("RGB")
            else:
                img_rgb = Image.fromarray(img).convert("RGB")

            # Run math detection and recognition
            try:
                # Import TaskNames for proper task specification
                from surya.common.surya.schema import TaskNames

                # Use recognition predictor with math mode enabled
                predictions = self.rec_predictor(
                    [img_rgb],
                    task_names=[TaskNames.ocr_with_boxes],
                    det_predictor=self.det_predictor,
                    math_mode=True  # Enable math mode for LaTeX output
                )

                # Process predictions
                if predictions and len(predictions) > 0:
                    prediction = predictions[0]

                    # Extract text regions that contain math
                    for text_line in prediction.text_lines:
                        text_content = text_line.text.strip()

                        # Check if this looks like math content
                        if self._is_math_content(text_content):
                            # Map to standard format
                            mapped_expr = self.map_expression(text_content)
                            expressions.append(mapped_expr)

                            # Add confidence if available
                            if hasattr(text_line, 'confidence'):
                                confidences.append(text_line.confidence)
                            else:
                                confidences.append(1.0)

                            # Add bounding box if available
                            if hasattr(text_line, 'bbox'):
                                bboxes.append(text_line.bbox)
                            else:
                                bboxes.append([0, 0, img_rgb.width, img_rgb.height])

            except Exception as e:
                if self.show_log:
                    logger.warning(f"Error processing image with Surya: {e}")
                # Fallback: return empty result for this image
                continue

        return LatexOutput(
            expressions=expressions,
            confidences=confidences if confidences else None,
            bboxes=bboxes if bboxes else None,
            source_img_size=images[0].size if images else None
        )

    except Exception as e:
        if self.show_log:
            logger.error("Error during Surya math extraction", exc_info=True)
        raise

SuryaMathMapper

SuryaMathMapper()

Bases: BaseLatexMapper

Label mapper for Surya math model output.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def __init__(self):
    self._mapping: Dict[str, str] = {}
    self._reverse_mapping: Dict[str, str] = {}
    self._setup_mapping()

omnidocs.tasks.math_expression_extraction.extractors.unimernet

UniMERNet (Universal Mathematical Expression Recognition Network) extractor for LaTeX expressions.

UniMERNetExtractor

UniMERNetExtractor(model_path: Optional[str] = None, cfg_path: Optional[str] = None, device: Optional[str] = None, show_log: bool = False, **kwargs)

Bases: BaseLatexExtractor

UniMERNet (Universal Mathematical Expression Recognition Network) based expression extraction.

Initialize UniMERNet Extractor.

Source code in omnidocs/tasks/math_expression_extraction/extractors/unimernet.py
def __init__(
    self,
    model_path: Optional[str] = None,
    cfg_path: Optional[str] = None,
    device: Optional[str] = None,
    show_log: bool = False,
    **kwargs
):
    """Initialize UniMERNet Extractor."""
    super().__init__(device=device, show_log=show_log)

    self._label_mapper = UniMERNetMapper()

    # Set default paths
    if model_path is None:
        model_path = "omnidocs/models/unimernet_base"
    if cfg_path is None:
        cfg_path = str(Path(__file__).parent / "UniMERNet" / "configs" / "demo.yaml")

    self.model_path = Path(model_path)
    self.cfg_path = Path(cfg_path)

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self.model_path.exists():
        self._download_model()

    try:
        self._load_model()
        if self.show_log:
            logger.success("UniMERNet model initialized successfully")
    except Exception as e:
        logger.error("Failed to initialize UniMERNet model", exc_info=True)
        raise

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput

Extract LaTeX expressions using UniMERNet.

Source code in omnidocs/tasks/math_expression_extraction/extractors/unimernet.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> LatexOutput:
    """Extract LaTeX expressions using UniMERNet."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)

        expressions = []
        for img in images:
            # Process image with UniMERNet
            image_tensor = self.vis_processor(img).unsqueeze(0).to(self.device)

            # Generate LaTeX
            with torch.no_grad():
                output = self.model.generate({"image": image_tensor})
                pred = output["pred_str"][0]

            # Map to standard format
            mapped_expr = self.map_expression(pred)
            expressions.append(mapped_expr)

        return LatexOutput(
            expressions=expressions,
            source_img_size=images[0].size if images else None
        )

    except Exception as e:
        logger.error("Error during UniMERNet extraction", exc_info=True)
        raise

UniMERNetMapper

UniMERNetMapper()

Bases: BaseLatexMapper

Label mapper for UniMERNet model output.

Source code in omnidocs/tasks/math_expression_extraction/base.py
def __init__(self):
    self._mapping: Dict[str, str] = {}
    self._reverse_mapping: Dict[str, str] = {}
    self._setup_mapping()

πŸ–Ή OCR (Optical Character Recognition)

Extract text from scanned documents and images using OCR models.

omnidocs.tasks.ocr_extraction.extractors.paddle

PaddleOCRExtractor

PaddleOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, use_angle_cls: bool = True, use_gpu: bool = True, drop_score: float = 0.5, model_path: Optional[str] = None, **kwargs)

Bases: BaseOCRExtractor

PaddleOCR based text extraction implementation.

Initialize PaddleOCR Extractor.

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    languages: Optional[List[str]] = None,
    use_angle_cls: bool = True,
    use_gpu: bool = True,
    drop_score: float = 0.5,
    model_path: Optional[str] = None,
    **kwargs
):
    """Initialize PaddleOCR Extractor."""
    super().__init__(
        device=device, 
        show_log=show_log, 
        languages=languages or ['en'],
        engine_name='paddle'
    )

    self.use_angle_cls = use_angle_cls
    self.use_gpu = use_gpu
    self.drop_score = drop_score
    self._label_mapper = PaddleOCRMapper()

    # Set default paths
    if model_path is None:
        model_path = "omnidocs/models/paddleocr"
    self.model_path = Path(model_path)

    # Check dependencies first
    self._check_dependencies()

    # Set up model directory and download if needed
    if self.model_path.exists() and any(self.model_path.iterdir()):
        if self.show_log:
            logger.info(f"Using existing PaddleOCR models from: {self.model_path}")
    elif not self.model_path.exists():
        self._download_model()

    # Load model
    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> OCROutput

Extract text using PaddleOCR.

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> OCROutput:
    """Extract text using PaddleOCR."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        img = images[0]

        # Convert PIL to cv2 format if needed
        if isinstance(img, Image.Image):
            img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)

        # Perform OCR
        result = self.model.ocr(img, cls=self.use_angle_cls)

        # Convert to standardized format
        texts = self._process_ocr_results(result)
        full_text_parts = [text.text for text in texts]

        img_size = img.shape[:2][::-1]  # (width, height)

        ocr_output = OCROutput(
            texts=texts,
            full_text=' '.join(full_text_parts),
            source_img_size=img_size
        )

        if self.show_log:
            logger.info(f"Extracted {len(texts)} text regions")

        return ocr_output

    except Exception as e:
        logger.error("Error during PaddleOCR extraction", exc_info=True)
        return OCROutput(
            texts=[],
            full_text="",
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

predict

predict(img, **kwargs)

Predict method for compatibility with original interface.

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def predict(self, img, **kwargs):
    """Predict method for compatibility with original interface."""
    try:
        result = self.extract(img, **kwargs)

        # Convert to original format
        ocr_res = []
        for text_obj in result.texts:
            # Convert bbox back to points format
            x0, y0, x1, y1 = text_obj.bbox
            points = [[x0, y0], [x1, y0], [x1, y1], [x0, y1]]
            poly = [coord for point in points for coord in point]

            ocr_res.append({
                "category_type": "text",
                'poly': poly,
                'score': text_obj.confidence,
                'text': text_obj.text,
            })

        return ocr_res

    except Exception as e:
        logger.error("Error during prediction", exc_info=True)
        return []

preprocess_image

preprocess_image(image, alpha_color=(255, 255, 255), inv=False, bin=False)

Preprocess image for OCR.

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def preprocess_image(self, image, alpha_color=(255, 255, 255), inv=False, bin=False):
    """Preprocess image for OCR."""
    image = alpha_to_color(image, alpha_color)
    if inv:
        image = cv2.bitwise_not(image)
    if bin:
        image = binarize_img(image)
    return image

PaddleOCRMapper

PaddleOCRMapper()

Bases: BaseOCRMapper

Label mapper for PaddleOCR model output.

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def __init__(self):
    super().__init__('paddleocr')
    self._mapping = {
        'en': 'en',
        'ch': 'ch',
        'chinese_cht': 'chinese_cht',
        'ta': 'ta',
        'te': 'te',
        'ka': 'ka',
        'ja': 'japan',
        'ko': 'korean',
        'hi': 'hi',
        'ar': 'ar',
        'cyrillic': 'cyrillic',
        'devanagari': 'devanagari',
        'fr': 'fr',
        'de': 'german',
        'es': 'es',
        'pt': 'pt',
        'ru': 'ru',
        'it': 'it',
    }
    self._reverse_mapping = {v: k for k, v in self._mapping.items()}

alpha_to_color

alpha_to_color(img, alpha_color=(255, 255, 255))

Convert transparent pixels to specified color.

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def alpha_to_color(img, alpha_color=(255, 255, 255)):
    """Convert transparent pixels to specified color."""
    if len(img.shape) == 4:  # RGBA
        alpha_channel = img[:, :, 3]
        rgb_channels = img[:, :, :3]
        transparent_mask = alpha_channel == 0

        for i in range(3):
            rgb_channels[:, :, i][transparent_mask] = alpha_color[i]

        return rgb_channels
    return img

binarize_img

binarize_img(img)

Convert image to binary (black and white).

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def binarize_img(img):
    """Convert image to binary (black and white)."""
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if len(img.shape) == 3 else img
    _, binary = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
    return cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)

points_to_bbox

points_to_bbox(points)

Change polygon(shape: N * 8) to bbox(shape: N * 4).

Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
def points_to_bbox(points):
    """Change polygon(shape: N * 8) to bbox(shape: N * 4)."""
    x_coords = [p[0] for p in points]
    y_coords = [p[1] for p in points]
    return [min(x_coords), min(y_coords), max(x_coords), max(y_coords)]

omnidocs.tasks.ocr_extraction.extractors.tesseract_ocr

TesseractOCRExtractor

TesseractOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, psm: int = 6, oem: int = 3, config: str = '', **kwargs)

Bases: BaseOCRExtractor

Tesseract OCR based text extraction implementation.

Initialize Tesseract OCR Extractor.

Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    languages: Optional[List[str]] = None,
    psm: int = 6,
    oem: int = 3,
    config: str = "",
    **kwargs
):
    """Initialize Tesseract OCR Extractor."""
    super().__init__(
        device=device, 
        show_log=show_log, 
        languages=languages or ['en'],
        engine_name='tesseract'
    )

    self.psm = psm  # Page segmentation mode
    self.oem = oem  # OCR engine mode
    self.config = config
    self._label_mapper = TesseractOCRMapper()

    try:
        import pytesseract
        from pytesseract import Output
        self.pytesseract = pytesseract
        self.Output = Output

        # Set Tesseract executable path for Windows
        import os
        tesseract_paths = [
            r"C:\Program Files\Tesseract-OCR\tesseract.exe",
            r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe",
            r"C:\Users\{}\AppData\Local\Tesseract-OCR\tesseract.exe".format(os.getenv('USERNAME', '')),
        ]

        for path in tesseract_paths:
            if os.path.exists(path):
                pytesseract.pytesseract.tesseract_cmd = path
                if self.show_log:
                    logger.info(f"Found Tesseract at: {path}")
                break
        else:
            # Try to find in PATH
            import shutil
            tesseract_cmd = shutil.which('tesseract')
            if tesseract_cmd:
                pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
                if self.show_log:
                    logger.info(f"Found Tesseract in PATH: {tesseract_cmd}")

    except ImportError as e:
        logger.error("Failed to import pytesseract")
        raise ImportError(
            "pytesseract is not available. Please install it with: pip install pytesseract"
        ) from e

    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> OCROutput

Extract text using Tesseract OCR.

Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> OCROutput:
    """Extract text using Tesseract OCR."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        img = images[0]

        # Convert PIL to numpy array
        img_array = np.array(img)

        # Run OCR with detailed output
        raw_output = self.pytesseract.image_to_data(
            img_array,
            lang=self.lang_string,
            config=self.tesseract_config,
            output_type=self.Output.DICT
        )

        # Convert to standardized format
        result = self.postprocess_output(raw_output, img.size)

        if self.show_log:
            logger.info(f"Extracted {len(result.texts)} text regions")

        return result

    except Exception as e:
        logger.error("Error during Tesseract extraction", exc_info=True)
        return OCROutput(
            texts=[],
            full_text="",
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

postprocess_output

postprocess_output(raw_output: dict, img_size: Tuple[int, int]) -> OCROutput

Convert Tesseract output to standardized OCROutput format.

Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
def postprocess_output(self, raw_output: dict, img_size: Tuple[int, int]) -> OCROutput:
    """Convert Tesseract output to standardized OCROutput format."""
    texts = []
    full_text_parts = []

    n_boxes = len(raw_output['text'])

    for i in range(n_boxes):
        text = raw_output['text'][i].strip()

        if not text:
            continue

        confidence = float(raw_output['conf'][i])

        if confidence < 0:
            continue

        x = int(raw_output['left'][i])
        y = int(raw_output['top'][i])
        w = int(raw_output['width'][i])
        h = int(raw_output['height'][i])
        bbox = [float(x), float(y), float(x + w), float(y + h)]

        # Create polygon from bbox
        polygon = [[float(x), float(y)], [float(x + w), float(y)], 
                   [float(x + w), float(y + h)], [float(x), float(y + h)]]

        detected_lang = self.detect_text_language(text)

        ocr_text = OCRText(
            text=text,
            confidence=confidence / 100.0,
            bbox=bbox,
            polygon=polygon,
            language=detected_lang,
            reading_order=i
        )

        texts.append(ocr_text)
        full_text_parts.append(text)

    return OCROutput(
        texts=texts,
        full_text=' '.join(full_text_parts),
        source_img_size=img_size
    )

TesseractOCRMapper

TesseractOCRMapper()

Bases: BaseOCRMapper

Label mapper for Tesseract OCR model output.

Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
def __init__(self):
    super().__init__('tesseract')
    self._setup_mapping()

omnidocs.tasks.ocr_extraction.extractors.easy_ocr

EasyOCRExtractor

EasyOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, gpu: bool = True, **kwargs)

Bases: BaseOCRExtractor

EasyOCR based text extraction implementation.

Initialize EasyOCR Extractor.

Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    languages: Optional[List[str]] = None,
    gpu: bool = True,
    **kwargs
):
    """Initialize EasyOCR Extractor."""
    super().__init__(
        device=device, 
        show_log=show_log, 
        languages=languages or ['en'],
        engine_name='easyocr'
    )

    self.gpu = gpu 
    self._label_mapper = EasyOCRMapper()

    # Set default model path
    self.model_path = Path("omnidocs/models/easyocr")

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self.model_path.exists():
        self._download_model()

    self._load_model()

extract

extract(input_path: Union[str, Path, Image], detail: int = 1, paragraph: bool = False, width_ths: float = 0.7, height_ths: float = 0.7, **kwargs) -> OCROutput

Extract text using EasyOCR.

Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    detail: int = 1,  # Changed default to 1 for bbox and confidence
    paragraph: bool = False,
    width_ths: float = 0.7,
    height_ths: float = 0.7,
    **kwargs
) -> OCROutput:
    """Extract text using EasyOCR."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        img = images[0]

        # Convert PIL to numpy array
        img_array = np.array(img)

        # Run OCR
        raw_output = self.model.readtext(
            img_array,
            detail=detail,
            paragraph=paragraph,
            width_ths=width_ths,
            height_ths=height_ths,
            **kwargs
        )

        # Convert to standardized format
        result = self.postprocess_output(raw_output, img.size)

        if self.show_log:
            logger.info(f"Extracted {len(result.texts)} text regions")

        return result

    except Exception as e:
        logger.error("Error during EasyOCR extraction", exc_info=True)
        return OCROutput(
            texts=[],
            full_text="",
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

postprocess_output

postprocess_output(raw_output: List, img_size: Tuple[int, int]) -> OCROutput

Convert EasyOCR output to standardized OCROutput format.

Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
def postprocess_output(self, raw_output: List, img_size: Tuple[int, int]) -> OCROutput:
    """Convert EasyOCR output to standardized OCROutput format."""
    texts = []
    full_text_parts = []

    for i, detection in enumerate(raw_output):
        if isinstance(detection, str):
            text = detection
            confidence = 0.9
            bbox = [0, 0, img_size[0], img_size[1]]
            polygon = [[0, 0], [img_size[0], 0], [img_size[0], img_size[1]], [0, img_size[1]]]
        elif isinstance(detection, (list, tuple)) and len(detection) == 3:
            bbox_coords, text, confidence = detection

            bbox_array = np.array(bbox_coords)
            x1, y1 = bbox_array.min(axis=0)
            x2, y2 = bbox_array.max(axis=0)
            bbox = [float(x1), float(y1), float(x2), float(y2)]

            polygon = [[float(x), float(y)] for x, y in bbox_coords]
        else:
            continue

        detected_lang = self.detect_text_language(text)

        ocr_text = OCRText(
            text=text,
            confidence=float(confidence),
            bbox=bbox,
            polygon=polygon,
            language=detected_lang,
            reading_order=i
        )

        texts.append(ocr_text)
        full_text_parts.append(text)

    return OCROutput(
        texts=texts,
        full_text=' '.join(full_text_parts),
        source_img_size=img_size
    )

EasyOCRMapper

EasyOCRMapper()

Bases: BaseOCRMapper

Label mapper for EasyOCR model output.

Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
def __init__(self):
    super().__init__('easyocr')
    self._setup_mapping()

omnidocs.tasks.ocr_extraction.extractors.surya_ocr

SuryaOCRExtractor

SuryaOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, **kwargs)

Bases: BaseOCRExtractor

Surya OCR based text extraction implementation.

Initialize Surya OCR Extractor.

Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    languages: Optional[List[str]] = None,
    **kwargs
):
    """Initialize Surya OCR Extractor."""
    super().__init__(
        device=device, 
        show_log=show_log, 
        languages=languages or ['en'],
        engine_name='surya'
    )

    self._label_mapper = SuryaOCRMapper()

    # Set default model path
    self.model_path = Path("omnidocs/models/surya")

    # Check dependencies
    self._check_dependencies()

    # Download model if needed
    if not self.model_path.exists():
        self._download_model()

    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> OCROutput

Extract text using Surya OCR.

Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> OCROutput:
    """Extract text using Surya OCR."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        img = images[0]

        # Map languages to Surya format
        surya_languages = []
        for lang in self.languages:
            mapped_lang = self._label_mapper.from_standard_language(lang)
            surya_languages.append(mapped_lang)

        # Use the new Predictor-based API
        predictions = None

        if hasattr(self, 'use_new_api') and self.use_new_api:
            # Use the new Predictor-based API based on surya scripts
            try:
                # Convert image to RGB if needed (function expects a list)
                img_rgb_list = self.convert_if_not_rgb([img])
                img_rgb = img_rgb_list[0]

                # Import TaskNames for proper task specification
                from surya.common.surya.schema import TaskNames

                # Call RecognitionPredictor directly with det_predictor parameter
                # This is how it's done in surya/scripts/ocr_text.py
                predictions = self.rec_predictor(
                    [img_rgb],
                    task_names=[TaskNames.ocr_with_boxes],
                    det_predictor=self.det_predictor,
                    math_mode=False
                )

            except Exception as e:
                if self.show_log:
                    logger.warning(f"New API failed: {e}")

        else:
            # Fallback to old API (shouldn't happen with current version)
            if hasattr(self, 'run_ocr'):
                try:
                    predictions = self.run_ocr(
                        [img],
                        [surya_languages],
                        self.det_model,
                        self.det_processor,
                        self.rec_model,
                        self.rec_processor
                    )
                except Exception as e:
                    if self.show_log:
                        logger.warning(f"run_ocr failed: {e}")

        if predictions is None:
            raise RuntimeError("Failed to run OCR with available Surya API functions")

        # Convert to standardized format
        result = self.postprocess_output(predictions, img.size)

        if self.show_log:
            logger.info(f"Extracted {len(result.texts)} text regions")

        return result

    except Exception as e:
        logger.error("Error during Surya OCR extraction", exc_info=True)
        return OCROutput(
            texts=[],
            full_text="",
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

postprocess_output

postprocess_output(raw_output: Union[List, Any], img_size: Tuple[int, int]) -> OCROutput

Convert Surya OCR output to standardized OCROutput format.

Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
def postprocess_output(self, raw_output: Union[List, Any], img_size: Tuple[int, int]) -> OCROutput:
    """Convert Surya OCR output to standardized OCROutput format."""
    texts = []
    full_text_parts = []

    if not raw_output:
        return OCROutput(
            texts=[],
            full_text="",
            source_img_size=img_size
        )

    try:
        # Handle different output formats from different Surya versions
        if isinstance(raw_output, list) and len(raw_output) > 0:
            prediction = raw_output[0]

            # Check for different attribute names based on version
            text_lines = None
            if hasattr(prediction, 'text_lines'):
                text_lines = prediction.text_lines
            elif hasattr(prediction, 'bboxes') and hasattr(prediction, 'text'):
                # Handle case where we have separate bboxes and text
                if hasattr(prediction, 'text') and isinstance(prediction.text, list):
                    text_lines = []
                    for i, (bbox, text) in enumerate(zip(prediction.bboxes, prediction.text)):
                        # Create a mock text_line object
                        class MockTextLine:
                            def __init__(self, text, bbox):
                                self.text = text
                                self.bbox = bbox
                                self.confidence = 0.9  # Default confidence
                        text_lines.append(MockTextLine(text, bbox))

            if text_lines:
                for i, text_line in enumerate(text_lines):
                    if hasattr(text_line, 'text') and hasattr(text_line, 'bbox'):
                        text = text_line.text.strip() if text_line.text else ""
                        if not text:
                            continue

                        bbox = text_line.bbox
                        # Ensure bbox is in the correct format [x1, y1, x2, y2]
                        if len(bbox) >= 4:
                            bbox_list = [float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])]
                        else:
                            continue

                        # Create polygon from bbox
                        polygon = [
                            [float(bbox[0]), float(bbox[1])], 
                            [float(bbox[2]), float(bbox[1])],
                            [float(bbox[2]), float(bbox[3])], 
                            [float(bbox[0]), float(bbox[3])]
                        ]

                        confidence = getattr(text_line, 'confidence', 0.9)
                        detected_lang = self.detect_text_language(text)

                        ocr_text = OCRText(
                            text=text,
                            confidence=float(confidence),
                            bbox=bbox_list,
                            polygon=polygon,
                            language=detected_lang,
                            reading_order=i
                        )

                        texts.append(ocr_text)
                        full_text_parts.append(text)

    except Exception as e:
        logger.error(f"Error processing Surya OCR output: {e}", exc_info=True)

    return OCROutput(
        texts=texts,
        full_text=' '.join(full_text_parts),
        source_img_size=img_size
    )

SuryaOCRMapper

SuryaOCRMapper()

Bases: BaseOCRMapper

Label mapper for Surya OCR model output.

Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
def __init__(self):
    super().__init__('surya')
    self._setup_mapping()

πŸ“Š Table Extraction

Extract tabular data from PDFs and images using classic and deep learning models.

omnidocs.tasks.table_extraction

Table extraction module for OmniDocs.

This module provides base classes and implementations for table detection and extraction from images and documents.

BaseTableExtractor

BaseTableExtractor(device: Optional[str] = None, show_log: bool = False, engine_name: Optional[str] = None)

Bases: ABC

Base class for table extraction models.

Initialize the table extractor.

Parameters:

Name Type Description Default
device Optional[str]

Device to run model on ('cuda' or 'cpu')

None
show_log bool

Whether to show detailed logs

False
engine_name Optional[str]

Name of the table extraction engine

None
Source code in omnidocs/tasks/table_extraction/base.py
def __init__(self, 
             device: Optional[str] = None, 
             show_log: bool = False,
             engine_name: Optional[str] = None):
    """Initialize the table extractor.

    Args:
        device: Device to run model on ('cuda' or 'cpu')
        show_log: Whether to show detailed logs
        engine_name: Name of the table extraction engine
    """
    self.show_log = show_log
    self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
    self.engine_name = engine_name or self.__class__.__name__.lower().replace('extractor', '')
    self.model = None
    self.model_path = None
    self._label_mapper: Optional[BaseTableMapper] = None

    # Initialize mapper if engine name is provided
    if self.engine_name:
        self._label_mapper = BaseTableMapper(self.engine_name)

    if self.show_log:
        logger.info(f"Initializing {self.__class__.__name__}")
        logger.info(f"Using device: {self.device}")
        logger.info(f"Engine: {self.engine_name}")

label_mapper property

label_mapper: BaseTableMapper

Get the label mapper for this extractor.

extract abstractmethod

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables from input image.

Parameters:

Name Type Description Default
input_path Union[str, Path, Image]

Path to input image or image data

required
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
TableOutput

TableOutput containing extracted tables

Source code in omnidocs/tasks/table_extraction/base.py
@abstractmethod
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables from input image.

    Args:
        input_path: Path to input image or image data
        **kwargs: Additional model-specific parameters

    Returns:
        TableOutput containing extracted tables
    """
    pass

extract_all

extract_all(input_paths: List[Union[str, Path, Image]], **kwargs) -> List[TableOutput]

Extract tables from multiple images.

Parameters:

Name Type Description Default
input_paths List[Union[str, Path, Image]]

List of image paths or image data

required
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
List[TableOutput]

List of TableOutput objects

Source code in omnidocs/tasks/table_extraction/base.py
def extract_all(
    self,
    input_paths: List[Union[str, Path, Image.Image]],
    **kwargs
) -> List[TableOutput]:
    """Extract tables from multiple images.

    Args:
        input_paths: List of image paths or image data
        **kwargs: Additional model-specific parameters

    Returns:
        List of TableOutput objects
    """
    results = []
    for input_path in input_paths:
        try:
            result = self.extract(input_path, **kwargs)
            results.append(result)
        except Exception as e:
            if self.show_log:
                logger.error(f"Error processing {input_path}: {str(e)}")
            raise
    return results

extract_with_layout

extract_with_layout(input_path: Union[str, Path, Image], layout_regions: Optional[List[Dict]] = None, **kwargs) -> TableOutput

Extract tables with optional layout information.

Parameters:

Name Type Description Default
input_path Union[str, Path, Image]

Path to input image or image data

required
layout_regions Optional[List[Dict]]

Optional list of layout regions containing tables

None
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
TableOutput

TableOutput containing extracted tables

Source code in omnidocs/tasks/table_extraction/base.py
def extract_with_layout(
    self,
    input_path: Union[str, Path, Image.Image],
    layout_regions: Optional[List[Dict]] = None,
    **kwargs
) -> TableOutput:
    """Extract tables with optional layout information.

    Args:
        input_path: Path to input image or image data
        layout_regions: Optional list of layout regions containing tables
        **kwargs: Additional model-specific parameters

    Returns:
        TableOutput containing extracted tables
    """
    # Default implementation just calls extract, can be overridden by child classes
    return self.extract(input_path, **kwargs)

postprocess_output

postprocess_output(raw_output: Any, img_size: Tuple[int, int]) -> TableOutput

Convert raw table extraction output to standardized TableOutput format.

Parameters:

Name Type Description Default
raw_output Any

Raw output from table extraction engine

required
img_size Tuple[int, int]

Original image size (width, height)

required

Returns:

Type Description
TableOutput

Standardized TableOutput object

Source code in omnidocs/tasks/table_extraction/base.py
def postprocess_output(self, raw_output: Any, img_size: Tuple[int, int]) -> TableOutput:
    """Convert raw table extraction output to standardized TableOutput format.

    Args:
        raw_output: Raw output from table extraction engine
        img_size: Original image size (width, height)

    Returns:
        Standardized TableOutput object
    """
    raise NotImplementedError("Child classes must implement postprocess_output method")

preprocess_input

preprocess_input(input_path: Union[str, Path, Image, ndarray]) -> List[Image.Image]

Convert input to list of PIL Images.

Parameters:

Name Type Description Default
input_path Union[str, Path, Image, ndarray]

Input image path or image data

required

Returns:

Type Description
List[Image]

List of PIL Images

Source code in omnidocs/tasks/table_extraction/base.py
def preprocess_input(self, input_path: Union[str, Path, Image.Image, np.ndarray]) -> List[Image.Image]:
    """Convert input to list of PIL Images.

    Args:
        input_path: Input image path or image data

    Returns:
        List of PIL Images
    """
    if isinstance(input_path, (str, Path)):
        image = Image.open(input_path).convert('RGB')
        return [image]
    elif isinstance(input_path, Image.Image):
        return [input_path.convert('RGB')]
    elif isinstance(input_path, np.ndarray):
        return [Image.fromarray(cv2.cvtColor(input_path, cv2.COLOR_BGR2RGB))]
    else:
        raise ValueError(f"Unsupported input type: {type(input_path)}")

visualize

visualize(table_result: TableOutput, image_path: Union[str, Path, Image], output_path: str = 'visualized_tables.png', table_color: str = 'red', cell_color: str = 'blue', box_width: int = 2, show_text: bool = False, text_color: str = 'green', font_size: int = 12, show_table_ids: bool = True) -> None

Visualize table extraction results by drawing bounding boxes on the original image.

This method allows users to easily see which extractor is working better by visualizing the detected tables and cells with bounding boxes.

Parameters:

Name Type Description Default
table_result TableOutput

TableOutput containing extracted tables

required
image_path Union[str, Path, Image]

Path to original image or PIL Image object

required
output_path str

Path to save the annotated image

'visualized_tables.png'
table_color str

Color for table bounding boxes

'red'
cell_color str

Color for cell bounding boxes

'blue'
box_width int

Width of bounding box lines

2
show_text bool

Whether to overlay cell text

False
text_color str

Color for text overlay

'green'
font_size int

Font size for text overlay

12
show_table_ids bool

Whether to show table IDs

True
Source code in omnidocs/tasks/table_extraction/base.py
def visualize(self,
              table_result: 'TableOutput',
              image_path: Union[str, Path, Image.Image],
              output_path: str = "visualized_tables.png",
              table_color: str = 'red',
              cell_color: str = 'blue',
              box_width: int = 2,
              show_text: bool = False,
              text_color: str = 'green',
              font_size: int = 12,
              show_table_ids: bool = True) -> None:
    """Visualize table extraction results by drawing bounding boxes on the original image.

    This method allows users to easily see which extractor is working better
    by visualizing the detected tables and cells with bounding boxes.

    Args:
        table_result: TableOutput containing extracted tables
        image_path: Path to original image or PIL Image object
        output_path: Path to save the annotated image
        table_color: Color for table bounding boxes
        cell_color: Color for cell bounding boxes
        box_width: Width of bounding box lines
        show_text: Whether to overlay cell text
        text_color: Color for text overlay
        font_size: Font size for text overlay
        show_table_ids: Whether to show table IDs
    """
    try:
        from PIL import Image, ImageDraw, ImageFont

        # Handle different input types
        if isinstance(image_path, (str, Path)):
            image_path = Path(image_path)

            # Check if it's a PDF file
            if image_path.suffix.lower() == '.pdf':
                # Convert PDF to image
                image = self._convert_pdf_to_image(image_path)
            else:
                # Regular image file
                image = Image.open(image_path).convert("RGB")
        elif isinstance(image_path, Image.Image):
            image = image_path.convert("RGB")
        else:
            raise ValueError(f"Unsupported image input type: {type(image_path)}")

        # Create a copy to draw on
        annotated_image = image.copy()
        draw = ImageDraw.Draw(annotated_image)

        # Just use original coordinates - no transformation needed

        # Try to load a font for text overlay
        font = None
        if show_text or show_table_ids:
            try:
                # Try to use a better font if available
                font = ImageFont.truetype("arial.ttf", font_size)
            except (OSError, IOError):
                try:
                    # Fallback to default font
                    font = ImageFont.load_default()
                except:
                    font = None

        # Draw tables and cells if table results exist
        if hasattr(table_result, "tables") and table_result.tables:
            for table_idx, table in enumerate(table_result.tables):
                # Draw table bounding box
                if table.bbox and len(table.bbox) == 4:
                    x1, y1, x2, y2 = table.bbox
                    draw.rectangle(
                        [(x1, y1), (x2, y2)],
                        outline=table_color,
                        width=box_width + 1
                    )

                    # Draw table ID (only if requested)
                    if show_table_ids and font:
                        table_id = getattr(table, 'table_id', f'Table {table_idx}')
                        draw.text((x1, y1 - font_size - 2), table_id,
                                fill=table_color, font=font)

                # Draw cell bounding boxes
                if hasattr(table, "cells") and table.cells:
                    for cell in table.cells:
                        if cell.bbox and len(cell.bbox) == 4:
                            x1, y1, x2, y2 = cell.bbox

                            # Draw cell rectangle - no text overlay
                            draw.rectangle(
                                [(x1, y1), (x2, y2)],
                                outline=cell_color,
                                width=box_width
                            )

        # Save the annotated image
        annotated_image.save(output_path)

        if self.show_log:
            logger.info(f"Table visualization saved to {output_path}")
            num_tables = len(table_result.tables) if table_result.tables else 0
            total_cells = sum(len(table.cells) for table in table_result.tables) if table_result.tables else 0
            logger.info(f"Visualized {num_tables} tables with {total_cells} cells")

    except Exception as e:
        error_msg = f"Error creating table visualization: {str(e)}"
        if self.show_log:
            logger.error(error_msg)
        raise RuntimeError(error_msg)

visualize_from_json

visualize_from_json(image_path: Union[str, Path, Image], json_path: Union[str, Path], output_path: str = 'visualized_tables_from_json.png', **kwargs) -> None

Load table extraction results from JSON file and visualize them.

Parameters:

Name Type Description Default
image_path Union[str, Path, Image]

Path to original image, PDF file, or PIL Image object

required
json_path Union[str, Path]

Path to JSON file containing table extraction results

required
output_path str

Path to save the annotated image

'visualized_tables_from_json.png'
**kwargs

Additional arguments passed to visualize method

{}
Source code in omnidocs/tasks/table_extraction/base.py
def visualize_from_json(self,
                       image_path: Union[str, Path, Image.Image],
                       json_path: Union[str, Path],
                       output_path: str = "visualized_tables_from_json.png",
                       **kwargs) -> None:
    """
    Load table extraction results from JSON file and visualize them.

    Args:
        image_path: Path to original image, PDF file, or PIL Image object
        json_path: Path to JSON file containing table extraction results
        output_path: Path to save the annotated image
        **kwargs: Additional arguments passed to visualize method
    """
    import json

    try:
        # Load table results from JSON
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        # Reconstruct TableOutput from JSON data
        tables = []
        if isinstance(data, list):
            # Handle list of tables format
            for table_data in data:
                cells = []
                if 'cells' in table_data:
                    for cell_data in table_data['cells']:
                        cell = TableCell(**cell_data)
                        cells.append(cell)

                table = Table(
                    cells=cells,
                    num_rows=table_data.get('num_rows', 0),
                    num_cols=table_data.get('num_cols', 0),
                    bbox=table_data.get('bbox'),
                    confidence=table_data.get('confidence'),
                    table_id=table_data.get('table_id', ''),
                    structure_confidence=table_data.get('structure_confidence')
                )
                tables.append(table)

        # Create TableOutput object
        table_result = TableOutput(
            tables=tables,
            source_img_size=data[0].get('source_img_size') if data else None,
            metadata=data[0].get('metadata', {}) if data else {}
        )

        # Visualize the loaded results
        self.visualize(table_result, image_path, output_path, **kwargs)

    except Exception as e:
        error_msg = f"Error loading and visualizing tables from JSON: {str(e)}"
        if self.show_log:
            logger.error(error_msg)
        raise RuntimeError(error_msg)

BaseTableMapper

BaseTableMapper(engine_name: str)

Base class for mapping table extraction engine-specific outputs to standardized format.

Initialize mapper for specific table extraction engine.

Parameters:

Name Type Description Default
engine_name str

Name of the table extraction engine

required
Source code in omnidocs/tasks/table_extraction/base.py
def __init__(self, engine_name: str):
    """Initialize mapper for specific table extraction engine.

    Args:
        engine_name: Name of the table extraction engine
    """
    self.engine_name = engine_name.lower()

detect_header_rows

detect_header_rows(cells: List[TableCell]) -> List[TableCell]

Detect and mark header cells based on position and formatting.

Source code in omnidocs/tasks/table_extraction/base.py
def detect_header_rows(self, cells: List[TableCell]) -> List[TableCell]:
    """Detect and mark header cells based on position and formatting."""
    # Simple heuristic: first row is likely header
    if not cells:
        return cells

    first_row_cells = [cell for cell in cells if cell.row == 0]
    for cell in first_row_cells:
        cell.is_header = True

    return cells

normalize_bbox

normalize_bbox(bbox: List[float], img_width: int, img_height: int) -> List[float]

Normalize bounding box coordinates to absolute pixel values.

Source code in omnidocs/tasks/table_extraction/base.py
def normalize_bbox(self, bbox: List[float], img_width: int, img_height: int) -> List[float]:
    """Normalize bounding box coordinates to absolute pixel values."""
    if all(0 <= coord <= 1 for coord in bbox):
        return [
            bbox[0] * img_width,
            bbox[1] * img_height,
            bbox[2] * img_width,
            bbox[3] * img_height
        ]
    return bbox

Table

Bases: BaseModel

Container for extracted table.

Attributes:

Name Type Description
cells List[TableCell]

List of table cells

num_rows int

Number of rows in the table

num_cols int

Number of columns in the table

bbox Optional[List[float]]

Bounding box of the entire table [x1, y1, x2, y2]

confidence Optional[float]

Overall table detection confidence

table_id Optional[str]

Optional table identifier

caption Optional[str]

Optional table caption

structure_confidence Optional[float]

Confidence score for table structure detection

to_csv

to_csv() -> str

Convert table to CSV format.

Source code in omnidocs/tasks/table_extraction/base.py
def to_csv(self) -> str:
    """Convert table to CSV format."""
    import csv
    import io

    # Create a grid to store cell values
    grid = [[''] * self.num_cols for _ in range(self.num_rows)]

    # Fill the grid with cell values
    for cell in self.cells:
        for r in range(cell.row, cell.row + cell.rowspan):
            for c in range(cell.col, cell.col + cell.colspan):
                if r < self.num_rows and c < self.num_cols:
                    grid[r][c] = cell.text

    # Convert to CSV
    output = io.StringIO()
    writer = csv.writer(output)
    writer.writerows(grid)
    return output.getvalue()

to_dict

to_dict() -> Dict

Convert to dictionary representation.

Source code in omnidocs/tasks/table_extraction/base.py
def to_dict(self) -> Dict:
    """Convert to dictionary representation."""
    return {
        'cells': [cell.to_dict() for cell in self.cells],
        'num_rows': self.num_rows,
        'num_cols': self.num_cols,
        'bbox': self.bbox,
        'confidence': self.confidence,
        'table_id': self.table_id,
        'caption': self.caption,
        'structure_confidence': self.structure_confidence
    }

to_html

to_html() -> str

Convert table to HTML format.

Source code in omnidocs/tasks/table_extraction/base.py
def to_html(self) -> str:
    """Convert table to HTML format."""
    html = ['<table>']

    # Create a grid to track cell positions and spans
    grid = [[None for _ in range(self.num_cols)] for _ in range(self.num_rows)]

    # Mark occupied cells
    for cell in self.cells:
        for r in range(cell.row, cell.row + cell.rowspan):
            for c in range(cell.col, cell.col + cell.colspan):
                if r < self.num_rows and c < self.num_cols:
                    grid[r][c] = cell if r == cell.row and c == cell.col else 'occupied'

    # Generate HTML rows
    for row_idx in range(self.num_rows):
        html.append('  <tr>')
        for col_idx in range(self.num_cols):
            cell_data = grid[row_idx][col_idx]
            if isinstance(cell_data, TableCell):
                tag = 'th' if cell_data.is_header else 'td'
                attrs = []
                if cell_data.rowspan > 1:
                    attrs.append(f'rowspan="{cell_data.rowspan}"')
                if cell_data.colspan > 1:
                    attrs.append(f'colspan="{cell_data.colspan}"')
                attr_str = ' ' + ' '.join(attrs) if attrs else ''
                html.append(f'    <{tag}{attr_str}>{cell_data.text}</{tag}>')
            elif cell_data is None:
                html.append('    <td></td>')
            # Skip 'occupied' cells as they're part of a span
        html.append('  </tr>')

    html.append('</table>')
    return '\n'.join(html)

TableCell

Bases: BaseModel

Container for individual table cell.

Attributes:

Name Type Description
text str

Cell text content

row int

Row index (0-based)

col int

Column index (0-based)

rowspan int

Number of rows the cell spans

colspan int

Number of columns the cell spans

bbox Optional[List[float]]

Bounding box coordinates [x1, y1, x2, y2]

confidence Optional[float]

Confidence score for cell detection

is_header bool

Whether the cell is a header cell

to_dict

to_dict() -> Dict

Convert to dictionary representation.

Source code in omnidocs/tasks/table_extraction/base.py
def to_dict(self) -> Dict:
    """Convert to dictionary representation."""
    return {
        'text': self.text,
        'row': self.row,
        'col': self.col,
        'rowspan': self.rowspan,
        'colspan': self.colspan,
        'bbox': self.bbox,
        'confidence': self.confidence,
        'is_header': self.is_header
    }

TableOutput

Bases: BaseModel

Container for table extraction results.

Attributes:

Name Type Description
tables List[Table]

List of extracted tables

source_img_size Optional[Tuple[int, int]]

Original image dimensions (width, height)

processing_time Optional[float]

Time taken for table extraction

metadata Optional[Dict[str, Any]]

Additional metadata from the extraction engine

get_tables_by_confidence

get_tables_by_confidence(min_confidence: float = 0.5) -> List[Table]

Filter tables by minimum confidence threshold.

Source code in omnidocs/tasks/table_extraction/base.py
def get_tables_by_confidence(self, min_confidence: float = 0.5) -> List[Table]:
    """Filter tables by minimum confidence threshold."""
    return [table for table in self.tables if table.confidence is None or table.confidence >= min_confidence]

save_json

save_json(output_path: Union[str, Path]) -> None

Save output to JSON file.

Source code in omnidocs/tasks/table_extraction/base.py
def save_json(self, output_path: Union[str, Path]) -> None:
    """Save output to JSON file."""
    import json
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)

save_tables_as_csv

save_tables_as_csv(output_dir: Union[str, Path]) -> List[Path]

Save all tables as separate CSV files.

Source code in omnidocs/tasks/table_extraction/base.py
def save_tables_as_csv(self, output_dir: Union[str, Path]) -> List[Path]:
    """Save all tables as separate CSV files."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    saved_files = []
    for i, table in enumerate(self.tables):
        filename = f"table_{table.table_id or i}.csv"
        file_path = output_dir / filename
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(table.to_csv())
        saved_files.append(file_path)

    return saved_files

to_dict

to_dict() -> Dict

Convert to dictionary representation.

Source code in omnidocs/tasks/table_extraction/base.py
def to_dict(self) -> Dict:
    """Convert to dictionary representation."""
    return {
        'tables': [table.to_dict() for table in self.tables],
        'source_img_size': self.source_img_size,
        'processing_time': self.processing_time,
        'metadata': self.metadata
    }

omnidocs.tasks.table_extraction.extractors.camelot

CamelotExtractor

CamelotExtractor(device: Optional[str] = None, show_log: bool = False, method: str = 'lattice', pages: str = '1', flavor: str = 'lattice', **kwargs)

Bases: BaseTableExtractor

Camelot based table extraction implementation.

TODO: Bbox coordinate transformation from PDF to image space is still broken. Current issues: - Coordinate transformation accuracy issues between PDF points and image pixels - Cell bbox estimation doesn't account for actual cell sizes from Camelot - Need better integration with Camelot's internal coordinate data - Grid-based estimation fallback is inaccurate for real table layouts

Initialize Camelot Table Extractor.

Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    method: str = 'lattice',
    pages: str = '1',
    flavor: str = 'lattice',
    **kwargs
):
    """Initialize Camelot Table Extractor."""
    super().__init__(
        device=device,
        show_log=show_log,
        engine_name='camelot'
    )

    self._label_mapper = CamelotMapper()
    self.method = method
    self.pages = pages
    self.flavor = flavor

    try:
        import camelot
        self.camelot = camelot

    except ImportError as e:
        logger.error("Failed to import Camelot")
        raise ImportError(
            "Camelot is not available. Please install it with: pip install camelot-py[cv]"
        ) from e

    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables using Camelot.

Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables using Camelot."""
    try:
        # Camelot works with PDF files
        if isinstance(input_path, (str, Path)):
            pdf_path = Path(input_path)
            if pdf_path.suffix.lower() != '.pdf':
                raise ValueError("Camelot only works with PDF files")

            # Extract tables from PDF
            tables = self.camelot.read_pdf(
                str(pdf_path),
                pages=self.pages,
                flavor=self.flavor,
                **kwargs
            )

            # Get image size (estimate from first page)
            try:
                images = self._convert_pdf_to_image(pdf_path)
                img_size = images[0].size if images else (612, 792)  # Default PDF size
            except:
                img_size = (612, 792)  # Default PDF size

        else:
            raise ValueError("Camelot requires PDF file path, not image data")

        # Convert to standardized format
        result = self.postprocess_output(tables, img_size)

        if self.show_log:
            logger.info(f"Extracted {len(result.tables)} tables using Camelot")

        return result

    except Exception as e:
        logger.error("Error during Camelot extraction", exc_info=True)
        return TableOutput(
            tables=[],
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

postprocess_output

postprocess_output(raw_output: Any, img_size: Tuple[int, int]) -> TableOutput

Convert Camelot output to standardized TableOutput format.

Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
def postprocess_output(self, raw_output: Any, img_size: Tuple[int, int]) -> TableOutput:
    """Convert Camelot output to standardized TableOutput format."""
    tables = []

    for i, camelot_table in enumerate(raw_output):
        # Get table data
        df = camelot_table.df

        # Convert DataFrame to cells
        cells = []
        num_rows, num_cols = df.shape

        for row_idx in range(num_rows):
            for col_idx in range(num_cols):
                cell_text = str(df.iloc[row_idx, col_idx]).strip()

                # Create cell with basic info
                cell = TableCell(
                    text=cell_text,
                    row=row_idx,
                    col=col_idx,
                    rowspan=1,
                    colspan=1,
                    confidence=camelot_table.accuracy / 100.0,  # Convert percentage to decimal
                    is_header=(row_idx == 0)  # Assume first row is header
                )
                cells.append(cell)

        # Get table bounding box if available
        bbox = None
        if hasattr(camelot_table, '_bbox'):
            bbox = list(camelot_table._bbox)

        # Create table object
        table = Table(
            cells=cells,
            num_rows=num_rows,
            num_cols=num_cols,
            bbox=bbox,
            confidence=camelot_table.accuracy / 100.0,
            table_id=f"table_{i}",
            structure_confidence=camelot_table.accuracy / 100.0
        )

        tables.append(table)

    return TableOutput(
        tables=tables,
        source_img_size=img_size,
        metadata={
            'engine': 'camelot',
            'method': self.method,
            'flavor': self.flavor
        }
    )

predict

predict(pdf_path: Union[str, Path], **kwargs)

Predict method for compatibility with original interface.

Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
def predict(self, pdf_path: Union[str, Path], **kwargs):
    """Predict method for compatibility with original interface."""
    try:
        result = self.extract(pdf_path, **kwargs)

        # Convert to original format
        table_res = []
        for table in result.tables:
            table_data = {
                "table_id": table.table_id,
                "bbox": table.bbox,
                "confidence": table.confidence,
                "cells": [cell.to_dict() for cell in table.cells],
                "num_rows": table.num_rows,
                "num_cols": table.num_cols
            }
            table_res.append(table_data)

        return table_res

    except Exception as e:
        logger.error("Error during Camelot prediction", exc_info=True)
        return []

CamelotMapper

CamelotMapper()

Bases: BaseTableMapper

Label mapper for Camelot table extraction output.

Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
def __init__(self):
    super().__init__('camelot')
    self._setup_mapping()

omnidocs.tasks.table_extraction.extractors.pdfplumber

PDFPlumberExtractor

PDFPlumberExtractor(device: Optional[str] = None, show_log: bool = False, table_settings: Optional[Dict] = None, **kwargs)

Bases: BaseTableExtractor

PDFPlumber based table extraction implementation.

Initialize PDFPlumber Table Extractor.

Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    table_settings: Optional[Dict] = None,
    **kwargs
):
    """Initialize PDFPlumber Table Extractor."""
    super().__init__(
        device=device,
        show_log=show_log,
        engine_name='pdfplumber'
    )

    self._label_mapper = PDFPlumberMapper()
    self.table_settings = table_settings or self._label_mapper._table_settings

    try:
        import pdfplumber
        self.pdfplumber = pdfplumber

    except ImportError as e:
        logger.error("Failed to import PDFPlumber")
        raise ImportError(
            "PDFPlumber is not available. Please install it with: pip install pdfplumber"
        ) from e

    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables using PDFPlumber.

Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables using PDFPlumber."""
    try:
        # PDFPlumber works with PDF files
        if isinstance(input_path, (str, Path)):
            pdf_path = Path(input_path)
            if pdf_path.suffix.lower() != '.pdf':
                raise ValueError("PDFPlumber only works with PDF files")

            all_tables = []

            # Open PDF and extract tables from all pages
            with self.pdfplumber.open(str(pdf_path)) as pdf:
                for page in pdf.pages:
                    page_tables = self._extract_tables_from_page(page)
                    all_tables.extend(page_tables)

            # Get image size and PDF size for coordinate transformation
            try:
                # Get actual PDF page size first
                import fitz
                doc = fitz.open(str(pdf_path))
                page = doc[0]
                pdf_size = (page.rect.width, page.rect.height)
                doc.close()

                # Convert PDF to image to get actual image size
                images = self._convert_pdf_to_image(pdf_path)
                img_size = images[0].size if images else pdf_size
            except:
                pdf_size = (612, 792)  # Default PDF size
                img_size = (612, 792)  # Default image size

        else:
            raise ValueError("PDFPlumber requires PDF file path, not image data")

        # Convert to standardized format
        result = self.postprocess_output(all_tables, img_size, pdf_size)

        if self.show_log:
            logger.info(f"Extracted {len(result.tables)} tables using PDFPlumber")

        return result

    except Exception as e:
        logger.error("Error during PDFPlumber extraction", exc_info=True)
        return TableOutput(
            tables=[],
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

postprocess_output

postprocess_output(raw_output: List[Dict], img_size: Tuple[int, int], pdf_size: Tuple[int, int] = None) -> TableOutput

Convert PDFPlumber output to standardized TableOutput format.

Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
def postprocess_output(
    self,
    raw_output: List[Dict],
    img_size: Tuple[int, int],
    pdf_size: Tuple[int, int] = None,
) -> TableOutput:
    """Convert PDFPlumber output to standardized TableOutput format."""
    tables: List[Table] = []

    for i, table_data in enumerate(raw_output):
        table_bbox = table_data.get("bbox")
        if table_bbox is None:
            table_bbox = [0, 0, img_size[0], img_size[1]]

        if pdf_size:
            table_bbox_img = self._transform_pdf_to_image_coords(
                table_bbox, pdf_size, img_size
            )
        else:
            table_bbox_img = table_bbox

        # Get max row/col indexes to know dimensions
        max_row = max(c["row"] for c in table_data["cells"])
        max_col = max(c["col"] for c in table_data["cells"])
        num_rows = max_row + 1
        num_cols = max_col + 1

        # Pre-compute equally spaced cell rectangles inside the table bbox
        x0, y0, x1, y1 = table_bbox_img
        cell_w = (x1 - x0) / num_cols
        cell_h = (y1 - y0) / num_rows

        cells: List[TableCell] = []
        for c in table_data["cells"]:
            r, cidx = c["row"], c["col"]

            # exact rectangle in image space
            cx0 = x0 + cidx * cell_w
            cy0 = y0 + r * cell_h
            cx1 = cx0 + cell_w
            cy1 = cy0 + cell_h
            cell_bbox_img = [cx0, cy0, cx1, cy1]

            cells.append(
                TableCell(
                    text=c["text"].strip(),
                    row=r,
                    col=cidx,
                    rowspan=c.get("rowspan", 1),
                    colspan=c.get("colspan", 1),
                    bbox=cell_bbox_img,
                    confidence=0.9,
                    is_header=(r == 0),
                )
            )

        tables.append(
            Table(
                cells=cells,
                num_rows=num_rows,
                num_cols=num_cols,
                bbox=table_bbox_img,
                confidence=0.9,
                table_id=f"table_{i}",
                structure_confidence=0.9,
            )
        )

    return TableOutput(
        tables=tables,
        source_img_size=img_size,
        metadata={"engine": "pdfplumber", "table_settings": self.table_settings},
    )

predict

predict(pdf_path: Union[str, Path], **kwargs)

Predict method for compatibility with original interface.

Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
def predict(self, pdf_path: Union[str, Path], **kwargs):
    """Predict method for compatibility with original interface."""
    try:
        result = self.extract(pdf_path, **kwargs)

        # Convert to original format
        table_res = []
        for table in result.tables:
            table_data = {
                "table_id": table.table_id,
                "bbox": table.bbox,
                "confidence": table.confidence,
                "cells": [cell.to_dict() for cell in table.cells],
                "num_rows": table.num_rows,
                "num_cols": table.num_cols
            }
            table_res.append(table_data)

        return table_res

    except Exception as e:
        logger.error("Error during PDFPlumber prediction", exc_info=True)
        return []

PDFPlumberMapper

PDFPlumberMapper()

Bases: BaseTableMapper

Label mapper for PDFPlumber table extraction output.

Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
def __init__(self):
    super().__init__('pdfplumber')
    self._setup_mapping()

omnidocs.tasks.table_extraction.extractors.surya_table

SuryaTableExtractor

SuryaTableExtractor(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, **kwargs)

Bases: BaseTableExtractor

Surya-based table extraction implementation.

Initialize Surya Table Extractor.

Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[Union[str, Path]] = None,
    **kwargs
):
    """Initialize Surya Table Extractor."""
    super().__init__(device=device, show_log=show_log, engine_name='surya')

    self._label_mapper = SuryaTableMapper()

    if self.show_log:
        logger.info("Initializing SuryaTableExtractor")

    # Set device if specified, otherwise use default from parent
    if device:
        self.device = device

    if self.show_log:
        logger.info(f"Using device: {self.device}")

    # Set default paths
    if model_path is None:
        model_path = _MODELS_DIR / "surya_table"

    self.model_path = Path(model_path)

    # Check dependencies and load model
    self._check_dependencies()
    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables using Surya.

Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables using Surya."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        image = images[0]
        img_size = image.size

        # Convert PIL to RGB if needed
        if isinstance(image, Image.Image):
            img_rgb = image.convert("RGB")
        else:
            img_rgb = Image.fromarray(image).convert("RGB")

        # Step 1: Use layout detection to find table regions
        layout_predictions = self.layout_predictor([img_rgb])

        tables_data = []

        if layout_predictions and len(layout_predictions) > 0:
            layout_pred = layout_predictions[0]

            # Find table regions from layout
            table_regions = []
            for bbox_obj in layout_pred.bboxes:
                if hasattr(bbox_obj, 'label') and 'table' in bbox_obj.label.lower():
                    table_regions.append({
                        'bbox': bbox_obj.bbox,
                        'confidence': getattr(bbox_obj, 'confidence', 1.0)
                    })

            # Step 2: For each table region, extract text and structure
            for table_region in table_regions:
                bbox = table_region['bbox']

                # Crop table region
                table_img = img_rgb.crop(bbox)

                # Step 3: Run OCR on table region
                try:
                    from surya.common.surya.schema import TaskNames

                    # Use recognition predictor for table text extraction
                    predictions = self.rec_predictor(
                        [table_img],
                        task_names=[TaskNames.ocr_with_boxes],
                        det_predictor=self.det_predictor,
                        math_mode=False
                    )

                    # Process OCR results into table structure
                    if predictions and len(predictions) > 0:
                        prediction = predictions[0]

                        # Extract text lines and organize into table structure
                        cells = self._organize_text_into_table(prediction.text_lines, bbox)

                        table_data = {
                            'bbox': bbox,
                            'confidence': table_region['confidence'],
                            'cells': cells,
                            'num_rows': len(set(c['row'] for c in cells)) if cells else 0,
                            'num_cols': len(set(c['col'] for c in cells)) if cells else 0
                        }
                        tables_data.append(table_data)

                except Exception as e:
                    if self.show_log:
                        logger.warning(f"Error processing table region: {e}")
                    continue

        # Convert to standardized format
        result = self.postprocess_output({'tables': tables_data}, img_size)

        if self.show_log:
            logger.info(f"Extracted {len(result.tables)} tables using Surya")

        return result

    except Exception as e:
        if self.show_log:
            logger.error("Error during Surya table extraction", exc_info=True)
        raise

postprocess_output

postprocess_output(raw_output: Any, img_size: Tuple[int, int]) -> TableOutput

Convert Surya output to standardized TableOutput format.

Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
def postprocess_output(self, raw_output: Any, img_size: Tuple[int, int]) -> TableOutput:
    """Convert Surya output to standardized TableOutput format."""
    tables = []

    if 'tables' in raw_output:
        for table_idx, table_data in enumerate(raw_output['tables']):
            # Extract table cells with proper mapping
            cells = []

            # Handle different possible structures from Surya
            if 'cells' in table_data:
                # Direct cell data
                for cell_data in table_data['cells']:
                    cell = self._create_table_cell(cell_data, table_idx)
                    if cell:
                        cells.append(cell)
            elif 'text_lines' in table_data:
                # Convert text lines to cells
                cells = self._text_lines_to_cells(table_data['text_lines'], table_data.get('bbox', [0, 0, img_size[0], img_size[1]]))

            if cells:
                # Calculate table dimensions
                num_rows = max(c.row for c in cells) + 1 if cells else 0
                num_cols = max(c.col for c in cells) + 1 if cells else 0

                # Create table
                table = Table(
                    cells=cells,
                    bbox=table_data.get('bbox', [0, 0, img_size[0], img_size[1]]),
                    confidence=table_data.get('confidence', 1.0),
                    num_rows=num_rows,
                    num_cols=num_cols,
                    table_id=f"surya_table_{table_idx}"
                )
                tables.append(table)

    return TableOutput(
        tables=tables,
        source_img_size=img_size,
        metadata={'engine': 'surya', 'raw_output': raw_output}
    )

SuryaTableMapper

SuryaTableMapper()

Bases: BaseTableMapper

Label mapper for Surya table model output.

Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
def __init__(self):
    super().__init__('surya')

omnidocs.tasks.table_extraction.extractors.tabula

TabulaExtractor

TabulaExtractor(device: Optional[str] = None, show_log: bool = False, method: str = 'lattice', pages: Optional[Union[str, List[int]]] = None, multiple_tables: bool = True, guess: bool = True, area: Optional[List[float]] = None, columns: Optional[List[float]] = None, **kwargs)

Bases: BaseTableExtractor

Tabula based table extraction implementation.

Initialize Tabula Table Extractor.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    method: str = 'lattice',
    pages: Optional[Union[str, List[int]]] = None,
    multiple_tables: bool = True,
    guess: bool = True,
    area: Optional[List[float]] = None,
    columns: Optional[List[float]] = None,
    **kwargs
):
    """Initialize Tabula Table Extractor."""
    super().__init__(
        device=device,
        show_log=show_log,
        engine_name='tabula'
    )

    self._label_mapper = TabulaMapper()
    self.method = method
    self.pages = pages or 'all'
    self.multiple_tables = multiple_tables
    self.guess = guess
    self.area = area
    self.columns = columns

    try:
        import tabula
        self.tabula = tabula

    except ImportError as e:
        logger.error("Failed to import Tabula")
        raise ImportError(
            "Tabula is not available. Please install it with: pip install tabula-py"
        ) from e

    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables using Tabula.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables using Tabula."""
    try:
        # Tabula works with PDF files
        if isinstance(input_path, (str, Path)):
            pdf_path = Path(input_path)
            if pdf_path.suffix.lower() != '.pdf':
                raise ValueError("Tabula only works with PDF files")

            # Prepare extraction options
            options = self._prepare_tabula_options(**kwargs)

            # Extract tables from PDF
            try:
                tables_list = self.tabula.read_pdf(str(pdf_path), **options)

                # Ensure we have a list of DataFrames
                if not isinstance(tables_list, list):
                    tables_list = [tables_list]

            except Exception as e:
                if self.show_log:
                    logger.error(f"Tabula extraction failed: {str(e)}")
                tables_list = []

            # Get image size and PDF size for coordinate transformation
            try:
                # Get actual PDF page size first
                import fitz
                doc = fitz.open(str(pdf_path))
                page = doc[0]
                pdf_size = (page.rect.width, page.rect.height)
                doc.close()

                # Convert PDF to image to get actual image size
                images = self._convert_pdf_to_image(pdf_path)
                img_size = images[0].size if images else pdf_size
            except:
                pdf_size = (612, 792)  # Default PDF size
                img_size = (612, 792)  # Default image size

        else:
            raise ValueError("Tabula requires PDF file path, not image data")

        # Convert to standardized format
        result = self.postprocess_output(tables_list, img_size, pdf_size)

        if self.show_log:
            logger.info(f"Extracted {len(result.tables)} tables using Tabula")

        return result

    except Exception as e:
        logger.error("Error during Tabula extraction", exc_info=True)
        return TableOutput(
            tables=[],
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

extract_with_area

extract_with_area(input_path: Union[str, Path], area: List[float], **kwargs) -> TableOutput

Extract tables from specific area of PDF.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
def extract_with_area(
    self,
    input_path: Union[str, Path],
    area: List[float],
    **kwargs
) -> TableOutput:
    """Extract tables from specific area of PDF."""
    original_area = self.area
    self.area = area

    try:
        result = self.extract(input_path, **kwargs)
        return result
    finally:
        self.area = original_area

extract_with_columns

extract_with_columns(input_path: Union[str, Path], columns: List[float], **kwargs) -> TableOutput

Extract tables with specified column positions.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
def extract_with_columns(
    self,
    input_path: Union[str, Path],
    columns: List[float],
    **kwargs
) -> TableOutput:
    """Extract tables with specified column positions."""
    original_columns = self.columns
    self.columns = columns

    try:
        result = self.extract(input_path, **kwargs)
        return result
    finally:
        self.columns = original_columns

postprocess_output

postprocess_output(raw_output: List, img_size: Tuple[int, int], pdf_size: Tuple[int, int] = None) -> TableOutput

Convert Tabula output to standardized TableOutput format.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
def postprocess_output(self, raw_output: List, img_size: Tuple[int, int], pdf_size: Tuple[int, int] = None) -> TableOutput:
    """Convert Tabula output to standardized TableOutput format."""
    tables = []

    for i, df in enumerate(raw_output):
        if df.empty:
            continue

        # Get table dimensions
        num_rows, num_cols = df.shape

        # Estimate table bbox
        bbox = self._estimate_table_bbox(df, img_size)

        # Transform PDF coordinates to image coordinates if needed
        if pdf_size and bbox:
            bbox = self._transform_pdf_to_image_coords(bbox, pdf_size, img_size)

        # Convert DataFrame to cells with estimated bboxes
        cells = self._dataframe_to_cells(df, i, bbox)

        # Create table object
        table = Table(
            cells=cells,
            num_rows=num_rows,
            num_cols=num_cols,
            bbox=bbox,
            confidence=None,  # Tabula doesn't provide confidence
            table_id=f"table_{i}",
            structure_confidence=None
        )

        tables.append(table)

    return TableOutput(
        tables=tables,
        source_img_size=img_size,
        metadata={
            'engine': 'tabula',
            'method': self.method,
            'pages': self.pages,
            'multiple_tables': self.multiple_tables,
            'guess': self.guess
        }
    )

predict

predict(pdf_path: Union[str, Path], **kwargs)

Predict method for compatibility with original interface.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
def predict(self, pdf_path: Union[str, Path], **kwargs):
    """Predict method for compatibility with original interface."""
    try:
        result = self.extract(pdf_path, **kwargs)

        # Convert to original format
        table_res = []
        for table in result.tables:
            table_data = {
                "table_id": table.table_id,
                "bbox": table.bbox,
                "confidence": table.confidence,
                "cells": [cell.to_dict() for cell in table.cells],
                "num_rows": table.num_rows,
                "num_cols": table.num_cols
            }
            table_res.append(table_data)

        return table_res

    except Exception as e:
        logger.error("Error during Tabula prediction", exc_info=True)
        return []

TabulaMapper

TabulaMapper()

Bases: BaseTableMapper

Label mapper for Tabula table extraction output.

Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
def __init__(self):
    super().__init__('tabula')
    self._setup_mapping()

omnidocs.tasks.table_extraction.extractors.table_transformer

TableTransformerExtractor

TableTransformerExtractor(device: Optional[str] = None, show_log: bool = False, detection_model_path: Optional[str] = None, structure_model_path: Optional[str] = None, detection_threshold: float = 0.7, structure_threshold: float = 0.7, **kwargs)

Bases: BaseTableExtractor

Table Transformer based table extraction implementation.

Initialize Table Transformer Extractor.

Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    detection_model_path: Optional[str] = None,
    structure_model_path: Optional[str] = None,
    detection_threshold: float = 0.7,
    structure_threshold: float = 0.7,
    **kwargs
):
    """Initialize Table Transformer Extractor."""
    super().__init__(
        device=device,
        show_log=show_log,
        engine_name='table_transformer'
    )

    self._label_mapper = TableTransformerMapper()

    # Set default paths if not provided
    self.detection_model_path = Path(detection_model_path) if detection_model_path else \
        Path(self._label_mapper._model_configs['detection']['local_path'])
    self.structure_model_path = Path(structure_model_path) if structure_model_path else \
        Path(self._label_mapper._model_configs['structure']['local_path'])

    self.detection_threshold = detection_threshold
    self.structure_threshold = structure_threshold

    # Check dependencies
    self._check_dependencies()

    # Download model if needed (sets up model sources)
    self._download_model()

    # Load models
    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables using Table Transformer.

Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables using Table Transformer."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        image = images[0]
        img_size = image.size

        # Detect tables
        detected_tables = self._detect_tables(image)

        if not detected_tables:
            if self.show_log:
                logger.info("No tables detected in the image")
            return TableOutput(
                tables=[],
                source_img_size=img_size,
                metadata={'engine': 'table_transformer', 'message': 'No tables detected'}
            )

        # Analyze structure for each detected table
        table_results = []
        for table_detection in detected_tables:
            structure_data = self._analyze_table_structure(image, table_detection['bbox'])
            cells = self._create_table_cells(structure_data)

            table_results.append({
                'bbox': table_detection['bbox'],
                'confidence': table_detection['confidence'],
                'cells': cells,
                'structure_confidence': np.mean([e['confidence'] for e in structure_data['elements']]) if structure_data['elements'] else 0.0
            })

        # Convert to standardized format
        result = self._create_table_output(table_results, img_size)

        if self.show_log:
            logger.info(f"Extracted {len(result.tables)} tables using Table Transformer")

        return result

    except Exception as e:
        logger.error("Error during Table Transformer extraction", exc_info=True)
        return TableOutput(
            tables=[],
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

predict

predict(input_path: Union[str, Path, Image], **kwargs)

Predict method for compatibility with original interface.

Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
def predict(self, input_path: Union[str, Path, Image.Image], **kwargs):
    """Predict method for compatibility with original interface."""
    try:
        result = self.extract(input_path, **kwargs)

        # Convert to original format
        return [
            {
                "table_id": table.table_id,
                "bbox": table.bbox,
                "confidence": table.confidence,
                "cells": [cell.to_dict() for cell in table.cells],
                "num_rows": table.num_rows,
                "num_cols": table.num_cols
            }
            for table in result.tables
        ]

    except Exception as e:
        logger.error("Error during Table Transformer prediction", exc_info=True)
        return []

TableTransformerMapper

TableTransformerMapper()

Bases: BaseTableMapper

Label mapper for Table Transformer model output.

Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
def __init__(self):
    super().__init__('table_transformer')
    self._setup_mapping()

omnidocs.tasks.table_extraction.extractors.tableformer

TableFormerExtractor

TableFormerExtractor(device: Optional[str] = None, show_log: bool = False, model_path: Optional[str] = None, model_type: str = 'structure', confidence_threshold: float = 0.7, max_size: int = 1000, **kwargs)

Bases: BaseTableExtractor

TableFormer based table extraction implementation.

Initialize TableFormer Extractor.

Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
def __init__(
    self,
    device: Optional[str] = None,
    show_log: bool = False,
    model_path: Optional[str] = None,
    model_type: str = 'structure',
    confidence_threshold: float = 0.7,
    max_size: int = 1000,
    **kwargs
):
    """Initialize TableFormer Extractor."""
    super().__init__(
        device=device,
        show_log=show_log,
        engine_name='tableformer'
    )

    self._label_mapper = TableFormerMapper()
    self.model_type = model_type
    self.confidence_threshold = confidence_threshold
    self.max_size = max_size

    # Set default model paths
    if model_path is None:
        model_path = f"omnidocs/models/tableformer_{model_type}"

    self.model_path = Path(model_path)

    # Check dependencies
    self._check_dependencies()

    # Try to load from local path first, fallback to HuggingFace
    if self.model_path.exists() and any(self.model_path.iterdir()):
        if self.show_log:
            logger.info(f"Found local {self.model_type} model at: {self.model_path}")
        self.model_name_or_path = str(self.model_path)
    else:
        # Get HuggingFace model name from config
        hf_model_name = self._label_mapper._model_configs[self.model_type]['model_name']
        if self.show_log:
            logger.info(f"Local {self.model_type} model not found, will download from HuggingFace: {hf_model_name}")

        # Download model if needed
        if not self.model_path.exists():
            self._download_model()

        self.model_name_or_path = hf_model_name

    # Load model
    self._load_model()

extract

extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput

Extract tables using TableFormer.

Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
@log_execution_time
def extract(
    self,
    input_path: Union[str, Path, Image.Image],
    **kwargs
) -> TableOutput:
    """Extract tables using TableFormer."""
    try:
        # Preprocess input
        images = self.preprocess_input(input_path)
        image = images[0]
        img_size = image.size

        # Detect table structure
        detections = self._detect_table_structure(image)

        if not detections:
            if self.show_log:
                logger.info("No table structure detected in the image")
            return TableOutput(
                tables=[],
                source_img_size=img_size,
                metadata={'engine': 'tableformer', 'message': 'No table structure detected'}
            )

        # Convert to standardized format
        result = self.postprocess_output({'detections': detections}, img_size)

        if self.show_log:
            logger.info(f"Extracted {len(result.tables)} tables using TableFormer")

        return result

    except Exception as e:
        logger.error("Error during TableFormer extraction", exc_info=True)
        return TableOutput(
            tables=[],
            source_img_size=None,
            processing_time=None,
            metadata={"error": str(e)}
        )

postprocess_output

postprocess_output(raw_output: Dict, img_size: Tuple[int, int]) -> TableOutput

Convert TableFormer output to standardized TableOutput format.

Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
def postprocess_output(self, raw_output: Dict, img_size: Tuple[int, int]) -> TableOutput:
    """Convert TableFormer output to standardized TableOutput format."""
    tables = []

    # Extract table from detections
    detections = raw_output.get('detections', [])
    if detections:
        table = self._create_table_from_detections(detections, img_size)
        tables.append(table)

    return TableOutput(
        tables=tables,
        source_img_size=img_size,
        metadata={
            'engine': 'tableformer',
            'model_name': self.model_name_or_path,
            'confidence_threshold': self.confidence_threshold,
            'max_size': self.max_size
        }
    )

predict

predict(input_path: Union[str, Path, Image], **kwargs)

Predict method for compatibility with original interface.

Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
def predict(self, input_path: Union[str, Path, Image.Image], **kwargs):
    """Predict method for compatibility with original interface."""
    try:
        result = self.extract(input_path, **kwargs)

        # Convert to original format
        table_res = []
        for table in result.tables:
            table_data = {
                "table_id": table.table_id,
                "bbox": table.bbox,
                "confidence": table.confidence,
                "cells": [cell.to_dict() for cell in table.cells],
                "num_rows": table.num_rows,
                "num_cols": table.num_cols
            }
            table_res.append(table_data)

        return table_res

    except Exception as e:
        logger.error("Error during TableFormer prediction", exc_info=True)
        return []

TableFormerMapper

TableFormerMapper()

Bases: BaseTableMapper

Label mapper for TableFormer model output.

Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
def __init__(self):
    super().__init__('tableformer')
    self._setup_mapping()

πŸ› οΈ Utilities & Helpers

Common utility functions, data structures, and helpers used throughout OmniDocs.

omnidocs.utils

Utilities module for OmniDocs.

This module provides common utilities used across different tasks and components.

GlobalLanguageMapper

GlobalLanguageMapper()

Global language mapper that handles different OCR engine formats.

Source code in omnidocs/utils/language.py
def __init__(self):
    self._engine_mappings: Dict[str, Dict[str, str]] = {}
    self._setup_default_mappings()

from_standard

from_standard(engine_name: str, standard_code: str) -> str

Convert standard language code to engine-specific format.

Parameters:

Name Type Description Default
engine_name str

Name of the OCR engine

required
standard_code str

Standard ISO 639-1 language code

required

Returns:

Type Description
str

Engine-specific language code

Source code in omnidocs/utils/language.py
def from_standard(self, engine_name: str, standard_code: str) -> str:
    """Convert standard language code to engine-specific format.

    Args:
        engine_name: Name of the OCR engine
        standard_code: Standard ISO 639-1 language code

    Returns:
        Engine-specific language code
    """
    if engine_name not in self._engine_mappings:
        return standard_code

    mapping = self._engine_mappings[engine_name]
    reverse_mapping = {v: k for k, v in mapping.items()}
    return reverse_mapping.get(standard_code.lower(), standard_code)

get_engine_codes

get_engine_codes(engine_name: str) -> List[str]

Get list of engine-specific language codes.

Parameters:

Name Type Description Default
engine_name str

Name of the OCR engine

required

Returns:

Type Description
List[str]

List of engine-specific language codes

Source code in omnidocs/utils/language.py
def get_engine_codes(self, engine_name: str) -> List[str]:
    """Get list of engine-specific language codes.

    Args:
        engine_name: Name of the OCR engine

    Returns:
        List of engine-specific language codes
    """
    if engine_name not in self._engine_mappings:
        return []

    return list(self._engine_mappings[engine_name].keys())

get_supported_engines

get_supported_engines() -> List[str]

Get list of supported OCR engines.

Source code in omnidocs/utils/language.py
def get_supported_engines(self) -> List[str]:
    """Get list of supported OCR engines."""
    return list(self._engine_mappings.keys())

get_supported_languages

get_supported_languages(engine_name: str) -> List[str]

Get list of supported languages for a specific engine.

Parameters:

Name Type Description Default
engine_name str

Name of the OCR engine

required

Returns:

Type Description
List[str]

List of standard language codes supported by the engine

Source code in omnidocs/utils/language.py
def get_supported_languages(self, engine_name: str) -> List[str]:
    """Get list of supported languages for a specific engine.

    Args:
        engine_name: Name of the OCR engine

    Returns:
        List of standard language codes supported by the engine
    """
    if engine_name not in self._engine_mappings:
        return []

    return list(self._engine_mappings[engine_name].values())

register_engine_mapping

register_engine_mapping(engine_name: str, mapping: Dict[str, str]) -> None

Register a new engine's language mapping.

Parameters:

Name Type Description Default
engine_name str

Name of the OCR engine

required
mapping Dict[str, str]

Dictionary mapping engine codes to standard codes

required
Source code in omnidocs/utils/language.py
def register_engine_mapping(self, engine_name: str, mapping: Dict[str, str]) -> None:
    """Register a new engine's language mapping.

    Args:
        engine_name: Name of the OCR engine
        mapping: Dictionary mapping engine codes to standard codes
    """
    self._engine_mappings[engine_name] = mapping

to_standard

to_standard(engine_name: str, engine_code: str) -> str

Convert engine-specific language code to standard format.

Parameters:

Name Type Description Default
engine_name str

Name of the OCR engine

required
engine_code str

Engine-specific language code

required

Returns:

Type Description
str

Standard ISO 639-1 language code

Source code in omnidocs/utils/language.py
def to_standard(self, engine_name: str, engine_code: str) -> str:
    """Convert engine-specific language code to standard format.

    Args:
        engine_name: Name of the OCR engine
        engine_code: Engine-specific language code

    Returns:
        Standard ISO 639-1 language code
    """
    if engine_name not in self._engine_mappings:
        return engine_code

    mapping = self._engine_mappings[engine_name]
    return mapping.get(engine_code.lower(), engine_code)

LanguageCode

Bases: Enum

Standard ISO 639-1 language codes supported by OmniDocs.

get_all_codes classmethod

get_all_codes() -> List[str]

Get all supported language codes.

Source code in omnidocs/utils/language.py
@classmethod
def get_all_codes(cls) -> List[str]:
    """Get all supported language codes."""
    return [lang.value for lang in cls]

is_valid_code classmethod

is_valid_code(code: str) -> bool

Check if a language code is valid.

Source code in omnidocs/utils/language.py
@classmethod
def is_valid_code(cls, code: str) -> bool:
    """Check if a language code is valid."""
    return code.lower() in [lang.value.lower() for lang in cls]

LanguageDetector

Simple language detection utilities.

detect_script classmethod

detect_script(text: str) -> Optional[str]

Detect the primary script/language of the given text.

Parameters:

Name Type Description Default
text str

Input text to analyze

required

Returns:

Type Description
Optional[str]

Detected language code or None if unable to detect

Source code in omnidocs/utils/language.py
@classmethod
def detect_script(cls, text: str) -> Optional[str]:
    """Detect the primary script/language of the given text.

    Args:
        text: Input text to analyze

    Returns:
        Detected language code or None if unable to detect
    """
    if not text:
        return None

    # Count characters for each language
    language_scores = {}

    for char in text:
        char_code = ord(char)
        for language, ranges in cls.LANGUAGE_RANGES.items():
            for start, end in ranges:
                if start <= char_code <= end:
                    language_scores[language] = language_scores.get(language, 0) + 1
                    break

    if not language_scores:
        # Default to English for Latin script
        return LanguageCode.ENGLISH.value

    # Return language with highest score
    return max(language_scores, key=language_scores.get)

is_mixed_script classmethod

is_mixed_script(text: str, threshold: float = 0.1) -> bool

Check if text contains mixed scripts.

Parameters:

Name Type Description Default
text str

Input text to analyze

required
threshold float

Minimum ratio for considering a script significant

0.1

Returns:

Type Description
bool

True if text contains multiple scripts above threshold

Source code in omnidocs/utils/language.py
@classmethod
def is_mixed_script(cls, text: str, threshold: float = 0.1) -> bool:
    """Check if text contains mixed scripts.

    Args:
        text: Input text to analyze
        threshold: Minimum ratio for considering a script significant

    Returns:
        True if text contains multiple scripts above threshold
    """
    if not text:
        return False

    language_scores = {}
    total_chars = 0

    for char in text:
        if char.isalnum():  # Only count alphanumeric characters
            total_chars += 1
            char_code = ord(char)
            for language, ranges in cls.LANGUAGE_RANGES.items():
                for start, end in ranges:
                    if start <= char_code <= end:
                        language_scores[language] = language_scores.get(language, 0) + 1
                        break

    if total_chars == 0:
        return False

    # Check how many languages exceed the threshold
    significant_languages = sum(
        1 for score in language_scores.values()
        if score / total_chars >= threshold
    )

    return significant_languages > 1

detect_language

detect_language(text: str) -> Optional[str]

Convenience function to detect language from text.

Source code in omnidocs/utils/language.py
def detect_language(text: str) -> Optional[str]:
    """Convenience function to detect language from text."""
    return LanguageDetector.detect_script(text)

get_all_supported_languages

get_all_supported_languages() -> List[str]

Get all language codes supported by OmniDocs.

Source code in omnidocs/utils/language.py
def get_all_supported_languages() -> List[str]:
    """Get all language codes supported by OmniDocs."""
    return LanguageCode.get_all_codes()

get_language_mapper

get_language_mapper() -> GlobalLanguageMapper

Get the global language mapper instance.

Source code in omnidocs/utils/language.py
def get_language_mapper() -> GlobalLanguageMapper:
    """Get the global language mapper instance."""
    return global_language_mapper

get_logger

get_logger(name: str, level: Union[str, int] = logging.INFO, log_file: Optional[Union[str, Path]] = None, include_path: bool = True) -> logging.Logger

Get a configured logger instance.

Parameters:

Name Type Description Default
name str

Name of the logger

required
level Union[str, int]

Logging level

INFO
log_file Optional[Union[str, Path]]

Optional file path to save logs

None
include_path bool

Whether to include full path in log messages

True

Returns:

Type Description
Logger

Configured logger instance

Source code in omnidocs/utils/logging.py
def get_logger(
    name: str,
    level: Union[str, int] = logging.INFO,
    log_file: Optional[Union[str, Path]] = None,
    include_path: bool = True,
) -> logging.Logger:
    """
    Get a configured logger instance.

    Args:
        name: Name of the logger
        level: Logging level
        log_file: Optional file path to save logs
        include_path: Whether to include full path in log messages

    Returns:
        Configured logger instance
    """
    # Create logger
    logger = logging.getLogger(name)
    logger.setLevel(level)

    # Remove existing handlers
    logger.handlers.clear()

    # Create console handler with rich support
    console_handler = RichHandler(
        console=console,
        show_time=False,
        show_path=False,
        rich_tracebacks=True,
        tracebacks_show_locals=True,
    )
    console_handler.setFormatter(CustomFormatter(include_path=include_path))
    logger.addHandler(console_handler)

    # Add file handler if log_file is specified
    if log_file:
        log_file = Path(log_file)
        log_file.parent.mkdir(parents=True, exist_ok=True)

        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(CustomFormatter(include_path=True))
        logger.addHandler(file_handler)

    return logger

get_model_path

get_model_path(extractor_name: str, model_name: str) -> Path

Get standardized model path for a specific extractor and model.

Parameters:

Name Type Description Default
extractor_name str

Name of the extractor (e.g., 'donut', 'nougat')

required
model_name str

Name/ID of the model (e.g., 'naver-clova-ix/donut-base')

required

Returns:

Name Type Description
Path Path

Full path where the model should be stored

Source code in omnidocs/utils/model_config.py
def get_model_path(extractor_name: str, model_name: str) -> Path:
    """
    Get standardized model path for a specific extractor and model.

    Args:
        extractor_name: Name of the extractor (e.g., 'donut', 'nougat')
        model_name: Name/ID of the model (e.g., 'naver-clova-ix/donut-base')

    Returns:
        Path: Full path where the model should be stored
    """
    models_dir = get_models_directory()
    # Replace slashes in model names to create valid directory names
    safe_model_name = model_name.replace("/", "_")
    return models_dir / extractor_name / safe_model_name

get_models_directory

get_models_directory() -> Path

Get the models directory, setting up environment if needed.

Returns:

Name Type Description
Path Path

The models directory path

Source code in omnidocs/utils/model_config.py
def get_models_directory() -> Path:
    """
    Get the models directory, setting up environment if needed.

    Returns:
        Path: The models directory path
    """
    return setup_model_environment()

is_supported_language

is_supported_language(code: str) -> bool

Check if a language code is supported by OmniDocs.

Source code in omnidocs/utils/language.py
def is_supported_language(code: str) -> bool:
    """Check if a language code is supported by OmniDocs."""
    return LanguageCode.is_valid_code(code)

setup_model_environment

setup_model_environment() -> Path

Setup model environment variables once for the entire application.

This function: 1. Calculates the omnidocs models directory dynamically 2. Creates the directory if it doesn't exist 3. Sets HuggingFace environment variables to use our models directory 4. Uses a flag to prevent multiple setups

Returns:

Name Type Description
Path Path

The models directory path

Source code in omnidocs/utils/model_config.py
def setup_model_environment() -> Path:
    """
    Setup model environment variables once for the entire application.

    This function:
    1. Calculates the omnidocs models directory dynamically
    2. Creates the directory if it doesn't exist
    3. Sets HuggingFace environment variables to use our models directory
    4. Uses a flag to prevent multiple setups

    Returns:
        Path: The models directory path
    """
    # Check if already setup to prevent multiple calls
    if 'OMNIDOCS_MODELS_SETUP' in os.environ:
        # Return the already configured models directory
        return Path(os.environ["HF_HOME"])

    # Calculate omnidocs root dynamically
    current_file = Path(__file__)
    omnidocs_root = current_file.parent.parent  # Go up to omnidocs/ root
    models_dir = omnidocs_root / "models"
    models_dir.mkdir(exist_ok=True)

    # Set environment variables for HuggingFace to use our models directory
    os.environ["HF_HOME"] = str(models_dir)
    os.environ["TRANSFORMERS_CACHE"] = str(models_dir)
    os.environ["HF_HUB_CACHE"] = str(models_dir)

    # Set flag to prevent re-setup
    os.environ["OMNIDOCS_MODELS_SETUP"] = "true"

    return models_dir

πŸ§‘β€πŸ’» Usage Tips

  • All extractors follow a consistent interface: extractor = ...Extractor(); result = extractor.extract(input)
  • Results are returned as structured objects (e.g., TableOutput, TextOutput, etc.)
  • See the Getting Started guide for real-world examples.
  • For advanced configuration, check each extractor’s docstring for parameters and options.

πŸ“š More Resources