diff --git a/rag-engine/src/layers/chunking_embedding/chunk_document.py b/rag-engine/src/layers/chunking_embedding/chunk_document.py index 58cb63a..2de8232 100644 --- a/rag-engine/src/layers/chunking_embedding/chunk_document.py +++ b/rag-engine/src/layers/chunking_embedding/chunk_document.py @@ -261,7 +261,9 @@ def _merge_small_chunks( # Merge if either side is small if prev.token_count < min_tokens or chunk.token_count < min_tokens: - combined_text = prev.text + "\n" + chunk.text + combined_text = ( + prev.text + "." + "\n" + chunk.section_path[-1] + ":\n" + chunk.text + ) combined_tokens = count_tokens(combined_text) if combined_tokens <= max_tokens: diff --git a/rag-engine/src/layers/chunking_embedding/embedding.py b/rag-engine/src/layers/chunking_embedding/embedding.py index f3b1f9e..5e70233 100644 --- a/rag-engine/src/layers/chunking_embedding/embedding.py +++ b/rag-engine/src/layers/chunking_embedding/embedding.py @@ -8,12 +8,25 @@ _executor = ThreadPoolExecutor(max_workers=os.cpu_count() or 4) -def embed_chunks(chunks: List[Chunk], batch_size: int = 64) -> List[Chunk]: +def embed_chunks(chunks: List[Chunk], batch_size: int = 64) -> List[Chunk]: for i in range(0, len(chunks), batch_size): - batch = chunks[i : i + batch_size] - texts = [f"passage: {c.text.strip()}" for c in batch] + + texts = [] + for c in batch: + path = " > ".join(c.section_path) if c.section_path else "" + title = c.section_title or "" + + text = f"""passage: +Title: {title} +Path: {path} +File Name : {c.metadata["_source_file"]} +page: {c.page_start} - {c.page_end} + +{c.text.strip()} +""" + texts.append(text) def dense_task(): return list(dense_embedding.embed(texts)) @@ -28,9 +41,7 @@ def sparse_task(): sparse_vectors = future_sparse.result() for chunk, dv, sv in zip(batch, dense_vectors, sparse_vectors): - chunk.dense_vectors = dv.tolist() - chunk.sparse_vectors = models.SparseVector( indices=sv.indices.tolist(), values=sv.values.tolist(), diff --git a/rag-engine/src/layers/data_extractor/extractor/csv.py b/rag-engine/src/layers/data_extractor/extractor/csv.py new file mode 100644 index 0000000..c2ab37d --- /dev/null +++ b/rag-engine/src/layers/data_extractor/extractor/csv.py @@ -0,0 +1,104 @@ +import csv +import io +import uuid +from src.layers.data_extractor.models import Line, Page, TablePage + + +def extract_data_csv(csv_bytes: bytes) -> tuple[list[Page], dict]: + metadata: dict[str, object] = { + "_file_type": "csv", + "_page_count": 1, + } + + text = csv_bytes.decode("utf-8", errors="replace") + + lines, tables = _parse_csv(text, metadata) + + page = Page( + page_number=1, + text="\n".join(line.text for line in lines), + lines=lines, + tables=tables, + images=[], + width=None, + height=None, + ) + + return [page], metadata + + +def _parse_csv( + csv_text: str, + metadata: dict[str, object], +) -> tuple[list[Line], list[TablePage]]: + + reader = csv.reader(io.StringIO(csv_text)) + rows = list(reader) + + if not rows: + return [], [] + + header = rows[0] + body = rows[1:] + + metadata["_csv_columns"] = header + metadata["_csv_row_count"] = len(body) + + lines: list[Line] = [] + tables: list[TablePage] = [] + + y = 0.0 + line_gap = 14.0 + + # ---------- Optional title line ---------- + lines.append( + Line( + text="CSV Table", + words=[], + top=y, + avg_size=22, + is_bold=True, + x0=0, + x1=300, + bottom=y + 22, + ) + ) + y += line_gap * 2 + + # ---------- Table ---------- + data: list[list[str | None]] = [[c if c != "" else None for c in header]] + + for row in body: + data.append([c if c != "" else None for c in row]) + + tables.append( + TablePage( + id=str(uuid.uuid4()), + bbox=(0, y, 0, y), + data=data, + top=y, + x0=0, + x1=800, + bottom=y, + page_number=1, + ) + ) + + # ---------- Text lines (for layout + headings) ---------- + for _, row in enumerate(body[:50]): # cap for readability + text = ", ".join(f"{header[i]}: {row[i]}" for i in range(len(header))) + lines.append( + Line( + text=text, + words=[], + top=y, + avg_size=12, + is_bold=False, + x0=20, + x1=800, + bottom=y + 12, + ) + ) + y += line_gap + + return lines, tables diff --git a/rag-engine/src/layers/data_extractor/extractor/json.py b/rag-engine/src/layers/data_extractor/extractor/json.py new file mode 100644 index 0000000..e995375 --- /dev/null +++ b/rag-engine/src/layers/data_extractor/extractor/json.py @@ -0,0 +1,120 @@ +import json +import uuid +from src.layers.data_extractor.models import Line, Page, TablePage + + +def extract_data_json(json_bytes: bytes) -> tuple[list[Page], dict]: + metadata: dict[str, object] = { + "_file_type": "json", + "_page_count": 1, + } + + text = json_bytes.decode("utf-8", errors="replace") + data = json.loads(text) + + lines: list[Line] = [] + tables: list[TablePage] = [] + + y = 0.0 + + _walk_json( + data=data, + path=[], + lines=lines, + tables=tables, + y_ref=[y], + page_number=1, + ) + + page = Page( + page_number=1, + text="\n".join(lin.text for lin in lines), + lines=lines, + tables=tables, + images=[], + width=None, + height=None, + ) + + return [page], metadata + + +def _walk_json( + data, + path: list[str], + lines: list[Line], + tables: list[TablePage], + y_ref: list[float], + page_number: int, +): + y = y_ref[0] + + # ---------- dict ---------- + if isinstance(data, dict): + for key, value in data.items(): # 🔒 insertion order preserved + title = ".".join(path + [str(key)]) + + lines.append( + Line( + text=title, + words=[], + top=y, + avg_size=18, + is_bold=True, + x0=len(path) * 20, + x1=800, + bottom=y + 18, + ) + ) + y += 18 + + _walk_json(value, path + [str(key)], lines, tables, [y], page_number) + y = y_ref[0] + + # ---------- list ---------- + elif isinstance(data, list): + if data and all(isinstance(x, dict) for x in data): + # table-like list + headers = list(data[0].keys()) + + rows: list[list[str | None]] = [] + for item in data: # 🔒 list order preserved + rows.append([ + str(item.get(h)) if item.get(h) is not None else None + for h in headers + ]) + + tables.append( + TablePage( + id=str(uuid.uuid4()), + bbox=(0, y, 0, y), + data=[headers] + rows, + top=y, + x0=0, + x1=800, + bottom=y, + page_number=page_number, + ) + ) + y += 14 * (len(rows) + 1) + + else: + for idx, item in enumerate(data): + _walk_json(item, path + [str(idx)], lines, tables, [y], page_number) + y = y_ref[0] + + # ---------- scalar ---------- + else: + lines.append( + Line( + text=str(data), + words=[], + top=y, + avg_size=12, + is_bold=False, + x0=len(path) * 20, + x1=800, + bottom=y + 12, + ) + ) + y += 12 diff --git a/rag-engine/src/layers/data_extractor/extractor/md.py b/rag-engine/src/layers/data_extractor/extractor/md.py new file mode 100644 index 0000000..2bb777e --- /dev/null +++ b/rag-engine/src/layers/data_extractor/extractor/md.py @@ -0,0 +1,196 @@ +import re +import uuid +from src.layers.data_extractor.models import ImagePage, Line, Page, TablePage +import yaml + +JSX_SELF_CLOSING = re.compile(r"<[A-Z][^>/]*/>") +JSX_OPEN_CLOSE = re.compile(r"]*>") +CODE_BLOCK_RE = re.compile( + r"```(\w+)?\n(.*?)\n```", + re.DOTALL, +) + + +def extract_data_md(file_bytes: bytes) -> tuple[list[Page], dict]: + md_text = file_bytes.decode("utf-8", errors="ignore") + metadata: dict[str, object] = { + "_page_count": 1, + } + + md_text, fm = _extract_frontmatter(md_text) + metadata.update(fm) + + md_text = _strip_mdx_jsx(md_text) + + lines, tables, images = _parse_md_stream(md_text) + + page = Page( + page_number=1, + text="\n".join(line.text for line in lines), + lines=lines, + tables=tables, + images=images, + width=None, + height=None, + ) + + return [page], metadata + + +def _parse_md_stream(md_text: str): + lines_out: list[Line] = [] + tables: list[TablePage] = [] + images: list[ImagePage] = [] + + y = 0.0 + line_gap = 14.0 + i = 0 + rows = md_text.splitlines() + + while i < len(rows): + raw = rows[i] + stripped = raw.strip() + + # ---------- Code blocks ---------- + if stripped.startswith("```"): + lang = stripped[3:].strip() or "text" + body = [] + i += 1 + while i < len(rows) and not rows[i].strip().startswith("```"): + body.append(rows[i]) + i += 1 + + tag = "DIAGRAM" if lang.lower() == "mermaid" else "CODE" + lines_out.append( + Line( + text=f"[{tag}:{lang}]", + words=[], + top=y, + avg_size=12, + is_bold=True, + x0=0, + x1=400, + bottom=y + 12, + ) + ) + y += line_gap + + for b in body: + lines_out.append( + Line( + text=b, + words=[], + top=y, + avg_size=11, + is_bold=False, + x0=20, + x1=600, + bottom=y + 11, + ) + ) + y += line_gap + + i += 1 + continue + + # ---------- Tables (GFM) ---------- + if "|" in raw and i + 1 < len(rows) and "---" in rows[i + 1]: + header = [c.strip() or None for c in raw.split("|") if c.strip()] + data: list[list[str | None]] = [header] + i += 2 + + while i < len(rows) and "|" in rows[i]: + row = [c.strip() or None for c in rows[i].split("|") if c.strip()] + data.append(row) + i += 1 + + tables.append( + TablePage( + id=str(uuid.uuid4()), + bbox=(0, y, 0, y), + data=data, + top=y, + x0=0, + x1=600, + bottom=y, + page_number=1, + ) + ) + y += line_gap + continue + + # ---------- Images ---------- + img_match = re.search(r"!\[.*?\]\((.*?)\)", raw) + if img_match: + images.append( + ImagePage( + id=str(uuid.uuid4()), + x0=0, + top=y, + x1=0, + bottom=y, + width=None, + height=None, + ) + ) + + # ---------- Normal text / headings ---------- + text = raw.rstrip() + if not stripped: + y += line_gap + i += 1 + continue + + level = len(stripped) - len(stripped.lstrip("#")) + is_heading = level > 0 and stripped[level : level + 1] == " " + + clean = stripped[level + 1 :] if is_heading else stripped + avg_size = 26 - level * 2 if is_heading else 12 + is_bold = is_heading or "**" in stripped or "__" in stripped + indent = len(text) - len(text.lstrip()) + x0 = indent * 4 + + lines_out.append( + Line( + text=clean, + words=[], + top=y, + avg_size=avg_size, + is_bold=is_bold, + x0=x0, + x1=x0 + len(clean) * 6, + bottom=y + avg_size, + ) + ) + + y += line_gap + i += 1 + + return lines_out, tables, images + + +def _extract_frontmatter(md_text: str) -> tuple[str, dict[str, object]]: + if not md_text.startswith("---"): + return md_text, {} + + match = re.match(r"^---\n(.*?)\n---\n?", md_text, re.DOTALL) + if not match: + return md_text, {} + + raw_yaml = match.group(1) + body = md_text[match.end() :] + + try: + data = yaml.safe_load(raw_yaml) or {} + if not isinstance(data, dict): + data = {} + except Exception: + data = {} + + return body, data + + +def _strip_mdx_jsx(md_text: str) -> str: + md_text = JSX_SELF_CLOSING.sub("", md_text) + md_text = JSX_OPEN_CLOSE.sub("", md_text) + return md_text diff --git a/rag-engine/src/layers/data_extractor/extractor/pdf.py b/rag-engine/src/layers/data_extractor/extractor/pdf.py index 0b838f4..9015417 100644 --- a/rag-engine/src/layers/data_extractor/extractor/pdf.py +++ b/rag-engine/src/layers/data_extractor/extractor/pdf.py @@ -17,13 +17,12 @@ # =============================== # PUBLIC ENTRY # =============================== -def extract_data(pdf_bytes: bytes) -> tuple[list[Page], dict]: +def extract_data_pdf(pdf_bytes: bytes) -> tuple[list[Page], dict]: pages_output: list[Page] = [] metadata = {} try: - with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc: - metadata["_file_type"] = "pdf" + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc: metadata["_page_count"] = len(pdf_doc.pages) metadata["_file_metadata"] = pdf_doc.metadata diff --git a/rag-engine/src/layers/data_extractor/extractor/xls.py b/rag-engine/src/layers/data_extractor/extractor/xls.py new file mode 100644 index 0000000..f9e889f --- /dev/null +++ b/rag-engine/src/layers/data_extractor/extractor/xls.py @@ -0,0 +1,193 @@ +from io import BytesIO +import uuid + +from openpyxl import load_workbook +import xlrd + +from src.layers.data_extractor.models import Line, Page, TablePage + + +def extract_data_excel(excel_bytes: bytes) -> tuple[list[Page], dict]: + """ + Extract XLS / XLSX while preserving strict stream order: + workbook → sheets → rows → cells + """ + + pages: list[Page] = [] + page_number = 1 + + metadata: dict[str, object] = { + "_file_type": "excel", + "_page_count": 0, + } + + # -------- Try XLSX first -------- + try: + wb = load_workbook( + filename=BytesIO(excel_bytes), + read_only=True, + data_only=True, + ) + + for sheet in wb.worksheets: # 🔒 sheet order preserved + page = _extract_xlsx_sheet(sheet, page_number) + pages.append(page) + page_number += 1 + + # -------- Fallback to XLS -------- + except Exception: + book = xlrd.open_workbook(file_contents=excel_bytes) + + for idx in range(book.nsheets): # 🔒 sheet order preserved + sheet = book.sheet_by_index(idx) + page = _extract_xls_sheet(sheet, page_number) + pages.append(page) + page_number += 1 + + metadata["_page_count"] = len(pages) + return pages, metadata + + +def _extract_xlsx_sheet(sheet, page_number: int) -> Page: + lines: list[Line] = [] + tables: list[TablePage] = [] + + y = 0.0 + + rows = list(sheet.iter_rows(values_only=True)) # 🔒 row order preserved + + if not rows: + return Page( + page_number=page_number, + text="", + lines=[], + tables=[], + images=[], + width=None, + height=None, + ) + + headers: list[str | None] = [str(c) if c is not None else None for c in rows[0]] + + table_rows: list[list[str | None]] = [] + + for row in rows[1:]: # 🔒 row order preserved + table_rows.append([ + str(cell) if cell is not None else None + for cell in row # 🔒 cell order preserved + ]) + + tables.append( + TablePage( + id=str(uuid.uuid4()), + bbox=(0, y, 0, y), + data=[headers] + table_rows, + top=y, + x0=0, + x1=800, + bottom=y + 14 * (len(table_rows) + 1), + page_number=page_number, + ) + ) + + # Optional text representation (useful for search) + for row in rows: + line_text = " | ".join(str(cell) if cell is not None else "" for cell in row) + + lines.append( + Line( + text=line_text, + words=[], + top=y, + avg_size=12, + is_bold=False, + x0=0, + x1=800, + bottom=y + 12, + ) + ) + y += 12 + + return Page( + page_number=page_number, + text="\n".join(lin.text for lin in lines), + lines=lines, + tables=tables, + images=[], + width=None, + height=None, + ) + + +def _extract_xls_sheet(sheet, page_number: int) -> Page: + lines: list[Line] = [] + tables: list[TablePage] = [] + + y = 0.0 + + if sheet.nrows == 0: + return Page( + page_number=page_number, + text="", + lines=[], + tables=[], + images=[], + width=None, + height=None, + ) + + headers: list[str | None] = [ + str(sheet.cell_value(0, col)) if sheet.cell_value(0, col) != "" else None + for col in range(sheet.ncols) + ] + + table_rows: list[list[str | None]] = [] + + for r in range(1, sheet.nrows): # 🔒 row order preserved + table_rows.append([ + str(sheet.cell_value(r, c)) if sheet.cell_value(r, c) != "" else None + for c in range(sheet.ncols) # 🔒 cell order preserved + ]) + + tables.append( + TablePage( + id=str(uuid.uuid4()), + bbox=(0, y, 0, y), + data=[headers] + table_rows, + top=y, + x0=0, + x1=800, + bottom=y + 14 * (len(table_rows) + 1), + page_number=page_number, + ) + ) + + for r in range(sheet.nrows): + row_text = " | ".join( + str(sheet.cell_value(r, c)) if sheet.cell_value(r, c) != "" else "" + for c in range(sheet.ncols) + ) + + lines.append( + Line( + text=row_text, + words=[], + top=y, + avg_size=12, + is_bold=False, + x0=0, + x1=800, + bottom=y + 12, + ) + ) + y += 12 + + return Page( + page_number=page_number, + text="\n".join(lin.text for lin in lines), + lines=lines, + tables=tables, + images=[], + width=None, + height=None, + ) diff --git a/rag-engine/src/layers/structure_analyzer/analyzer/pdf.py b/rag-engine/src/layers/structure_analyzer/analyzer.py similarity index 100% rename from rag-engine/src/layers/structure_analyzer/analyzer/pdf.py rename to rag-engine/src/layers/structure_analyzer/analyzer.py diff --git a/rag-engine/src/layers/structure_analyzer/analyzer/__init__.py b/rag-engine/src/layers/structure_analyzer/analyzer/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/rag-engine/src/query/service.py b/rag-engine/src/query/service.py index 49106d0..2bafd43 100644 --- a/rag-engine/src/query/service.py +++ b/rag-engine/src/query/service.py @@ -59,7 +59,7 @@ def query( hits = _normalize_scores(hits) logging.info(f"hits after normalized scores : {len(hits)}") - hits = _rerank(expanded_queries[0], hits, math.floor(final_top_k / 2) + 1) + hits = _rerank(expanded_queries[0], hits, final_top_k) logging.info( f"hits after reranking : {len(hits)} with final_top: {math.floor(final_top_k / 2) + 1}" ) diff --git a/rag-engine/src/store/controllers/csv.py b/rag-engine/src/store/controllers/csv.py new file mode 100644 index 0000000..d9fc92f --- /dev/null +++ b/rag-engine/src/store/controllers/csv.py @@ -0,0 +1,83 @@ +import os +from urllib.parse import urlparse +import requests +import hashlib +from typing import Optional +from fastapi import File, Form, HTTPException, UploadFile, status +from src.common.utils import document_exists, parse_metadata +from src.layers.data_extractor.extractor.csv import extract_data_csv +from src.store import service +from src.store.controllers.utils import assert_csv + + +async def upload( + upload: UploadFile = File(..., description="CSV file to upload"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + meta = parse_metadata(metadata) + meta["_source_file"] = upload.filename + + data_bytes = await upload.read() + assert_csv(data_bytes) + + file_hash = hashlib.sha256(data_bytes).hexdigest() + + meta["_file_hash"] = file_hash + meta["_file_type"] = "csv" + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_csv) + + +def with_url( + url: str = Form(..., description="Link to fetch CSV"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + resp = requests.get(url, timeout=10) + resp.raise_for_status() + + content_type = resp.headers.get("Content-Type", "").lower() + if not any(t in content_type for t in ["text/csv", "text/plain"]): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "URL does not point to a CSV file", + ) + data_bytes = resp.content + assert_csv(data_bytes) + + parsed = urlparse(url) + filename = os.path.basename(parsed.path) or "unknown.csv" + + meta = parse_metadata(metadata) + meta["_source_file"] = filename + meta["_file_type"] = "csv" + + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_csv) diff --git a/rag-engine/src/store/controllers/json.py b/rag-engine/src/store/controllers/json.py new file mode 100644 index 0000000..e27ae01 --- /dev/null +++ b/rag-engine/src/store/controllers/json.py @@ -0,0 +1,83 @@ +import os +from urllib.parse import urlparse +import requests +import hashlib +from typing import Optional +from fastapi import File, Form, HTTPException, UploadFile, status +from src.common.utils import document_exists, parse_metadata +from src.layers.data_extractor.extractor.json import extract_data_json +from src.store import service +from src.store.controllers.utils import assert_json + + +async def upload( + upload: UploadFile = File(..., description="Json file to upload"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + meta = parse_metadata(metadata) + meta["_source_file"] = upload.filename + + data_bytes = await upload.read() + assert_json(data_bytes) + + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + meta["_file_type"] = "json" + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_json) + + +def with_url( + url: str = Form(..., description="Link to fetch Json"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + resp = requests.get(url, timeout=10) + resp.raise_for_status() + + content_type = resp.headers.get("Content-Type", "").lower() + if not any(t in content_type for t in ["application/json", "text/plain"]): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "URL does not point to a JSON file", + ) + + data_bytes = resp.content + assert_json(data_bytes) + + parsed = urlparse(url) + filename = os.path.basename(parsed.path) or "unknown.json" + + meta = parse_metadata(metadata) + meta["_source_file"] = filename + meta["_file_type"] = "json" + + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_json) diff --git a/rag-engine/src/store/controllers/md.py b/rag-engine/src/store/controllers/md.py new file mode 100644 index 0000000..372381e --- /dev/null +++ b/rag-engine/src/store/controllers/md.py @@ -0,0 +1,83 @@ +import os +from urllib.parse import urlparse +import requests +import hashlib +from typing import Optional +from fastapi import File, Form, HTTPException, UploadFile, status +from src.common.utils import document_exists, parse_metadata +from src.layers.data_extractor.extractor.md import extract_data_md +from src.store import service +from src.store.controllers.utils import assert_markdown + + +async def upload( + upload: UploadFile = File(..., description="Markdown / MDX file to upload"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + meta = parse_metadata(metadata) + meta["_source_file"] = upload.filename + + data_bytes = await upload.read() + assert_markdown(data_bytes) + + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + meta["_file_type"] = "md" + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_md) + + +def with_url( + url: str = Form(..., description="Link to fetch Markdown / MDX"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + resp = requests.get(url, timeout=10) + resp.raise_for_status() + + content_type = resp.headers.get("Content-Type", "").lower() + if not any(t in content_type for t in ["text/markdown", "text/plain"]): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "URL does not point to a Markdown / MDX file", + ) + + data_bytes = resp.content + assert_markdown(data_bytes) + + parsed = urlparse(url) + filename = os.path.basename(parsed.path) or "unknown.md" + + meta = parse_metadata(metadata) + meta["_source_file"] = filename + meta["_file_type"] = "md" + + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_md) diff --git a/rag-engine/src/store/controllers/pdf.py b/rag-engine/src/store/controllers/pdf.py index 356834d..87b022d 100644 --- a/rag-engine/src/store/controllers/pdf.py +++ b/rag-engine/src/store/controllers/pdf.py @@ -5,8 +5,11 @@ import requests from src.common.utils import document_exists from src.common.utils import parse_metadata -from src.store.services import pdf +from src.layers.data_extractor.extractor.pdf import extract_data_pdf from urllib.parse import urlparse +from src.store import service +from src.store.controllers.utils import assert_pdf + async def upload( upload: UploadFile = File(..., description="The file to upload"), @@ -14,8 +17,11 @@ async def upload( ): meta = parse_metadata(metadata) meta["_source_file"] = upload.filename + meta["_file_type"] = "pdf" data_bytes = await upload.read() + assert_pdf(data_bytes) + file_hash = hashlib.sha256(data_bytes).hexdigest() meta["_file_hash"] = file_hash user_id = meta.get("_user_id") @@ -29,23 +35,27 @@ async def upload( status_code=status.HTTP_409_CONFLICT, detail="Document already uploaded", ) - return pdf.handle(data_bytes, meta) - + return service.handle(data_bytes, meta, extract_data_pdf) def with_url( - url: str = Form(..., description="Link to fetch"), + url: str = Form(..., description="Link to fetch pdf"), metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), ): resp = requests.get(url, timeout=10) resp.raise_for_status() if "application/pdf" not in resp.headers.get("Content-Type", ""): - raise HTTPException(status.HTTP_400_BAD_REQUEST,"URL does not point to a PDF file") + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "URL does not point to a PDF file" + ) + data_bytes = resp.content + assert_pdf(data_bytes) parsed = urlparse(url) meta = parse_metadata(metadata) - filename = os.path.basename(parsed.path) or "unkown" - data_bytes = resp.content + filename = os.path.basename(parsed.path) or "unkown.pdf" + meta["_source_file"] = filename + meta["_file_type"] = "pdf" file_hash = hashlib.sha256(data_bytes).hexdigest() meta["_file_hash"] = file_hash user_id = meta.get("_user_id") @@ -59,4 +69,4 @@ def with_url( status_code=status.HTTP_409_CONFLICT, detail="Document already uploaded", ) - return pdf.handle(data_bytes, meta) + return service.handle(data_bytes, meta, extract_data_pdf) diff --git a/rag-engine/src/store/controllers/sheet.py b/rag-engine/src/store/controllers/sheet.py new file mode 100644 index 0000000..f0e82bc --- /dev/null +++ b/rag-engine/src/store/controllers/sheet.py @@ -0,0 +1,89 @@ +import hashlib +import os +from typing import Optional +from urllib.parse import urlparse +from fastapi import File, Form, HTTPException, UploadFile, status +import requests +from src.common.utils import document_exists, parse_metadata +from src.layers.data_extractor.extractor.xls import extract_data_excel +from src.store import service +from src.store.controllers.utils import assert_sheet + + +async def upload( + upload: UploadFile = File(..., description="Sheet file to upload"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + meta = parse_metadata(metadata) + meta["_source_file"] = upload.filename + + data_bytes = await upload.read() + + assert_sheet(data_bytes) + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + meta["_file_type"] = "sheet" + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_excel) + + +def with_url( + url: str = Form(..., description="Link to fetch sheet"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + resp = requests.get(url, timeout=10) + resp.raise_for_status() + + content_type = resp.headers.get("Content-Type", "").lower() + if not any( + t in content_type + for t in [ + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ] + ): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "URL does not point to a XLS/Sheet file", + ) + + data_bytes = resp.content + assert_sheet(data_bytes) + + parsed = urlparse(url) + filename = os.path.basename(parsed.path) or "unknown.md" + + meta = parse_metadata(metadata) + meta["_source_file"] = filename + meta["_file_type"] = "sheet" + + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + + return service.handle(data_bytes, meta, extract_data_excel) diff --git a/rag-engine/src/store/controllers/utils.py b/rag-engine/src/store/controllers/utils.py index 1fcd97b..5a1c7f5 100644 --- a/rag-engine/src/store/controllers/utils.py +++ b/rag-engine/src/store/controllers/utils.py @@ -1,6 +1,9 @@ import json +import csv +import io from fastapi import HTTPException, status from qdrant_client.models import Optional +from openpyxl import load_workbook def parse_metadata(metadata_str: Optional[str]) -> dict: @@ -14,3 +17,54 @@ def parse_metadata(metadata_str: Optional[str]) -> dict: detail="Invalid JSON in metadata field", ) return {} + + +def assert_pdf(data: bytes): + if not data.startswith(b"%PDF-"): + raise HTTPException(400, "Invalid PDF file") + + +def assert_markdown(data: bytes): + if data.startswith(b"%PDF-") or data.startswith(b"\x50\x4b"): + raise HTTPException(400, "Binary file is not Markdown") + + try: + text = data.decode("utf-8") + except UnicodeDecodeError: + raise HTTPException(400, "Markdown must be UTF-8 text") + + if "\x00" in text: + raise HTTPException(400, "Invalid Markdown content") + + +def assert_csv(data: bytes): + try: + text = data.decode("utf-8") + except UnicodeDecodeError: + raise HTTPException(400, "CSV must be UTF-8 text") + + reader = csv.reader(io.StringIO(text)) + rows = list(reader) + if not rows: + raise HTTPException(400, "Empty CSV") + + cols = len(rows[0]) + if any(len(r) != cols for r in rows): + raise HTTPException(400, "Malformed CSV") + + +def assert_json(data: bytes): + try: + json.loads(data.decode("utf-8")) + except Exception: + raise HTTPException(400, "Invalid JSON file") + + +def assert_sheet(data: bytes): + if not data.startswith(b"\x50\x4b"): + raise HTTPException(400, "Invalid XLSX file") + + try: + load_workbook(io.BytesIO(data), read_only=True) + except Exception: + raise HTTPException(400, "Corrupted XLSX file") diff --git a/rag-engine/src/store/routers.py b/rag-engine/src/store/routers.py index d594cac..931ce18 100644 --- a/rag-engine/src/store/routers.py +++ b/rag-engine/src/store/routers.py @@ -1,12 +1,12 @@ from fastapi import APIRouter, File, Form, UploadFile, status from qdrant_client.models import Optional from src.store.model import StoreResponse -from src.store.controllers import pdf +from src.store.controllers import pdf, md, csv, json, sheet store_upload_router = APIRouter(prefix="/store/upload", tags=["Store_Upload"]) store_url_router = APIRouter(prefix="/store/url", tags=["Store_URL"]) - +# 1. PDF @store_upload_router.post( "/pdf", summary="Store an uploaded PDF file", @@ -19,7 +19,6 @@ async def store_pdf_upload( ): return await pdf.upload(upload, metadata) - @store_url_router.post( "/pdf", summary="Store an uploaded PDF file", @@ -31,3 +30,95 @@ def store_pdf_with_url( metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), ): return pdf.with_url(url, metadata) + +# 2. MDX +@store_upload_router.post( + "/md", + summary="Store an uploaded Markdown / MDX file", + status_code=status.HTTP_200_OK, +) +async def store_md_upload( + upload: UploadFile = File(..., description="Markdown or MDX file to upload"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return await md.upload(upload, metadata) + +@store_url_router.post( + "/md", + summary="Store a Markdown / MDX file from URL", + status_code=status.HTTP_200_OK, +) +def store_md_with_url( + url: str = Form(..., description="Link to fetch Markdown / MDX"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return md.with_url(url, metadata) + +# 3. CSV +@store_upload_router.post( + "/csv", + summary="Store an uploaded CSV file", + status_code=status.HTTP_200_OK, +) +async def store_csv_upload( + upload: UploadFile = File(..., description="CSV file to upload"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return await csv.upload(upload, metadata) + +@store_url_router.post( + "/csv", + summary="Store a SCV file from URL", + status_code=status.HTTP_200_OK, +) +def store_csv_with_url( + url: str = Form(..., description="Link to fetch CSV"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return csv.with_url(url, metadata) + +# 4. JSON +@store_upload_router.post( + "/json", + summary="Store an uploaded JSON file", + status_code=status.HTTP_200_OK, +) +async def store_json_upload( + upload: UploadFile = File(..., description="JSON file to upload"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return await json.upload(upload, metadata) + +@store_url_router.post( + "/json", + summary="Store a JSON file from URL", + status_code=status.HTTP_200_OK, +) +def store_json_with_url( + url: str = Form(..., description="Link to fetch JSON"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return json.with_url(url, metadata) + +# 5. Sheet +@store_upload_router.post( + "/sheet", + summary="Store an uploaded sheet file", + status_code=status.HTTP_200_OK, +) +async def store_sheet_upload( + upload: UploadFile = File(..., description="Sheet file to upload"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return await sheet.upload(upload, metadata) + +@store_url_router.post( + "/sheet", + summary="Store a Sheet file from URL", + status_code=status.HTTP_200_OK, +) +def store_sheet_with_url( + url: str = Form(..., description="Link to fetch Sheet"), + metadata: Optional[str] = Form(None, description="Metadata for chunks (JSON)"), +): + return sheet.with_url(url, metadata) diff --git a/rag-engine/src/store/services/utils.py b/rag-engine/src/store/service.py similarity index 51% rename from rag-engine/src/store/services/utils.py rename to rag-engine/src/store/service.py index 29f597d..42f60a0 100644 --- a/rag-engine/src/store/services/utils.py +++ b/rag-engine/src/store/service.py @@ -1,10 +1,40 @@ -from fastapi import HTTPException, status import logging +from fastapi import HTTPException, status +from src.layers.chunking_embedding.chunk_document import chunk_document +from src.layers.chunking_embedding.embedding import embed_chunks from src.layers.chunking_embedding.models import Chunk +from src.layers.qdrant_store.store import store_chunks +from src.layers.structure_analyzer.analyzer import analyze_layout from src.store.model import StoreResponse +def handle(file_bytes: bytes, metadata: dict, extract_data_func): + return process_with_error_handling( + _handleFile, file_bytes, metadata, extract_data_func + ) + + +def _handleFile(file_bytes: bytes, metadata: dict, extract_data_func): + file_type = metadata.get("_file_type") + pages, extractor_meta = extract_data_func(file_bytes) + logging.info(f"{file_type} data extracted pages: {len(pages)}") + structured_document = analyze_layout(pages) + logging.info(f"analyzed {file_type} structured") + chunks = chunk_document( + structured_document, + metadata | extractor_meta, + max_tokens=450, + min_tokens=80, + ) + logging.info(f"chunked {file_type} to : {len(chunks)} chunks") + chunks = embed_chunks(chunks) + logging.info("embedding chunks") + store_chunks(chunks) + logging.info("stored chunked") + return makeResponse(metadata | extractor_meta, chunks) + + def makeResponse(metadata: dict, chunks: list[Chunk]) -> StoreResponse: return StoreResponse( document_id=metadata["_file_hash"], diff --git a/rag-engine/src/store/services/__init__.py b/rag-engine/src/store/services/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/rag-engine/src/store/services/pdf.py b/rag-engine/src/store/services/pdf.py deleted file mode 100644 index 0cf18b9..0000000 --- a/rag-engine/src/store/services/pdf.py +++ /dev/null @@ -1,30 +0,0 @@ -import logging -from src.layers.chunking_embedding.chunk_document import chunk_document -from src.layers.chunking_embedding.embedding import embed_chunks -from src.layers.data_extractor.extractor.pdf import extract_data -from src.layers.qdrant_store.store import store_chunks -from src.layers.structure_analyzer.analyzer.pdf import analyze_layout -from src.store.services.utils import makeResponse, process_with_error_handling - - -def handle(file_bytes: bytes, metadata: dict): - return process_with_error_handling(_handleFile, file_bytes, metadata) - - -def _handleFile(file_bytes: bytes, metadata: dict): - pages, extractor_meta = extract_data(file_bytes) - logging.info(f"pdf data extracted pages: {len(pages)}") - structured_document = analyze_layout(pages) - logging.info("analyzed pdf structured") - chunks = chunk_document( - structured_document, - metadata | extractor_meta, - max_tokens=450, - min_tokens=80, - ) - logging.info(f"chunked pdf to : {len(chunks)} chunks") - chunks = embed_chunks(chunks) - logging.info("embedding chunks") - store_chunks(chunks) - logging.info("stored chunked") - return makeResponse(metadata | extractor_meta, chunks)