Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 184 additions & 70 deletions rag-engine/src/layers/data_extractor/extractor.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,57 @@
import io
import re
from typing import List
import uuid
import pdfplumber

from src.process.models import PageContent
from src.layers.data_extractor.models import ImagePage, Line, Page, Word


def pdf(pdf_bytes: bytes) -> list[PageContent]:
pages_output = []
# ===============================
# CONFIG
# ===============================
LINE_TOLERANCE = 3 # vertical tolerance for grouping words into lines
TABLE_PADDING = 1.5 # small padding around table bbox to catch overlaps

# ===============================
# PUBLIC ENTRY
# ===============================
def pdf(pdf_bytes: bytes) -> list[Page]:
pages_output: list[Page] = []

try:
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
for page_number, page in enumerate(pdf.pages, start=1):
words = page.extract_words(
x_tolerance=2, y_tolerance=2, keep_blank_chars=False
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc:
for page_number, page in enumerate(pdf_doc.pages, start=1):
tables_output = extract_tables(page)
table_bboxes = [
expand_bbox(table.bbox, padding=TABLE_PADDING)
for table in page.find_tables()
]

words = extract_words(page)
words = filter_table_words(words, table_bboxes)

lines_output = group_words_into_lines(words)

raw_text = "\n".join(line.text for line in lines_output)
text = normalize_text(raw_text)

images_output = extract_images(page)

pages_output.append(
Page(
page_number=page_number,
text=text,
lines=lines_output,
tables=tables_output,
images=images_output,
width=page.width,
height=page.height,
)
)
lines = {}
for w in words:
top = round(w["top"], 1)
lines.setdefault(top, []).append(w)
text_lines = []
for top in sorted(lines.keys()):
line_words = sorted(lines[top], key=lambda x: x["x0"])
line_text = " ".join(word["text"] for word in line_words)
text_lines.append(line_text)
text = normalize_text("\n".join(text_lines))

tables_output = []
tables = page.find_tables()
for table in tables:
data = table.extract()
if data and any(any(cell for cell in row) for row in data):
tables_output.append(data)

images_output = []
for img in page.images:
images_output.append({
"id": str(uuid.uuid4()),
"x0": img.get("x0"),
"top": img.get("top"),
"x1": img.get("x1"),
"bottom": img.get("bottom"),
"width": img.get("width"),
"height": img.get("height"),
})

pages_output.append({
"page_number": page_number,
"text": text,
"tables": tables_output,
"images": images_output,
"width": page.width,
"height": page.height,
})

return pages_output

except Exception as e:
raise ValueError(f"Error processing PDF: {e}")

Expand All @@ -72,47 +70,163 @@ def normalize_text(text: str) -> str:
return text.strip()


def extract_words(page) -> List[Word]:

raw_words = page.extract_words(
x_tolerance=2,
y_tolerance=2,
keep_blank_chars=False,
extra_attrs=["size", "fontname"],
)

words: List[Word] = []

for w in raw_words:
words.append(
Word(
text=w["text"],
x0=w["x0"],
x1=w["x1"],
top=w["top"],
bottom=w["bottom"],
size=w.get("size", 0.0),
fontname=w.get("fontname", ""),
)
)

return words


def group_words_into_lines(words: List[Word]) -> List[Line]:

if not words:
return []

words_sorted = sorted(words, key=lambda w: (w.top, w.x0))

line_clusters: List[List[Word]] = []

for word in words_sorted:
placed = False

for cluster in line_clusters:
if abs(cluster[0].top - word.top) <= LINE_TOLERANCE:
cluster.append(word)
placed = True
break

if not placed:
line_clusters.append([word])

lines_output: List[Line] = []

for cluster in line_clusters:
cluster = sorted(cluster, key=lambda w: w.x0)

line_text = " ".join(w.text for w in cluster)

avg_size = sum(w.size for w in cluster) / len(cluster)

is_bold = any("bold" in w.fontname.lower() for w in cluster)

x0 = min(w.x0 for w in cluster)
x1 = max(w.x1 for w in cluster)

top = min(w.top for w in cluster)

lines_output.append(
Line(
text=line_text,
words=cluster,
top=top,
avg_size=avg_size,
is_bold=is_bold,
x0=x0,
x1=x1,
)
)

# Sort final lines vertically
lines_output.sort(key=lambda lin: lin.top)

return lines_output


def extract_tables(page):

tables_output = []

tables = page.find_tables()

for table in tables:
data = table.extract()

if data and any(any(cell for cell in row) for row in data):
tables_output.append(data)

return tables_output


def extract_images(page):

images_output: list[ImagePage] = []

for img in page.images:
images_output.append(
ImagePage(
id=str(uuid.uuid4()),
x0=img.get("x0"),
top=img.get("top"),
x1=img.get("x1"),
bottom=img.get("bottom"),
width=img.get("width"),
height=img.get("height"),
)
)

return images_output


def fix_hyphen_breaks(text: str) -> str:
# Join words broken with hyphen + newline
return re.sub(r"-\n(\w)", r"\1", text)


def remove_page_numbers(text: str) -> str:
lines = text.splitlines()
cleaned = []

for line in lines:
stripped = line.strip()
if stripped.isdigit():
continue
cleaned.append(line)

return "\n".join(cleaned)
return "\n".join(line for line in text.splitlines() if not line.strip().isdigit())


def normalize_spaces(text: str) -> str:
return re.sub(r"[ \t]+", " ", text)


def remove_dot_lines(text: str) -> str:
lines = text.splitlines()
cleaned = []
for line in lines:
if re.match(r"^(\.\s?){5,}$", line.strip()):
continue
cleaned.append(line)
return "\n".join(cleaned)
return "\n".join(
line
for line in text.splitlines()
if not re.match(r"^(\.\s?){5,}$", line.strip())
)


def remove_lonely_symbols(text: str) -> str:
lines = text.splitlines()
cleaned = []
for line in lines:
if len(line.strip()) <= 2:
continue
cleaned.append(line)
return "\n".join(cleaned)
return "\n".join(line for line in text.splitlines() if len(line.strip()) > 2)


def fix_merged_words(text: str) -> str:
return re.sub(r"([a-z])([A-Z])", r"\1 \2", text)

def expand_bbox(bbox, padding=1.0):
x0, top, x1, bottom = bbox
return (x0 - padding, top - padding, x1 + padding, bottom + padding)

def filter_table_words(words: list[Word], table_bboxes: list[tuple]) -> list[Word]:
filtered = []
for word in words:
if not any(is_inside_bbox(word, bbox) for bbox in table_bboxes):
filtered.append(word)
return filtered

def is_inside_bbox(word: Word, bbox) -> bool:
x0, top, x1, bottom = bbox
return (
word.x0 >= x0 and word.x1 <= x1 and word.top >= top and word.bottom <= bottom
)
44 changes: 36 additions & 8 deletions rag-engine/src/layers/data_extractor/models.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,44 @@
from pydantic import BaseModel

class ImagePage(BaseModel):
id: str
x0:float
top:float

class Word(BaseModel):
text: str
x0: float
x1: float
top: float
bottom: float
width:float
height: float
size: float
fontname: str


class Line(BaseModel):
text: str
words: list[Word]
top: float
avg_size: float
is_bold: bool
x0: float # new
x1: float # new

class PageContent(BaseModel):

class ImagePage(BaseModel):
id: str | None
x0: float | None
top: float | None
x1: float | None
bottom: float | None
width: float | None
height: float | None


class Page(BaseModel):
page_number: int
text: str
lines: list[Line]
tables: list[list[list[str | None]]]
images: list[ImagePage]
tables: list[list[list[str]]]
width: float | None
height: float | None


Page.model_rebuild()
Empty file.
Loading