π Python API Reference
Welcome to the OmniDocs Python API Reference! This page provides live, auto-generated documentation for every major module, extractor, and utility in the OmniDocs ecosystem. Use this as your single source of truth for all classes, functions, and configuration options.
π¦ Core Package
The main OmniDocs package provides the top-level API, configuration, and shared utilities.
OmniDocs organizes all document AI into modular tasks. Each task has its own extractors, which you can import and use directly. Click any section below to expand the full API for that task.
π Layout Analysis
Detect and analyze document structure, regions, and reading order.
omnidocs.tasks.layout_analysis
FlorenceLayoutDetector
FlorenceLayoutDetector(device: Optional[str] = None, show_log: bool = False, trust_remote_code: bool = True, **kwargs)
Bases: BaseLayoutDetector
Florence-based layout detection implementation.
Initialize Florence Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
trust_remote_code: bool = True,
**kwargs
):
"""Initialize Florence Layout Detector."""
super().__init__(show_log=show_log)
# Initialize label mapper
self._label_mapper = FlorenceLayoutMapper()
logger.info("Initializing FlorenceLayoutDetector")
if device:
self.device = device
logger.info(f"Using device: {self.device}")
try:
from transformers import AutoProcessor, AutoModelForCausalLM
except ImportError as ex:
logger.error("Failed to import transformers")
raise ImportError(
"transformers is not available. Please install it with: pip install transformers"
) from ex
# Initialize the model and processor
try:
self.model = AutoModelForCausalLM.from_pretrained(
self.MODEL_REPO,
trust_remote_code=trust_remote_code,
**kwargs
)
self.processor = AutoProcessor.from_pretrained(
self.MODEL_REPO,
trust_remote_code=trust_remote_code
)
self.model.to(self.device)
logger.success("Model initialized successfully")
except Exception as e:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect
detect(input_path: Union[str, Path], max_new_tokens: int = 1024, do_sample: bool = False, num_beams: int = 3, **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
max_new_tokens: int = 1024,
do_sample: bool = False,
num_beams: int = 3,
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
try:
# Load and preprocess input
image = Image.open(input_path).convert("RGB")
# Prepare inputs
prompt = "<OD>"
inputs = self.processor(
text=prompt,
images=image,
return_tensors="pt"
).to(self.device)
# Generate predictions
generated_ids = self.model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=max_new_tokens,
do_sample=do_sample,
num_beams=num_beams,
**kwargs
)
# Decode and post-process
generated_text = self.processor.batch_decode(
generated_ids,
skip_special_tokens=False
)[0]
parsed_result = self.processor.post_process_generation(
generated_text,
task="<OD>",
image_size=(image.width, image.height)
)
# Convert to standard format
layout_boxes = []
for bbox, label in zip(
parsed_result["<OD>"]["bboxes"],
parsed_result["<OD>"]["labels"]
):
mapped_label = self.map_label(label.lower())
if mapped_label:
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=[float(coord) for coord in bbox],
confidence=None # Florence model doesn't provide confidence scores
)
)
# Create annotated image
annotated_img = image.copy()
draw = ImageDraw.Draw(annotated_img)
# Draw boxes and labels
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
return annotated_img, LayoutOutput(bboxes=layout_boxes)
except Exception as e:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save annotated image and layout data to files.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save annotated image and layout data to files.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
super().visualize(detection_result, output_path)
|
PaddleLayoutDetector
PaddleLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)
Bases: BaseLayoutDetector
PaddleOCR-based layout detection implementation.
Initialize PaddleOCR Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
**kwargs
):
"""Initialize PaddleOCR Layout Detector."""
super().__init__()
# Initialize label mapper
self._label_mapper = PaddleLayoutMapper()
# Log initialization
logger.info("Initializing PaddleLayoutDetector")
# Set device if specified
if device:
self.device = device
logger.info(f"Using device: {self.device}")
try:
from paddleocr import PPStructure
except ImportError as ex:
logger.error("Failed to import paddleocr")
raise ImportError(
"paddleocr is not available. Please install it with: pip install paddleocr"
) from ex
# Initialize the model
try:
self.model = PPStructure(
table=True,
ocr=True,
show_log=show_log,
**kwargs
)
logger.success("Model initialized successfully")
except Exception as e:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect
detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
try:
# Load and preprocess input
images = self.preprocess_input(input_path)
results = []
for img in images:
# Get detection results
det_result = self.model(img)
# Convert to PIL Image if needed
if isinstance(img, np.ndarray):
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
# Create annotated image
annotated_img = img.copy()
draw = ImageDraw.Draw(annotated_img)
# Convert detection results to LayoutBox objects with standardized labels
layout_boxes = []
for block in det_result:
# Extract coordinates and type
x1, y1, x2, y2 = block['bbox']
model_label = block['type']
mapped_label = self.map_label(model_label)
if mapped_label: # Only include boxes with valid mapped labels
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=[float(x1), float(y1), float(x2), float(y2)],
confidence=block.get('confidence', None)
)
)
# Draw with standardized colors
color = self.color_map.get(mapped_label, 'gray')
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
draw.text((x1, y1-20), mapped_label, fill=color)
results.append((
annotated_img,
LayoutOutput(bboxes=layout_boxes)
))
return results[0] if results else (None, LayoutOutput(bboxes=[]))
except Exception as e:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save annotated image and layout data to files.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save annotated image and layout data to files.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
super().visualize(detection_result, output_path)
|
RTDETRLayoutDetector
RTDETRLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, num_threads: Optional[int] = 4, use_cpu_only: bool = True)
Bases: BaseLayoutDetector
RT-DETR-based layout detection implementation.
Initialize RT-DETR Layout Detector with careful device handling.
Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[Union[str, Path]] = None,
num_threads: Optional[int] = 4,
use_cpu_only: bool = True
):
"""Initialize RT-DETR Layout Detector with careful device handling."""
super().__init__(show_log=show_log)
self._label_mapper = RTDETRLayoutMapper()
if self.show_log:
logger.info("Initializing RTDETRLayoutDetector")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "rtdetr_layout" / self.MODEL_REPO.replace("/", "_")
self.model_path = Path(model_path)
self.num_threads = num_threads
# Careful device handling
if use_cpu_only:
self.device = "cpu"
if self.show_log:
logger.info("Forced CPU usage due to use_cpu_only flag")
elif device:
self.device = device
if self.show_log:
logger.info(f"Using specified device: {device}")
else:
# Check CUDA availability with error handling
try:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
if self.show_log:
logger.info(f"Automatically selected device: {self.device}")
except Exception as e:
self.device = "cpu"
if self.show_log:
logger.warning(f"Error checking CUDA availability: {e}. Defaulting to CPU")
self.num_threads = num_threads or int(os.environ.get("OMP_NUM_THREADS", 4))
# Set thread count for CPU operations
if self.device == "cpu":
torch.set_num_threads(self.num_threads)
if self.show_log:
logger.info(f"Set CPU threads to {self.num_threads}")
# Model parameters
self.image_size = 640
self.confidence_threshold = 0.6
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self._model_exists():
if self.show_log:
logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
self._download_model()
# Load model
try:
self._load_model()
if self.show_log:
logger.success("Model initialized successfully")
except Exception as e:
if self.show_log:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect
detect(input_path: Union[str, Path], confidence_threshold: Optional[float] = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection using RT-DETR Transformers model.
Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
confidence_threshold: Optional[float] = None,
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection using RT-DETR Transformers model."""
if self.model is None:
raise RuntimeError("Model not loaded. Initialization failed.")
try:
# Load and preprocess image
if isinstance(input_path, (str, Path)):
image = Image.open(input_path).convert("RGB")
elif isinstance(input_path, Image.Image):
image = input_path.convert("RGB")
elif isinstance(input_path, np.ndarray):
image = Image.fromarray(input_path).convert("RGB")
else:
raise ValueError("Unsupported input type")
# Preprocess the image using the image processor
resize = {"height": self.image_size, "width": self.image_size}
inputs = self.image_processor(
images=image,
return_tensors="pt",
size=resize,
)
# Move inputs to the correct device
if self.device == "cuda":
inputs = {k: v.cuda() if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}
# Run inference
try:
with torch.no_grad():
outputs = self.model(**inputs)
except Exception as e:
raise RuntimeError(f"Error during model inference: {e}") from e
# Post-process results
threshold = confidence_threshold or self.confidence_threshold
results = self.image_processor.post_process_object_detection(
outputs,
target_sizes=torch.tensor([image.size[::-1]]),
threshold=threshold
)
# Process predictions
layout_boxes = []
for result in results:
for score, label_id, box in zip(result["scores"], result["labels"], result["boxes"]):
score_val = float(score.item())
label_idx = int(label_id.item())
# Get label from model config (add 1 because model config is 0-indexed)
model_label = self.model.config.id2label.get(label_idx + 1)
if not model_label:
continue
# Map to standardized label
mapped_label = self.map_label(model_label)
if not mapped_label:
continue
# Convert box coordinates (already in image space)
box = [round(i, 2) for i in box.tolist()]
left, top, right, bottom = box
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=[left, top, right, bottom],
confidence=score_val
)
)
# Create annotated image
annotated_img = image.copy()
draw = ImageDraw.Draw(annotated_img)
# Draw boxes with standardized colors
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
return annotated_img, LayoutOutput(bboxes=layout_boxes)
except Exception as e:
if self.show_log:
logger.error("Error during prediction", exc_info=True)
raise
|
SuryaLayoutDetector
SuryaLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)
Bases: BaseLayoutDetector
Surya-based layout detection implementation.
Initialize Surya Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
**kwargs
):
"""Initialize Surya Layout Detector."""
super().__init__(show_log=show_log)
# Initialize label mapper
self._label_mapper = SuryaLayoutMapper()
if self.show_log:
logger.info("Initializing SuryaLayoutDetector")
# Set device if specified, otherwise use default from parent
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
try:
# Import required libraries - use new API
import surya
if self.show_log:
logger.info(f"Found surya package at: {surya.__file__}")
except ImportError as ex:
if self.show_log:
logger.error("Failed to import surya")
raise ImportError(
"surya is not available. Please install it with: pip install surya-ocr"
) from ex
try:
# Initialize detection and layout models using new API
from surya.layout import LayoutPredictor
self.layout_predictor = LayoutPredictor()
if self.show_log:
logger.success("Models initialized successfully")
except Exception as e:
if self.show_log:
logger.error("Failed to initialize models", exc_info=True)
raise
|
detect
detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
try:
# Load and preprocess input
if isinstance(input_path, (str, Path)):
image = Image.open(input_path).convert("RGB")
elif isinstance(input_path, Image.Image):
image = input_path.convert("RGB")
elif isinstance(input_path, np.ndarray):
image = Image.fromarray(input_path).convert("RGB")
else:
raise ValueError("Unsupported input type")
# Run layout detection using new API
layout_predictions = self.layout_predictor([image])
# Process the layout prediction (take first since we only processed one image)
layout_pred = layout_predictions[0]
# Convert to standardized format
layout_boxes = []
for box in layout_pred.bboxes:
mapped_label = self.map_label(box.label)
if mapped_label:
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=box.bbox, # Already in [x1, y1, x2, y2] format
confidence=box.confidence
)
)
# Create annotated image
annotated_img = image.copy()
draw = ImageDraw.Draw(annotated_img)
# Draw boxes with standardized colors
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
# Create LayoutOutput with image size
layout_output = LayoutOutput(
bboxes=layout_boxes,
image_size=image.size
)
return annotated_img, layout_output
except Exception as e:
if self.show_log:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save annotated image and layout data to files.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save annotated image and layout data to files.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
super().visualize(detection_result, output_path)
|
YOLOLayoutDetector
YOLOLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None)
Bases: BaseLayoutDetector
YOLO-based layout detection implementation.
Initialize YOLO Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[Union[str, Path]] = None
):
"""Initialize YOLO Layout Detector."""
super().__init__(show_log=show_log)
self._label_mapper = YOLOLayoutMapper()
if self.show_log:
logger.info(f"Initializing YOLOLayoutDetector")
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "yolo_layout" / self.MODEL_REPO.replace("/", "_")
self.model_path = Path(model_path)
if self.show_log:
logger.info(f"Model directory: {self.model_path}")
self.conf_threshold = 0.2
self.img_size = 1024
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self._model_exists():
if self.show_log:
logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
self._download_model()
# Load model
try:
self._load_model()
if self.show_log:
logger.success("Model initialized successfully")
except Exception as e:
if self.show_log:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect
detect(input_path: Union[str, Path], conf_threshold: float = None, img_size: int = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
conf_threshold: float = None,
img_size: int = None,
**kwargs,
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
if self.model is None:
raise RuntimeError("Model not loaded. Initialization failed.")
conf = conf_threshold if conf_threshold else self.conf_threshold
imgsz = img_size if img_size else self.img_size
try:
images = self.preprocess_input(input_path)
results = []
for img in images:
# Get detection results
det_result = self.model.predict(
img, imgsz=imgsz, conf=conf, device=self.device, **kwargs
)
# Convert detection results to LayoutBox objects
layout_boxes = []
for box in det_result[0].boxes:
model_label = det_result[0].names[int(box.cls[0])]
mapped_label = self.map_label(model_label)
if mapped_label:
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=box.xyxy[0].tolist(),
confidence=float(box.conf[0]) if box.conf is not None else None
)
)
# Get the annotated image (will be a numpy array)
annotated_img_array = det_result[0].plot(labels=False) # Disable YOLO's default labels
# Convert numpy array to PIL Image
annotated_img = Image.fromarray(cv2.cvtColor(annotated_img_array, cv2.COLOR_BGR2RGB))
# Draw standardized labels on the image
draw = ImageDraw.Draw(annotated_img)
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
results.append((
annotated_img,
LayoutOutput(bboxes=layout_boxes)
))
return results[0] if results else (None, LayoutOutput(bboxes=[]))
except Exception as e:
if self.show_log:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save the annotated image to file.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save the annotated image to file.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
annotated_image, _ = detection_result
# Convert numpy array to PIL Image if necessary
if isinstance(annotated_image, np.ndarray):
annotated_image = Image.fromarray(annotated_image)
if annotated_image is not None:
annotated_image.save(str(output_path))
|
YOLOLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None)
Bases: BaseLayoutDetector
YOLO-based layout detection implementation.
Initialize YOLO Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[Union[str, Path]] = None
):
"""Initialize YOLO Layout Detector."""
super().__init__(show_log=show_log)
self._label_mapper = YOLOLayoutMapper()
if self.show_log:
logger.info(f"Initializing YOLOLayoutDetector")
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "yolo_layout" / self.MODEL_REPO.replace("/", "_")
self.model_path = Path(model_path)
if self.show_log:
logger.info(f"Model directory: {self.model_path}")
self.conf_threshold = 0.2
self.img_size = 1024
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self._model_exists():
if self.show_log:
logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
self._download_model()
# Load model
try:
self._load_model()
if self.show_log:
logger.success("Model initialized successfully")
except Exception as e:
if self.show_log:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect(input_path: Union[str, Path], conf_threshold: float = None, img_size: int = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
conf_threshold: float = None,
img_size: int = None,
**kwargs,
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
if self.model is None:
raise RuntimeError("Model not loaded. Initialization failed.")
conf = conf_threshold if conf_threshold else self.conf_threshold
imgsz = img_size if img_size else self.img_size
try:
images = self.preprocess_input(input_path)
results = []
for img in images:
# Get detection results
det_result = self.model.predict(
img, imgsz=imgsz, conf=conf, device=self.device, **kwargs
)
# Convert detection results to LayoutBox objects
layout_boxes = []
for box in det_result[0].boxes:
model_label = det_result[0].names[int(box.cls[0])]
mapped_label = self.map_label(model_label)
if mapped_label:
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=box.xyxy[0].tolist(),
confidence=float(box.conf[0]) if box.conf is not None else None
)
)
# Get the annotated image (will be a numpy array)
annotated_img_array = det_result[0].plot(labels=False) # Disable YOLO's default labels
# Convert numpy array to PIL Image
annotated_img = Image.fromarray(cv2.cvtColor(annotated_img_array, cv2.COLOR_BGR2RGB))
# Draw standardized labels on the image
draw = ImageDraw.Draw(annotated_img)
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
results.append((
annotated_img,
LayoutOutput(bboxes=layout_boxes)
))
return results[0] if results else (None, LayoutOutput(bboxes=[]))
except Exception as e:
if self.show_log:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save the annotated image to file.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/doc_layout_yolo.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save the annotated image to file.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
annotated_image, _ = detection_result
# Convert numpy array to PIL Image if necessary
if isinstance(annotated_image, np.ndarray):
annotated_image = Image.fromarray(annotated_image)
if annotated_image is not None:
annotated_image.save(str(output_path))
|
Bases: BaseLayoutMapper
Label mapper for YOLO layout detection model.
Source code in omnidocs/tasks/layout_analysis/base.py
| def __init__(self):
self._mapping: Dict[str, LayoutLabel] = {}
self._reverse_mapping: Dict[LayoutLabel, str] = {}
self._setup_mapping()
|
FlorenceLayoutDetector(device: Optional[str] = None, show_log: bool = False, trust_remote_code: bool = True, **kwargs)
Bases: BaseLayoutDetector
Florence-based layout detection implementation.
Initialize Florence Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
trust_remote_code: bool = True,
**kwargs
):
"""Initialize Florence Layout Detector."""
super().__init__(show_log=show_log)
# Initialize label mapper
self._label_mapper = FlorenceLayoutMapper()
logger.info("Initializing FlorenceLayoutDetector")
if device:
self.device = device
logger.info(f"Using device: {self.device}")
try:
from transformers import AutoProcessor, AutoModelForCausalLM
except ImportError as ex:
logger.error("Failed to import transformers")
raise ImportError(
"transformers is not available. Please install it with: pip install transformers"
) from ex
# Initialize the model and processor
try:
self.model = AutoModelForCausalLM.from_pretrained(
self.MODEL_REPO,
trust_remote_code=trust_remote_code,
**kwargs
)
self.processor = AutoProcessor.from_pretrained(
self.MODEL_REPO,
trust_remote_code=trust_remote_code
)
self.model.to(self.device)
logger.success("Model initialized successfully")
except Exception as e:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect(input_path: Union[str, Path], max_new_tokens: int = 1024, do_sample: bool = False, num_beams: int = 3, **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
max_new_tokens: int = 1024,
do_sample: bool = False,
num_beams: int = 3,
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
try:
# Load and preprocess input
image = Image.open(input_path).convert("RGB")
# Prepare inputs
prompt = "<OD>"
inputs = self.processor(
text=prompt,
images=image,
return_tensors="pt"
).to(self.device)
# Generate predictions
generated_ids = self.model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=max_new_tokens,
do_sample=do_sample,
num_beams=num_beams,
**kwargs
)
# Decode and post-process
generated_text = self.processor.batch_decode(
generated_ids,
skip_special_tokens=False
)[0]
parsed_result = self.processor.post_process_generation(
generated_text,
task="<OD>",
image_size=(image.width, image.height)
)
# Convert to standard format
layout_boxes = []
for bbox, label in zip(
parsed_result["<OD>"]["bboxes"],
parsed_result["<OD>"]["labels"]
):
mapped_label = self.map_label(label.lower())
if mapped_label:
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=[float(coord) for coord in bbox],
confidence=None # Florence model doesn't provide confidence scores
)
)
# Create annotated image
annotated_img = image.copy()
draw = ImageDraw.Draw(annotated_img)
# Draw boxes and labels
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
return annotated_img, LayoutOutput(bboxes=layout_boxes)
except Exception as e:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save annotated image and layout data to files.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/florence.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save annotated image and layout data to files.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
super().visualize(detection_result, output_path)
|
Bases: BaseLayoutMapper
Label mapper for Florence layout detection model.
Source code in omnidocs/tasks/layout_analysis/base.py
| def __init__(self):
self._mapping: Dict[str, LayoutLabel] = {}
self._reverse_mapping: Dict[LayoutLabel, str] = {}
self._setup_mapping()
|
PaddleLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)
Bases: BaseLayoutDetector
PaddleOCR-based layout detection implementation.
Initialize PaddleOCR Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
**kwargs
):
"""Initialize PaddleOCR Layout Detector."""
super().__init__()
# Initialize label mapper
self._label_mapper = PaddleLayoutMapper()
# Log initialization
logger.info("Initializing PaddleLayoutDetector")
# Set device if specified
if device:
self.device = device
logger.info(f"Using device: {self.device}")
try:
from paddleocr import PPStructure
except ImportError as ex:
logger.error("Failed to import paddleocr")
raise ImportError(
"paddleocr is not available. Please install it with: pip install paddleocr"
) from ex
# Initialize the model
try:
self.model = PPStructure(
table=True,
ocr=True,
show_log=show_log,
**kwargs
)
logger.success("Model initialized successfully")
except Exception as e:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
try:
# Load and preprocess input
images = self.preprocess_input(input_path)
results = []
for img in images:
# Get detection results
det_result = self.model(img)
# Convert to PIL Image if needed
if isinstance(img, np.ndarray):
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
# Create annotated image
annotated_img = img.copy()
draw = ImageDraw.Draw(annotated_img)
# Convert detection results to LayoutBox objects with standardized labels
layout_boxes = []
for block in det_result:
# Extract coordinates and type
x1, y1, x2, y2 = block['bbox']
model_label = block['type']
mapped_label = self.map_label(model_label)
if mapped_label: # Only include boxes with valid mapped labels
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=[float(x1), float(y1), float(x2), float(y2)],
confidence=block.get('confidence', None)
)
)
# Draw with standardized colors
color = self.color_map.get(mapped_label, 'gray')
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
draw.text((x1, y1-20), mapped_label, fill=color)
results.append((
annotated_img,
LayoutOutput(bboxes=layout_boxes)
))
return results[0] if results else (None, LayoutOutput(bboxes=[]))
except Exception as e:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save annotated image and layout data to files.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/paddle.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save annotated image and layout data to files.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
super().visualize(detection_result, output_path)
|
Bases: BaseLayoutMapper
Label mapper for PaddleOCR layout detection model.
Source code in omnidocs/tasks/layout_analysis/base.py
| def __init__(self):
self._mapping: Dict[str, LayoutLabel] = {}
self._reverse_mapping: Dict[LayoutLabel, str] = {}
self._setup_mapping()
|
RTDETRLayoutDetector(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, num_threads: Optional[int] = 4, use_cpu_only: bool = True)
Bases: BaseLayoutDetector
RT-DETR-based layout detection implementation.
Initialize RT-DETR Layout Detector with careful device handling.
Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[Union[str, Path]] = None,
num_threads: Optional[int] = 4,
use_cpu_only: bool = True
):
"""Initialize RT-DETR Layout Detector with careful device handling."""
super().__init__(show_log=show_log)
self._label_mapper = RTDETRLayoutMapper()
if self.show_log:
logger.info("Initializing RTDETRLayoutDetector")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "rtdetr_layout" / self.MODEL_REPO.replace("/", "_")
self.model_path = Path(model_path)
self.num_threads = num_threads
# Careful device handling
if use_cpu_only:
self.device = "cpu"
if self.show_log:
logger.info("Forced CPU usage due to use_cpu_only flag")
elif device:
self.device = device
if self.show_log:
logger.info(f"Using specified device: {device}")
else:
# Check CUDA availability with error handling
try:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
if self.show_log:
logger.info(f"Automatically selected device: {self.device}")
except Exception as e:
self.device = "cpu"
if self.show_log:
logger.warning(f"Error checking CUDA availability: {e}. Defaulting to CPU")
self.num_threads = num_threads or int(os.environ.get("OMP_NUM_THREADS", 4))
# Set thread count for CPU operations
if self.device == "cpu":
torch.set_num_threads(self.num_threads)
if self.show_log:
logger.info(f"Set CPU threads to {self.num_threads}")
# Model parameters
self.image_size = 640
self.confidence_threshold = 0.6
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self._model_exists():
if self.show_log:
logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
self._download_model()
# Load model
try:
self._load_model()
if self.show_log:
logger.success("Model initialized successfully")
except Exception as e:
if self.show_log:
logger.error("Failed to initialize model", exc_info=True)
raise
|
detect(input_path: Union[str, Path], confidence_threshold: Optional[float] = None, **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection using RT-DETR Transformers model.
Source code in omnidocs/tasks/layout_analysis/extractors/rtdetr.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
confidence_threshold: Optional[float] = None,
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection using RT-DETR Transformers model."""
if self.model is None:
raise RuntimeError("Model not loaded. Initialization failed.")
try:
# Load and preprocess image
if isinstance(input_path, (str, Path)):
image = Image.open(input_path).convert("RGB")
elif isinstance(input_path, Image.Image):
image = input_path.convert("RGB")
elif isinstance(input_path, np.ndarray):
image = Image.fromarray(input_path).convert("RGB")
else:
raise ValueError("Unsupported input type")
# Preprocess the image using the image processor
resize = {"height": self.image_size, "width": self.image_size}
inputs = self.image_processor(
images=image,
return_tensors="pt",
size=resize,
)
# Move inputs to the correct device
if self.device == "cuda":
inputs = {k: v.cuda() if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}
# Run inference
try:
with torch.no_grad():
outputs = self.model(**inputs)
except Exception as e:
raise RuntimeError(f"Error during model inference: {e}") from e
# Post-process results
threshold = confidence_threshold or self.confidence_threshold
results = self.image_processor.post_process_object_detection(
outputs,
target_sizes=torch.tensor([image.size[::-1]]),
threshold=threshold
)
# Process predictions
layout_boxes = []
for result in results:
for score, label_id, box in zip(result["scores"], result["labels"], result["boxes"]):
score_val = float(score.item())
label_idx = int(label_id.item())
# Get label from model config (add 1 because model config is 0-indexed)
model_label = self.model.config.id2label.get(label_idx + 1)
if not model_label:
continue
# Map to standardized label
mapped_label = self.map_label(model_label)
if not mapped_label:
continue
# Convert box coordinates (already in image space)
box = [round(i, 2) for i in box.tolist()]
left, top, right, bottom = box
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=[left, top, right, bottom],
confidence=score_val
)
)
# Create annotated image
annotated_img = image.copy()
draw = ImageDraw.Draw(annotated_img)
# Draw boxes with standardized colors
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
return annotated_img, LayoutOutput(bboxes=layout_boxes)
except Exception as e:
if self.show_log:
logger.error("Error during prediction", exc_info=True)
raise
|
Bases: BaseLayoutMapper
Label mapper for RT-DETR layout detection model.
Source code in omnidocs/tasks/layout_analysis/base.py
| def __init__(self):
self._mapping: Dict[str, LayoutLabel] = {}
self._reverse_mapping: Dict[LayoutLabel, str] = {}
self._setup_mapping()
|
SuryaLayoutDetector(device: Optional[str] = None, show_log: bool = False, **kwargs)
Bases: BaseLayoutDetector
Surya-based layout detection implementation.
Initialize Surya Layout Detector.
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
**kwargs
):
"""Initialize Surya Layout Detector."""
super().__init__(show_log=show_log)
# Initialize label mapper
self._label_mapper = SuryaLayoutMapper()
if self.show_log:
logger.info("Initializing SuryaLayoutDetector")
# Set device if specified, otherwise use default from parent
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
try:
# Import required libraries - use new API
import surya
if self.show_log:
logger.info(f"Found surya package at: {surya.__file__}")
except ImportError as ex:
if self.show_log:
logger.error("Failed to import surya")
raise ImportError(
"surya is not available. Please install it with: pip install surya-ocr"
) from ex
try:
# Initialize detection and layout models using new API
from surya.layout import LayoutPredictor
self.layout_predictor = LayoutPredictor()
if self.show_log:
logger.success("Models initialized successfully")
except Exception as e:
if self.show_log:
logger.error("Failed to initialize models", exc_info=True)
raise
|
detect(input_path: Union[str, Path], **kwargs) -> Tuple[Image.Image, LayoutOutput]
Run layout detection with standardized labels.
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
| @log_execution_time
def detect(
self,
input_path: Union[str, Path],
**kwargs
) -> Tuple[Image.Image, LayoutOutput]:
"""Run layout detection with standardized labels."""
try:
# Load and preprocess input
if isinstance(input_path, (str, Path)):
image = Image.open(input_path).convert("RGB")
elif isinstance(input_path, Image.Image):
image = input_path.convert("RGB")
elif isinstance(input_path, np.ndarray):
image = Image.fromarray(input_path).convert("RGB")
else:
raise ValueError("Unsupported input type")
# Run layout detection using new API
layout_predictions = self.layout_predictor([image])
# Process the layout prediction (take first since we only processed one image)
layout_pred = layout_predictions[0]
# Convert to standardized format
layout_boxes = []
for box in layout_pred.bboxes:
mapped_label = self.map_label(box.label)
if mapped_label:
layout_boxes.append(
LayoutBox(
label=mapped_label,
bbox=box.bbox, # Already in [x1, y1, x2, y2] format
confidence=box.confidence
)
)
# Create annotated image
annotated_img = image.copy()
draw = ImageDraw.Draw(annotated_img)
# Draw boxes with standardized colors
for box in layout_boxes:
color = self.color_map.get(box.label, 'gray')
coords = box.bbox
draw.rectangle(coords, outline=color, width=3)
draw.text((coords[0], coords[1]-20), box.label, fill=color)
# Create LayoutOutput with image size
layout_output = LayoutOutput(
bboxes=layout_boxes,
image_size=image.size
)
return annotated_img, layout_output
except Exception as e:
if self.show_log:
logger.error("Error during prediction", exc_info=True)
raise
|
visualize(detection_result: Tuple[Image, LayoutOutput], output_path: Union[str, Path]) -> None
Save annotated image and layout data to files.
Parameters:
Name |
Type |
Description |
Default |
detection_result
|
Tuple[Image, LayoutOutput]
|
Tuple containing (PIL Image, LayoutOutput)
|
required
|
output_path
|
Union[str, Path]
|
Path to save visualization
|
required
|
Source code in omnidocs/tasks/layout_analysis/extractors/surya.py
| def visualize(
self,
detection_result: Tuple[Image.Image, LayoutOutput],
output_path: Union[str, Path],
) -> None:
"""
Save annotated image and layout data to files.
Args:
detection_result: Tuple containing (PIL Image, LayoutOutput)
output_path: Path to save visualization
"""
super().visualize(detection_result, output_path)
|
Bases: BaseLayoutMapper
Label mapper for Surya layout detection model.
Source code in omnidocs/tasks/layout_analysis/base.py
| def __init__(self):
self._mapping: Dict[str, LayoutLabel] = {}
self._reverse_mapping: Dict[LayoutLabel, str] = {}
self._setup_mapping()
|
Extract raw and structured text from PDFs and images using classic and deep learning methods.
Text extraction module for OmniDocs.
This module provides base classes and implementations for text extraction
from documents (PDFs, images, etc.).
BaseTextExtractor(device: Optional[str] = None, show_log: bool = False, engine_name: Optional[str] = None, extract_images: bool = False)
Bases: ABC
Base class for text extraction models.
Initialize the text extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run model on ('cuda' or 'cpu')
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
engine_name
|
Optional[str]
|
Name of the text extraction engine
|
None
|
extract_images
|
bool
|
Whether to extract images alongside text
|
False
|
Source code in omnidocs/tasks/text_extraction/base.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
engine_name: Optional[str] = None,
extract_images: bool = False):
"""Initialize the text extractor.
Args:
device: Device to run model on ('cuda' or 'cpu')
show_log: Whether to show detailed logs
engine_name: Name of the text extraction engine
extract_images: Whether to extract images alongside text
"""
self.show_log = show_log
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
self.engine_name = engine_name or self.__class__.__name__.lower().replace('extractor', '')
self.extract_images = extract_images
self.model = None
self.model_path = None
self._label_mapper: Optional[BaseTextMapper] = None
# Initialize mapper if engine name is provided
if self.engine_name:
self._label_mapper = BaseTextMapper(self.engine_name)
if self.show_log:
logger.info(f"Initializing {self.__class__.__name__}")
logger.info(f"Using device: {self.device}")
logger.info(f"Engine: {self.engine_name}")
logger.info(f"Extract images: {self.extract_images}")
|
label_mapper: BaseTextMapper
Get the label mapper for this extractor.
extract(input_path: Union[str, Path], **kwargs) -> TextOutput
Extract text from input document.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/base.py
| @abstractmethod
def extract(
self,
input_path: Union[str, Path],
**kwargs
) -> TextOutput:
"""Extract text from input document.
Args:
input_path: Path to input document
**kwargs: Additional model-specific parameters
Returns:
TextOutput containing extracted text
"""
pass
|
extract_all(input_paths: List[Union[str, Path]], **kwargs) -> List[TextOutput]
Extract text from multiple documents.
Parameters:
Name |
Type |
Description |
Default |
input_paths
|
List[Union[str, Path]]
|
|
required
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
List[TextOutput]
|
List of TextOutput objects
|
Source code in omnidocs/tasks/text_extraction/base.py
| def extract_all(
self,
input_paths: List[Union[str, Path]],
**kwargs
) -> List[TextOutput]:
"""Extract text from multiple documents.
Args:
input_paths: List of document paths
**kwargs: Additional model-specific parameters
Returns:
List of TextOutput objects
"""
results = []
for input_path in input_paths:
try:
result = self.extract(input_path, **kwargs)
results.append(result)
except Exception as e:
if self.show_log:
logger.error(f"Error processing {input_path}: {str(e)}")
raise
return results
|
extract_from_pages(input_path: Union[str, Path], page_range: Optional[Tuple[int, int]] = None, **kwargs) -> TextOutput
Extract text from specific pages of a document.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
page_range
|
Optional[Tuple[int, int]]
|
Optional tuple of (start_page, end_page) (1-based, inclusive)
|
None
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text from specified pages
|
Source code in omnidocs/tasks/text_extraction/base.py
| def extract_from_pages(
self,
input_path: Union[str, Path],
page_range: Optional[Tuple[int, int]] = None,
**kwargs
) -> TextOutput:
"""Extract text from specific pages of a document.
Args:
input_path: Path to input document
page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
**kwargs: Additional model-specific parameters
Returns:
TextOutput containing extracted text from specified pages
"""
# Default implementation extracts all pages then filters
# Child classes can override for more efficient page-specific extraction
full_output = self.extract(input_path, **kwargs)
if page_range is None:
return full_output
start_page, end_page = page_range
filtered_blocks = [
block for block in full_output.text_blocks
if start_page <= block.page_num <= end_page
]
# Rebuild full text from filtered blocks
full_text = '\n'.join(block.text for block in filtered_blocks)
return TextOutput(
text_blocks=filtered_blocks,
full_text=full_text,
metadata=full_output.metadata,
source_info=full_output.source_info,
processing_time=full_output.processing_time,
page_count=end_page - start_page + 1
)
|
extract_with_layout(input_path: Union[str, Path], layout_regions: Optional[List[Dict]] = None, **kwargs) -> TextOutput
Extract text with optional layout information.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
layout_regions
|
Optional[List[Dict]]
|
Optional list of layout regions to focus extraction on
|
None
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/base.py
| def extract_with_layout(
self,
input_path: Union[str, Path],
layout_regions: Optional[List[Dict]] = None,
**kwargs
) -> TextOutput:
"""Extract text with optional layout information.
Args:
input_path: Path to input document
layout_regions: Optional list of layout regions to focus extraction on
**kwargs: Additional model-specific parameters
Returns:
TextOutput containing extracted text
"""
# Default implementation just calls extract, can be overridden by child classes
return self.extract(input_path, **kwargs)
|
get_supported_formats() -> List[str]
Get list of supported document formats.
Source code in omnidocs/tasks/text_extraction/base.py
| def get_supported_formats(self) -> List[str]:
"""Get list of supported document formats."""
# Default formats - child classes should override
return ['.txt', '.pdf']
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert raw text extraction output to standardized TextOutput format.
Parameters:
Name |
Type |
Description |
Default |
raw_output
|
Any
|
Raw output from text extraction engine
|
required
|
source_info
|
Optional[Dict]
|
Optional source document information
|
None
|
Returns:
Type |
Description |
TextOutput
|
Standardized TextOutput object
|
Source code in omnidocs/tasks/text_extraction/base.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert raw text extraction output to standardized TextOutput format.
Args:
raw_output: Raw output from text extraction engine
source_info: Optional source document information
Returns:
Standardized TextOutput object
"""
raise NotImplementedError("Child classes must implement postprocess_output method")
|
preprocess_input(input_path: Union[str, Path]) -> Any
Preprocess input document for text extraction.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
Returns:
Type |
Description |
Any
|
Preprocessed document object
|
Source code in omnidocs/tasks/text_extraction/base.py
| def preprocess_input(self, input_path: Union[str, Path]) -> Any:
"""Preprocess input document for text extraction.
Args:
input_path: Path to input document
Returns:
Preprocessed document object
"""
# Default implementation - child classes should override for specific formats
return input_path
|
BaseTextMapper(engine_name: str)
Base class for mapping text extraction engine-specific outputs to standardized format.
Initialize mapper for specific text extraction engine.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
Name of the text extraction engine
|
required
|
Source code in omnidocs/tasks/text_extraction/base.py
| def __init__(self, engine_name: str):
"""Initialize mapper for specific text extraction engine.
Args:
engine_name: Name of the text extraction engine
"""
self.engine_name = engine_name.lower()
self._block_type_mapping: Dict[str, str] = {}
self._setup_block_type_mapping()
|
extract_font_info(raw_font_data: Any) -> Dict[str, Any]
Extract and normalize font information.
Source code in omnidocs/tasks/text_extraction/base.py
| def extract_font_info(self, raw_font_data: Any) -> Dict[str, Any]:
"""Extract and normalize font information."""
font_info = {}
if isinstance(raw_font_data, dict):
font_info.update({
'font_name': raw_font_data.get('name', raw_font_data.get('font_name')),
'font_size': raw_font_data.get('size', raw_font_data.get('font_size')),
'bold': raw_font_data.get('bold', raw_font_data.get('is_bold', False)),
'italic': raw_font_data.get('italic', raw_font_data.get('is_italic', False)),
'color': raw_font_data.get('color', raw_font_data.get('font_color'))
})
return {k: v for k, v in font_info.items() if v is not None}
|
normalize_bbox(bbox: List[float], page_width: int, page_height: int) -> List[float]
Normalize bounding box coordinates to absolute values.
Source code in omnidocs/tasks/text_extraction/base.py
| def normalize_bbox(self, bbox: List[float], page_width: int, page_height: int) -> List[float]:
"""Normalize bounding box coordinates to absolute values."""
if all(0 <= coord <= 1 for coord in bbox):
return [
bbox[0] * page_width,
bbox[1] * page_height,
bbox[2] * page_width,
bbox[3] * page_height
]
return bbox
|
normalize_block_type(engine_type: str) -> str
Convert engine-specific block type to standardized format.
Source code in omnidocs/tasks/text_extraction/base.py
| def normalize_block_type(self, engine_type: str) -> str:
"""Convert engine-specific block type to standardized format."""
return self._block_type_mapping.get(engine_type.lower(), engine_type)
|
Bases: BaseModel
Container for individual text block.
Attributes:
Name |
Type |
Description |
text |
str
|
|
bbox |
Optional[List[float]]
|
Bounding box coordinates [x1, y1, x2, y2]
|
confidence |
Optional[float]
|
Confidence score for text extraction
|
page_num |
int
|
Page number (for multi-page documents)
|
block_type |
Optional[str]
|
Type of text block (paragraph, heading, list, etc.)
|
font_info |
Optional[Dict[str, Any]]
|
Optional font information
|
reading_order |
Optional[int]
|
|
language |
Optional[str]
|
Detected language of the text
|
Convert to dictionary representation.
Source code in omnidocs/tasks/text_extraction/base.py
| def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
'text': self.text,
'bbox': self.bbox,
'confidence': self.confidence,
'page_num': self.page_num,
'block_type': self.block_type,
'font_info': self.font_info,
'reading_order': self.reading_order,
'language': self.language
}
|
Bases: BaseModel
Container for text extraction results.
Attributes:
Name |
Type |
Description |
text_blocks |
List[TextBlock]
|
List of extracted text blocks
|
full_text |
str
|
Combined text from all blocks
|
metadata |
Optional[Dict[str, Any]]
|
Additional metadata from extraction
|
source_info |
Optional[Dict[str, Any]]
|
Information about the source document
|
processing_time |
Optional[float]
|
Time taken for text extraction
|
page_count |
int
|
Number of pages in the document
|
get_sorted_by_reading_order() -> List[TextBlock]
Get text blocks sorted by reading order.
Source code in omnidocs/tasks/text_extraction/base.py
| def get_sorted_by_reading_order(self) -> List[TextBlock]:
"""Get text blocks sorted by reading order."""
blocks_with_order = [block for block in self.text_blocks if block.reading_order is not None]
blocks_without_order = [block for block in self.text_blocks if block.reading_order is None]
# Sort blocks with reading order
blocks_with_order.sort(key=lambda x: (x.page_num, x.reading_order))
# Sort blocks without reading order by page and bbox
if blocks_without_order:
blocks_without_order.sort(key=lambda x: (
x.page_num,
x.bbox[1] if x.bbox else 0, # Sort by y coordinate (top to bottom)
x.bbox[0] if x.bbox else 0 # Then by x coordinate (left to right)
))
return blocks_with_order + blocks_without_order
|
get_text_by_confidence(min_confidence: float = 0.5) -> List[TextBlock]
Filter text blocks by minimum confidence threshold.
Source code in omnidocs/tasks/text_extraction/base.py
| def get_text_by_confidence(self, min_confidence: float = 0.5) -> List[TextBlock]:
"""Filter text blocks by minimum confidence threshold."""
return [block for block in self.text_blocks if block.confidence is None or block.confidence >= min_confidence]
|
get_text_by_page(page_num: int) -> List[TextBlock]
Get text blocks from a specific page.
Source code in omnidocs/tasks/text_extraction/base.py
| def get_text_by_page(self, page_num: int) -> List[TextBlock]:
"""Get text blocks from a specific page."""
return [block for block in self.text_blocks if block.page_num == page_num]
|
get_text_by_type(block_type: str) -> List[TextBlock]
Get text blocks of a specific type.
Source code in omnidocs/tasks/text_extraction/base.py
| def get_text_by_type(self, block_type: str) -> List[TextBlock]:
"""Get text blocks of a specific type."""
return [block for block in self.text_blocks if block.block_type == block_type]
|
save_json(output_path: Union[str, Path]) -> None
Save output to JSON file.
Source code in omnidocs/tasks/text_extraction/base.py
| def save_json(self, output_path: Union[str, Path]) -> None:
"""Save output to JSON file."""
import json
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
|
save_markdown(output_path: Union[str, Path]) -> None
Save text as markdown with basic formatting.
Source code in omnidocs/tasks/text_extraction/base.py
| def save_markdown(self, output_path: Union[str, Path]) -> None:
"""Save text as markdown with basic formatting."""
markdown_content = []
for block in self.get_sorted_by_reading_order():
if block.block_type == 'heading':
# Convert to markdown heading
markdown_content.append(f"# {block.text}\n")
elif block.block_type == 'subheading':
markdown_content.append(f"## {block.text}\n")
elif block.block_type == 'list':
# Convert to markdown list
lines = block.text.split('\n')
for line in lines:
if line.strip():
markdown_content.append(f"- {line.strip()}")
markdown_content.append("")
else:
# Regular paragraph
markdown_content.append(f"{block.text}\n")
with open(output_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(markdown_content))
|
save_text(output_path: Union[str, Path]) -> None
Save full text to a text file.
Source code in omnidocs/tasks/text_extraction/base.py
| def save_text(self, output_path: Union[str, Path]) -> None:
"""Save full text to a text file."""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(self.full_text)
|
Convert to dictionary representation.
Source code in omnidocs/tasks/text_extraction/base.py
| def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
'text_blocks': [block.to_dict() for block in self.text_blocks],
'full_text': self.full_text,
'metadata': self.metadata,
'source_info': self.source_info,
'processing_time': self.processing_time,
'page_count': self.page_count
}
|
PyMuPDFTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, extract_tables: bool = False, flags: int = 0, clip: Optional[tuple] = None)
Bases: BaseTextExtractor
Text extractor using PyMuPDF (fitz).
Initialize PyMuPDF text extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run on (not used for PyMuPDF)
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
extract_images
|
bool
|
Whether to extract images alongside text
|
False
|
extract_tables
|
bool
|
Whether to extract tables
|
False
|
flags
|
int
|
Text extraction flags (fitz.TEXT_PRESERVE_LIGATURES, etc.)
|
0
|
clip
|
Optional[tuple]
|
Optional clipping rectangle (x0, y0, x1, y1)
|
None
|
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
extract_images: bool = False,
extract_tables: bool = False,
flags: int = 0,
clip: Optional[tuple] = None):
"""Initialize PyMuPDF text extractor.
Args:
device: Device to run on (not used for PyMuPDF)
show_log: Whether to show detailed logs
extract_images: Whether to extract images alongside text
extract_tables: Whether to extract tables
flags: Text extraction flags (fitz.TEXT_PRESERVE_LIGATURES, etc.)
clip: Optional clipping rectangle (x0, y0, x1, y1)
"""
super().__init__(device, show_log, "pymupdf", extract_images)
self.extract_tables = extract_tables
self.flags = flags
self.clip = clip
self._label_mapper = PyMuPDFTextMapper()
self._load_model()
|
extract(input_path: Union[str, Path], use_layout: bool = True, **kwargs) -> TextOutput
Extract text from document using PyMuPDF.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
use_layout
|
bool
|
Whether to use layout information for extraction
|
True
|
**kwargs
|
|
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def extract(
self,
input_path: Union[str, Path],
use_layout: bool = True,
**kwargs
) -> TextOutput:
"""Extract text from document using PyMuPDF.
Args:
input_path: Path to input document
use_layout: Whether to use layout information for extraction
**kwargs: Additional parameters
Returns:
TextOutput containing extracted text
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}")
try:
all_text_blocks = []
# Open document
doc = fitz.open(str(input_path))
try:
total_pages = len(doc)
for page_num in range(total_pages):
page = doc[page_num]
# Extract text blocks
if use_layout:
page_blocks = self._extract_text_blocks(page)
else:
page_blocks = self._extract_text_simple(page)
all_text_blocks.extend(page_blocks)
# Extract tables if requested
if self.extract_tables:
table_blocks = self._extract_tables(page)
all_text_blocks.extend(table_blocks)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pymupdf',
'total_pages': total_pages,
'metadata': doc.metadata
}
finally:
doc.close()
# Post-process output
output = self.postprocess_output(all_text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks from {total_pages} pages in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
extract_from_pages(input_path: Union[str, Path], page_range: Optional[tuple] = None, use_layout: bool = True, **kwargs) -> TextOutput
Extract text from specific pages.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
page_range
|
Optional[tuple]
|
Optional tuple of (start_page, end_page) (1-based, inclusive)
|
None
|
use_layout
|
bool
|
Whether to use layout information
|
True
|
**kwargs
|
|
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text from specified pages
|
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def extract_from_pages(
self,
input_path: Union[str, Path],
page_range: Optional[tuple] = None,
use_layout: bool = True,
**kwargs
) -> TextOutput:
"""Extract text from specific pages.
Args:
input_path: Path to input document
page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
use_layout: Whether to use layout information
**kwargs: Additional parameters
Returns:
TextOutput containing extracted text from specified pages
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}, pages {page_range}")
try:
all_text_blocks = []
# Open document
doc = fitz.open(str(input_path))
try:
total_pages = len(doc)
if page_range is None:
start_page, end_page = 1, total_pages
else:
start_page, end_page = page_range
# Convert to 0-based indexing
start_idx = max(0, start_page - 1)
end_idx = min(total_pages - 1, end_page - 1)
for page_num in range(start_idx, end_idx + 1):
page = doc[page_num]
# Extract text blocks
if use_layout:
page_blocks = self._extract_text_blocks(page)
else:
page_blocks = self._extract_text_simple(page)
all_text_blocks.extend(page_blocks)
# Extract tables if requested
if self.extract_tables:
table_blocks = self._extract_tables(page)
all_text_blocks.extend(table_blocks)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pymupdf',
'total_pages': total_pages,
'page_range': page_range,
'metadata': doc.metadata
}
finally:
doc.close()
# Post-process output
output = self.postprocess_output(all_text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
get_supported_formats() -> List[str]
Get list of supported document formats.
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def get_supported_formats(self) -> List[str]:
"""Get list of supported document formats."""
return ['.pdf', '.xps', '.oxps', '.epub', '.mobi', '.fb2', '.cbz', '.svg']
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert PyMuPDF output to standardized TextOutput format.
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert PyMuPDF output to standardized TextOutput format."""
text_blocks = raw_output # raw_output is already a list of TextBlocks
# Sort blocks by page and reading order
text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))
# Combine all text
full_text = '\n\n'.join(block.text for block in text_blocks if block.text.strip())
# Get metadata
metadata = {
'engine': 'pymupdf',
'extract_tables': self.extract_tables,
'flags': self.flags,
'clip': self.clip,
'total_blocks': len(text_blocks)
}
return TextOutput(
text_blocks=text_blocks,
full_text=full_text,
metadata=metadata,
source_info=source_info,
page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
)
|
preprocess_input(input_path: Union[str, Path]) -> Path
Preprocess input document.
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def preprocess_input(self, input_path: Union[str, Path]) -> Path:
"""Preprocess input document."""
input_path = Path(input_path)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
supported_formats = ['.pdf', '.xps', '.oxps', '.epub', '.mobi', '.fb2', '.cbz', '.svg']
if input_path.suffix.lower() not in supported_formats:
raise ValueError(f"Unsupported format: {input_path.suffix}. Supported: {supported_formats}")
return input_path
|
Bases: BaseTextMapper
Mapper for PyMuPDF text extraction output.
Source code in omnidocs/tasks/text_extraction/extractors/pymupdf.py
| def __init__(self):
super().__init__("pymupdf")
|
PdfplumberTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, extract_tables: bool = False, use_layout: bool = True)
Bases: BaseTextExtractor
Text extractor using pdfplumber.
Initialize pdfplumber text extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run on (not used for pdfplumber)
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
extract_images
|
bool
|
Whether to extract images alongside text
|
False
|
extract_tables
|
bool
|
Whether to extract tables
|
False
|
use_layout
|
bool
|
Whether to use layout information for text extraction
|
True
|
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
extract_images: bool = False,
extract_tables: bool = False,
use_layout: bool = True):
"""Initialize pdfplumber text extractor.
Args:
device: Device to run on (not used for pdfplumber)
show_log: Whether to show detailed logs
extract_images: Whether to extract images alongside text
extract_tables: Whether to extract tables
use_layout: Whether to use layout information for text extraction
"""
super().__init__(device, show_log, "pdfplumber", extract_images)
self.extract_tables = extract_tables
self.use_layout = use_layout
self._label_mapper = PdfplumberTextMapper()
self._load_model()
|
extract(input_path: Union[str, Path], **kwargs) -> TextOutput
Extract text from PDF using pdfplumber.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
**kwargs
|
|
Additional parameters (ignored for pdfplumber)
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
| def extract(
self,
input_path: Union[str, Path],
**kwargs
) -> TextOutput:
"""Extract text from PDF using pdfplumber.
Args:
input_path: Path to input PDF
**kwargs: Additional parameters (ignored for pdfplumber)
Returns:
TextOutput containing extracted text
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}")
try:
all_text_blocks = []
with pdfplumber.open(input_path) as pdf:
total_pages = len(pdf.pages)
for page in pdf.pages:
if self.use_layout:
page_blocks = self._extract_text_with_layout(page)
else:
page_blocks = self._extract_text_simple(page)
all_text_blocks.extend(page_blocks)
# Extract tables if requested
if self.extract_tables:
table_blocks = self._extract_tables(page)
all_text_blocks.extend(table_blocks)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pdfplumber',
'total_pages': total_pages
}
# Post-process output
output = self.postprocess_output(all_text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks from {total_pages} pages in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
get_supported_formats() -> List[str]
Get list of supported document formats.
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
| def get_supported_formats(self) -> List[str]:
"""Get list of supported document formats."""
return ['.pdf']
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert pdfplumber output to standardized TextOutput format.
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert pdfplumber output to standardized TextOutput format."""
text_blocks = raw_output # raw_output is already a list of TextBlocks
# Sort blocks by page and reading order
text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))
# Combine all text
full_text = '\n\n'.join(block.text for block in text_blocks)
# Get metadata
metadata = {
'engine': 'pdfplumber',
'extract_tables': self.extract_tables,
'use_layout': self.use_layout,
'total_blocks': len(text_blocks)
}
return TextOutput(
text_blocks=text_blocks,
full_text=full_text,
metadata=metadata,
source_info=source_info,
page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
)
|
preprocess_input(input_path: Union[str, Path]) -> Path
Preprocess input document.
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
| def preprocess_input(self, input_path: Union[str, Path]) -> Path:
"""Preprocess input document."""
input_path = Path(input_path)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
if input_path.suffix.lower() != '.pdf':
raise ValueError(f"pdfplumber only supports PDF files. Got: {input_path.suffix}")
return input_path
|
Bases: BaseTextMapper
Mapper for pdfplumber text extraction output.
Source code in omnidocs/tasks/text_extraction/extractors/pdfplumber.py
| def __init__(self):
super().__init__("pdfplumber")
|
PyPDF2TextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, ignore_images: bool = True, extract_forms: bool = False)
Bases: BaseTextExtractor
Text extractor using PyPDF2.
Initialize PyPDF2 text extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run on (not used for PyPDF2)
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
extract_images
|
bool
|
Whether to extract images alongside text
|
False
|
ignore_images
|
bool
|
Whether to ignore images during text extraction
|
True
|
extract_forms
|
bool
|
Whether to extract form fields
|
False
|
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
extract_images: bool = False,
ignore_images: bool = True,
extract_forms: bool = False):
"""Initialize PyPDF2 text extractor.
Args:
device: Device to run on (not used for PyPDF2)
show_log: Whether to show detailed logs
extract_images: Whether to extract images alongside text
ignore_images: Whether to ignore images during text extraction
extract_forms: Whether to extract form fields
"""
super().__init__(device, show_log, "pypdf2", extract_images)
self.ignore_images = ignore_images
self.extract_forms = extract_forms
self._label_mapper = PyPDF2TextMapper()
self._load_model()
|
extract(input_path: Union[str, Path], password: Optional[str] = None, **kwargs) -> TextOutput
Extract text from PDF using PyPDF2.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
password
|
Optional[str]
|
Optional password for encrypted PDFs
|
None
|
**kwargs
|
|
Additional parameters (ignored for PyPDF2)
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def extract(
self,
input_path: Union[str, Path],
password: Optional[str] = None,
**kwargs
) -> TextOutput:
"""Extract text from PDF using PyPDF2.
Args:
input_path: Path to input PDF
password: Optional password for encrypted PDFs
**kwargs: Additional parameters (ignored for PyPDF2)
Returns:
TextOutput containing extracted text
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}")
try:
all_text_blocks = []
# Open PDF
with open(input_path, 'rb') as file:
reader = PdfReader(file)
# Check if PDF is encrypted
if reader.is_encrypted:
if password:
if not reader.decrypt(password):
raise ValueError("Invalid password for encrypted PDF")
else:
raise ValueError("PDF is encrypted but no password provided")
total_pages = len(reader.pages)
# Extract text from each page
for page_num, page in enumerate(reader.pages, 1):
page_blocks = self._extract_page_text(page, page_num)
all_text_blocks.extend(page_blocks)
# Extract form fields if requested
if self.extract_forms:
form_blocks = self._extract_form_fields(reader)
all_text_blocks.extend(form_blocks)
# Get PDF metadata
pdf_metadata = self._get_pdf_metadata(reader)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pypdf2',
'total_pages': total_pages,
'is_encrypted': reader.is_encrypted,
'pdf_metadata': pdf_metadata
}
# Post-process output
output = self.postprocess_output(all_text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks from {total_pages} pages in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
extract_from_pages(input_path: Union[str, Path], page_range: Optional[tuple] = None, password: Optional[str] = None, **kwargs) -> TextOutput
Extract text from specific pages.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
page_range
|
Optional[tuple]
|
Optional tuple of (start_page, end_page) (1-based, inclusive)
|
None
|
password
|
Optional[str]
|
Optional password for encrypted PDFs
|
None
|
**kwargs
|
|
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text from specified pages
|
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def extract_from_pages(
self,
input_path: Union[str, Path],
page_range: Optional[tuple] = None,
password: Optional[str] = None,
**kwargs
) -> TextOutput:
"""Extract text from specific pages.
Args:
input_path: Path to input PDF
page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
password: Optional password for encrypted PDFs
**kwargs: Additional parameters
Returns:
TextOutput containing extracted text from specified pages
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}, pages {page_range}")
try:
all_text_blocks = []
# Open PDF
with open(input_path, 'rb') as file:
reader = PdfReader(file)
# Check if PDF is encrypted
if reader.is_encrypted:
if password:
if not reader.decrypt(password):
raise ValueError("Invalid password for encrypted PDF")
else:
raise ValueError("PDF is encrypted but no password provided")
total_pages = len(reader.pages)
if page_range is None:
start_page, end_page = 1, total_pages
else:
start_page, end_page = page_range
# Validate page range
start_page = max(1, start_page)
end_page = min(total_pages, end_page)
# Extract text from specified pages
for page_num in range(start_page, end_page + 1):
page = reader.pages[page_num - 1] # Convert to 0-based index
page_blocks = self._extract_page_text(page, page_num)
all_text_blocks.extend(page_blocks)
# Get PDF metadata
pdf_metadata = self._get_pdf_metadata(reader)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pypdf2',
'total_pages': total_pages,
'page_range': page_range,
'is_encrypted': reader.is_encrypted,
'pdf_metadata': pdf_metadata
}
# Post-process output
output = self.postprocess_output(all_text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
extract_with_password(input_path: Union[str, Path], password: str, **kwargs) -> TextOutput
Extract text from password-protected PDF.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
password
|
str
|
Password for encrypted PDF
|
required
|
**kwargs
|
|
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def extract_with_password(
self,
input_path: Union[str, Path],
password: str,
**kwargs
) -> TextOutput:
"""Extract text from password-protected PDF.
Args:
input_path: Path to input PDF
password: Password for encrypted PDF
**kwargs: Additional parameters
Returns:
TextOutput containing extracted text
"""
return self.extract(input_path, password=password, **kwargs)
|
get_supported_formats() -> List[str]
Get list of supported document formats.
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def get_supported_formats(self) -> List[str]:
"""Get list of supported document formats."""
return ['.pdf']
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert PyPDF2 output to standardized TextOutput format.
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert PyPDF2 output to standardized TextOutput format."""
text_blocks = raw_output # raw_output is already a list of TextBlocks
# Sort blocks by page and reading order
text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))
# Combine all text
full_text = '\n\n'.join(block.text for block in text_blocks if block.text.strip())
# Get metadata
metadata = {
'engine': 'pypdf2',
'ignore_images': self.ignore_images,
'extract_forms': self.extract_forms,
'total_blocks': len(text_blocks)
}
# Make everything JSON serializable
metadata = sanitize_for_json(metadata)
source_info = sanitize_for_json(source_info)
return TextOutput(
text_blocks=text_blocks,
full_text=full_text,
metadata=metadata,
source_info=source_info,
page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
)
|
preprocess_input(input_path: Union[str, Path]) -> Path
Preprocess input document.
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def preprocess_input(self, input_path: Union[str, Path]) -> Path:
"""Preprocess input document."""
input_path = Path(input_path)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
if input_path.suffix.lower() != '.pdf':
raise ValueError(f"PyPDF2 only supports PDF files. Got: {input_path.suffix}")
return input_path
|
Bases: BaseTextMapper
Mapper for PyPDF2 text extraction output.
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def __init__(self):
super().__init__("pypdf2")
|
sanitize_for_json(obj: Any) -> Any
Recursively convert PyPDF2 objects (like IndirectObject) to JSON-serializable types.
Parameters:
Name |
Type |
Description |
Default |
obj
|
Any
|
Input object that might contain non-serializable types
|
required
|
Returns:
Type |
Description |
Any
|
JSON-serializable version of the input object
|
Source code in omnidocs/tasks/text_extraction/extractors/pypdf2.py
| def sanitize_for_json(obj: Any) -> Any:
"""
Recursively convert PyPDF2 objects (like IndirectObject) to JSON-serializable types.
Args:
obj: Input object that might contain non-serializable types
Returns:
JSON-serializable version of the input object
"""
if obj is None:
return None
# Handle common collection types recursively
if isinstance(obj, dict):
return {k: sanitize_for_json(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize_for_json(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(sanitize_for_json(item) for item in obj)
# Try to determine if this is a PyPDF2 IndirectObject or similar custom type
# that's not JSON-serializable
try:
# This will work for built-in types that are JSON-serializable
if isinstance(obj, (str, int, float, bool)):
return obj
# Check if it's a custom class from PyPDF2
class_name = obj.__class__.__name__
if "PyPDF2" in str(obj.__class__) or class_name in [
"IndirectObject", "DictionaryObject", "ArrayObject",
"PdfObject", "NullObject", "NameObject"
]:
return str(obj)
# If we got here, it might be a normal object, let's try to serialize it
return obj
except Exception:
# If all else fails, convert to string
return str(obj)
|
PdftextTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, keep_layout: bool = False, physical_layout: bool = False)
Bases: BaseTextExtractor
Text extractor using pdftext.
Initialize pdftext text extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run on (not used for pdftext)
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
extract_images
|
bool
|
Whether to extract images alongside text
|
False
|
keep_layout
|
bool
|
Whether to keep original layout formatting
|
False
|
physical_layout
|
bool
|
Whether to use physical layout analysis
|
False
|
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
extract_images: bool = False,
keep_layout: bool = False,
physical_layout: bool = False):
"""Initialize pdftext text extractor.
Args:
device: Device to run on (not used for pdftext)
show_log: Whether to show detailed logs
extract_images: Whether to extract images alongside text
keep_layout: Whether to keep original layout formatting
physical_layout: Whether to use physical layout analysis
"""
super().__init__(device, show_log, "pdftext", extract_images)
self.keep_layout = keep_layout
self.physical_layout = physical_layout
self._label_mapper = PdftextTextMapper()
self._load_model()
|
extract(input_path: Union[str, Path], **kwargs) -> TextOutput
Extract text from PDF using pdftext.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
**kwargs
|
|
Additional parameters (ignored for pdftext)
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def extract(
self,
input_path: Union[str, Path],
**kwargs
) -> TextOutput:
"""Extract text from PDF using pdftext.
Args:
input_path: Path to input PDF
**kwargs: Additional parameters (ignored for pdftext)
Returns:
TextOutput containing extracted text
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}")
try:
# Extract text blocks
text_blocks = self._extract_text_by_page(input_path)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pdftext'
}
# Post-process output
output = self.postprocess_output(text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
extract_from_pages(input_path: Union[str, Path], page_range: Optional[tuple] = None, **kwargs) -> TextOutput
Extract text from specific pages.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
page_range
|
Optional[tuple]
|
Optional tuple of (start_page, end_page) (1-based, inclusive)
|
None
|
**kwargs
|
|
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text from specified pages
|
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def extract_from_pages(
self,
input_path: Union[str, Path],
page_range: Optional[tuple] = None,
**kwargs
) -> TextOutput:
"""Extract text from specific pages.
Args:
input_path: Path to input PDF
page_range: Optional tuple of (start_page, end_page) (1-based, inclusive)
**kwargs: Additional parameters
Returns:
TextOutput containing extracted text from specified pages
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}, pages {page_range}")
try:
text_blocks = []
if page_range is None:
# Extract all pages
text_blocks = self._extract_text_by_page(input_path)
else:
start_page, end_page = page_range
for page_num in range(start_page, end_page + 1):
try:
page_text = pdftext.pdf_text(
str(input_path),
page_num=page_num,
keep_layout=self.keep_layout,
physical_layout=self.physical_layout
)
if page_text and page_text.strip():
paragraphs = page_text.split('\n\n')
for para_idx, paragraph in enumerate(paragraphs):
if paragraph.strip():
block = TextBlock(
text=paragraph.strip(),
bbox=None,
confidence=1.0,
page_num=page_num,
block_type='paragraph',
reading_order=para_idx
)
text_blocks.append(block)
except Exception as e:
logger.warning(f"Error extracting page {page_num}: {str(e)}")
continue
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'pdftext',
'page_range': page_range
}
# Post-process output
output = self.postprocess_output(text_blocks, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
get_supported_formats() -> List[str]
Get list of supported document formats.
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def get_supported_formats(self) -> List[str]:
"""Get list of supported document formats."""
return ['.pdf']
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert pdftext output to standardized TextOutput format.
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert pdftext output to standardized TextOutput format."""
text_blocks = raw_output # raw_output is already a list of TextBlocks
# Sort blocks by page and reading order
text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))
# Combine all text
full_text = '\n\n'.join(block.text for block in text_blocks if block.text.strip())
# Get metadata
metadata = {
'engine': 'pdftext',
'keep_layout': self.keep_layout,
'physical_layout': self.physical_layout,
'total_blocks': len(text_blocks)
}
return TextOutput(
text_blocks=text_blocks,
full_text=full_text,
metadata=metadata,
source_info=source_info,
page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
)
|
preprocess_input(input_path: Union[str, Path]) -> Path
Preprocess input document.
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def preprocess_input(self, input_path: Union[str, Path]) -> Path:
"""Preprocess input document."""
input_path = Path(input_path)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
if input_path.suffix.lower() != '.pdf':
raise ValueError(f"pdftext only supports PDF files. Got: {input_path.suffix}")
return input_path
|
Bases: BaseTextMapper
Mapper for pdftext text extraction output.
Source code in omnidocs/tasks/text_extraction/extractors/pdftext.py
| def __init__(self):
super().__init__("pdftext")
|
SuryaTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, model_path: Optional[Union[str, Path]] = None, **kwargs)
Bases: BaseTextExtractor
Surya-based text extraction implementation for images and documents.
Initialize Surya Text Extractor.
Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
extract_images: bool = False,
model_path: Optional[Union[str, Path]] = None,
**kwargs
):
"""Initialize Surya Text Extractor."""
super().__init__(device=device, show_log=show_log, engine_name='surya', extract_images=extract_images)
self._label_mapper = SuryaTextMapper()
if self.show_log:
logger.info("Initializing SuryaTextExtractor")
# Set device if specified, otherwise use default from parent
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "surya_text"
self.model_path = Path(model_path)
# Check dependencies and load model
self._check_dependencies()
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TextOutput
Extract text using Surya OCR.
Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TextOutput:
"""Extract text using Surya OCR."""
start_time = time.time()
try:
# Preprocess input
images = self.preprocess_input(input_path)
predictions = []
for img in images:
# Run text detection and recognition
try:
from surya.common.surya.schema import TaskNames
# Use recognition predictor for text extraction
prediction = self.rec_predictor(
[img],
task_names=[TaskNames.ocr_with_boxes],
det_predictor=self.det_predictor,
math_mode=False # Standard text mode
)
if prediction and len(prediction) > 0:
predictions.append(prediction[0])
except Exception as e:
if self.show_log:
logger.warning(f"Error processing image with Surya: {e}")
continue
# Prepare source info
source_info = {
'source_path': str(input_path) if not isinstance(input_path, Image.Image) else 'PIL_Image',
'num_images': len(images),
'processing_time': time.time() - start_time
}
# Convert to standardized format
result = self.postprocess_output({
'predictions': predictions,
'processing_info': {
'total_images': len(images),
'successful_predictions': len(predictions)
}
}, source_info)
if self.show_log:
logger.info(f"Extracted {len(result.text_blocks)} text blocks using Surya")
return result
except Exception:
if self.show_log:
logger.error("Error during Surya text extraction", exc_info=True)
raise
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert Surya output to standardized TextOutput format.
Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert Surya output to standardized TextOutput format."""
text_blocks = []
full_text_parts = []
if 'predictions' in raw_output:
for page_idx, prediction in enumerate(raw_output['predictions']):
if hasattr(prediction, 'text_lines'):
for line_idx, text_line in enumerate(prediction.text_lines):
# Create text block
block = TextBlock(
text=text_line.text.strip(),
bbox=text_line.bbox if hasattr(text_line, 'bbox') else None,
confidence=getattr(text_line, 'confidence', 1.0),
page_num=page_idx + 1,
block_type='text_line',
reading_order=line_idx
)
text_blocks.append(block)
full_text_parts.append(text_line.text.strip())
# Build metadata
metadata = {
'engine': 'surya',
'total_blocks': len(text_blocks),
'processing_info': raw_output.get('processing_info', {})
}
if source_info:
metadata.update(source_info)
return TextOutput(
text_blocks=text_blocks,
full_text='\n'.join(full_text_parts),
metadata=metadata,
source_info=source_info,
page_count=len(raw_output.get('predictions', []))
)
|
preprocess_input(input_path: Union[str, Path, Image]) -> List[Image.Image]
Preprocess input for Surya text extraction.
Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
| def preprocess_input(self, input_path: Union[str, Path, Image.Image]) -> List[Image.Image]:
"""Preprocess input for Surya text extraction."""
if isinstance(input_path, Image.Image):
return [input_path.convert("RGB")]
elif isinstance(input_path, (str, Path)):
# Handle image files
if str(input_path).lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp')):
image = Image.open(input_path).convert("RGB")
return [image]
else:
# For PDF files, we'd need to convert to images first
# This is a simplified implementation - you might want to use pdf2image
raise ValueError(f"Unsupported file type: {input_path}. Surya text extractor works with images.")
else:
raise ValueError("Unsupported input type for Surya text extractor")
|
Bases: BaseTextMapper
Label mapper for Surya text model output.
Source code in omnidocs/tasks/text_extraction/extractors/surya_text.py
| def __init__(self):
super().__init__('surya')
|
DoclingTextExtractor(device: Optional[str] = None, show_log: bool = False, extract_images: bool = False, ocr_enabled: bool = True, table_structure_enabled: bool = True)
Bases: BaseTextExtractor
Text extractor using Docling.
Initialize Docling text extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run on (not used for Docling)
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
extract_images
|
bool
|
Whether to extract images alongside text
|
False
|
ocr_enabled
|
bool
|
Whether to enable OCR for scanned documents
|
True
|
table_structure_enabled
|
bool
|
Whether to enable table structure detection
|
True
|
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
extract_images: bool = False,
ocr_enabled: bool = True,
table_structure_enabled: bool = True):
"""Initialize Docling text extractor.
Args:
device: Device to run on (not used for Docling)
show_log: Whether to show detailed logs
extract_images: Whether to extract images alongside text
ocr_enabled: Whether to enable OCR for scanned documents
table_structure_enabled: Whether to enable table structure detection
"""
super().__init__(device, show_log, "docling", extract_images)
self.ocr_enabled = ocr_enabled
self.table_structure_enabled = table_structure_enabled
self._label_mapper = DoclingTextMapper()
self._load_model()
|
extract(input_path: Union[str, Path], **kwargs) -> TextOutput
Extract text from document using Docling.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path]
|
|
required
|
**kwargs
|
|
Additional parameters (ignored for Docling)
|
{}
|
Returns:
Type |
Description |
TextOutput
|
TextOutput containing extracted text
|
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
| def extract(
self,
input_path: Union[str, Path],
**kwargs
) -> TextOutput:
"""Extract text from document using Docling.
Args:
input_path: Path to input document
**kwargs: Additional parameters (ignored for Docling)
Returns:
TextOutput containing extracted text
"""
start_time = time.time()
# Preprocess input
input_path = self.preprocess_input(input_path)
if self.show_log:
logger.info(f"Extracting text from {input_path}")
try:
# Convert document
result = self.model.convert(input_path)
# Create source info
source_info = {
'file_path': str(input_path),
'file_name': input_path.name,
'file_size': input_path.stat().st_size,
'engine': 'docling'
}
# Post-process output
output = self.postprocess_output(result, source_info)
output.processing_time = time.time() - start_time
if self.show_log:
logger.info(f"Extracted {len(output.text_blocks)} text blocks in {output.processing_time:.2f}s")
return output
except Exception as e:
logger.error(f"Error extracting text from {input_path}: {str(e)}")
raise
|
get_supported_formats() -> List[str]
Get list of supported document formats.
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
| def get_supported_formats(self) -> List[str]:
"""Get list of supported document formats."""
return ['.pdf', '.docx', '.pptx', '.html', '.md']
|
postprocess_output(raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput
Convert Docling output to standardized TextOutput format.
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
| def postprocess_output(self, raw_output: Any, source_info: Optional[Dict] = None) -> TextOutput:
"""Convert Docling output to standardized TextOutput format."""
text_blocks = []
# Process document elements
for element in raw_output.document.texts:
# Get bounding box if available
bbox = None
if hasattr(element, 'prov') and element.prov:
for prov in element.prov:
if hasattr(prov, 'bbox'):
bbox = [prov.bbox.l, prov.bbox.t, prov.bbox.r, prov.bbox.b]
break
# Get page number
page_num = 1
if hasattr(element, 'prov') and element.prov:
for prov in element.prov:
if hasattr(prov, 'page'):
page_num = prov.page + 1 # Convert to 1-based
break
# Create text block
block = TextBlock(
text=element.text,
bbox=bbox,
confidence=1.0, # Docling doesn't provide confidence scores
page_num=page_num,
block_type=self._label_mapper.normalize_block_type(element.label),
reading_order=getattr(element, 'reading_order', None)
)
text_blocks.append(block)
# Sort blocks by reading order
text_blocks.sort(key=lambda x: (x.page_num, x.reading_order or 0))
# Combine all text
full_text = '\n\n'.join(block.text for block in text_blocks)
# Get metadata
metadata = {
'engine': 'docling',
'ocr_enabled': self.ocr_enabled,
'table_structure_enabled': self.table_structure_enabled,
'total_elements': len(text_blocks)
}
return TextOutput(
text_blocks=text_blocks,
full_text=full_text,
metadata=metadata,
source_info=source_info,
page_count=max(block.page_num for block in text_blocks) if text_blocks else 1
)
|
preprocess_input(input_path: Union[str, Path]) -> Path
Preprocess input document.
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
| def preprocess_input(self, input_path: Union[str, Path]) -> Path:
"""Preprocess input document."""
input_path = Path(input_path)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
supported_formats = ['.pdf', '.docx', '.pptx', '.html', '.md']
if input_path.suffix.lower() not in supported_formats:
raise ValueError(f"Unsupported format: {input_path.suffix}. Supported: {supported_formats}")
return input_path
|
Bases: BaseTextMapper
Mapper for Docling text extraction output.
Source code in omnidocs/tasks/text_extraction/extractors/docling_parse.py
| def __init__(self):
super().__init__("docling")
|
Recognize and extract LaTeX math expressions from images and PDFs.
Math expression extraction module for OmniDocs.
This module provides base classes and implementations for mathematical expression
extraction and LaTeX recognition from images and documents.
BaseLatexExtractor(device: Optional[str] = None, show_log: bool = False)
Bases: ABC
Base class for LaTeX expression extraction models.
Initialize the LaTeX extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run model on ('cuda' or 'cpu')
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def __init__(self, device: Optional[str] = None, show_log: bool = False):
"""Initialize the LaTeX extractor.
Args:
device: Device to run model on ('cuda' or 'cpu')
show_log: Whether to show detailed logs
"""
self.show_log = show_log
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
self.model = None
self.model_path = None
self._label_mapper: Optional[BaseLatexMapper] = None
if self.show_log:
logger.info(f"Initializing {self.__class__.__name__}")
logger.info(f"Using device: {self.device}")
|
label_mapper: BaseLatexMapper
Get the label mapper for this extractor.
extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput
Extract LaTeX expressions from input image.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path, Image]
|
Path to input image or image data
|
required
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
LatexOutput
|
LatexOutput containing extracted expressions
|
Source code in omnidocs/tasks/math_expression_extraction/base.py
| @abstractmethod
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> LatexOutput:
"""Extract LaTeX expressions from input image.
Args:
input_path: Path to input image or image data
**kwargs: Additional model-specific parameters
Returns:
LatexOutput containing extracted expressions
"""
pass
|
extract_all(input_paths: List[Union[str, Path, Image]], **kwargs) -> List[LatexOutput]
Extract LaTeX from multiple images.
Parameters:
Name |
Type |
Description |
Default |
input_paths
|
List[Union[str, Path, Image]]
|
List of image paths or image data
|
required
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
List[LatexOutput]
|
List of LatexOutput objects
|
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def extract_all(
self,
input_paths: List[Union[str, Path, Image.Image]],
**kwargs
) -> List[LatexOutput]:
"""Extract LaTeX from multiple images.
Args:
input_paths: List of image paths or image data
**kwargs: Additional model-specific parameters
Returns:
List of LatexOutput objects
"""
results = []
for input_path in input_paths:
try:
result = self.extract(input_path, **kwargs)
results.append(result)
except Exception as e:
if self.show_log:
logger.error(f"Error processing {input_path}: {str(e)}")
raise
return results
|
map_expression(expression: str) -> str
Map model-specific LaTeX to standardized format.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def map_expression(self, expression: str) -> str:
"""Map model-specific LaTeX to standardized format."""
if self._label_mapper is None:
return expression
return self._label_mapper.to_standard(expression)
|
preprocess_input(input_path: Union[str, Path, Image, ndarray]) -> List[Image.Image]
Convert input to list of PIL Images.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path, Image, ndarray]
|
Input image path or image data
|
required
|
Returns:
Type |
Description |
List[Image]
|
|
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def preprocess_input(self, input_path: Union[str, Path, Image.Image, np.ndarray]) -> List[Image.Image]:
"""Convert input to list of PIL Images.
Args:
input_path: Input image path or image data
Returns:
List of PIL Images
"""
if isinstance(input_path, (str, Path)):
image = Image.open(input_path).convert('RGB')
return [image]
elif isinstance(input_path, Image.Image):
return [input_path.convert('RGB')]
elif isinstance(input_path, np.ndarray):
return [Image.fromarray(cv2.cvtColor(input_path, cv2.COLOR_BGR2RGB))]
else:
raise ValueError(f"Unsupported input type: {type(input_path)}")
|
Base class for mapping model-specific outputs to standardized format.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def __init__(self):
self._mapping: Dict[str, str] = {}
self._reverse_mapping: Dict[str, str] = {}
self._setup_mapping()
|
from_standard
from_standard(standard_latex: str) -> str
Convert standardized LaTeX to model-specific format.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def from_standard(self, standard_latex: str) -> str:
"""Convert standardized LaTeX to model-specific format."""
return self._reverse_mapping.get(standard_latex, standard_latex)
|
to_standard
to_standard(model_output: str) -> str
Convert model-specific LaTeX to standardized format.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def to_standard(self, model_output: str) -> str:
"""Convert model-specific LaTeX to standardized format."""
return self._mapping.get(model_output, model_output)
|
Bases: BaseModel
Container for extracted LaTeX expressions.
Attributes:
Name |
Type |
Description |
expressions |
List[str]
|
List of extracted LaTeX expressions
|
confidences |
Optional[List[float]]
|
Optional confidence scores for each expression
|
bboxes |
Optional[List[List[float]]]
|
Optional bounding boxes for each expression
|
source_img_size |
Optional[Tuple[int, int]]
|
Optional tuple of source image dimensions
|
save_json(output_path: Union[str, Path]) -> None
Save output to JSON file.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def save_json(self, output_path: Union[str, Path]) -> None:
"""Save output to JSON file."""
import json
with open(output_path, 'w') as f:
json.dump(self.to_dict(), f, indent=2)
|
Convert to dictionary representation.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
'expressions': self.expressions,
'confidences': self.confidences,
'bboxes': self.bboxes,
'source_img_size': self.source_img_size
}
|
DonutExtractor(device: Optional[str] = None, show_log: bool = False, model_name: str = 'naver-clova-ix/donut-base-finetuned-cord-v2', model_path: Optional[Union[str, Path]] = None, **kwargs)
Bases: BaseLatexExtractor
Donut (NAVER CLOVA) based expression extraction implementation.
Initialize Donut Extractor.
Source code in omnidocs/tasks/math_expression_extraction/extractors/donut.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_name: str = "naver-clova-ix/donut-base-finetuned-cord-v2",
model_path: Optional[Union[str, Path]] = None,
**kwargs
):
"""Initialize Donut Extractor."""
super().__init__(device=device, show_log=show_log)
self._label_mapper = DonutMapper()
self.model_name = model_name
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "donut_models" / model_name.replace("/", "_")
self.model_path = Path(model_path)
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self._model_exists():
if self.show_log:
logger.info(f"Model not found at {self.model_path}, will download from HuggingFace")
self._download_model()
try:
self._load_model()
if self.show_log:
logger.success("Donut model initialized successfully")
except Exception as e:
logger.error("Failed to initialize Donut model", exc_info=True)
raise
|
extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput
Extract LaTeX expressions using Donut.
Source code in omnidocs/tasks/math_expression_extraction/extractors/donut.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> LatexOutput:
"""Extract LaTeX expressions using Donut."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
expressions = []
for img in images:
# Prepare image for Donut
pixel_values = self.processor(img, return_tensors="pt").pixel_values
pixel_values = pixel_values.to(self.device)
# Prepare task prompt (adjust based on your specific task)
task_prompt = "<s_cord-v2>" # Default CORD v2 task(this is used for receipt/invoice parsing)
decoder_input_ids = self.processor.tokenizer(
task_prompt,
add_special_tokens=False,
return_tensors="pt" #returns pytorch tensor
).input_ids
decoder_input_ids = decoder_input_ids.to(self.device)
# Generate
with torch.no_grad():
outputs = self.model.generate(
pixel_values,
decoder_input_ids=decoder_input_ids,
max_length=self.model.decoder.config.max_position_embeddings,
early_stopping=True,
pad_token_id=self.processor.tokenizer.pad_token_id,
eos_token_id=self.processor.tokenizer.eos_token_id,
use_cache=True,
num_beams=1,
bad_words_ids=[[self.processor.tokenizer.unk_token_id]],
return_dict_in_generate=True,
)
# Decode output
#converts the generated token IDs back into a string
sequence = self.processor.batch_decode(outputs.sequences)[0]
#removes any pos and eos
sequence = sequence.replace(self.processor.tokenizer.eos_token, "").replace(self.processor.tokenizer.pad_token, "")
#removes task prompt
sequence = sequence.replace(task_prompt, "")
# Extract math content from JSON-like output
math_content = self._extract_math_from_json(sequence)
# Map to standard format
mapped_expr = self.map_expression(math_content)
expressions.append(mapped_expr)
return LatexOutput(
expressions=expressions,
source_img_size=images[0].size if images else None
)
except Exception as e:
logger.error("Error during Donut extraction", exc_info=True)
raise
|
Bases: BaseLatexMapper
Label mapper for Donut model output.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def __init__(self):
self._mapping: Dict[str, str] = {}
self._reverse_mapping: Dict[str, str] = {}
self._setup_mapping()
|
Nougat (Neural Optical Understanding for Academic Documents) LaTeX Expression Extractor
This module provides LaTeX expression extraction using Facebook's Nougat model
via Hugging Face transformers.
NougatExtractor(model_type: str = 'small', device: Optional[str] = None, show_log: bool = False, model_path: Optional[str] = None, **kwargs)
Bases: BaseLatexExtractor
Nougat (Neural Optical Understanding for Academic Documents) based expression extraction.
Initialize Nougat Extractor.
Source code in omnidocs/tasks/math_expression_extraction/extractors/nougat.py
| def __init__(
self,
model_type: str = "small",
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[str] = None,
**kwargs
):
"""Initialize Nougat Extractor."""
super().__init__(device=device, show_log=show_log)
self._label_mapper = NougatMapper()
self.model_type = model_type
# Set default model path if not provided
if model_path is None:
model_path = _MODELS_DIR / f"nougat_{model_type}"
self.model_path = Path(model_path)
# Check dependencies
self._check_dependencies()
try:
# Check if model exists locally, download if needed
if not self._model_exists():
if self.show_log:
logger.info("Model not found locally, will download from Hugging Face")
self._download_model()
else:
if self.show_log:
logger.info("Model found locally, using that version")
self._load_model()
if self.show_log:
logger.success("Nougat model initialized successfully")
except Exception as e:
logger.error("Failed to initialize Nougat model", exc_info=True)
raise
|
extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput
Extract LaTeX expressions using Nougat.
Source code in omnidocs/tasks/math_expression_extraction/extractors/nougat.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> LatexOutput:
"""Extract LaTeX expressions using Nougat."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
all_expressions = []
for img in images:
# Add padding to make it look more like a document page
from PIL import ImageOps
padded_image = ImageOps.expand(img, border=100, fill='white')
# Process image with Nougat processor
pixel_values = self.processor(padded_image, return_tensors="pt").pixel_values
pixel_values = pixel_values.to(self.device)
# Generate text using the model
with torch.no_grad():
outputs = self.model.generate(
pixel_values,
max_length=512,
num_beams=1, # Use greedy decoding for faster inference
do_sample=False,
early_stopping=False
)
# Decode the generated text
generated_text = self.processor.batch_decode(outputs, skip_special_tokens=True)[0]
# Extract mathematical expressions from the text
expressions = self._extract_math_expressions(generated_text)
# Map expressions to standard format
mapped_expressions = [self.map_expression(expr) for expr in expressions]
all_expressions.extend(mapped_expressions)
return LatexOutput(
expressions=all_expressions,
source_img_size=images[0].size if images else None
)
except Exception as e:
logger.error("Error during Nougat extraction", exc_info=True)
raise
|
Bases: BaseLatexMapper
Label mapper for Nougat model output.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def __init__(self):
self._mapping: Dict[str, str] = {}
self._reverse_mapping: Dict[str, str] = {}
self._setup_mapping()
|
SuryaMathExtractor(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, **kwargs)
Bases: BaseLatexExtractor
Surya-based mathematical expression extraction implementation.
Initialize Surya Math Extractor.
Source code in omnidocs/tasks/math_expression_extraction/extractors/surya_math.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[Union[str, Path]] = None,
**kwargs
):
"""Initialize Surya Math Extractor."""
super().__init__(device=device, show_log=show_log)
self._label_mapper = SuryaMathMapper()
if self.show_log:
logger.info("Initializing SuryaMathExtractor")
# Set device if specified, otherwise use default from parent
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "surya_math"
self.model_path = Path(model_path)
# Check dependencies and load model
self._check_dependencies()
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput
Extract LaTeX expressions using Surya.
Source code in omnidocs/tasks/math_expression_extraction/extractors/surya_math.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> LatexOutput:
"""Extract LaTeX expressions using Surya."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
expressions = []
confidences = []
bboxes = []
for img in images:
# Convert PIL to RGB if needed
if isinstance(img, Image.Image):
img_rgb = img.convert("RGB")
else:
img_rgb = Image.fromarray(img).convert("RGB")
# Run math detection and recognition
try:
# Import TaskNames for proper task specification
from surya.common.surya.schema import TaskNames
# Use recognition predictor with math mode enabled
predictions = self.rec_predictor(
[img_rgb],
task_names=[TaskNames.ocr_with_boxes],
det_predictor=self.det_predictor,
math_mode=True # Enable math mode for LaTeX output
)
# Process predictions
if predictions and len(predictions) > 0:
prediction = predictions[0]
# Extract text regions that contain math
for text_line in prediction.text_lines:
text_content = text_line.text.strip()
# Check if this looks like math content
if self._is_math_content(text_content):
# Map to standard format
mapped_expr = self.map_expression(text_content)
expressions.append(mapped_expr)
# Add confidence if available
if hasattr(text_line, 'confidence'):
confidences.append(text_line.confidence)
else:
confidences.append(1.0)
# Add bounding box if available
if hasattr(text_line, 'bbox'):
bboxes.append(text_line.bbox)
else:
bboxes.append([0, 0, img_rgb.width, img_rgb.height])
except Exception as e:
if self.show_log:
logger.warning(f"Error processing image with Surya: {e}")
# Fallback: return empty result for this image
continue
return LatexOutput(
expressions=expressions,
confidences=confidences if confidences else None,
bboxes=bboxes if bboxes else None,
source_img_size=images[0].size if images else None
)
except Exception as e:
if self.show_log:
logger.error("Error during Surya math extraction", exc_info=True)
raise
|
Bases: BaseLatexMapper
Label mapper for Surya math model output.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def __init__(self):
self._mapping: Dict[str, str] = {}
self._reverse_mapping: Dict[str, str] = {}
self._setup_mapping()
|
UniMERNet (Universal Mathematical Expression Recognition Network) extractor for LaTeX expressions.
UniMERNetExtractor(model_path: Optional[str] = None, cfg_path: Optional[str] = None, device: Optional[str] = None, show_log: bool = False, **kwargs)
Bases: BaseLatexExtractor
UniMERNet (Universal Mathematical Expression Recognition Network) based expression extraction.
Initialize UniMERNet Extractor.
Source code in omnidocs/tasks/math_expression_extraction/extractors/unimernet.py
| def __init__(
self,
model_path: Optional[str] = None,
cfg_path: Optional[str] = None,
device: Optional[str] = None,
show_log: bool = False,
**kwargs
):
"""Initialize UniMERNet Extractor."""
super().__init__(device=device, show_log=show_log)
self._label_mapper = UniMERNetMapper()
# Set default paths
if model_path is None:
model_path = "omnidocs/models/unimernet_base"
if cfg_path is None:
cfg_path = str(Path(__file__).parent / "UniMERNet" / "configs" / "demo.yaml")
self.model_path = Path(model_path)
self.cfg_path = Path(cfg_path)
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self.model_path.exists():
self._download_model()
try:
self._load_model()
if self.show_log:
logger.success("UniMERNet model initialized successfully")
except Exception as e:
logger.error("Failed to initialize UniMERNet model", exc_info=True)
raise
|
extract(input_path: Union[str, Path, Image], **kwargs) -> LatexOutput
Extract LaTeX expressions using UniMERNet.
Source code in omnidocs/tasks/math_expression_extraction/extractors/unimernet.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> LatexOutput:
"""Extract LaTeX expressions using UniMERNet."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
expressions = []
for img in images:
# Process image with UniMERNet
image_tensor = self.vis_processor(img).unsqueeze(0).to(self.device)
# Generate LaTeX
with torch.no_grad():
output = self.model.generate({"image": image_tensor})
pred = output["pred_str"][0]
# Map to standard format
mapped_expr = self.map_expression(pred)
expressions.append(mapped_expr)
return LatexOutput(
expressions=expressions,
source_img_size=images[0].size if images else None
)
except Exception as e:
logger.error("Error during UniMERNet extraction", exc_info=True)
raise
|
Bases: BaseLatexMapper
Label mapper for UniMERNet model output.
Source code in omnidocs/tasks/math_expression_extraction/base.py
| def __init__(self):
self._mapping: Dict[str, str] = {}
self._reverse_mapping: Dict[str, str] = {}
self._setup_mapping()
|
πΉ OCR (Optical Character Recognition)
Extract text from scanned documents and images using OCR models.
PaddleOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, use_angle_cls: bool = True, use_gpu: bool = True, drop_score: float = 0.5, model_path: Optional[str] = None, **kwargs)
Bases: BaseOCRExtractor
PaddleOCR based text extraction implementation.
Initialize PaddleOCR Extractor.
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
languages: Optional[List[str]] = None,
use_angle_cls: bool = True,
use_gpu: bool = True,
drop_score: float = 0.5,
model_path: Optional[str] = None,
**kwargs
):
"""Initialize PaddleOCR Extractor."""
super().__init__(
device=device,
show_log=show_log,
languages=languages or ['en'],
engine_name='paddle'
)
self.use_angle_cls = use_angle_cls
self.use_gpu = use_gpu
self.drop_score = drop_score
self._label_mapper = PaddleOCRMapper()
# Set default paths
if model_path is None:
model_path = "omnidocs/models/paddleocr"
self.model_path = Path(model_path)
# Check dependencies first
self._check_dependencies()
# Set up model directory and download if needed
if self.model_path.exists() and any(self.model_path.iterdir()):
if self.show_log:
logger.info(f"Using existing PaddleOCR models from: {self.model_path}")
elif not self.model_path.exists():
self._download_model()
# Load model
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> OCROutput
Extract text using PaddleOCR.
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> OCROutput:
"""Extract text using PaddleOCR."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
img = images[0]
# Convert PIL to cv2 format if needed
if isinstance(img, Image.Image):
img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
# Perform OCR
result = self.model.ocr(img, cls=self.use_angle_cls)
# Convert to standardized format
texts = self._process_ocr_results(result)
full_text_parts = [text.text for text in texts]
img_size = img.shape[:2][::-1] # (width, height)
ocr_output = OCROutput(
texts=texts,
full_text=' '.join(full_text_parts),
source_img_size=img_size
)
if self.show_log:
logger.info(f"Extracted {len(texts)} text regions")
return ocr_output
except Exception as e:
logger.error("Error during PaddleOCR extraction", exc_info=True)
return OCROutput(
texts=[],
full_text="",
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
Predict method for compatibility with original interface.
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def predict(self, img, **kwargs):
"""Predict method for compatibility with original interface."""
try:
result = self.extract(img, **kwargs)
# Convert to original format
ocr_res = []
for text_obj in result.texts:
# Convert bbox back to points format
x0, y0, x1, y1 = text_obj.bbox
points = [[x0, y0], [x1, y0], [x1, y1], [x0, y1]]
poly = [coord for point in points for coord in point]
ocr_res.append({
"category_type": "text",
'poly': poly,
'score': text_obj.confidence,
'text': text_obj.text,
})
return ocr_res
except Exception as e:
logger.error("Error during prediction", exc_info=True)
return []
|
preprocess_image(image, alpha_color=(255, 255, 255), inv=False, bin=False)
Preprocess image for OCR.
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def preprocess_image(self, image, alpha_color=(255, 255, 255), inv=False, bin=False):
"""Preprocess image for OCR."""
image = alpha_to_color(image, alpha_color)
if inv:
image = cv2.bitwise_not(image)
if bin:
image = binarize_img(image)
return image
|
Bases: BaseOCRMapper
Label mapper for PaddleOCR model output.
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def __init__(self):
super().__init__('paddleocr')
self._mapping = {
'en': 'en',
'ch': 'ch',
'chinese_cht': 'chinese_cht',
'ta': 'ta',
'te': 'te',
'ka': 'ka',
'ja': 'japan',
'ko': 'korean',
'hi': 'hi',
'ar': 'ar',
'cyrillic': 'cyrillic',
'devanagari': 'devanagari',
'fr': 'fr',
'de': 'german',
'es': 'es',
'pt': 'pt',
'ru': 'ru',
'it': 'it',
}
self._reverse_mapping = {v: k for k, v in self._mapping.items()}
|
alpha_to_color(img, alpha_color=(255, 255, 255))
Convert transparent pixels to specified color.
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def alpha_to_color(img, alpha_color=(255, 255, 255)):
"""Convert transparent pixels to specified color."""
if len(img.shape) == 4: # RGBA
alpha_channel = img[:, :, 3]
rgb_channels = img[:, :, :3]
transparent_mask = alpha_channel == 0
for i in range(3):
rgb_channels[:, :, i][transparent_mask] = alpha_color[i]
return rgb_channels
return img
|
Convert image to binary (black and white).
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def binarize_img(img):
"""Convert image to binary (black and white)."""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if len(img.shape) == 3 else img
_, binary = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
return cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)
|
Change polygon(shape: N * 8) to bbox(shape: N * 4).
Source code in omnidocs/tasks/ocr_extraction/extractors/paddle.py
| def points_to_bbox(points):
"""Change polygon(shape: N * 8) to bbox(shape: N * 4)."""
x_coords = [p[0] for p in points]
y_coords = [p[1] for p in points]
return [min(x_coords), min(y_coords), max(x_coords), max(y_coords)]
|
TesseractOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, psm: int = 6, oem: int = 3, config: str = '', **kwargs)
Bases: BaseOCRExtractor
Tesseract OCR based text extraction implementation.
Initialize Tesseract OCR Extractor.
Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
languages: Optional[List[str]] = None,
psm: int = 6,
oem: int = 3,
config: str = "",
**kwargs
):
"""Initialize Tesseract OCR Extractor."""
super().__init__(
device=device,
show_log=show_log,
languages=languages or ['en'],
engine_name='tesseract'
)
self.psm = psm # Page segmentation mode
self.oem = oem # OCR engine mode
self.config = config
self._label_mapper = TesseractOCRMapper()
try:
import pytesseract
from pytesseract import Output
self.pytesseract = pytesseract
self.Output = Output
# Set Tesseract executable path for Windows
import os
tesseract_paths = [
r"C:\Program Files\Tesseract-OCR\tesseract.exe",
r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe",
r"C:\Users\{}\AppData\Local\Tesseract-OCR\tesseract.exe".format(os.getenv('USERNAME', '')),
]
for path in tesseract_paths:
if os.path.exists(path):
pytesseract.pytesseract.tesseract_cmd = path
if self.show_log:
logger.info(f"Found Tesseract at: {path}")
break
else:
# Try to find in PATH
import shutil
tesseract_cmd = shutil.which('tesseract')
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
if self.show_log:
logger.info(f"Found Tesseract in PATH: {tesseract_cmd}")
except ImportError as e:
logger.error("Failed to import pytesseract")
raise ImportError(
"pytesseract is not available. Please install it with: pip install pytesseract"
) from e
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> OCROutput
Extract text using Tesseract OCR.
Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> OCROutput:
"""Extract text using Tesseract OCR."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
img = images[0]
# Convert PIL to numpy array
img_array = np.array(img)
# Run OCR with detailed output
raw_output = self.pytesseract.image_to_data(
img_array,
lang=self.lang_string,
config=self.tesseract_config,
output_type=self.Output.DICT
)
# Convert to standardized format
result = self.postprocess_output(raw_output, img.size)
if self.show_log:
logger.info(f"Extracted {len(result.texts)} text regions")
return result
except Exception as e:
logger.error("Error during Tesseract extraction", exc_info=True)
return OCROutput(
texts=[],
full_text="",
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
postprocess_output(raw_output: dict, img_size: Tuple[int, int]) -> OCROutput
Convert Tesseract output to standardized OCROutput format.
Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
| def postprocess_output(self, raw_output: dict, img_size: Tuple[int, int]) -> OCROutput:
"""Convert Tesseract output to standardized OCROutput format."""
texts = []
full_text_parts = []
n_boxes = len(raw_output['text'])
for i in range(n_boxes):
text = raw_output['text'][i].strip()
if not text:
continue
confidence = float(raw_output['conf'][i])
if confidence < 0:
continue
x = int(raw_output['left'][i])
y = int(raw_output['top'][i])
w = int(raw_output['width'][i])
h = int(raw_output['height'][i])
bbox = [float(x), float(y), float(x + w), float(y + h)]
# Create polygon from bbox
polygon = [[float(x), float(y)], [float(x + w), float(y)],
[float(x + w), float(y + h)], [float(x), float(y + h)]]
detected_lang = self.detect_text_language(text)
ocr_text = OCRText(
text=text,
confidence=confidence / 100.0,
bbox=bbox,
polygon=polygon,
language=detected_lang,
reading_order=i
)
texts.append(ocr_text)
full_text_parts.append(text)
return OCROutput(
texts=texts,
full_text=' '.join(full_text_parts),
source_img_size=img_size
)
|
Bases: BaseOCRMapper
Label mapper for Tesseract OCR model output.
Source code in omnidocs/tasks/ocr_extraction/extractors/tesseract_ocr.py
| def __init__(self):
super().__init__('tesseract')
self._setup_mapping()
|
EasyOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, gpu: bool = True, **kwargs)
Bases: BaseOCRExtractor
EasyOCR based text extraction implementation.
Initialize EasyOCR Extractor.
Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
languages: Optional[List[str]] = None,
gpu: bool = True,
**kwargs
):
"""Initialize EasyOCR Extractor."""
super().__init__(
device=device,
show_log=show_log,
languages=languages or ['en'],
engine_name='easyocr'
)
self.gpu = gpu
self._label_mapper = EasyOCRMapper()
# Set default model path
self.model_path = Path("omnidocs/models/easyocr")
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self.model_path.exists():
self._download_model()
self._load_model()
|
extract(input_path: Union[str, Path, Image], detail: int = 1, paragraph: bool = False, width_ths: float = 0.7, height_ths: float = 0.7, **kwargs) -> OCROutput
Extract text using EasyOCR.
Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
detail: int = 1, # Changed default to 1 for bbox and confidence
paragraph: bool = False,
width_ths: float = 0.7,
height_ths: float = 0.7,
**kwargs
) -> OCROutput:
"""Extract text using EasyOCR."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
img = images[0]
# Convert PIL to numpy array
img_array = np.array(img)
# Run OCR
raw_output = self.model.readtext(
img_array,
detail=detail,
paragraph=paragraph,
width_ths=width_ths,
height_ths=height_ths,
**kwargs
)
# Convert to standardized format
result = self.postprocess_output(raw_output, img.size)
if self.show_log:
logger.info(f"Extracted {len(result.texts)} text regions")
return result
except Exception as e:
logger.error("Error during EasyOCR extraction", exc_info=True)
return OCROutput(
texts=[],
full_text="",
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
postprocess_output(raw_output: List, img_size: Tuple[int, int]) -> OCROutput
Convert EasyOCR output to standardized OCROutput format.
Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
| def postprocess_output(self, raw_output: List, img_size: Tuple[int, int]) -> OCROutput:
"""Convert EasyOCR output to standardized OCROutput format."""
texts = []
full_text_parts = []
for i, detection in enumerate(raw_output):
if isinstance(detection, str):
text = detection
confidence = 0.9
bbox = [0, 0, img_size[0], img_size[1]]
polygon = [[0, 0], [img_size[0], 0], [img_size[0], img_size[1]], [0, img_size[1]]]
elif isinstance(detection, (list, tuple)) and len(detection) == 3:
bbox_coords, text, confidence = detection
bbox_array = np.array(bbox_coords)
x1, y1 = bbox_array.min(axis=0)
x2, y2 = bbox_array.max(axis=0)
bbox = [float(x1), float(y1), float(x2), float(y2)]
polygon = [[float(x), float(y)] for x, y in bbox_coords]
else:
continue
detected_lang = self.detect_text_language(text)
ocr_text = OCRText(
text=text,
confidence=float(confidence),
bbox=bbox,
polygon=polygon,
language=detected_lang,
reading_order=i
)
texts.append(ocr_text)
full_text_parts.append(text)
return OCROutput(
texts=texts,
full_text=' '.join(full_text_parts),
source_img_size=img_size
)
|
Bases: BaseOCRMapper
Label mapper for EasyOCR model output.
Source code in omnidocs/tasks/ocr_extraction/extractors/easy_ocr.py
| def __init__(self):
super().__init__('easyocr')
self._setup_mapping()
|
SuryaOCRExtractor(device: Optional[str] = None, show_log: bool = False, languages: Optional[List[str]] = None, **kwargs)
Bases: BaseOCRExtractor
Surya OCR based text extraction implementation.
Initialize Surya OCR Extractor.
Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
languages: Optional[List[str]] = None,
**kwargs
):
"""Initialize Surya OCR Extractor."""
super().__init__(
device=device,
show_log=show_log,
languages=languages or ['en'],
engine_name='surya'
)
self._label_mapper = SuryaOCRMapper()
# Set default model path
self.model_path = Path("omnidocs/models/surya")
# Check dependencies
self._check_dependencies()
# Download model if needed
if not self.model_path.exists():
self._download_model()
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> OCROutput
Extract text using Surya OCR.
Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> OCROutput:
"""Extract text using Surya OCR."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
img = images[0]
# Map languages to Surya format
surya_languages = []
for lang in self.languages:
mapped_lang = self._label_mapper.from_standard_language(lang)
surya_languages.append(mapped_lang)
# Use the new Predictor-based API
predictions = None
if hasattr(self, 'use_new_api') and self.use_new_api:
# Use the new Predictor-based API based on surya scripts
try:
# Convert image to RGB if needed (function expects a list)
img_rgb_list = self.convert_if_not_rgb([img])
img_rgb = img_rgb_list[0]
# Import TaskNames for proper task specification
from surya.common.surya.schema import TaskNames
# Call RecognitionPredictor directly with det_predictor parameter
# This is how it's done in surya/scripts/ocr_text.py
predictions = self.rec_predictor(
[img_rgb],
task_names=[TaskNames.ocr_with_boxes],
det_predictor=self.det_predictor,
math_mode=False
)
except Exception as e:
if self.show_log:
logger.warning(f"New API failed: {e}")
else:
# Fallback to old API (shouldn't happen with current version)
if hasattr(self, 'run_ocr'):
try:
predictions = self.run_ocr(
[img],
[surya_languages],
self.det_model,
self.det_processor,
self.rec_model,
self.rec_processor
)
except Exception as e:
if self.show_log:
logger.warning(f"run_ocr failed: {e}")
if predictions is None:
raise RuntimeError("Failed to run OCR with available Surya API functions")
# Convert to standardized format
result = self.postprocess_output(predictions, img.size)
if self.show_log:
logger.info(f"Extracted {len(result.texts)} text regions")
return result
except Exception as e:
logger.error("Error during Surya OCR extraction", exc_info=True)
return OCROutput(
texts=[],
full_text="",
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
postprocess_output(raw_output: Union[List, Any], img_size: Tuple[int, int]) -> OCROutput
Convert Surya OCR output to standardized OCROutput format.
Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
| def postprocess_output(self, raw_output: Union[List, Any], img_size: Tuple[int, int]) -> OCROutput:
"""Convert Surya OCR output to standardized OCROutput format."""
texts = []
full_text_parts = []
if not raw_output:
return OCROutput(
texts=[],
full_text="",
source_img_size=img_size
)
try:
# Handle different output formats from different Surya versions
if isinstance(raw_output, list) and len(raw_output) > 0:
prediction = raw_output[0]
# Check for different attribute names based on version
text_lines = None
if hasattr(prediction, 'text_lines'):
text_lines = prediction.text_lines
elif hasattr(prediction, 'bboxes') and hasattr(prediction, 'text'):
# Handle case where we have separate bboxes and text
if hasattr(prediction, 'text') and isinstance(prediction.text, list):
text_lines = []
for i, (bbox, text) in enumerate(zip(prediction.bboxes, prediction.text)):
# Create a mock text_line object
class MockTextLine:
def __init__(self, text, bbox):
self.text = text
self.bbox = bbox
self.confidence = 0.9 # Default confidence
text_lines.append(MockTextLine(text, bbox))
if text_lines:
for i, text_line in enumerate(text_lines):
if hasattr(text_line, 'text') and hasattr(text_line, 'bbox'):
text = text_line.text.strip() if text_line.text else ""
if not text:
continue
bbox = text_line.bbox
# Ensure bbox is in the correct format [x1, y1, x2, y2]
if len(bbox) >= 4:
bbox_list = [float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])]
else:
continue
# Create polygon from bbox
polygon = [
[float(bbox[0]), float(bbox[1])],
[float(bbox[2]), float(bbox[1])],
[float(bbox[2]), float(bbox[3])],
[float(bbox[0]), float(bbox[3])]
]
confidence = getattr(text_line, 'confidence', 0.9)
detected_lang = self.detect_text_language(text)
ocr_text = OCRText(
text=text,
confidence=float(confidence),
bbox=bbox_list,
polygon=polygon,
language=detected_lang,
reading_order=i
)
texts.append(ocr_text)
full_text_parts.append(text)
except Exception as e:
logger.error(f"Error processing Surya OCR output: {e}", exc_info=True)
return OCROutput(
texts=texts,
full_text=' '.join(full_text_parts),
source_img_size=img_size
)
|
Bases: BaseOCRMapper
Label mapper for Surya OCR model output.
Source code in omnidocs/tasks/ocr_extraction/extractors/surya_ocr.py
| def __init__(self):
super().__init__('surya')
self._setup_mapping()
|
Extract tabular data from PDFs and images using classic and deep learning models.
Table extraction module for OmniDocs.
This module provides base classes and implementations for table detection and extraction
from images and documents.
BaseTableExtractor(device: Optional[str] = None, show_log: bool = False, engine_name: Optional[str] = None)
Bases: ABC
Base class for table extraction models.
Initialize the table extractor.
Parameters:
Name |
Type |
Description |
Default |
device
|
Optional[str]
|
Device to run model on ('cuda' or 'cpu')
|
None
|
show_log
|
bool
|
Whether to show detailed logs
|
False
|
engine_name
|
Optional[str]
|
Name of the table extraction engine
|
None
|
Source code in omnidocs/tasks/table_extraction/base.py
| def __init__(self,
device: Optional[str] = None,
show_log: bool = False,
engine_name: Optional[str] = None):
"""Initialize the table extractor.
Args:
device: Device to run model on ('cuda' or 'cpu')
show_log: Whether to show detailed logs
engine_name: Name of the table extraction engine
"""
self.show_log = show_log
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
self.engine_name = engine_name or self.__class__.__name__.lower().replace('extractor', '')
self.model = None
self.model_path = None
self._label_mapper: Optional[BaseTableMapper] = None
# Initialize mapper if engine name is provided
if self.engine_name:
self._label_mapper = BaseTableMapper(self.engine_name)
if self.show_log:
logger.info(f"Initializing {self.__class__.__name__}")
logger.info(f"Using device: {self.device}")
logger.info(f"Engine: {self.engine_name}")
|
label_mapper: BaseTableMapper
Get the label mapper for this extractor.
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables from input image.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path, Image]
|
Path to input image or image data
|
required
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
TableOutput
|
TableOutput containing extracted tables
|
Source code in omnidocs/tasks/table_extraction/base.py
| @abstractmethod
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables from input image.
Args:
input_path: Path to input image or image data
**kwargs: Additional model-specific parameters
Returns:
TableOutput containing extracted tables
"""
pass
|
extract_all(input_paths: List[Union[str, Path, Image]], **kwargs) -> List[TableOutput]
Extract tables from multiple images.
Parameters:
Name |
Type |
Description |
Default |
input_paths
|
List[Union[str, Path, Image]]
|
List of image paths or image data
|
required
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
List[TableOutput]
|
List of TableOutput objects
|
Source code in omnidocs/tasks/table_extraction/base.py
| def extract_all(
self,
input_paths: List[Union[str, Path, Image.Image]],
**kwargs
) -> List[TableOutput]:
"""Extract tables from multiple images.
Args:
input_paths: List of image paths or image data
**kwargs: Additional model-specific parameters
Returns:
List of TableOutput objects
"""
results = []
for input_path in input_paths:
try:
result = self.extract(input_path, **kwargs)
results.append(result)
except Exception as e:
if self.show_log:
logger.error(f"Error processing {input_path}: {str(e)}")
raise
return results
|
extract_with_layout(input_path: Union[str, Path, Image], layout_regions: Optional[List[Dict]] = None, **kwargs) -> TableOutput
Extract tables with optional layout information.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path, Image]
|
Path to input image or image data
|
required
|
layout_regions
|
Optional[List[Dict]]
|
Optional list of layout regions containing tables
|
None
|
**kwargs
|
|
Additional model-specific parameters
|
{}
|
Returns:
Type |
Description |
TableOutput
|
TableOutput containing extracted tables
|
Source code in omnidocs/tasks/table_extraction/base.py
| def extract_with_layout(
self,
input_path: Union[str, Path, Image.Image],
layout_regions: Optional[List[Dict]] = None,
**kwargs
) -> TableOutput:
"""Extract tables with optional layout information.
Args:
input_path: Path to input image or image data
layout_regions: Optional list of layout regions containing tables
**kwargs: Additional model-specific parameters
Returns:
TableOutput containing extracted tables
"""
# Default implementation just calls extract, can be overridden by child classes
return self.extract(input_path, **kwargs)
|
postprocess_output(raw_output: Any, img_size: Tuple[int, int]) -> TableOutput
Convert raw table extraction output to standardized TableOutput format.
Parameters:
Name |
Type |
Description |
Default |
raw_output
|
Any
|
Raw output from table extraction engine
|
required
|
img_size
|
Tuple[int, int]
|
Original image size (width, height)
|
required
|
Returns:
Type |
Description |
TableOutput
|
Standardized TableOutput object
|
Source code in omnidocs/tasks/table_extraction/base.py
| def postprocess_output(self, raw_output: Any, img_size: Tuple[int, int]) -> TableOutput:
"""Convert raw table extraction output to standardized TableOutput format.
Args:
raw_output: Raw output from table extraction engine
img_size: Original image size (width, height)
Returns:
Standardized TableOutput object
"""
raise NotImplementedError("Child classes must implement postprocess_output method")
|
preprocess_input(input_path: Union[str, Path, Image, ndarray]) -> List[Image.Image]
Convert input to list of PIL Images.
Parameters:
Name |
Type |
Description |
Default |
input_path
|
Union[str, Path, Image, ndarray]
|
Input image path or image data
|
required
|
Returns:
Type |
Description |
List[Image]
|
|
Source code in omnidocs/tasks/table_extraction/base.py
| def preprocess_input(self, input_path: Union[str, Path, Image.Image, np.ndarray]) -> List[Image.Image]:
"""Convert input to list of PIL Images.
Args:
input_path: Input image path or image data
Returns:
List of PIL Images
"""
if isinstance(input_path, (str, Path)):
image = Image.open(input_path).convert('RGB')
return [image]
elif isinstance(input_path, Image.Image):
return [input_path.convert('RGB')]
elif isinstance(input_path, np.ndarray):
return [Image.fromarray(cv2.cvtColor(input_path, cv2.COLOR_BGR2RGB))]
else:
raise ValueError(f"Unsupported input type: {type(input_path)}")
|
visualize(table_result: TableOutput, image_path: Union[str, Path, Image], output_path: str = 'visualized_tables.png', table_color: str = 'red', cell_color: str = 'blue', box_width: int = 2, show_text: bool = False, text_color: str = 'green', font_size: int = 12, show_table_ids: bool = True) -> None
Visualize table extraction results by drawing bounding boxes on the original image.
This method allows users to easily see which extractor is working better
by visualizing the detected tables and cells with bounding boxes.
Parameters:
Name |
Type |
Description |
Default |
table_result
|
TableOutput
|
TableOutput containing extracted tables
|
required
|
image_path
|
Union[str, Path, Image]
|
Path to original image or PIL Image object
|
required
|
output_path
|
str
|
Path to save the annotated image
|
'visualized_tables.png'
|
table_color
|
str
|
Color for table bounding boxes
|
'red'
|
cell_color
|
str
|
Color for cell bounding boxes
|
'blue'
|
box_width
|
int
|
Width of bounding box lines
|
2
|
show_text
|
bool
|
Whether to overlay cell text
|
False
|
text_color
|
str
|
|
'green'
|
font_size
|
int
|
Font size for text overlay
|
12
|
show_table_ids
|
bool
|
Whether to show table IDs
|
True
|
Source code in omnidocs/tasks/table_extraction/base.py
| def visualize(self,
table_result: 'TableOutput',
image_path: Union[str, Path, Image.Image],
output_path: str = "visualized_tables.png",
table_color: str = 'red',
cell_color: str = 'blue',
box_width: int = 2,
show_text: bool = False,
text_color: str = 'green',
font_size: int = 12,
show_table_ids: bool = True) -> None:
"""Visualize table extraction results by drawing bounding boxes on the original image.
This method allows users to easily see which extractor is working better
by visualizing the detected tables and cells with bounding boxes.
Args:
table_result: TableOutput containing extracted tables
image_path: Path to original image or PIL Image object
output_path: Path to save the annotated image
table_color: Color for table bounding boxes
cell_color: Color for cell bounding boxes
box_width: Width of bounding box lines
show_text: Whether to overlay cell text
text_color: Color for text overlay
font_size: Font size for text overlay
show_table_ids: Whether to show table IDs
"""
try:
from PIL import Image, ImageDraw, ImageFont
# Handle different input types
if isinstance(image_path, (str, Path)):
image_path = Path(image_path)
# Check if it's a PDF file
if image_path.suffix.lower() == '.pdf':
# Convert PDF to image
image = self._convert_pdf_to_image(image_path)
else:
# Regular image file
image = Image.open(image_path).convert("RGB")
elif isinstance(image_path, Image.Image):
image = image_path.convert("RGB")
else:
raise ValueError(f"Unsupported image input type: {type(image_path)}")
# Create a copy to draw on
annotated_image = image.copy()
draw = ImageDraw.Draw(annotated_image)
# Just use original coordinates - no transformation needed
# Try to load a font for text overlay
font = None
if show_text or show_table_ids:
try:
# Try to use a better font if available
font = ImageFont.truetype("arial.ttf", font_size)
except (OSError, IOError):
try:
# Fallback to default font
font = ImageFont.load_default()
except:
font = None
# Draw tables and cells if table results exist
if hasattr(table_result, "tables") and table_result.tables:
for table_idx, table in enumerate(table_result.tables):
# Draw table bounding box
if table.bbox and len(table.bbox) == 4:
x1, y1, x2, y2 = table.bbox
draw.rectangle(
[(x1, y1), (x2, y2)],
outline=table_color,
width=box_width + 1
)
# Draw table ID (only if requested)
if show_table_ids and font:
table_id = getattr(table, 'table_id', f'Table {table_idx}')
draw.text((x1, y1 - font_size - 2), table_id,
fill=table_color, font=font)
# Draw cell bounding boxes
if hasattr(table, "cells") and table.cells:
for cell in table.cells:
if cell.bbox and len(cell.bbox) == 4:
x1, y1, x2, y2 = cell.bbox
# Draw cell rectangle - no text overlay
draw.rectangle(
[(x1, y1), (x2, y2)],
outline=cell_color,
width=box_width
)
# Save the annotated image
annotated_image.save(output_path)
if self.show_log:
logger.info(f"Table visualization saved to {output_path}")
num_tables = len(table_result.tables) if table_result.tables else 0
total_cells = sum(len(table.cells) for table in table_result.tables) if table_result.tables else 0
logger.info(f"Visualized {num_tables} tables with {total_cells} cells")
except Exception as e:
error_msg = f"Error creating table visualization: {str(e)}"
if self.show_log:
logger.error(error_msg)
raise RuntimeError(error_msg)
|
visualize_from_json(image_path: Union[str, Path, Image], json_path: Union[str, Path], output_path: str = 'visualized_tables_from_json.png', **kwargs) -> None
Load table extraction results from JSON file and visualize them.
Parameters:
Name |
Type |
Description |
Default |
image_path
|
Union[str, Path, Image]
|
Path to original image, PDF file, or PIL Image object
|
required
|
json_path
|
Union[str, Path]
|
Path to JSON file containing table extraction results
|
required
|
output_path
|
str
|
Path to save the annotated image
|
'visualized_tables_from_json.png'
|
**kwargs
|
|
Additional arguments passed to visualize method
|
{}
|
Source code in omnidocs/tasks/table_extraction/base.py
| def visualize_from_json(self,
image_path: Union[str, Path, Image.Image],
json_path: Union[str, Path],
output_path: str = "visualized_tables_from_json.png",
**kwargs) -> None:
"""
Load table extraction results from JSON file and visualize them.
Args:
image_path: Path to original image, PDF file, or PIL Image object
json_path: Path to JSON file containing table extraction results
output_path: Path to save the annotated image
**kwargs: Additional arguments passed to visualize method
"""
import json
try:
# Load table results from JSON
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Reconstruct TableOutput from JSON data
tables = []
if isinstance(data, list):
# Handle list of tables format
for table_data in data:
cells = []
if 'cells' in table_data:
for cell_data in table_data['cells']:
cell = TableCell(**cell_data)
cells.append(cell)
table = Table(
cells=cells,
num_rows=table_data.get('num_rows', 0),
num_cols=table_data.get('num_cols', 0),
bbox=table_data.get('bbox'),
confidence=table_data.get('confidence'),
table_id=table_data.get('table_id', ''),
structure_confidence=table_data.get('structure_confidence')
)
tables.append(table)
# Create TableOutput object
table_result = TableOutput(
tables=tables,
source_img_size=data[0].get('source_img_size') if data else None,
metadata=data[0].get('metadata', {}) if data else {}
)
# Visualize the loaded results
self.visualize(table_result, image_path, output_path, **kwargs)
except Exception as e:
error_msg = f"Error loading and visualizing tables from JSON: {str(e)}"
if self.show_log:
logger.error(error_msg)
raise RuntimeError(error_msg)
|
BaseTableMapper(engine_name: str)
Base class for mapping table extraction engine-specific outputs to standardized format.
Initialize mapper for specific table extraction engine.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
Name of the table extraction engine
|
required
|
Source code in omnidocs/tasks/table_extraction/base.py
| def __init__(self, engine_name: str):
"""Initialize mapper for specific table extraction engine.
Args:
engine_name: Name of the table extraction engine
"""
self.engine_name = engine_name.lower()
|
detect_header_rows(cells: List[TableCell]) -> List[TableCell]
Detect and mark header cells based on position and formatting.
Source code in omnidocs/tasks/table_extraction/base.py
| def detect_header_rows(self, cells: List[TableCell]) -> List[TableCell]:
"""Detect and mark header cells based on position and formatting."""
# Simple heuristic: first row is likely header
if not cells:
return cells
first_row_cells = [cell for cell in cells if cell.row == 0]
for cell in first_row_cells:
cell.is_header = True
return cells
|
normalize_bbox(bbox: List[float], img_width: int, img_height: int) -> List[float]
Normalize bounding box coordinates to absolute pixel values.
Source code in omnidocs/tasks/table_extraction/base.py
| def normalize_bbox(self, bbox: List[float], img_width: int, img_height: int) -> List[float]:
"""Normalize bounding box coordinates to absolute pixel values."""
if all(0 <= coord <= 1 for coord in bbox):
return [
bbox[0] * img_width,
bbox[1] * img_height,
bbox[2] * img_width,
bbox[3] * img_height
]
return bbox
|
Bases: BaseModel
Container for extracted table.
Attributes:
Name |
Type |
Description |
cells |
List[TableCell]
|
|
num_rows |
int
|
Number of rows in the table
|
num_cols |
int
|
Number of columns in the table
|
bbox |
Optional[List[float]]
|
Bounding box of the entire table [x1, y1, x2, y2]
|
confidence |
Optional[float]
|
Overall table detection confidence
|
table_id |
Optional[str]
|
Optional table identifier
|
caption |
Optional[str]
|
|
structure_confidence |
Optional[float]
|
Confidence score for table structure detection
|
Convert table to CSV format.
Source code in omnidocs/tasks/table_extraction/base.py
| def to_csv(self) -> str:
"""Convert table to CSV format."""
import csv
import io
# Create a grid to store cell values
grid = [[''] * self.num_cols for _ in range(self.num_rows)]
# Fill the grid with cell values
for cell in self.cells:
for r in range(cell.row, cell.row + cell.rowspan):
for c in range(cell.col, cell.col + cell.colspan):
if r < self.num_rows and c < self.num_cols:
grid[r][c] = cell.text
# Convert to CSV
output = io.StringIO()
writer = csv.writer(output)
writer.writerows(grid)
return output.getvalue()
|
Convert to dictionary representation.
Source code in omnidocs/tasks/table_extraction/base.py
| def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
'cells': [cell.to_dict() for cell in self.cells],
'num_rows': self.num_rows,
'num_cols': self.num_cols,
'bbox': self.bbox,
'confidence': self.confidence,
'table_id': self.table_id,
'caption': self.caption,
'structure_confidence': self.structure_confidence
}
|
Convert table to HTML format.
Source code in omnidocs/tasks/table_extraction/base.py
| def to_html(self) -> str:
"""Convert table to HTML format."""
html = ['<table>']
# Create a grid to track cell positions and spans
grid = [[None for _ in range(self.num_cols)] for _ in range(self.num_rows)]
# Mark occupied cells
for cell in self.cells:
for r in range(cell.row, cell.row + cell.rowspan):
for c in range(cell.col, cell.col + cell.colspan):
if r < self.num_rows and c < self.num_cols:
grid[r][c] = cell if r == cell.row and c == cell.col else 'occupied'
# Generate HTML rows
for row_idx in range(self.num_rows):
html.append(' <tr>')
for col_idx in range(self.num_cols):
cell_data = grid[row_idx][col_idx]
if isinstance(cell_data, TableCell):
tag = 'th' if cell_data.is_header else 'td'
attrs = []
if cell_data.rowspan > 1:
attrs.append(f'rowspan="{cell_data.rowspan}"')
if cell_data.colspan > 1:
attrs.append(f'colspan="{cell_data.colspan}"')
attr_str = ' ' + ' '.join(attrs) if attrs else ''
html.append(f' <{tag}{attr_str}>{cell_data.text}</{tag}>')
elif cell_data is None:
html.append(' <td></td>')
# Skip 'occupied' cells as they're part of a span
html.append(' </tr>')
html.append('</table>')
return '\n'.join(html)
|
Bases: BaseModel
Container for individual table cell.
Attributes:
Name |
Type |
Description |
text |
str
|
|
row |
int
|
|
col |
int
|
|
rowspan |
int
|
Number of rows the cell spans
|
colspan |
int
|
Number of columns the cell spans
|
bbox |
Optional[List[float]]
|
Bounding box coordinates [x1, y1, x2, y2]
|
confidence |
Optional[float]
|
Confidence score for cell detection
|
is_header |
bool
|
Whether the cell is a header cell
|
Convert to dictionary representation.
Source code in omnidocs/tasks/table_extraction/base.py
| def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
'text': self.text,
'row': self.row,
'col': self.col,
'rowspan': self.rowspan,
'colspan': self.colspan,
'bbox': self.bbox,
'confidence': self.confidence,
'is_header': self.is_header
}
|
Bases: BaseModel
Container for table extraction results.
Attributes:
Name |
Type |
Description |
tables |
List[Table]
|
|
source_img_size |
Optional[Tuple[int, int]]
|
Original image dimensions (width, height)
|
processing_time |
Optional[float]
|
Time taken for table extraction
|
metadata |
Optional[Dict[str, Any]]
|
Additional metadata from the extraction engine
|
get_tables_by_confidence(min_confidence: float = 0.5) -> List[Table]
Filter tables by minimum confidence threshold.
Source code in omnidocs/tasks/table_extraction/base.py
| def get_tables_by_confidence(self, min_confidence: float = 0.5) -> List[Table]:
"""Filter tables by minimum confidence threshold."""
return [table for table in self.tables if table.confidence is None or table.confidence >= min_confidence]
|
save_json(output_path: Union[str, Path]) -> None
Save output to JSON file.
Source code in omnidocs/tasks/table_extraction/base.py
| def save_json(self, output_path: Union[str, Path]) -> None:
"""Save output to JSON file."""
import json
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
|
save_tables_as_csv(output_dir: Union[str, Path]) -> List[Path]
Save all tables as separate CSV files.
Source code in omnidocs/tasks/table_extraction/base.py
| def save_tables_as_csv(self, output_dir: Union[str, Path]) -> List[Path]:
"""Save all tables as separate CSV files."""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
saved_files = []
for i, table in enumerate(self.tables):
filename = f"table_{table.table_id or i}.csv"
file_path = output_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(table.to_csv())
saved_files.append(file_path)
return saved_files
|
Convert to dictionary representation.
Source code in omnidocs/tasks/table_extraction/base.py
| def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
'tables': [table.to_dict() for table in self.tables],
'source_img_size': self.source_img_size,
'processing_time': self.processing_time,
'metadata': self.metadata
}
|
CamelotExtractor(device: Optional[str] = None, show_log: bool = False, method: str = 'lattice', pages: str = '1', flavor: str = 'lattice', **kwargs)
Bases: BaseTableExtractor
Camelot based table extraction implementation.
TODO: Bbox coordinate transformation from PDF to image space is still broken.
Current issues:
- Coordinate transformation accuracy issues between PDF points and image pixels
- Cell bbox estimation doesn't account for actual cell sizes from Camelot
- Need better integration with Camelot's internal coordinate data
- Grid-based estimation fallback is inaccurate for real table layouts
Initialize Camelot Table Extractor.
Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
method: str = 'lattice',
pages: str = '1',
flavor: str = 'lattice',
**kwargs
):
"""Initialize Camelot Table Extractor."""
super().__init__(
device=device,
show_log=show_log,
engine_name='camelot'
)
self._label_mapper = CamelotMapper()
self.method = method
self.pages = pages
self.flavor = flavor
try:
import camelot
self.camelot = camelot
except ImportError as e:
logger.error("Failed to import Camelot")
raise ImportError(
"Camelot is not available. Please install it with: pip install camelot-py[cv]"
) from e
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables using Camelot.
Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables using Camelot."""
try:
# Camelot works with PDF files
if isinstance(input_path, (str, Path)):
pdf_path = Path(input_path)
if pdf_path.suffix.lower() != '.pdf':
raise ValueError("Camelot only works with PDF files")
# Extract tables from PDF
tables = self.camelot.read_pdf(
str(pdf_path),
pages=self.pages,
flavor=self.flavor,
**kwargs
)
# Get image size (estimate from first page)
try:
images = self._convert_pdf_to_image(pdf_path)
img_size = images[0].size if images else (612, 792) # Default PDF size
except:
img_size = (612, 792) # Default PDF size
else:
raise ValueError("Camelot requires PDF file path, not image data")
# Convert to standardized format
result = self.postprocess_output(tables, img_size)
if self.show_log:
logger.info(f"Extracted {len(result.tables)} tables using Camelot")
return result
except Exception as e:
logger.error("Error during Camelot extraction", exc_info=True)
return TableOutput(
tables=[],
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
postprocess_output(raw_output: Any, img_size: Tuple[int, int]) -> TableOutput
Convert Camelot output to standardized TableOutput format.
Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
| def postprocess_output(self, raw_output: Any, img_size: Tuple[int, int]) -> TableOutput:
"""Convert Camelot output to standardized TableOutput format."""
tables = []
for i, camelot_table in enumerate(raw_output):
# Get table data
df = camelot_table.df
# Convert DataFrame to cells
cells = []
num_rows, num_cols = df.shape
for row_idx in range(num_rows):
for col_idx in range(num_cols):
cell_text = str(df.iloc[row_idx, col_idx]).strip()
# Create cell with basic info
cell = TableCell(
text=cell_text,
row=row_idx,
col=col_idx,
rowspan=1,
colspan=1,
confidence=camelot_table.accuracy / 100.0, # Convert percentage to decimal
is_header=(row_idx == 0) # Assume first row is header
)
cells.append(cell)
# Get table bounding box if available
bbox = None
if hasattr(camelot_table, '_bbox'):
bbox = list(camelot_table._bbox)
# Create table object
table = Table(
cells=cells,
num_rows=num_rows,
num_cols=num_cols,
bbox=bbox,
confidence=camelot_table.accuracy / 100.0,
table_id=f"table_{i}",
structure_confidence=camelot_table.accuracy / 100.0
)
tables.append(table)
return TableOutput(
tables=tables,
source_img_size=img_size,
metadata={
'engine': 'camelot',
'method': self.method,
'flavor': self.flavor
}
)
|
predict(pdf_path: Union[str, Path], **kwargs)
Predict method for compatibility with original interface.
Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
| def predict(self, pdf_path: Union[str, Path], **kwargs):
"""Predict method for compatibility with original interface."""
try:
result = self.extract(pdf_path, **kwargs)
# Convert to original format
table_res = []
for table in result.tables:
table_data = {
"table_id": table.table_id,
"bbox": table.bbox,
"confidence": table.confidence,
"cells": [cell.to_dict() for cell in table.cells],
"num_rows": table.num_rows,
"num_cols": table.num_cols
}
table_res.append(table_data)
return table_res
except Exception as e:
logger.error("Error during Camelot prediction", exc_info=True)
return []
|
Bases: BaseTableMapper
Label mapper for Camelot table extraction output.
Source code in omnidocs/tasks/table_extraction/extractors/camelot.py
| def __init__(self):
super().__init__('camelot')
self._setup_mapping()
|
PDFPlumberExtractor(device: Optional[str] = None, show_log: bool = False, table_settings: Optional[Dict] = None, **kwargs)
Bases: BaseTableExtractor
PDFPlumber based table extraction implementation.
Initialize PDFPlumber Table Extractor.
Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
table_settings: Optional[Dict] = None,
**kwargs
):
"""Initialize PDFPlumber Table Extractor."""
super().__init__(
device=device,
show_log=show_log,
engine_name='pdfplumber'
)
self._label_mapper = PDFPlumberMapper()
self.table_settings = table_settings or self._label_mapper._table_settings
try:
import pdfplumber
self.pdfplumber = pdfplumber
except ImportError as e:
logger.error("Failed to import PDFPlumber")
raise ImportError(
"PDFPlumber is not available. Please install it with: pip install pdfplumber"
) from e
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables using PDFPlumber.
Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables using PDFPlumber."""
try:
# PDFPlumber works with PDF files
if isinstance(input_path, (str, Path)):
pdf_path = Path(input_path)
if pdf_path.suffix.lower() != '.pdf':
raise ValueError("PDFPlumber only works with PDF files")
all_tables = []
# Open PDF and extract tables from all pages
with self.pdfplumber.open(str(pdf_path)) as pdf:
for page in pdf.pages:
page_tables = self._extract_tables_from_page(page)
all_tables.extend(page_tables)
# Get image size and PDF size for coordinate transformation
try:
# Get actual PDF page size first
import fitz
doc = fitz.open(str(pdf_path))
page = doc[0]
pdf_size = (page.rect.width, page.rect.height)
doc.close()
# Convert PDF to image to get actual image size
images = self._convert_pdf_to_image(pdf_path)
img_size = images[0].size if images else pdf_size
except:
pdf_size = (612, 792) # Default PDF size
img_size = (612, 792) # Default image size
else:
raise ValueError("PDFPlumber requires PDF file path, not image data")
# Convert to standardized format
result = self.postprocess_output(all_tables, img_size, pdf_size)
if self.show_log:
logger.info(f"Extracted {len(result.tables)} tables using PDFPlumber")
return result
except Exception as e:
logger.error("Error during PDFPlumber extraction", exc_info=True)
return TableOutput(
tables=[],
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
postprocess_output(raw_output: List[Dict], img_size: Tuple[int, int], pdf_size: Tuple[int, int] = None) -> TableOutput
Convert PDFPlumber output to standardized TableOutput format.
Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
| def postprocess_output(
self,
raw_output: List[Dict],
img_size: Tuple[int, int],
pdf_size: Tuple[int, int] = None,
) -> TableOutput:
"""Convert PDFPlumber output to standardized TableOutput format."""
tables: List[Table] = []
for i, table_data in enumerate(raw_output):
table_bbox = table_data.get("bbox")
if table_bbox is None:
table_bbox = [0, 0, img_size[0], img_size[1]]
if pdf_size:
table_bbox_img = self._transform_pdf_to_image_coords(
table_bbox, pdf_size, img_size
)
else:
table_bbox_img = table_bbox
# Get max row/col indexes to know dimensions
max_row = max(c["row"] for c in table_data["cells"])
max_col = max(c["col"] for c in table_data["cells"])
num_rows = max_row + 1
num_cols = max_col + 1
# Pre-compute equally spaced cell rectangles inside the table bbox
x0, y0, x1, y1 = table_bbox_img
cell_w = (x1 - x0) / num_cols
cell_h = (y1 - y0) / num_rows
cells: List[TableCell] = []
for c in table_data["cells"]:
r, cidx = c["row"], c["col"]
# exact rectangle in image space
cx0 = x0 + cidx * cell_w
cy0 = y0 + r * cell_h
cx1 = cx0 + cell_w
cy1 = cy0 + cell_h
cell_bbox_img = [cx0, cy0, cx1, cy1]
cells.append(
TableCell(
text=c["text"].strip(),
row=r,
col=cidx,
rowspan=c.get("rowspan", 1),
colspan=c.get("colspan", 1),
bbox=cell_bbox_img,
confidence=0.9,
is_header=(r == 0),
)
)
tables.append(
Table(
cells=cells,
num_rows=num_rows,
num_cols=num_cols,
bbox=table_bbox_img,
confidence=0.9,
table_id=f"table_{i}",
structure_confidence=0.9,
)
)
return TableOutput(
tables=tables,
source_img_size=img_size,
metadata={"engine": "pdfplumber", "table_settings": self.table_settings},
)
|
predict(pdf_path: Union[str, Path], **kwargs)
Predict method for compatibility with original interface.
Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
| def predict(self, pdf_path: Union[str, Path], **kwargs):
"""Predict method for compatibility with original interface."""
try:
result = self.extract(pdf_path, **kwargs)
# Convert to original format
table_res = []
for table in result.tables:
table_data = {
"table_id": table.table_id,
"bbox": table.bbox,
"confidence": table.confidence,
"cells": [cell.to_dict() for cell in table.cells],
"num_rows": table.num_rows,
"num_cols": table.num_cols
}
table_res.append(table_data)
return table_res
except Exception as e:
logger.error("Error during PDFPlumber prediction", exc_info=True)
return []
|
Bases: BaseTableMapper
Label mapper for PDFPlumber table extraction output.
Source code in omnidocs/tasks/table_extraction/extractors/pdfplumber.py
| def __init__(self):
super().__init__('pdfplumber')
self._setup_mapping()
|
SuryaTableExtractor(device: Optional[str] = None, show_log: bool = False, model_path: Optional[Union[str, Path]] = None, **kwargs)
Bases: BaseTableExtractor
Surya-based table extraction implementation.
Initialize Surya Table Extractor.
Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[Union[str, Path]] = None,
**kwargs
):
"""Initialize Surya Table Extractor."""
super().__init__(device=device, show_log=show_log, engine_name='surya')
self._label_mapper = SuryaTableMapper()
if self.show_log:
logger.info("Initializing SuryaTableExtractor")
# Set device if specified, otherwise use default from parent
if device:
self.device = device
if self.show_log:
logger.info(f"Using device: {self.device}")
# Set default paths
if model_path is None:
model_path = _MODELS_DIR / "surya_table"
self.model_path = Path(model_path)
# Check dependencies and load model
self._check_dependencies()
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables using Surya.
Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables using Surya."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
image = images[0]
img_size = image.size
# Convert PIL to RGB if needed
if isinstance(image, Image.Image):
img_rgb = image.convert("RGB")
else:
img_rgb = Image.fromarray(image).convert("RGB")
# Step 1: Use layout detection to find table regions
layout_predictions = self.layout_predictor([img_rgb])
tables_data = []
if layout_predictions and len(layout_predictions) > 0:
layout_pred = layout_predictions[0]
# Find table regions from layout
table_regions = []
for bbox_obj in layout_pred.bboxes:
if hasattr(bbox_obj, 'label') and 'table' in bbox_obj.label.lower():
table_regions.append({
'bbox': bbox_obj.bbox,
'confidence': getattr(bbox_obj, 'confidence', 1.0)
})
# Step 2: For each table region, extract text and structure
for table_region in table_regions:
bbox = table_region['bbox']
# Crop table region
table_img = img_rgb.crop(bbox)
# Step 3: Run OCR on table region
try:
from surya.common.surya.schema import TaskNames
# Use recognition predictor for table text extraction
predictions = self.rec_predictor(
[table_img],
task_names=[TaskNames.ocr_with_boxes],
det_predictor=self.det_predictor,
math_mode=False
)
# Process OCR results into table structure
if predictions and len(predictions) > 0:
prediction = predictions[0]
# Extract text lines and organize into table structure
cells = self._organize_text_into_table(prediction.text_lines, bbox)
table_data = {
'bbox': bbox,
'confidence': table_region['confidence'],
'cells': cells,
'num_rows': len(set(c['row'] for c in cells)) if cells else 0,
'num_cols': len(set(c['col'] for c in cells)) if cells else 0
}
tables_data.append(table_data)
except Exception as e:
if self.show_log:
logger.warning(f"Error processing table region: {e}")
continue
# Convert to standardized format
result = self.postprocess_output({'tables': tables_data}, img_size)
if self.show_log:
logger.info(f"Extracted {len(result.tables)} tables using Surya")
return result
except Exception as e:
if self.show_log:
logger.error("Error during Surya table extraction", exc_info=True)
raise
|
postprocess_output(raw_output: Any, img_size: Tuple[int, int]) -> TableOutput
Convert Surya output to standardized TableOutput format.
Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
| def postprocess_output(self, raw_output: Any, img_size: Tuple[int, int]) -> TableOutput:
"""Convert Surya output to standardized TableOutput format."""
tables = []
if 'tables' in raw_output:
for table_idx, table_data in enumerate(raw_output['tables']):
# Extract table cells with proper mapping
cells = []
# Handle different possible structures from Surya
if 'cells' in table_data:
# Direct cell data
for cell_data in table_data['cells']:
cell = self._create_table_cell(cell_data, table_idx)
if cell:
cells.append(cell)
elif 'text_lines' in table_data:
# Convert text lines to cells
cells = self._text_lines_to_cells(table_data['text_lines'], table_data.get('bbox', [0, 0, img_size[0], img_size[1]]))
if cells:
# Calculate table dimensions
num_rows = max(c.row for c in cells) + 1 if cells else 0
num_cols = max(c.col for c in cells) + 1 if cells else 0
# Create table
table = Table(
cells=cells,
bbox=table_data.get('bbox', [0, 0, img_size[0], img_size[1]]),
confidence=table_data.get('confidence', 1.0),
num_rows=num_rows,
num_cols=num_cols,
table_id=f"surya_table_{table_idx}"
)
tables.append(table)
return TableOutput(
tables=tables,
source_img_size=img_size,
metadata={'engine': 'surya', 'raw_output': raw_output}
)
|
Bases: BaseTableMapper
Label mapper for Surya table model output.
Source code in omnidocs/tasks/table_extraction/extractors/surya_table.py
| def __init__(self):
super().__init__('surya')
|
TabulaExtractor(device: Optional[str] = None, show_log: bool = False, method: str = 'lattice', pages: Optional[Union[str, List[int]]] = None, multiple_tables: bool = True, guess: bool = True, area: Optional[List[float]] = None, columns: Optional[List[float]] = None, **kwargs)
Bases: BaseTableExtractor
Tabula based table extraction implementation.
Initialize Tabula Table Extractor.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
method: str = 'lattice',
pages: Optional[Union[str, List[int]]] = None,
multiple_tables: bool = True,
guess: bool = True,
area: Optional[List[float]] = None,
columns: Optional[List[float]] = None,
**kwargs
):
"""Initialize Tabula Table Extractor."""
super().__init__(
device=device,
show_log=show_log,
engine_name='tabula'
)
self._label_mapper = TabulaMapper()
self.method = method
self.pages = pages or 'all'
self.multiple_tables = multiple_tables
self.guess = guess
self.area = area
self.columns = columns
try:
import tabula
self.tabula = tabula
except ImportError as e:
logger.error("Failed to import Tabula")
raise ImportError(
"Tabula is not available. Please install it with: pip install tabula-py"
) from e
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables using Tabula.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables using Tabula."""
try:
# Tabula works with PDF files
if isinstance(input_path, (str, Path)):
pdf_path = Path(input_path)
if pdf_path.suffix.lower() != '.pdf':
raise ValueError("Tabula only works with PDF files")
# Prepare extraction options
options = self._prepare_tabula_options(**kwargs)
# Extract tables from PDF
try:
tables_list = self.tabula.read_pdf(str(pdf_path), **options)
# Ensure we have a list of DataFrames
if not isinstance(tables_list, list):
tables_list = [tables_list]
except Exception as e:
if self.show_log:
logger.error(f"Tabula extraction failed: {str(e)}")
tables_list = []
# Get image size and PDF size for coordinate transformation
try:
# Get actual PDF page size first
import fitz
doc = fitz.open(str(pdf_path))
page = doc[0]
pdf_size = (page.rect.width, page.rect.height)
doc.close()
# Convert PDF to image to get actual image size
images = self._convert_pdf_to_image(pdf_path)
img_size = images[0].size if images else pdf_size
except:
pdf_size = (612, 792) # Default PDF size
img_size = (612, 792) # Default image size
else:
raise ValueError("Tabula requires PDF file path, not image data")
# Convert to standardized format
result = self.postprocess_output(tables_list, img_size, pdf_size)
if self.show_log:
logger.info(f"Extracted {len(result.tables)} tables using Tabula")
return result
except Exception as e:
logger.error("Error during Tabula extraction", exc_info=True)
return TableOutput(
tables=[],
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
extract_with_area(input_path: Union[str, Path], area: List[float], **kwargs) -> TableOutput
Extract tables from specific area of PDF.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| def extract_with_area(
self,
input_path: Union[str, Path],
area: List[float],
**kwargs
) -> TableOutput:
"""Extract tables from specific area of PDF."""
original_area = self.area
self.area = area
try:
result = self.extract(input_path, **kwargs)
return result
finally:
self.area = original_area
|
extract_with_columns
extract_with_columns(input_path: Union[str, Path], columns: List[float], **kwargs) -> TableOutput
Extract tables with specified column positions.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| def extract_with_columns(
self,
input_path: Union[str, Path],
columns: List[float],
**kwargs
) -> TableOutput:
"""Extract tables with specified column positions."""
original_columns = self.columns
self.columns = columns
try:
result = self.extract(input_path, **kwargs)
return result
finally:
self.columns = original_columns
|
postprocess_output(raw_output: List, img_size: Tuple[int, int], pdf_size: Tuple[int, int] = None) -> TableOutput
Convert Tabula output to standardized TableOutput format.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| def postprocess_output(self, raw_output: List, img_size: Tuple[int, int], pdf_size: Tuple[int, int] = None) -> TableOutput:
"""Convert Tabula output to standardized TableOutput format."""
tables = []
for i, df in enumerate(raw_output):
if df.empty:
continue
# Get table dimensions
num_rows, num_cols = df.shape
# Estimate table bbox
bbox = self._estimate_table_bbox(df, img_size)
# Transform PDF coordinates to image coordinates if needed
if pdf_size and bbox:
bbox = self._transform_pdf_to_image_coords(bbox, pdf_size, img_size)
# Convert DataFrame to cells with estimated bboxes
cells = self._dataframe_to_cells(df, i, bbox)
# Create table object
table = Table(
cells=cells,
num_rows=num_rows,
num_cols=num_cols,
bbox=bbox,
confidence=None, # Tabula doesn't provide confidence
table_id=f"table_{i}",
structure_confidence=None
)
tables.append(table)
return TableOutput(
tables=tables,
source_img_size=img_size,
metadata={
'engine': 'tabula',
'method': self.method,
'pages': self.pages,
'multiple_tables': self.multiple_tables,
'guess': self.guess
}
)
|
predict(pdf_path: Union[str, Path], **kwargs)
Predict method for compatibility with original interface.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| def predict(self, pdf_path: Union[str, Path], **kwargs):
"""Predict method for compatibility with original interface."""
try:
result = self.extract(pdf_path, **kwargs)
# Convert to original format
table_res = []
for table in result.tables:
table_data = {
"table_id": table.table_id,
"bbox": table.bbox,
"confidence": table.confidence,
"cells": [cell.to_dict() for cell in table.cells],
"num_rows": table.num_rows,
"num_cols": table.num_cols
}
table_res.append(table_data)
return table_res
except Exception as e:
logger.error("Error during Tabula prediction", exc_info=True)
return []
|
Bases: BaseTableMapper
Label mapper for Tabula table extraction output.
Source code in omnidocs/tasks/table_extraction/extractors/tabula.py
| def __init__(self):
super().__init__('tabula')
self._setup_mapping()
|
TableTransformerExtractor(device: Optional[str] = None, show_log: bool = False, detection_model_path: Optional[str] = None, structure_model_path: Optional[str] = None, detection_threshold: float = 0.7, structure_threshold: float = 0.7, **kwargs)
Bases: BaseTableExtractor
Table Transformer based table extraction implementation.
Initialize Table Transformer Extractor.
Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
detection_model_path: Optional[str] = None,
structure_model_path: Optional[str] = None,
detection_threshold: float = 0.7,
structure_threshold: float = 0.7,
**kwargs
):
"""Initialize Table Transformer Extractor."""
super().__init__(
device=device,
show_log=show_log,
engine_name='table_transformer'
)
self._label_mapper = TableTransformerMapper()
# Set default paths if not provided
self.detection_model_path = Path(detection_model_path) if detection_model_path else \
Path(self._label_mapper._model_configs['detection']['local_path'])
self.structure_model_path = Path(structure_model_path) if structure_model_path else \
Path(self._label_mapper._model_configs['structure']['local_path'])
self.detection_threshold = detection_threshold
self.structure_threshold = structure_threshold
# Check dependencies
self._check_dependencies()
# Download model if needed (sets up model sources)
self._download_model()
# Load models
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables using Table Transformer.
Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables using Table Transformer."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
image = images[0]
img_size = image.size
# Detect tables
detected_tables = self._detect_tables(image)
if not detected_tables:
if self.show_log:
logger.info("No tables detected in the image")
return TableOutput(
tables=[],
source_img_size=img_size,
metadata={'engine': 'table_transformer', 'message': 'No tables detected'}
)
# Analyze structure for each detected table
table_results = []
for table_detection in detected_tables:
structure_data = self._analyze_table_structure(image, table_detection['bbox'])
cells = self._create_table_cells(structure_data)
table_results.append({
'bbox': table_detection['bbox'],
'confidence': table_detection['confidence'],
'cells': cells,
'structure_confidence': np.mean([e['confidence'] for e in structure_data['elements']]) if structure_data['elements'] else 0.0
})
# Convert to standardized format
result = self._create_table_output(table_results, img_size)
if self.show_log:
logger.info(f"Extracted {len(result.tables)} tables using Table Transformer")
return result
except Exception as e:
logger.error("Error during Table Transformer extraction", exc_info=True)
return TableOutput(
tables=[],
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
predict(input_path: Union[str, Path, Image], **kwargs)
Predict method for compatibility with original interface.
Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
| def predict(self, input_path: Union[str, Path, Image.Image], **kwargs):
"""Predict method for compatibility with original interface."""
try:
result = self.extract(input_path, **kwargs)
# Convert to original format
return [
{
"table_id": table.table_id,
"bbox": table.bbox,
"confidence": table.confidence,
"cells": [cell.to_dict() for cell in table.cells],
"num_rows": table.num_rows,
"num_cols": table.num_cols
}
for table in result.tables
]
except Exception as e:
logger.error("Error during Table Transformer prediction", exc_info=True)
return []
|
Bases: BaseTableMapper
Label mapper for Table Transformer model output.
Source code in omnidocs/tasks/table_extraction/extractors/table_transformer.py
| def __init__(self):
super().__init__('table_transformer')
self._setup_mapping()
|
TableFormerExtractor(device: Optional[str] = None, show_log: bool = False, model_path: Optional[str] = None, model_type: str = 'structure', confidence_threshold: float = 0.7, max_size: int = 1000, **kwargs)
Bases: BaseTableExtractor
TableFormer based table extraction implementation.
Initialize TableFormer Extractor.
Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
| def __init__(
self,
device: Optional[str] = None,
show_log: bool = False,
model_path: Optional[str] = None,
model_type: str = 'structure',
confidence_threshold: float = 0.7,
max_size: int = 1000,
**kwargs
):
"""Initialize TableFormer Extractor."""
super().__init__(
device=device,
show_log=show_log,
engine_name='tableformer'
)
self._label_mapper = TableFormerMapper()
self.model_type = model_type
self.confidence_threshold = confidence_threshold
self.max_size = max_size
# Set default model paths
if model_path is None:
model_path = f"omnidocs/models/tableformer_{model_type}"
self.model_path = Path(model_path)
# Check dependencies
self._check_dependencies()
# Try to load from local path first, fallback to HuggingFace
if self.model_path.exists() and any(self.model_path.iterdir()):
if self.show_log:
logger.info(f"Found local {self.model_type} model at: {self.model_path}")
self.model_name_or_path = str(self.model_path)
else:
# Get HuggingFace model name from config
hf_model_name = self._label_mapper._model_configs[self.model_type]['model_name']
if self.show_log:
logger.info(f"Local {self.model_type} model not found, will download from HuggingFace: {hf_model_name}")
# Download model if needed
if not self.model_path.exists():
self._download_model()
self.model_name_or_path = hf_model_name
# Load model
self._load_model()
|
extract(input_path: Union[str, Path, Image], **kwargs) -> TableOutput
Extract tables using TableFormer.
Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
| @log_execution_time
def extract(
self,
input_path: Union[str, Path, Image.Image],
**kwargs
) -> TableOutput:
"""Extract tables using TableFormer."""
try:
# Preprocess input
images = self.preprocess_input(input_path)
image = images[0]
img_size = image.size
# Detect table structure
detections = self._detect_table_structure(image)
if not detections:
if self.show_log:
logger.info("No table structure detected in the image")
return TableOutput(
tables=[],
source_img_size=img_size,
metadata={'engine': 'tableformer', 'message': 'No table structure detected'}
)
# Convert to standardized format
result = self.postprocess_output({'detections': detections}, img_size)
if self.show_log:
logger.info(f"Extracted {len(result.tables)} tables using TableFormer")
return result
except Exception as e:
logger.error("Error during TableFormer extraction", exc_info=True)
return TableOutput(
tables=[],
source_img_size=None,
processing_time=None,
metadata={"error": str(e)}
)
|
postprocess_output(raw_output: Dict, img_size: Tuple[int, int]) -> TableOutput
Convert TableFormer output to standardized TableOutput format.
Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
| def postprocess_output(self, raw_output: Dict, img_size: Tuple[int, int]) -> TableOutput:
"""Convert TableFormer output to standardized TableOutput format."""
tables = []
# Extract table from detections
detections = raw_output.get('detections', [])
if detections:
table = self._create_table_from_detections(detections, img_size)
tables.append(table)
return TableOutput(
tables=tables,
source_img_size=img_size,
metadata={
'engine': 'tableformer',
'model_name': self.model_name_or_path,
'confidence_threshold': self.confidence_threshold,
'max_size': self.max_size
}
)
|
predict(input_path: Union[str, Path, Image], **kwargs)
Predict method for compatibility with original interface.
Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
| def predict(self, input_path: Union[str, Path, Image.Image], **kwargs):
"""Predict method for compatibility with original interface."""
try:
result = self.extract(input_path, **kwargs)
# Convert to original format
table_res = []
for table in result.tables:
table_data = {
"table_id": table.table_id,
"bbox": table.bbox,
"confidence": table.confidence,
"cells": [cell.to_dict() for cell in table.cells],
"num_rows": table.num_rows,
"num_cols": table.num_cols
}
table_res.append(table_data)
return table_res
except Exception as e:
logger.error("Error during TableFormer prediction", exc_info=True)
return []
|
Bases: BaseTableMapper
Label mapper for TableFormer model output.
Source code in omnidocs/tasks/table_extraction/extractors/tableformer.py
| def __init__(self):
super().__init__('tableformer')
self._setup_mapping()
|
π οΈ Utilities & Helpers
Common utility functions, data structures, and helpers used throughout OmniDocs.
omnidocs.utils
Utilities module for OmniDocs.
This module provides common utilities used across different tasks and components.
GlobalLanguageMapper
Global language mapper that handles different OCR engine formats.
Source code in omnidocs/utils/language.py
| def __init__(self):
self._engine_mappings: Dict[str, Dict[str, str]] = {}
self._setup_default_mappings()
|
from_standard
from_standard(engine_name: str, standard_code: str) -> str
Convert standard language code to engine-specific format.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
|
required
|
standard_code
|
str
|
Standard ISO 639-1 language code
|
required
|
Returns:
Type |
Description |
str
|
Engine-specific language code
|
Source code in omnidocs/utils/language.py
| def from_standard(self, engine_name: str, standard_code: str) -> str:
"""Convert standard language code to engine-specific format.
Args:
engine_name: Name of the OCR engine
standard_code: Standard ISO 639-1 language code
Returns:
Engine-specific language code
"""
if engine_name not in self._engine_mappings:
return standard_code
mapping = self._engine_mappings[engine_name]
reverse_mapping = {v: k for k, v in mapping.items()}
return reverse_mapping.get(standard_code.lower(), standard_code)
|
get_engine_codes
get_engine_codes(engine_name: str) -> List[str]
Get list of engine-specific language codes.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
|
required
|
Returns:
Type |
Description |
List[str]
|
List of engine-specific language codes
|
Source code in omnidocs/utils/language.py
| def get_engine_codes(self, engine_name: str) -> List[str]:
"""Get list of engine-specific language codes.
Args:
engine_name: Name of the OCR engine
Returns:
List of engine-specific language codes
"""
if engine_name not in self._engine_mappings:
return []
return list(self._engine_mappings[engine_name].keys())
|
get_supported_engines
get_supported_engines() -> List[str]
Get list of supported OCR engines.
Source code in omnidocs/utils/language.py
| def get_supported_engines(self) -> List[str]:
"""Get list of supported OCR engines."""
return list(self._engine_mappings.keys())
|
get_supported_languages
get_supported_languages(engine_name: str) -> List[str]
Get list of supported languages for a specific engine.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
|
required
|
Returns:
Type |
Description |
List[str]
|
List of standard language codes supported by the engine
|
Source code in omnidocs/utils/language.py
| def get_supported_languages(self, engine_name: str) -> List[str]:
"""Get list of supported languages for a specific engine.
Args:
engine_name: Name of the OCR engine
Returns:
List of standard language codes supported by the engine
"""
if engine_name not in self._engine_mappings:
return []
return list(self._engine_mappings[engine_name].values())
|
register_engine_mapping
register_engine_mapping(engine_name: str, mapping: Dict[str, str]) -> None
Register a new engine's language mapping.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
|
required
|
mapping
|
Dict[str, str]
|
Dictionary mapping engine codes to standard codes
|
required
|
Source code in omnidocs/utils/language.py
| def register_engine_mapping(self, engine_name: str, mapping: Dict[str, str]) -> None:
"""Register a new engine's language mapping.
Args:
engine_name: Name of the OCR engine
mapping: Dictionary mapping engine codes to standard codes
"""
self._engine_mappings[engine_name] = mapping
|
to_standard
to_standard(engine_name: str, engine_code: str) -> str
Convert engine-specific language code to standard format.
Parameters:
Name |
Type |
Description |
Default |
engine_name
|
str
|
|
required
|
engine_code
|
str
|
Engine-specific language code
|
required
|
Returns:
Type |
Description |
str
|
Standard ISO 639-1 language code
|
Source code in omnidocs/utils/language.py
| def to_standard(self, engine_name: str, engine_code: str) -> str:
"""Convert engine-specific language code to standard format.
Args:
engine_name: Name of the OCR engine
engine_code: Engine-specific language code
Returns:
Standard ISO 639-1 language code
"""
if engine_name not in self._engine_mappings:
return engine_code
mapping = self._engine_mappings[engine_name]
return mapping.get(engine_code.lower(), engine_code)
|
LanguageCode
Bases: Enum
Standard ISO 639-1 language codes supported by OmniDocs.
get_all_codes
classmethod
get_all_codes() -> List[str]
Get all supported language codes.
Source code in omnidocs/utils/language.py
| @classmethod
def get_all_codes(cls) -> List[str]:
"""Get all supported language codes."""
return [lang.value for lang in cls]
|
is_valid_code
classmethod
is_valid_code(code: str) -> bool
Check if a language code is valid.
Source code in omnidocs/utils/language.py
| @classmethod
def is_valid_code(cls, code: str) -> bool:
"""Check if a language code is valid."""
return code.lower() in [lang.value.lower() for lang in cls]
|
LanguageDetector
Simple language detection utilities.
detect_script
classmethod
detect_script(text: str) -> Optional[str]
Detect the primary script/language of the given text.
Parameters:
Name |
Type |
Description |
Default |
text
|
str
|
|
required
|
Returns:
Type |
Description |
Optional[str]
|
Detected language code or None if unable to detect
|
Source code in omnidocs/utils/language.py
| @classmethod
def detect_script(cls, text: str) -> Optional[str]:
"""Detect the primary script/language of the given text.
Args:
text: Input text to analyze
Returns:
Detected language code or None if unable to detect
"""
if not text:
return None
# Count characters for each language
language_scores = {}
for char in text:
char_code = ord(char)
for language, ranges in cls.LANGUAGE_RANGES.items():
for start, end in ranges:
if start <= char_code <= end:
language_scores[language] = language_scores.get(language, 0) + 1
break
if not language_scores:
# Default to English for Latin script
return LanguageCode.ENGLISH.value
# Return language with highest score
return max(language_scores, key=language_scores.get)
|
is_mixed_script
classmethod
is_mixed_script(text: str, threshold: float = 0.1) -> bool
Check if text contains mixed scripts.
Parameters:
Name |
Type |
Description |
Default |
text
|
str
|
|
required
|
threshold
|
float
|
Minimum ratio for considering a script significant
|
0.1
|
Returns:
Type |
Description |
bool
|
True if text contains multiple scripts above threshold
|
Source code in omnidocs/utils/language.py
| @classmethod
def is_mixed_script(cls, text: str, threshold: float = 0.1) -> bool:
"""Check if text contains mixed scripts.
Args:
text: Input text to analyze
threshold: Minimum ratio for considering a script significant
Returns:
True if text contains multiple scripts above threshold
"""
if not text:
return False
language_scores = {}
total_chars = 0
for char in text:
if char.isalnum(): # Only count alphanumeric characters
total_chars += 1
char_code = ord(char)
for language, ranges in cls.LANGUAGE_RANGES.items():
for start, end in ranges:
if start <= char_code <= end:
language_scores[language] = language_scores.get(language, 0) + 1
break
if total_chars == 0:
return False
# Check how many languages exceed the threshold
significant_languages = sum(
1 for score in language_scores.values()
if score / total_chars >= threshold
)
return significant_languages > 1
|
detect_language
detect_language(text: str) -> Optional[str]
Convenience function to detect language from text.
Source code in omnidocs/utils/language.py
| def detect_language(text: str) -> Optional[str]:
"""Convenience function to detect language from text."""
return LanguageDetector.detect_script(text)
|
get_all_supported_languages
get_all_supported_languages() -> List[str]
Get all language codes supported by OmniDocs.
Source code in omnidocs/utils/language.py
| def get_all_supported_languages() -> List[str]:
"""Get all language codes supported by OmniDocs."""
return LanguageCode.get_all_codes()
|
get_language_mapper
get_language_mapper() -> GlobalLanguageMapper
Get the global language mapper instance.
Source code in omnidocs/utils/language.py
| def get_language_mapper() -> GlobalLanguageMapper:
"""Get the global language mapper instance."""
return global_language_mapper
|
get_logger
get_logger(name: str, level: Union[str, int] = logging.INFO, log_file: Optional[Union[str, Path]] = None, include_path: bool = True) -> logging.Logger
Get a configured logger instance.
Parameters:
Name |
Type |
Description |
Default |
name
|
str
|
|
required
|
level
|
Union[str, int]
|
|
INFO
|
log_file
|
Optional[Union[str, Path]]
|
Optional file path to save logs
|
None
|
include_path
|
bool
|
Whether to include full path in log messages
|
True
|
Returns:
Type |
Description |
Logger
|
Configured logger instance
|
Source code in omnidocs/utils/logging.py
| def get_logger(
name: str,
level: Union[str, int] = logging.INFO,
log_file: Optional[Union[str, Path]] = None,
include_path: bool = True,
) -> logging.Logger:
"""
Get a configured logger instance.
Args:
name: Name of the logger
level: Logging level
log_file: Optional file path to save logs
include_path: Whether to include full path in log messages
Returns:
Configured logger instance
"""
# Create logger
logger = logging.getLogger(name)
logger.setLevel(level)
# Remove existing handlers
logger.handlers.clear()
# Create console handler with rich support
console_handler = RichHandler(
console=console,
show_time=False,
show_path=False,
rich_tracebacks=True,
tracebacks_show_locals=True,
)
console_handler.setFormatter(CustomFormatter(include_path=include_path))
logger.addHandler(console_handler)
# Add file handler if log_file is specified
if log_file:
log_file = Path(log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(CustomFormatter(include_path=True))
logger.addHandler(file_handler)
return logger
|
get_model_path
get_model_path(extractor_name: str, model_name: str) -> Path
Get standardized model path for a specific extractor and model.
Parameters:
Name |
Type |
Description |
Default |
extractor_name
|
str
|
Name of the extractor (e.g., 'donut', 'nougat')
|
required
|
model_name
|
str
|
Name/ID of the model (e.g., 'naver-clova-ix/donut-base')
|
required
|
Returns:
Name | Type |
Description |
Path |
Path
|
Full path where the model should be stored
|
Source code in omnidocs/utils/model_config.py
| def get_model_path(extractor_name: str, model_name: str) -> Path:
"""
Get standardized model path for a specific extractor and model.
Args:
extractor_name: Name of the extractor (e.g., 'donut', 'nougat')
model_name: Name/ID of the model (e.g., 'naver-clova-ix/donut-base')
Returns:
Path: Full path where the model should be stored
"""
models_dir = get_models_directory()
# Replace slashes in model names to create valid directory names
safe_model_name = model_name.replace("/", "_")
return models_dir / extractor_name / safe_model_name
|
get_models_directory
get_models_directory() -> Path
Get the models directory, setting up environment if needed.
Returns:
Name | Type |
Description |
Path |
Path
|
The models directory path
|
Source code in omnidocs/utils/model_config.py
| def get_models_directory() -> Path:
"""
Get the models directory, setting up environment if needed.
Returns:
Path: The models directory path
"""
return setup_model_environment()
|
is_supported_language
is_supported_language(code: str) -> bool
Check if a language code is supported by OmniDocs.
Source code in omnidocs/utils/language.py
| def is_supported_language(code: str) -> bool:
"""Check if a language code is supported by OmniDocs."""
return LanguageCode.is_valid_code(code)
|
setup_model_environment
setup_model_environment() -> Path
Setup model environment variables once for the entire application.
This function:
1. Calculates the omnidocs models directory dynamically
2. Creates the directory if it doesn't exist
3. Sets HuggingFace environment variables to use our models directory
4. Uses a flag to prevent multiple setups
Returns:
Name | Type |
Description |
Path |
Path
|
The models directory path
|
Source code in omnidocs/utils/model_config.py
| def setup_model_environment() -> Path:
"""
Setup model environment variables once for the entire application.
This function:
1. Calculates the omnidocs models directory dynamically
2. Creates the directory if it doesn't exist
3. Sets HuggingFace environment variables to use our models directory
4. Uses a flag to prevent multiple setups
Returns:
Path: The models directory path
"""
# Check if already setup to prevent multiple calls
if 'OMNIDOCS_MODELS_SETUP' in os.environ:
# Return the already configured models directory
return Path(os.environ["HF_HOME"])
# Calculate omnidocs root dynamically
current_file = Path(__file__)
omnidocs_root = current_file.parent.parent # Go up to omnidocs/ root
models_dir = omnidocs_root / "models"
models_dir.mkdir(exist_ok=True)
# Set environment variables for HuggingFace to use our models directory
os.environ["HF_HOME"] = str(models_dir)
os.environ["TRANSFORMERS_CACHE"] = str(models_dir)
os.environ["HF_HUB_CACHE"] = str(models_dir)
# Set flag to prevent re-setup
os.environ["OMNIDOCS_MODELS_SETUP"] = "true"
return models_dir
|
π§βπ» Usage Tips
- All extractors follow a consistent interface:
extractor = ...Extractor(); result = extractor.extract(input)
- Results are returned as structured objects (e.g.,
TableOutput
, TextOutput
, etc.)
- See the Getting Started guide for real-world examples.
- For advanced configuration, check each extractorβs docstring for parameters and options.
π More Resources