From a3f5baf5724bcb536b0b52c10b741baa76cc7953 Mon Sep 17 00:00:00 2001 From: aliamerj Date: Sat, 14 Feb 2026 15:48:33 +0300 Subject: [PATCH] implement second layer of RAG --- .../src/layers/data_extractor/extractor.py | 254 +++++++--- .../src/layers/data_extractor/models.py | 44 +- .../src/layers/structure_analyzer/__init__.py | 0 .../src/layers/structure_analyzer/analyzer.py | 478 ++++++++++++++++++ .../src/layers/structure_analyzer/models.py | 31 ++ rag-engine/src/process/service.py | 9 +- 6 files changed, 735 insertions(+), 81 deletions(-) create mode 100644 rag-engine/src/layers/structure_analyzer/__init__.py create mode 100644 rag-engine/src/layers/structure_analyzer/analyzer.py create mode 100644 rag-engine/src/layers/structure_analyzer/models.py diff --git a/rag-engine/src/layers/data_extractor/extractor.py b/rag-engine/src/layers/data_extractor/extractor.py index 13a2170..be00a01 100644 --- a/rag-engine/src/layers/data_extractor/extractor.py +++ b/rag-engine/src/layers/data_extractor/extractor.py @@ -1,59 +1,57 @@ import io import re +from typing import List import uuid import pdfplumber -from src.process.models import PageContent +from src.layers.data_extractor.models import ImagePage, Line, Page, Word -def pdf(pdf_bytes: bytes) -> list[PageContent]: - pages_output = [] +# =============================== +# CONFIG +# =============================== +LINE_TOLERANCE = 3 # vertical tolerance for grouping words into lines +TABLE_PADDING = 1.5 # small padding around table bbox to catch overlaps + +# =============================== +# PUBLIC ENTRY +# =============================== +def pdf(pdf_bytes: bytes) -> list[Page]: + pages_output: list[Page] = [] + try: - with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: - for page_number, page in enumerate(pdf.pages, start=1): - words = page.extract_words( - x_tolerance=2, y_tolerance=2, keep_blank_chars=False + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc: + for page_number, page in enumerate(pdf_doc.pages, start=1): + tables_output = extract_tables(page) + table_bboxes = [ + expand_bbox(table.bbox, padding=TABLE_PADDING) + for table in page.find_tables() + ] + + words = extract_words(page) + words = filter_table_words(words, table_bboxes) + + lines_output = group_words_into_lines(words) + + raw_text = "\n".join(line.text for line in lines_output) + text = normalize_text(raw_text) + + images_output = extract_images(page) + + pages_output.append( + Page( + page_number=page_number, + text=text, + lines=lines_output, + tables=tables_output, + images=images_output, + width=page.width, + height=page.height, + ) ) - lines = {} - for w in words: - top = round(w["top"], 1) - lines.setdefault(top, []).append(w) - text_lines = [] - for top in sorted(lines.keys()): - line_words = sorted(lines[top], key=lambda x: x["x0"]) - line_text = " ".join(word["text"] for word in line_words) - text_lines.append(line_text) - text = normalize_text("\n".join(text_lines)) - - tables_output = [] - tables = page.find_tables() - for table in tables: - data = table.extract() - if data and any(any(cell for cell in row) for row in data): - tables_output.append(data) - - images_output = [] - for img in page.images: - images_output.append({ - "id": str(uuid.uuid4()), - "x0": img.get("x0"), - "top": img.get("top"), - "x1": img.get("x1"), - "bottom": img.get("bottom"), - "width": img.get("width"), - "height": img.get("height"), - }) - - pages_output.append({ - "page_number": page_number, - "text": text, - "tables": tables_output, - "images": images_output, - "width": page.width, - "height": page.height, - }) return pages_output + except Exception as e: raise ValueError(f"Error processing PDF: {e}") @@ -72,22 +70,129 @@ def normalize_text(text: str) -> str: return text.strip() +def extract_words(page) -> List[Word]: + + raw_words = page.extract_words( + x_tolerance=2, + y_tolerance=2, + keep_blank_chars=False, + extra_attrs=["size", "fontname"], + ) + + words: List[Word] = [] + + for w in raw_words: + words.append( + Word( + text=w["text"], + x0=w["x0"], + x1=w["x1"], + top=w["top"], + bottom=w["bottom"], + size=w.get("size", 0.0), + fontname=w.get("fontname", ""), + ) + ) + + return words + + +def group_words_into_lines(words: List[Word]) -> List[Line]: + + if not words: + return [] + + words_sorted = sorted(words, key=lambda w: (w.top, w.x0)) + + line_clusters: List[List[Word]] = [] + + for word in words_sorted: + placed = False + + for cluster in line_clusters: + if abs(cluster[0].top - word.top) <= LINE_TOLERANCE: + cluster.append(word) + placed = True + break + + if not placed: + line_clusters.append([word]) + + lines_output: List[Line] = [] + + for cluster in line_clusters: + cluster = sorted(cluster, key=lambda w: w.x0) + + line_text = " ".join(w.text for w in cluster) + + avg_size = sum(w.size for w in cluster) / len(cluster) + + is_bold = any("bold" in w.fontname.lower() for w in cluster) + + x0 = min(w.x0 for w in cluster) + x1 = max(w.x1 for w in cluster) + + top = min(w.top for w in cluster) + + lines_output.append( + Line( + text=line_text, + words=cluster, + top=top, + avg_size=avg_size, + is_bold=is_bold, + x0=x0, + x1=x1, + ) + ) + + # Sort final lines vertically + lines_output.sort(key=lambda lin: lin.top) + + return lines_output + + +def extract_tables(page): + + tables_output = [] + + tables = page.find_tables() + + for table in tables: + data = table.extract() + + if data and any(any(cell for cell in row) for row in data): + tables_output.append(data) + + return tables_output + + +def extract_images(page): + + images_output: list[ImagePage] = [] + + for img in page.images: + images_output.append( + ImagePage( + id=str(uuid.uuid4()), + x0=img.get("x0"), + top=img.get("top"), + x1=img.get("x1"), + bottom=img.get("bottom"), + width=img.get("width"), + height=img.get("height"), + ) + ) + + return images_output + + def fix_hyphen_breaks(text: str) -> str: - # Join words broken with hyphen + newline return re.sub(r"-\n(\w)", r"\1", text) def remove_page_numbers(text: str) -> str: - lines = text.splitlines() - cleaned = [] - - for line in lines: - stripped = line.strip() - if stripped.isdigit(): - continue - cleaned.append(line) - - return "\n".join(cleaned) + return "\n".join(line for line in text.splitlines() if not line.strip().isdigit()) def normalize_spaces(text: str) -> str: @@ -95,24 +200,33 @@ def normalize_spaces(text: str) -> str: def remove_dot_lines(text: str) -> str: - lines = text.splitlines() - cleaned = [] - for line in lines: - if re.match(r"^(\.\s?){5,}$", line.strip()): - continue - cleaned.append(line) - return "\n".join(cleaned) + return "\n".join( + line + for line in text.splitlines() + if not re.match(r"^(\.\s?){5,}$", line.strip()) + ) def remove_lonely_symbols(text: str) -> str: - lines = text.splitlines() - cleaned = [] - for line in lines: - if len(line.strip()) <= 2: - continue - cleaned.append(line) - return "\n".join(cleaned) + return "\n".join(line for line in text.splitlines() if len(line.strip()) > 2) def fix_merged_words(text: str) -> str: return re.sub(r"([a-z])([A-Z])", r"\1 \2", text) + +def expand_bbox(bbox, padding=1.0): + x0, top, x1, bottom = bbox + return (x0 - padding, top - padding, x1 + padding, bottom + padding) + +def filter_table_words(words: list[Word], table_bboxes: list[tuple]) -> list[Word]: + filtered = [] + for word in words: + if not any(is_inside_bbox(word, bbox) for bbox in table_bboxes): + filtered.append(word) + return filtered + +def is_inside_bbox(word: Word, bbox) -> bool: + x0, top, x1, bottom = bbox + return ( + word.x0 >= x0 and word.x1 <= x1 and word.top >= top and word.bottom <= bottom + ) diff --git a/rag-engine/src/layers/data_extractor/models.py b/rag-engine/src/layers/data_extractor/models.py index 9253182..040353b 100644 --- a/rag-engine/src/layers/data_extractor/models.py +++ b/rag-engine/src/layers/data_extractor/models.py @@ -1,16 +1,44 @@ from pydantic import BaseModel -class ImagePage(BaseModel): - id: str - x0:float - top:float + +class Word(BaseModel): + text: str + x0: float x1: float + top: float bottom: float - width:float - height: float + size: float + fontname: str + + +class Line(BaseModel): + text: str + words: list[Word] + top: float + avg_size: float + is_bold: bool + x0: float # new + x1: float # new -class PageContent(BaseModel): + +class ImagePage(BaseModel): + id: str | None + x0: float | None + top: float | None + x1: float | None + bottom: float | None + width: float | None + height: float | None + + +class Page(BaseModel): page_number: int text: str + lines: list[Line] + tables: list[list[list[str | None]]] images: list[ImagePage] - tables: list[list[list[str]]] + width: float | None + height: float | None + + +Page.model_rebuild() diff --git a/rag-engine/src/layers/structure_analyzer/__init__.py b/rag-engine/src/layers/structure_analyzer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/layers/structure_analyzer/analyzer.py b/rag-engine/src/layers/structure_analyzer/analyzer.py new file mode 100644 index 0000000..f1055f8 --- /dev/null +++ b/rag-engine/src/layers/structure_analyzer/analyzer.py @@ -0,0 +1,478 @@ +from collections import defaultdict +import re +import uuid +from typing import Counter, List + +from src.layers.data_extractor.models import Line, Page +from src.layers.structure_analyzer.models import Paragraph, Section, StructuredDocument + + + +# ========================================================== +# PUBLIC API +# ========================================================== +def analyze_layout(pages: List[Page]) -> StructuredDocument: + + document = StructuredDocument() + stack: List[Section] = [] + + if not pages: + return document + + font_tiers = compute_font_tiers(pages) + + for page in pages: + + # ---- normalize reading order ---- + page_lines = normalize_reading_order(page.lines) + + # ---- detect columns ---- + columns = cluster_columns(page_lines) + + for column_lines in columns: + + blocks = build_blocks(column_lines) + + for block in blocks: + + if is_garbage_block(block): + continue + + heading_level, confidence = detect_heading(block, font_tiers) + + # ------------------------------- + # SECTION CREATION + # ------------------------------- + if heading_level > 0: + + section = Section( + id=str(uuid.uuid4()), + title=clean_title(block.text), + level=heading_level, + page_number=page.page_number, + confidence=confidence, + ) + + while stack and stack[-1].level >= heading_level: + stack.pop() + + if stack: + stack[-1].children.append(section) + else: + document.sections.append(section) + + stack.append(section) + continue + + # ------------------------------- + # PARAGRAPH CREATION + # ------------------------------- + paragraph = Paragraph( + text=block.text, + page_number=page.page_number, + ) + + if stack: + stack[-1].paragraphs.append(paragraph) + else: + document.preamble.append(paragraph) + + # ---- attach assets ---- + if stack: + stack[-1].tables.extend(page.tables) + stack[-1].images.extend(page.images) + + return document + +def should_merge(prev_line, current_line): + if not prev_line: + return False + + # Font size must match + if round(prev_line.avg_size, 1) != round(current_line.avg_size, 1): + return False + + # Bold style mismatch → new paragraph + if prev_line.is_bold != current_line.is_bold: + return False + + # Indentation difference + if abs(prev_line.x0 - current_line.x0) > 5: + return False + + # Large vertical gap → new paragraph + if has_large_vertical_gap(prev_line, current_line, multiplier=1.2): + return False + + # Bullet line → start new paragraph + if is_bullet_line(current_line.text): + return False + + return True + + +def is_garbage_fragment(line, body_size): + # Very small font → likely garbage + if line.avg_size < body_size * 0.85: + return True + + # Tiny orphan line → likely fragment + words = line.text.strip().split() + if len(words) <= 1: + return True + + # Single symbol lines → skip + if len(line.text.strip()) <= 2: + return True + + return False + + +def is_bullet_line(text: str): + + stripped = text.strip() + + return ( + stripped.startswith(("•", "-", "–", "*")) + or re.match(r"^\d+\.", stripped) + or re.match(r"^[a-zA-Z]\)", stripped) + ) + + +def is_page_number(line, page_width): + text = line.text.strip() + if not text.isdigit(): + return False + if not is_centered(line, page_width, tolerance_ratio=0.2): + return False + return True + + +# ========================================================== +# FONT ANALYSIS +# ========================================================== +def compute_font_stats(pages: List[Page]): + sizes = [] + + for page in pages: + for line in page.lines: + sizes.append(round(line.avg_size, 1)) + + counter = Counter(sizes) + + body_size = counter.most_common(1)[0][0] + unique_sizes = sorted(counter.keys(), reverse=True) + + return body_size, unique_sizes + + +def get_heading_level(line, body_size, size_levels): + size = round(line.avg_size, 1) + + if size <= body_size: + return 0 + + for idx, s in enumerate(size_levels): + if size == s: + return idx + 1 + + return 0 + + +# ========================================================== +# LAYOUT SIGNALS +# ========================================================== +def is_centered(line, page_width, tolerance_ratio=0.1): + line_center = (line.x0 + line.x1) / 2 + page_center = page_width / 2 + tolerance = page_width * tolerance_ratio + return abs(line_center - page_center) <= tolerance + + +def compute_body_indent(pages): + indents = [] + + for page in pages: + for line in page.lines: + indents.append(round(line.x0, 1)) + + counter = Counter(indents) + return counter.most_common(1)[0][0] + + +def is_indent_shift(line, body_indent, threshold=5): + return abs(line.x0 - body_indent) > threshold + + +def has_large_vertical_gap(prev_line, current_line, multiplier=1.5): + if not prev_line: + return False + + gap = current_line.top - prev_line.top + return gap > (prev_line.avg_size * multiplier) + + + +# ========================================================== +# HEADING SCORING +# ========================================================== +def compute_heading_score( + line, + body_size, + body_indent, + prev_line, + page_width, +): + score = 0.0 + + # Larger font + if line.avg_size > body_size: + score += 0.4 + + # Bold + if line.is_bold: + score += 0.2 + + # Centered + if is_centered(line, page_width): + score += 0.15 + + # Indent difference + if is_indent_shift(line, body_indent): + score += 0.1 + + # Vertical gap above + if has_large_vertical_gap(prev_line, line): + score += 0.15 + + # Short lines are more likely headings + if len(line.text.split()) <= 20: + score += 0.05 + + return score + + +def should_merge_lines(prev_line: Line | None, current_line: Line) -> bool: + + if not prev_line: + return False + + # Font match + if abs(prev_line.avg_size - current_line.avg_size) > 0.3: + return False + + # Bold mismatch + if prev_line.is_bold != current_line.is_bold: + return False + + # Indent tolerance + if abs(prev_line.x0 - current_line.x0) > 8: + return False + + # Vertical gap normalization + gap = current_line.top - prev_line.top + if gap > prev_line.avg_size * 1.6: + return False + + # Bullet always starts new paragraph + if is_bullet_line(current_line.text): + return False + + # Sentence continuation heuristic + if prev_line.text.rstrip().endswith((".", "!", "?", ":", ";")): + return False + + return True + + +def is_garbage_line(line, known_headings): + text = line.text.strip() + + # Numeric only + if text.isdigit(): + return True + + # Single symbol (→, -, etc.) + if len(text) <= 2 and not text.isalpha(): + return True + + # Repeated heading fragment + if text in known_headings: + return True + + # Tiny orphan word + if len(text.split()) == 1 and len(text) < 4: + return True + + return False + + + + + +def normalize_reading_order(lines: List[Line]) -> List[Line]: + return sorted(lines, key=lambda lin: (round(lin.top, 1), lin.x0)) + +def cluster_columns(lines: List[Line], tolerance=60): + + clusters = [] + + for line in sorted(lines, key=lambda lin: lin.x0): + + placed = False + + for cluster in clusters: + if abs(cluster["x_mean"] - line.x0) < tolerance: + cluster["lines"].append(line) + cluster["x_mean"] = sum(lin.x0 for lin in cluster["lines"]) / len(cluster["lines"]) + placed = True + break + + if not placed: + clusters.append({"x_mean": line.x0, "lines": [line]}) + + # maintain vertical order inside each column + return [ + sorted(cluster["lines"], key=lambda lin: lin.top) + for cluster in sorted(clusters, key=lambda c: c["x_mean"]) + ] + + +class Block: + def __init__(self, lines: List[Line]): + self.lines = lines + self.text = " ".join(lin.text for lin in lines) + self.avg_size = sum(lin.avg_size for lin in lines) / len(lines) + self.is_bold = any(lin.is_bold for lin in lines) + self.x0 = min(lin.x0 for lin in lines) + self.top = lines[0].top + + +def build_blocks(lines: List[Line]) -> List[Block]: + + blocks = [] + current = [lines[0]] + + for prev, line in zip(lines, lines[1:]): + + if should_start_new_block(prev, line): + blocks.append(Block(current)) + current = [line] + else: + current.append(line) + + if current: + blocks.append(Block(current)) + + return blocks + +def compute_font_tiers(pages: List[Page]): + + sizes = [] + + for page in pages: + for line in page.lines: + sizes.append(round(line.avg_size, 1)) + + unique = sorted(set(sizes), reverse=True) + + # map size → tier index + return {size: idx + 1 for idx, size in enumerate(unique)} + + +def detect_heading(block, font_tiers): + + size = round(block.avg_size, 1) + tier = font_tiers.get(size, 0) + + if tier <= 1: + return 0, 0.0 + + score = 0.0 + + # Larger tier weight + if tier >= 2: + score += 0.5 + + # Bold weight + if block.is_bold: + score += 0.2 + + word_count = len(block.text.split()) + + # Headings are usually short + if word_count <= 12: + score += 0.2 + else: + score -= 0.3 # Penalize long paragraphs heavily + + # Penalize sentence-like paragraphs + if block.text.count(".") > 1: + score -= 0.3 + + # Uppercase headings + if block.text.isupper(): + score += 0.1 + + if score >= 0.6: + return tier - 1, round(score, 3) + + return 0, round(score, 3) + + +def is_garbage_block(block): + + text = block.text.strip() + + if not text: + return True + + # pure symbols + if len(text) <= 2 and not text.isalpha(): + return True + + # extremely tiny font + if block.avg_size < 5: + return True + + return False + +def detect_repeated_lines(pages): + + freq = defaultdict(int) + + for page in pages: + for line in page.lines: + key = (round(line.top, 0), line.text.strip()) + freq[key] += 1 + + repeated = set() + total = len(pages) + + for key, count in freq.items(): + if count > total * 0.6: + repeated.add(key) + + return repeated + + +def clean_title(text: str) -> str: + return re.sub(r"^\d+(\.\d+)*\s*", "", text).strip() + + +def should_start_new_block(prev: Line, current: Line): + + # large vertical gap + gap = current.top - prev.top + if gap > prev.avg_size * 1.8: + return True + + # strong font size change + if abs(prev.avg_size - current.avg_size) > 1.2: + return True + + # strong indent shift + if abs(prev.x0 - current.x0) > 40: + return True + + return False diff --git a/rag-engine/src/layers/structure_analyzer/models.py b/rag-engine/src/layers/structure_analyzer/models.py new file mode 100644 index 0000000..df001ac --- /dev/null +++ b/rag-engine/src/layers/structure_analyzer/models.py @@ -0,0 +1,31 @@ +from pydantic import BaseModel, Field +from typing import List + +from src.layers.data_extractor.models import ImagePage + + +class Paragraph(BaseModel): + text: str + page_number: int + + +class Section(BaseModel): + id: str + title: str + level: int + page_number: int + + paragraphs: List[Paragraph] = Field(default_factory=list) + children: List["Section"] = Field(default_factory=list) + + tables: list[list[list[str | None]]] = Field(default_factory=list) + images: List[ImagePage] = Field(default_factory=list) + confidence: float + + +class StructuredDocument(BaseModel): + sections: List[Section] = Field(default_factory=list) + preamble: List[Paragraph] = Field(default_factory=list) + + +Section.model_rebuild() diff --git a/rag-engine/src/process/service.py b/rag-engine/src/process/service.py index 5407db6..a10aa08 100644 --- a/rag-engine/src/process/service.py +++ b/rag-engine/src/process/service.py @@ -1,13 +1,16 @@ import logging from src.layers.data_extractor import extractor +from src.layers.structure_analyzer.analyzer import analyze_layout + from . import models def processFile(fileType: models.FileType, file_bytes: bytes): if fileType == models.FileType.pdf: logging.info("start processing pdf files") - data = extractor.pdf(file_bytes) - logging.info(f"pdf data extracted pages: {len(data)}") - return data + pages = extractor.pdf(file_bytes) + data = analyze_layout(pages) + logging.info(f"pdf data extracted pages: {len(pages)}") + return data.model_dump() raise Exception("Unspported File type")