diff --git a/rag-engine/src/layers/chunking_embedding/chunk_document.py b/rag-engine/src/layers/chunking_embedding/chunk_document.py index 4ed3eb7..ce0c875 100644 --- a/rag-engine/src/layers/chunking_embedding/chunk_document.py +++ b/rag-engine/src/layers/chunking_embedding/chunk_document.py @@ -51,6 +51,8 @@ def chunk_document( # ---- FINAL CLEANUP ---- chunks = _deduplicate_chunks(chunks) + if len(chunks) == 0: + raise ValueError("No text found in your pdf!, make sure it is not image") return chunks diff --git a/rag-engine/src/process/__init__.py b/rag-engine/src/layers/data_extractor/extractor/__init__.py similarity index 100% rename from rag-engine/src/process/__init__.py rename to rag-engine/src/layers/data_extractor/extractor/__init__.py diff --git a/rag-engine/src/layers/data_extractor/extractor.py b/rag-engine/src/layers/data_extractor/extractor/pdf.py similarity index 98% rename from rag-engine/src/layers/data_extractor/extractor.py rename to rag-engine/src/layers/data_extractor/extractor/pdf.py index df3d4a2..d417dd6 100644 --- a/rag-engine/src/layers/data_extractor/extractor.py +++ b/rag-engine/src/layers/data_extractor/extractor/pdf.py @@ -17,13 +17,12 @@ # =============================== # PUBLIC ENTRY # =============================== -def pdf(pdf_bytes: bytes) -> tuple[list[Page], dict]: +def extract_data(pdf_bytes: bytes) -> tuple[list[Page], dict]: pages_output: list[Page] = [] metadata = {} try: with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc: - metadata["_document_id"] = str(uuid.uuid4()) metadata["_file_type"] = "pdf" metadata["_page_count"] = len(pdf_doc.pages) metadata["_file_metadata"] = pdf_doc.metadata diff --git a/rag-engine/src/layers/structure_analyzer/analyzer/__init__.py b/rag-engine/src/layers/structure_analyzer/analyzer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/layers/structure_analyzer/analyzer.py b/rag-engine/src/layers/structure_analyzer/analyzer/pdf.py similarity index 100% rename from rag-engine/src/layers/structure_analyzer/analyzer.py rename to rag-engine/src/layers/structure_analyzer/analyzer/pdf.py diff --git a/rag-engine/src/main.py b/rag-engine/src/main.py index cc373bc..7bf0a21 100644 --- a/rag-engine/src/main.py +++ b/rag-engine/src/main.py @@ -1,13 +1,14 @@ from dotenv import load_dotenv from fastapi import FastAPI -from src.process.controller import router as process +from src.store.routers import store_upload_router, store_url_router from .logging import configure_logging, LogLevels from pathlib import Path -env_path = Path(__file__).parent / '.env' +env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) configure_logging(LogLevels.info) app = FastAPI() -app.include_router(process) +app.include_router(store_upload_router) +app.include_router(store_url_router) diff --git a/rag-engine/src/process/controller.py b/rag-engine/src/process/controller.py deleted file mode 100644 index a5c2b38..0000000 --- a/rag-engine/src/process/controller.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -from fastapi.responses import JSONResponse -from fastapi import APIRouter, File, Form, HTTPException, Path, UploadFile, status -import requests -import json -from src.process.service import processFile -from . import models -from urllib.parse import urlparse -import hashlib -router = APIRouter(prefix="/process", tags=["Process"]) - - -@router.post( - "/{file_type}/{input_mode}", - summary="Process an uploaded file or URL", - status_code=status.HTTP_200_OK, -) -async def process( - file_type: models.FileType = Path(..., description="Type of file to process"), - input_mode: models.InputMode = Path(..., description="How content is passed"), - metadata: str | None = Form(None, description="metadata for chunks"), - upload: UploadFile | None = File(None, description="The file to upload"), - url: str | None = Form(None, description="Link to fetch"), -): - meta = {} - if metadata: - try: - meta = json.loads(metadata) - except json.JSONDecodeError: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Invalid JSON in metadata field", - ) - try: - if input_mode == models.InputMode.url: - if not url: - raise HTTPException( - status.HTTP_422_UNPROCESSABLE_CONTENT, - "Must provide a URL when input_mode is 'url'", - ) - resp = requests.get(url, timeout=10) - resp.raise_for_status() - if file_type == models.FileType.pdf: - if "application/pdf" not in resp.headers.get("Content-Type", ""): - raise HTTPException( - status.HTTP_400_BAD_REQUEST, "URL does not point to a PDF file" - ) - parsed = urlparse(url) - filename = os.path.basename(parsed.path) or "unkown" - meta["_source_file"] = filename - meta["_file_hash"] = hashlib.sha256(resp.content).hexdigest() - data = processFile(models.FileType.pdf, resp.content, meta) - return JSONResponse(content=data, status_code=status.HTTP_200_OK) - if input_mode == models.InputMode.file: - if not upload: - raise HTTPException( - status.HTTP_422_UNPROCESSABLE_CONTENT, - "Must upload a file when input_mode is 'file'", - ) - meta["_source_file"] = upload.filename - data_bytes = await upload.read() - meta["_file_hash"] = hashlib.sha256(data_bytes).hexdigest() - data = processFile(models.FileType.pdf, data_bytes, meta) - return JSONResponse(content=data, status_code=status.HTTP_200_OK) - - except ValueError as e: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) - except Exception as e: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Internal server error: {str(e)}", - ) diff --git a/rag-engine/src/process/models.py b/rag-engine/src/process/models.py deleted file mode 100644 index 95ecf10..0000000 --- a/rag-engine/src/process/models.py +++ /dev/null @@ -1,21 +0,0 @@ -from enum import Enum -from pydantic import BaseModel - - -class FileType(str, Enum): - pdf = "pdf" - md = "md" - - -class InputMode(str, Enum): - file = "file" - url = "url" - - -class PageContent(BaseModel): - text: str - tables: list[list[list[str]]] - - -class SupportUrlFile(str, Enum): - pdf = "application/pdf" diff --git a/rag-engine/src/process/service.py b/rag-engine/src/process/service.py deleted file mode 100644 index 2967e87..0000000 --- a/rag-engine/src/process/service.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -from src.common.utils import document_exists -from src.layers.chunking_embedding.chunk_document import chunk_document -from src.layers.chunking_embedding.embedding import embed_chunks -from src.layers.data_extractor import extractor -from src.layers.qdrant_store.store import store_chunks -from src.layers.structure_analyzer.analyzer import analyze_layout - -from . import models - - -def processFile(fileType: models.FileType, file_bytes: bytes, metadata: dict): - userId = metadata.get("_user_id") - file_hash = metadata.get("_file_hash") - if userId is None or file_hash is None: - raise Exception("user_id or file_hash is missing") - - if document_exists(userId, file_hash): - raise Exception("Document already uploaded") - - if fileType == models.FileType.pdf: - pages, extractor_meta = extractor.pdf(file_bytes) - logging.info(f"pdf data extracted pages: {len(pages)}") - structured_document = analyze_layout(pages) - logging.info("analyzed pdf structured") - chunks = chunk_document( - structured_document, - extractor_meta | metadata, - max_tokens=400, - ) - logging.info(f"chunked pdf to : {len(chunks)} chunks") - chunks = embed_chunks(chunks) - logging.info(f"embedding chunks: {len(chunks[0].embedding)}") - store_chunks(chunks) - logging.info("stored chunked") - return {"ok": True} - - raise Exception("Unspported File type") diff --git a/rag-engine/src/store/__init__.py b/rag-engine/src/store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/store/controllers/__init__.py b/rag-engine/src/store/controllers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/store/controllers/pdf.py b/rag-engine/src/store/controllers/pdf.py new file mode 100644 index 0000000..52863d8 --- /dev/null +++ b/rag-engine/src/store/controllers/pdf.py @@ -0,0 +1,62 @@ +import hashlib +import os +from fastapi import File, Form, HTTPException, UploadFile, status +from qdrant_client.models import Optional +import requests +from src.common.utils import document_exists +from src.store.controllers.utils import parse_metadata +from src.store.services import pdf +from urllib.parse import urlparse + +async def upload( + upload: UploadFile = File(..., description="The file to upload"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + meta = parse_metadata(metadata) + meta["_source_file"] = upload.filename + + data_bytes = await upload.read() + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + return pdf.handle(data_bytes, meta) + + + +async def with_url( + url: str = Form(..., description="Link to fetch"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + resp = requests.get(url, timeout=10) + resp.raise_for_status() + if "application/pdf" not in resp.headers.get("Content-Type", ""): + raise HTTPException(status.HTTP_400_BAD_REQUEST,"URL does not point to a PDF file") + parsed = urlparse(url) + meta = parse_metadata(metadata) + filename = os.path.basename(parsed.path) or "unkown" + data_bytes = resp.content + meta["_source_file"] = filename + file_hash = hashlib.sha256(data_bytes).hexdigest() + meta["_file_hash"] = file_hash + user_id = meta.get("_user_id") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing '_user_id' in metadata", + ) + if document_exists(user_id, file_hash): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Document already uploaded", + ) + return pdf.handle(data_bytes, meta) diff --git a/rag-engine/src/store/controllers/utils.py b/rag-engine/src/store/controllers/utils.py new file mode 100644 index 0000000..1fcd97b --- /dev/null +++ b/rag-engine/src/store/controllers/utils.py @@ -0,0 +1,16 @@ +import json +from fastapi import HTTPException, status +from qdrant_client.models import Optional + + +def parse_metadata(metadata_str: Optional[str]) -> dict: + """Parse JSON metadata string, return empty dict if None.""" + if metadata_str: + try: + return json.loads(metadata_str) + except json.JSONDecodeError: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Invalid JSON in metadata field", + ) + return {} diff --git a/rag-engine/src/store/model.py b/rag-engine/src/store/model.py new file mode 100644 index 0000000..90602dd --- /dev/null +++ b/rag-engine/src/store/model.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel + + +class StoreResponse(BaseModel): + document_id: str + file_type:str + page_count: int + chunks: list[str] diff --git a/rag-engine/src/store/routers.py b/rag-engine/src/store/routers.py new file mode 100644 index 0000000..a01638c --- /dev/null +++ b/rag-engine/src/store/routers.py @@ -0,0 +1,33 @@ +from fastapi import APIRouter, File, Form, UploadFile, status +from qdrant_client.models import Optional +from src.store.model import StoreResponse +from src.store.controllers import pdf + +store_upload_router = APIRouter(prefix="/store/upload", tags=["Store_Upload"]) +store_url_router = APIRouter(prefix="/store/url", tags=["Store_URL"]) + + +@store_upload_router.post( + "/pdf", + summary="Store an uploaded PDF file", + response_model=StoreResponse, + status_code=status.HTTP_200_OK, +) +async def store_pdf_upload( + upload: UploadFile = File(..., description="The file to upload"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + return pdf.upload(upload, metadata) + + +@store_url_router.post( + "/pdf", + summary="Store an uploaded PDF file", + response_model=StoreResponse, + status_code=status.HTTP_200_OK, +) +async def store_pdf_with_url( + url: str = Form(..., description="Link to fetch"), + metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"), +): + return pdf.with_url(url, metadata) diff --git a/rag-engine/src/store/services/__init__.py b/rag-engine/src/store/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/store/services/pdf.py b/rag-engine/src/store/services/pdf.py new file mode 100644 index 0000000..d4fba99 --- /dev/null +++ b/rag-engine/src/store/services/pdf.py @@ -0,0 +1,29 @@ +import logging +from src.layers.chunking_embedding.chunk_document import chunk_document +from src.layers.chunking_embedding.embedding import embed_chunks +from src.layers.data_extractor.extractor.pdf import extract_data +from src.layers.qdrant_store.store import store_chunks +from src.layers.structure_analyzer.analyzer.pdf import analyze_layout +from src.store.services.utils import makeResponse, process_with_error_handling + + +def handle(file_bytes: bytes, metadata: dict): + return process_with_error_handling(_handleFile, file_bytes, metadata) + + +def _handleFile(file_bytes: bytes, metadata: dict): + pages, extractor_meta = extract_data(file_bytes) + logging.info(f"pdf data extracted pages: {len(pages)}") + structured_document = analyze_layout(pages) + logging.info("analyzed pdf structured") + chunks = chunk_document( + structured_document, + metadata | extractor_meta, + max_tokens=400, + ) + logging.info(f"chunked pdf to : {len(chunks)} chunks") + chunks = embed_chunks(chunks) + logging.info(f"embedding chunks: {len(chunks[0].embedding)}") + store_chunks(chunks) + logging.info("stored chunked") + return makeResponse(metadata | extractor_meta, chunks) diff --git a/rag-engine/src/store/services/utils.py b/rag-engine/src/store/services/utils.py new file mode 100644 index 0000000..29f597d --- /dev/null +++ b/rag-engine/src/store/services/utils.py @@ -0,0 +1,34 @@ +from fastapi import HTTPException, status +import logging + +from src.layers.chunking_embedding.models import Chunk +from src.store.model import StoreResponse + + +def makeResponse(metadata: dict, chunks: list[Chunk]) -> StoreResponse: + return StoreResponse( + document_id=metadata["_file_hash"], + file_type=metadata["_file_type"], + page_count=metadata["_page_count"], + chunks=[chunk.id for chunk in chunks], + ) + + +def process_with_error_handling(process_func, *args, **kwargs): + """ + Execute process_func and convert exceptions to HTTPException. + - ValueError -> 400 Bad Request + - Other Exception -> 500 Internal Server Error (with generic message) + """ + try: + return process_func(*args, **kwargs) + except ValueError as e: + # Client error – include the message + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception as e: + # Server error – log and return generic message + logging.error(f"Unexpected error in processing, {str(e)} ") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An internal error occurred while processing the file.", + )