diff --git a/rag-engine/src/layers/chunking/chunk_document.py b/rag-engine/src/layers/chunking/chunk_document.py index 06ce75a..b76798a 100644 --- a/rag-engine/src/layers/chunking/chunk_document.py +++ b/rag-engine/src/layers/chunking/chunk_document.py @@ -1,8 +1,11 @@ +import json from typing import List import uuid from src.layers.chunking.models import Chunk import tiktoken +from src.layers.structure_analyzer.models import Section, StructuredDocument + _encoder = tiktoken.get_encoding("cl100k_base") @@ -11,17 +14,18 @@ def count_tokens(text: str) -> int: def chunk_document( - structured_document, + structured_document: StructuredDocument, metadata: dict, max_tokens: int = 400, + min_tokens: int = 80, ) -> List[Chunk]: chunks: List[Chunk] = [] - # handle preamble + # ---- PREAMBLE ---- if structured_document.preamble: preamble_text = "\n".join(p.text for p in structured_document.preamble) - if not _looks_like_toc(preamble_text): + if preamble_text.strip(): chunks.extend( _chunk_paragraphs( paragraphs=structured_document.preamble, @@ -29,18 +33,26 @@ def chunk_document( section_path=["Preamble"], level=0, max_tokens=max_tokens, + min_tokens=min_tokens, metadata=metadata, ) ) - # handle sections + # ---- SECTIONS ---- for section in structured_document.sections: chunks.extend( _process_section( - section, parent_path=[], max_tokens=max_tokens, metadata=metadata + section, + parent_path=[], + max_tokens=max_tokens, + min_tokens=min_tokens, + metadata=metadata, ) ) + # ---- FINAL CLEANUP ---- + chunks = _deduplicate_chunks(chunks) + return chunks @@ -50,6 +62,7 @@ def _chunk_paragraphs( section_path: List[str], level: int, max_tokens: int, + min_tokens: int, metadata: dict, ) -> List[Chunk]: @@ -107,20 +120,21 @@ def _chunk_paragraphs( ) ) - return _merge_small_chunks(chunks, metadata) + return _merge_small_chunks(chunks, metadata, min_tokens, max_tokens) def _process_section( - section, + section: Section, parent_path: List[str], max_tokens: int, + min_tokens: int, metadata: dict, ) -> List[Chunk]: path = parent_path + [section.title] chunks: List[Chunk] = [] - # chunk this section's paragraphs + # ---- TEXT CHUNKS ---- if section.paragraphs: chunks.extend( _chunk_paragraphs( @@ -129,18 +143,52 @@ def _process_section( section_path=path, level=section.level, max_tokens=max_tokens, + min_tokens=min_tokens, + metadata=metadata, + ) + ) + + # ---- TABLE CHUNKS ---- + if section.tables: + chunks.extend( + _build_table_chunks_from_section( + section=section, + section_path=path, metadata=metadata, ) ) - # recursively process children + # ---- CHILD SECTIONS ---- for child in section.children: chunks.extend( _process_section( - child, parent_path=path, max_tokens=max_tokens, metadata=metadata + child, + parent_path=path, + max_tokens=max_tokens, + min_tokens=min_tokens, + metadata=metadata, ) ) + if ( + not section.paragraphs + and not section.tables + and not section.children + and section.title.strip() + ): + if not _is_pure_category_title(section.title): + chunks.append( + _build_chunk( + text=section.title.strip(), + section_title=section.title, + section_path=path, + level=section.level, + page_start=section.page_number, + page_end=section.page_number, + metadata=metadata, + ) + ) + return chunks @@ -168,40 +216,96 @@ def _build_chunk( def _merge_small_chunks( - chunks: List[Chunk], metadata: dict, min_tokens: int = 80 + chunks: List[Chunk], + metadata: dict, + min_tokens: int, + max_tokens: int, ) -> List[Chunk]: + if not chunks: - return chunks + return [] merged = [] buffer = chunks[0] for chunk in chunks[1:]: + # If buffer too small, try merging if buffer.token_count < min_tokens: combined_text = buffer.text + "\n" + chunk.text - buffer = _build_chunk( - combined_text, - buffer.section_title, - buffer.section_path, - buffer.level, - buffer.page_start, - chunk.page_end, - metadata=metadata, - ) - else: - merged.append(buffer) - buffer = chunk + combined_tokens = count_tokens(combined_text) + + # Only merge if we stay under max_tokens + if combined_tokens <= max_tokens: + buffer = _build_chunk( + combined_text, + buffer.section_title, + buffer.section_path, + buffer.level, + buffer.page_start, + chunk.page_end, + metadata, + ) + continue + + # Otherwise flush buffer + merged.append(buffer) + buffer = chunk merged.append(buffer) return merged -def _looks_like_toc(text: str) -> bool: - lines = text.split("\n") - digit_lines = sum(1 for lin in lines if lin.strip().split()[-1].isdigit()) - dotted_lines = sum(1 for lin in lines if "..." in lin or ". ." in lin) +def _deduplicate_chunks(chunks: List[Chunk]) -> List[Chunk]: + seen = set() + unique = [] + + for chunk in chunks: + normalized = chunk.text.strip() + + if normalized in seen: + continue + + seen.add(normalized) + unique.append(chunk) + + return unique + + +def _build_table_chunks_from_section( + section: Section, + section_path: List[str], + metadata: dict, +) -> List[Chunk]: + + chunks = [] + + for table in section.tables: + table_metadata = metadata.copy() + table_metadata["_content_type"] = "table" + table_json = json.dumps(table, ensure_ascii=False) + + chunks.append( + Chunk( + id=str(uuid.uuid4()), + text=table_json, + token_count=count_tokens(table_json), + section_title=section.title, + section_path=section_path, + level=section.level, + page_start=section.page_number, + page_end=section.page_number, + metadata=table_metadata, + ) + ) + + return chunks + + +def _is_pure_category_title(title: str) -> bool: + clean = title.strip() - if len(lines) == 0: - return False + # If fully uppercase and short → likely category + if clean.isupper() and len(clean.split()) <= 3: + return True - return (digit_lines / len(lines)) > 0.4 or (dotted_lines / len(lines)) > 0.3 + return False diff --git a/rag-engine/src/layers/data_extractor/extractor.py b/rag-engine/src/layers/data_extractor/extractor.py index be00a01..df3d4a2 100644 --- a/rag-engine/src/layers/data_extractor/extractor.py +++ b/rag-engine/src/layers/data_extractor/extractor.py @@ -13,30 +13,37 @@ LINE_TOLERANCE = 3 # vertical tolerance for grouping words into lines TABLE_PADDING = 1.5 # small padding around table bbox to catch overlaps + # =============================== # PUBLIC ENTRY # =============================== -def pdf(pdf_bytes: bytes) -> list[Page]: +def pdf(pdf_bytes: bytes) -> tuple[list[Page], dict]: pages_output: list[Page] = [] + metadata = {} try: with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc: + metadata["_document_id"] = str(uuid.uuid4()) + metadata["_file_type"] = "pdf" + metadata["_page_count"] = len(pdf_doc.pages) + metadata["_file_metadata"] = pdf_doc.metadata + for page_number, page in enumerate(pdf_doc.pages, start=1): - tables_output = extract_tables(page) + tables_output = _extract_tables(page) table_bboxes = [ - expand_bbox(table.bbox, padding=TABLE_PADDING) + _expand_bbox(table.bbox, padding=TABLE_PADDING) for table in page.find_tables() ] - words = extract_words(page) - words = filter_table_words(words, table_bboxes) + words = _extract_words(page) + words = _filter_table_words(words, table_bboxes) - lines_output = group_words_into_lines(words) + lines_output = _group_words_into_lines(words) raw_text = "\n".join(line.text for line in lines_output) - text = normalize_text(raw_text) + text = _normalize_text(raw_text) - images_output = extract_images(page) + images_output = _extract_images(page) pages_output.append( Page( @@ -50,19 +57,19 @@ def pdf(pdf_bytes: bytes) -> list[Page]: ) ) - return pages_output + return pages_output, metadata except Exception as e: raise ValueError(f"Error processing PDF: {e}") -def normalize_text(text: str) -> str: - text = fix_hyphen_breaks(text) - text = remove_page_numbers(text) - text = remove_dot_lines(text) - text = remove_lonely_symbols(text) - text = fix_merged_words(text) - text = normalize_spaces(text) +def _normalize_text(text: str) -> str: + text = _fix_hyphen_breaks(text) + text = _remove_page_numbers(text) + text = _remove_dot_lines(text) + text = _remove_lonely_symbols(text) + text = _fix_merged_words(text) + text = _normalize_spaces(text) text = "\n".join(line.rstrip() for line in text.splitlines()) text = re.sub(r"\n{3,}", "\n\n", text) @@ -70,7 +77,7 @@ def normalize_text(text: str) -> str: return text.strip() -def extract_words(page) -> List[Word]: +def _extract_words(page) -> List[Word]: raw_words = page.extract_words( x_tolerance=2, @@ -97,7 +104,7 @@ def extract_words(page) -> List[Word]: return words -def group_words_into_lines(words: List[Word]) -> List[Line]: +def _group_words_into_lines(words: List[Word]) -> List[Line]: if not words: return [] @@ -152,7 +159,7 @@ def group_words_into_lines(words: List[Word]) -> List[Line]: return lines_output -def extract_tables(page): +def _extract_tables(page): tables_output = [] @@ -167,7 +174,7 @@ def extract_tables(page): return tables_output -def extract_images(page): +def _extract_images(page): images_output: list[ImagePage] = [] @@ -187,19 +194,19 @@ def extract_images(page): return images_output -def fix_hyphen_breaks(text: str) -> str: +def _fix_hyphen_breaks(text: str) -> str: return re.sub(r"-\n(\w)", r"\1", text) -def remove_page_numbers(text: str) -> str: +def _remove_page_numbers(text: str) -> str: return "\n".join(line for line in text.splitlines() if not line.strip().isdigit()) -def normalize_spaces(text: str) -> str: +def _normalize_spaces(text: str) -> str: return re.sub(r"[ \t]+", " ", text) -def remove_dot_lines(text: str) -> str: +def _remove_dot_lines(text: str) -> str: return "\n".join( line for line in text.splitlines() @@ -207,26 +214,27 @@ def remove_dot_lines(text: str) -> str: ) -def remove_lonely_symbols(text: str) -> str: +def _remove_lonely_symbols(text: str) -> str: return "\n".join(line for line in text.splitlines() if len(line.strip()) > 2) -def fix_merged_words(text: str) -> str: +def _fix_merged_words(text: str) -> str: return re.sub(r"([a-z])([A-Z])", r"\1 \2", text) -def expand_bbox(bbox, padding=1.0): + +def _expand_bbox(bbox, padding=1.0): x0, top, x1, bottom = bbox return (x0 - padding, top - padding, x1 + padding, bottom + padding) -def filter_table_words(words: list[Word], table_bboxes: list[tuple]) -> list[Word]: + +def _filter_table_words(words: list[Word], table_bboxes: list[tuple]) -> list[Word]: filtered = [] for word in words: - if not any(is_inside_bbox(word, bbox) for bbox in table_bboxes): + if not any(_is_inside_bbox(word, bbox) for bbox in table_bboxes): filtered.append(word) return filtered -def is_inside_bbox(word: Word, bbox) -> bool: + +def _is_inside_bbox(word: Word, bbox) -> bool: x0, top, x1, bottom = bbox - return ( - word.x0 >= x0 and word.x1 <= x1 and word.top >= top and word.bottom <= bottom - ) + return word.x0 >= x0 and word.x1 <= x1 and word.top >= top and word.bottom <= bottom diff --git a/rag-engine/src/process/controller.py b/rag-engine/src/process/controller.py index f8e38c0..b06434d 100644 --- a/rag-engine/src/process/controller.py +++ b/rag-engine/src/process/controller.py @@ -1,10 +1,11 @@ +import os from fastapi.responses import JSONResponse from fastapi import APIRouter, File, Form, HTTPException, Path, UploadFile, status import requests import json from src.process.service import processFile from . import models - +from urllib.parse import urlparse router = APIRouter(prefix="/process", tags=["Process"]) @@ -44,7 +45,9 @@ async def process( raise HTTPException( status.HTTP_400_BAD_REQUEST, "URL does not point to a PDF file" ) - + parsed = urlparse(url) + filename = os.path.basename(parsed.path) or "unkown" + meta["_source_file"] = filename data = processFile(models.FileType.pdf, resp.content, meta) return JSONResponse(content=data, status_code=status.HTTP_200_OK) if input_mode == models.InputMode.file: @@ -53,6 +56,7 @@ async def process( status.HTTP_422_UNPROCESSABLE_CONTENT, "Must upload a file when input_mode is 'file'", ) + meta["_source_file"] = upload.filename data_bytes = await upload.read() data = processFile(models.FileType.pdf, data_bytes, meta) return JSONResponse(content=data, status_code=status.HTTP_200_OK) diff --git a/rag-engine/src/process/service.py b/rag-engine/src/process/service.py index 727acaf..4092cf2 100644 --- a/rag-engine/src/process/service.py +++ b/rag-engine/src/process/service.py @@ -9,9 +9,11 @@ def processFile(fileType: models.FileType, file_bytes: bytes, metadata: dict): if fileType == models.FileType.pdf: logging.info("start processing pdf files") - pages = extractor.pdf(file_bytes) + pages, extractor_meta = extractor.pdf(file_bytes) structured_document = analyze_layout(pages) - chunks = chunk_document(structured_document, metadata, max_tokens=400) + chunks = chunk_document( + structured_document, extractor_meta | metadata, max_tokens=400 + ) logging.info(f"pdf data extracted pages: {len(pages)}") return [chunk.model_dump() for chunk in chunks]