diff --git a/rag-engine/requirements.txt b/rag-engine/requirements.txt index 23e22d6..8dcc527 100644 --- a/rag-engine/requirements.txt +++ b/rag-engine/requirements.txt @@ -3,3 +3,4 @@ python-dotenv pydantic pdfplumber requests +tiktoken diff --git a/rag-engine/src/layers/chunking/__init__.py b/rag-engine/src/layers/chunking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/layers/chunking/chunk_document.py b/rag-engine/src/layers/chunking/chunk_document.py new file mode 100644 index 0000000..06ce75a --- /dev/null +++ b/rag-engine/src/layers/chunking/chunk_document.py @@ -0,0 +1,207 @@ +from typing import List +import uuid +from src.layers.chunking.models import Chunk +import tiktoken + +_encoder = tiktoken.get_encoding("cl100k_base") + + +def count_tokens(text: str) -> int: + return len(_encoder.encode(text)) + + +def chunk_document( + structured_document, + metadata: dict, + max_tokens: int = 400, +) -> List[Chunk]: + + chunks: List[Chunk] = [] + + # handle preamble + if structured_document.preamble: + preamble_text = "\n".join(p.text for p in structured_document.preamble) + if not _looks_like_toc(preamble_text): + chunks.extend( + _chunk_paragraphs( + paragraphs=structured_document.preamble, + section_title="Preamble", + section_path=["Preamble"], + level=0, + max_tokens=max_tokens, + metadata=metadata, + ) + ) + + # handle sections + for section in structured_document.sections: + chunks.extend( + _process_section( + section, parent_path=[], max_tokens=max_tokens, metadata=metadata + ) + ) + + return chunks + + +def _chunk_paragraphs( + paragraphs, + section_title: str, + section_path: List[str], + level: int, + max_tokens: int, + metadata: dict, +) -> List[Chunk]: + + chunks: List[Chunk] = [] + + buffer = "" + page_start: int | None = None + page_end: int | None = None + + for p in paragraphs: + text = p.text.strip() + if not text: + continue + + if page_start is None: + page_start = p.page_number + + page_end = p.page_number + + candidate = f"{buffer}\n{text}" if buffer else text + token_count = count_tokens(candidate) + + if token_count <= max_tokens: + buffer = candidate + else: + # flush + if buffer: + chunks.append( + _build_chunk( + buffer, + section_title, + section_path, + level, + page_start, + page_end, + metadata=metadata, + ) + ) + + buffer = text + page_start = p.page_number + page_end = p.page_number + + # final flush + if buffer: + chunks.append( + _build_chunk( + buffer, + section_title, + section_path, + level, + page_start, + page_end, + metadata=metadata, + ) + ) + + return _merge_small_chunks(chunks, metadata) + + +def _process_section( + section, + parent_path: List[str], + max_tokens: int, + metadata: dict, +) -> List[Chunk]: + + path = parent_path + [section.title] + chunks: List[Chunk] = [] + + # chunk this section's paragraphs + if section.paragraphs: + chunks.extend( + _chunk_paragraphs( + paragraphs=section.paragraphs, + section_title=section.title, + section_path=path, + level=section.level, + max_tokens=max_tokens, + metadata=metadata, + ) + ) + + # recursively process children + for child in section.children: + chunks.extend( + _process_section( + child, parent_path=path, max_tokens=max_tokens, metadata=metadata + ) + ) + + return chunks + + +def _build_chunk( + text: str, + section_title: str, + section_path: List[str], + level: int, + page_start: int | None, + page_end: int | None, + metadata: dict, +) -> Chunk: + + return Chunk( + id=str(uuid.uuid4()), + text=text.strip(), + token_count=count_tokens(text), + section_title=section_title, + section_path=section_path, + level=level, + page_start=page_start, + page_end=page_end, + metadata=metadata, + ) + + +def _merge_small_chunks( + chunks: List[Chunk], metadata: dict, min_tokens: int = 80 +) -> List[Chunk]: + if not chunks: + return chunks + + merged = [] + buffer = chunks[0] + + for chunk in chunks[1:]: + if buffer.token_count < min_tokens: + combined_text = buffer.text + "\n" + chunk.text + buffer = _build_chunk( + combined_text, + buffer.section_title, + buffer.section_path, + buffer.level, + buffer.page_start, + chunk.page_end, + metadata=metadata, + ) + else: + merged.append(buffer) + buffer = chunk + + merged.append(buffer) + return merged + + +def _looks_like_toc(text: str) -> bool: + lines = text.split("\n") + digit_lines = sum(1 for lin in lines if lin.strip().split()[-1].isdigit()) + dotted_lines = sum(1 for lin in lines if "..." in lin or ". ." in lin) + + if len(lines) == 0: + return False + + return (digit_lines / len(lines)) > 0.4 or (dotted_lines / len(lines)) > 0.3 diff --git a/rag-engine/src/layers/chunking/models.py b/rag-engine/src/layers/chunking/models.py new file mode 100644 index 0000000..edb00ea --- /dev/null +++ b/rag-engine/src/layers/chunking/models.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel +from typing import List, Dict, Any + +# ------------------------- +# Output Model +# ------------------------- + + +class Chunk(BaseModel): + id: str + text: str + token_count: int + + section_title: str + section_path: List[str] + level: int + + page_start: int | None + page_end: int | None + + metadata: Dict[str, Any] = {} diff --git a/rag-engine/src/layers/structure_analyzer/analyzer.py b/rag-engine/src/layers/structure_analyzer/analyzer.py index f1055f8..1efc336 100644 --- a/rag-engine/src/layers/structure_analyzer/analyzer.py +++ b/rag-engine/src/layers/structure_analyzer/analyzer.py @@ -1,13 +1,11 @@ -from collections import defaultdict import re import uuid -from typing import Counter, List +from typing import List from src.layers.data_extractor.models import Line, Page from src.layers.structure_analyzer.models import Paragraph, Section, StructuredDocument - # ========================================================== # PUBLIC API # ========================================================== @@ -19,35 +17,31 @@ def analyze_layout(pages: List[Page]) -> StructuredDocument: if not pages: return document - font_tiers = compute_font_tiers(pages) + font_tiers = _compute_font_tiers(pages) for page in pages: - # ---- normalize reading order ---- - page_lines = normalize_reading_order(page.lines) + page_lines = _normalize_reading_order(page.lines) # ---- detect columns ---- - columns = cluster_columns(page_lines) + columns = _cluster_columns(page_lines) for column_lines in columns: - - blocks = build_blocks(column_lines) + blocks = _build_blocks(column_lines) for block in blocks: - - if is_garbage_block(block): + if _is_garbage_block(block): continue - heading_level, confidence = detect_heading(block, font_tiers) + heading_level, confidence = _detect_heading(block, font_tiers) # ------------------------------- # SECTION CREATION # ------------------------------- if heading_level > 0: - section = Section( id=str(uuid.uuid4()), - title=clean_title(block.text), + title=_clean_title(block.text), level=heading_level, page_number=page.page_number, confidence=confidence, @@ -84,247 +78,24 @@ def analyze_layout(pages: List[Page]) -> StructuredDocument: return document -def should_merge(prev_line, current_line): - if not prev_line: - return False - - # Font size must match - if round(prev_line.avg_size, 1) != round(current_line.avg_size, 1): - return False - - # Bold style mismatch → new paragraph - if prev_line.is_bold != current_line.is_bold: - return False - - # Indentation difference - if abs(prev_line.x0 - current_line.x0) > 5: - return False - - # Large vertical gap → new paragraph - if has_large_vertical_gap(prev_line, current_line, multiplier=1.2): - return False - - # Bullet line → start new paragraph - if is_bullet_line(current_line.text): - return False - - return True - - -def is_garbage_fragment(line, body_size): - # Very small font → likely garbage - if line.avg_size < body_size * 0.85: - return True - - # Tiny orphan line → likely fragment - words = line.text.strip().split() - if len(words) <= 1: - return True - - # Single symbol lines → skip - if len(line.text.strip()) <= 2: - return True - - return False - - -def is_bullet_line(text: str): - - stripped = text.strip() - - return ( - stripped.startswith(("•", "-", "–", "*")) - or re.match(r"^\d+\.", stripped) - or re.match(r"^[a-zA-Z]\)", stripped) - ) - - -def is_page_number(line, page_width): - text = line.text.strip() - if not text.isdigit(): - return False - if not is_centered(line, page_width, tolerance_ratio=0.2): - return False - return True - - -# ========================================================== -# FONT ANALYSIS -# ========================================================== -def compute_font_stats(pages: List[Page]): - sizes = [] - - for page in pages: - for line in page.lines: - sizes.append(round(line.avg_size, 1)) - - counter = Counter(sizes) - - body_size = counter.most_common(1)[0][0] - unique_sizes = sorted(counter.keys(), reverse=True) - - return body_size, unique_sizes - - -def get_heading_level(line, body_size, size_levels): - size = round(line.avg_size, 1) - - if size <= body_size: - return 0 - - for idx, s in enumerate(size_levels): - if size == s: - return idx + 1 - - return 0 - - -# ========================================================== -# LAYOUT SIGNALS -# ========================================================== -def is_centered(line, page_width, tolerance_ratio=0.1): - line_center = (line.x0 + line.x1) / 2 - page_center = page_width / 2 - tolerance = page_width * tolerance_ratio - return abs(line_center - page_center) <= tolerance - - -def compute_body_indent(pages): - indents = [] - - for page in pages: - for line in page.lines: - indents.append(round(line.x0, 1)) - - counter = Counter(indents) - return counter.most_common(1)[0][0] - - -def is_indent_shift(line, body_indent, threshold=5): - return abs(line.x0 - body_indent) > threshold - - -def has_large_vertical_gap(prev_line, current_line, multiplier=1.5): - if not prev_line: - return False - - gap = current_line.top - prev_line.top - return gap > (prev_line.avg_size * multiplier) - - - -# ========================================================== -# HEADING SCORING -# ========================================================== -def compute_heading_score( - line, - body_size, - body_indent, - prev_line, - page_width, -): - score = 0.0 - - # Larger font - if line.avg_size > body_size: - score += 0.4 - - # Bold - if line.is_bold: - score += 0.2 - - # Centered - if is_centered(line, page_width): - score += 0.15 - - # Indent difference - if is_indent_shift(line, body_indent): - score += 0.1 - - # Vertical gap above - if has_large_vertical_gap(prev_line, line): - score += 0.15 - - # Short lines are more likely headings - if len(line.text.split()) <= 20: - score += 0.05 - - return score - -def should_merge_lines(prev_line: Line | None, current_line: Line) -> bool: - - if not prev_line: - return False - - # Font match - if abs(prev_line.avg_size - current_line.avg_size) > 0.3: - return False - - # Bold mismatch - if prev_line.is_bold != current_line.is_bold: - return False - - # Indent tolerance - if abs(prev_line.x0 - current_line.x0) > 8: - return False - - # Vertical gap normalization - gap = current_line.top - prev_line.top - if gap > prev_line.avg_size * 1.6: - return False - - # Bullet always starts new paragraph - if is_bullet_line(current_line.text): - return False - - # Sentence continuation heuristic - if prev_line.text.rstrip().endswith((".", "!", "?", ":", ";")): - return False - - return True - - -def is_garbage_line(line, known_headings): - text = line.text.strip() - - # Numeric only - if text.isdigit(): - return True - - # Single symbol (→, -, etc.) - if len(text) <= 2 and not text.isalpha(): - return True - - # Repeated heading fragment - if text in known_headings: - return True - - # Tiny orphan word - if len(text.split()) == 1 and len(text) < 4: - return True - - return False - - - - - -def normalize_reading_order(lines: List[Line]) -> List[Line]: +def _normalize_reading_order(lines: List[Line]) -> List[Line]: return sorted(lines, key=lambda lin: (round(lin.top, 1), lin.x0)) -def cluster_columns(lines: List[Line], tolerance=60): + +def _cluster_columns(lines: List[Line], tolerance=60): clusters = [] for line in sorted(lines, key=lambda lin: lin.x0): - placed = False for cluster in clusters: if abs(cluster["x_mean"] - line.x0) < tolerance: cluster["lines"].append(line) - cluster["x_mean"] = sum(lin.x0 for lin in cluster["lines"]) / len(cluster["lines"]) + cluster["x_mean"] = sum(lin.x0 for lin in cluster["lines"]) / len( + cluster["lines"] + ) placed = True break @@ -348,14 +119,13 @@ def __init__(self, lines: List[Line]): self.top = lines[0].top -def build_blocks(lines: List[Line]) -> List[Block]: +def _build_blocks(lines: List[Line]) -> List[Block]: blocks = [] current = [lines[0]] for prev, line in zip(lines, lines[1:]): - - if should_start_new_block(prev, line): + if _should_start_new_block(prev, line): blocks.append(Block(current)) current = [line] else: @@ -366,7 +136,8 @@ def build_blocks(lines: List[Line]) -> List[Block]: return blocks -def compute_font_tiers(pages: List[Page]): + +def _compute_font_tiers(pages: List[Page]): sizes = [] @@ -380,47 +151,55 @@ def compute_font_tiers(pages: List[Page]): return {size: idx + 1 for idx, size in enumerate(unique)} -def detect_heading(block, font_tiers): +def _detect_heading(block, font_tiers): + + if _is_code_like(block): + return 0, 0.0 + + if _looks_like_toc_block(block): + return 0, 0.0 size = round(block.avg_size, 1) tier = font_tiers.get(size, 0) - if tier <= 1: + if tier == 0: return 0, 0.0 score = 0.0 - # Larger tier weight - if tier >= 2: + # Tier weight (now includes largest font) + if tier >= 1: score += 0.5 - # Bold weight if block.is_bold: score += 0.2 word_count = len(block.text.split()) - # Headings are usually short if word_count <= 12: score += 0.2 else: - score -= 0.3 # Penalize long paragraphs heavily + score -= 0.4 - # Penalize sentence-like paragraphs if block.text.count(".") > 1: score -= 0.3 - # Uppercase headings if block.text.isupper(): score += 0.1 + # Chapter-number pattern + text = block.text.strip() + if text and text[0].isdigit() and "—" in text: + score += 0.2 + if score >= 0.6: - return tier - 1, round(score, 3) + level = min(tier, 6) + return level, round(score, 3) return 0, round(score, 3) -def is_garbage_block(block): +def _is_garbage_block(block): text = block.text.strip() @@ -437,30 +216,12 @@ def is_garbage_block(block): return False -def detect_repeated_lines(pages): - - freq = defaultdict(int) - - for page in pages: - for line in page.lines: - key = (round(line.top, 0), line.text.strip()) - freq[key] += 1 - - repeated = set() - total = len(pages) - - for key, count in freq.items(): - if count > total * 0.6: - repeated.add(key) - - return repeated - -def clean_title(text: str) -> str: +def _clean_title(text: str) -> str: return re.sub(r"^\d+(\.\d+)*\s*", "", text).strip() -def should_start_new_block(prev: Line, current: Line): +def _should_start_new_block(prev: Line, current: Line): # large vertical gap gap = current.top - prev.top @@ -476,3 +237,56 @@ def should_start_new_block(prev: Line, current: Line): return True return False + + +def _is_code_like(block) -> bool: + text = block.text.strip() + + if not text: + return False + + # High punctuation density + punct = sum(1 for c in text if c in "{}();=<>[]") + if len(text) > 0 and (punct / len(text)) > 0.08: + return True + + # Ends with semicolon (strong signal) + if text.endswith(";"): + return True + + # Many parentheses (function-like) + if text.count("(") >= 1 and text.count(")") >= 1: + if "{" in text or "=" in text: + return True + + # Very high digit ratio (often code or references) + digits = sum(1 for c in text if c.isdigit()) + if len(text) > 0 and (digits / len(text)) > 0.3: + return True + + return False + + +def _looks_like_toc_block(block) -> bool: + text = block.text.strip() + + if not text: + return False + + words = text.split() + + # Many dot leaders + if text.count(".") > 8: + return True + + # Ends with page number + if words and words[-1].isdigit(): + if text.count(".") >= 3: + return True + + # High digit ratio + digits = sum(1 for c in text if c.isdigit()) + if len(text) > 0 and (digits / len(text)) > 0.4: + return True + + return False diff --git a/rag-engine/src/process/controller.py b/rag-engine/src/process/controller.py index 5eade54..f8e38c0 100644 --- a/rag-engine/src/process/controller.py +++ b/rag-engine/src/process/controller.py @@ -1,12 +1,12 @@ from fastapi.responses import JSONResponse from fastapi import APIRouter, File, Form, HTTPException, Path, UploadFile, status import requests - +import json from src.process.service import processFile from . import models -router = APIRouter(prefix="/process", tags=["Todos"]) +router = APIRouter(prefix="/process", tags=["Process"]) @router.post( @@ -17,9 +17,19 @@ async def process( file_type: models.FileType = Path(..., description="Type of file to process"), input_mode: models.InputMode = Path(..., description="How content is passed"), + metadata: str | None = Form(None, description="metadata for chunks"), upload: UploadFile | None = File(None, description="The file to upload"), url: str | None = Form(None, description="Link to fetch"), ): + meta = {} + if metadata: + try: + meta = json.loads(metadata) + except json.JSONDecodeError: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Invalid JSON in metadata field", + ) try: if input_mode == models.InputMode.url: if not url: @@ -34,7 +44,8 @@ async def process( raise HTTPException( status.HTTP_400_BAD_REQUEST, "URL does not point to a PDF file" ) - data = processFile(models.FileType.pdf, resp.content) + + data = processFile(models.FileType.pdf, resp.content, meta) return JSONResponse(content=data, status_code=status.HTTP_200_OK) if input_mode == models.InputMode.file: if not upload: @@ -43,7 +54,7 @@ async def process( "Must upload a file when input_mode is 'file'", ) data_bytes = await upload.read() - data = processFile(models.FileType.pdf,data_bytes) + data = processFile(models.FileType.pdf, data_bytes, meta) return JSONResponse(content=data, status_code=status.HTTP_200_OK) except ValueError as e: diff --git a/rag-engine/src/process/service.py b/rag-engine/src/process/service.py index a10aa08..727acaf 100644 --- a/rag-engine/src/process/service.py +++ b/rag-engine/src/process/service.py @@ -1,16 +1,18 @@ import logging +from src.layers.chunking.chunk_document import chunk_document from src.layers.data_extractor import extractor from src.layers.structure_analyzer.analyzer import analyze_layout from . import models -def processFile(fileType: models.FileType, file_bytes: bytes): +def processFile(fileType: models.FileType, file_bytes: bytes, metadata: dict): if fileType == models.FileType.pdf: logging.info("start processing pdf files") pages = extractor.pdf(file_bytes) - data = analyze_layout(pages) + structured_document = analyze_layout(pages) + chunks = chunk_document(structured_document, metadata, max_tokens=400) logging.info(f"pdf data extracted pages: {len(pages)}") - return data.model_dump() + return [chunk.model_dump() for chunk in chunks] raise Exception("Unspported File type")