Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rag-engine/src/layers/chunking_embedding/chunk_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def chunk_document(

# ---- FINAL CLEANUP ----
chunks = _deduplicate_chunks(chunks)
if len(chunks) == 0:
raise ValueError("No text found in your pdf!, make sure it is not image")

return chunks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
# ===============================
# PUBLIC ENTRY
# ===============================
def pdf(pdf_bytes: bytes) -> tuple[list[Page], dict]:
def extract_data(pdf_bytes: bytes) -> tuple[list[Page], dict]:
pages_output: list[Page] = []
metadata = {}

try:
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf_doc:
metadata["_document_id"] = str(uuid.uuid4())
metadata["_file_type"] = "pdf"
metadata["_page_count"] = len(pdf_doc.pages)
metadata["_file_metadata"] = pdf_doc.metadata
Expand Down
Empty file.
7 changes: 4 additions & 3 deletions rag-engine/src/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from dotenv import load_dotenv
from fastapi import FastAPI
from src.process.controller import router as process
from src.store.routers import store_upload_router, store_url_router
from .logging import configure_logging, LogLevels
from pathlib import Path


env_path = Path(__file__).parent / '.env'
env_path = Path(__file__).parent.parent / '.env'

load_dotenv(dotenv_path=env_path)
configure_logging(LogLevels.info)
app = FastAPI()
app.include_router(process)
app.include_router(store_upload_router)
app.include_router(store_url_router)
72 changes: 0 additions & 72 deletions rag-engine/src/process/controller.py

This file was deleted.

21 changes: 0 additions & 21 deletions rag-engine/src/process/models.py

This file was deleted.

38 changes: 0 additions & 38 deletions rag-engine/src/process/service.py

This file was deleted.

Empty file.
Empty file.
62 changes: 62 additions & 0 deletions rag-engine/src/store/controllers/pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import hashlib
import os
from fastapi import File, Form, HTTPException, UploadFile, status
from qdrant_client.models import Optional
import requests
from src.common.utils import document_exists
from src.store.controllers.utils import parse_metadata
from src.store.services import pdf
from urllib.parse import urlparse

async def upload(
upload: UploadFile = File(..., description="The file to upload"),
metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"),
):
meta = parse_metadata(metadata)
meta["_source_file"] = upload.filename

data_bytes = await upload.read()
file_hash = hashlib.sha256(data_bytes).hexdigest()
meta["_file_hash"] = file_hash
user_id = meta.get("_user_id")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing '_user_id' in metadata",
)
if document_exists(user_id, file_hash):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Document already uploaded",
)
return pdf.handle(data_bytes, meta)



async def with_url(
url: str = Form(..., description="Link to fetch"),
metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"),
):
resp = requests.get(url, timeout=10)
resp.raise_for_status()
if "application/pdf" not in resp.headers.get("Content-Type", ""):
raise HTTPException(status.HTTP_400_BAD_REQUEST,"URL does not point to a PDF file")
parsed = urlparse(url)
meta = parse_metadata(metadata)
filename = os.path.basename(parsed.path) or "unkown"
data_bytes = resp.content
meta["_source_file"] = filename
file_hash = hashlib.sha256(data_bytes).hexdigest()
meta["_file_hash"] = file_hash
user_id = meta.get("_user_id")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing '_user_id' in metadata",
)
if document_exists(user_id, file_hash):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Document already uploaded",
)
return pdf.handle(data_bytes, meta)
16 changes: 16 additions & 0 deletions rag-engine/src/store/controllers/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
from fastapi import HTTPException, status
from qdrant_client.models import Optional


def parse_metadata(metadata_str: Optional[str]) -> dict:
"""Parse JSON metadata string, return empty dict if None."""
if metadata_str:
try:
return json.loads(metadata_str)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid JSON in metadata field",
)
return {}
8 changes: 8 additions & 0 deletions rag-engine/src/store/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from pydantic import BaseModel


class StoreResponse(BaseModel):
document_id: str
file_type:str
page_count: int
chunks: list[str]
33 changes: 33 additions & 0 deletions rag-engine/src/store/routers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from fastapi import APIRouter, File, Form, UploadFile, status
from qdrant_client.models import Optional
from src.store.model import StoreResponse
from src.store.controllers import pdf

store_upload_router = APIRouter(prefix="/store/upload", tags=["Store_Upload"])
store_url_router = APIRouter(prefix="/store/url", tags=["Store_URL"])


@store_upload_router.post(
"/pdf",
summary="Store an uploaded PDF file",
response_model=StoreResponse,
status_code=status.HTTP_200_OK,
)
async def store_pdf_upload(
upload: UploadFile = File(..., description="The file to upload"),
metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"),
):
return pdf.upload(upload, metadata)


@store_url_router.post(
"/pdf",
summary="Store an uploaded PDF file",
response_model=StoreResponse,
status_code=status.HTTP_200_OK,
)
async def store_pdf_with_url(
url: str = Form(..., description="Link to fetch"),
metadata: Optional[str] = Form(..., description="Metadata for chunks (JSON)"),
):
return pdf.with_url(url, metadata)
Empty file.
29 changes: 29 additions & 0 deletions rag-engine/src/store/services/pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
from src.layers.chunking_embedding.chunk_document import chunk_document
from src.layers.chunking_embedding.embedding import embed_chunks
from src.layers.data_extractor.extractor.pdf import extract_data
from src.layers.qdrant_store.store import store_chunks
from src.layers.structure_analyzer.analyzer.pdf import analyze_layout
from src.store.services.utils import makeResponse, process_with_error_handling


def handle(file_bytes: bytes, metadata: dict):
return process_with_error_handling(_handleFile, file_bytes, metadata)


def _handleFile(file_bytes: bytes, metadata: dict):
pages, extractor_meta = extract_data(file_bytes)
logging.info(f"pdf data extracted pages: {len(pages)}")
structured_document = analyze_layout(pages)
logging.info("analyzed pdf structured")
chunks = chunk_document(
structured_document,
metadata | extractor_meta,
max_tokens=400,
)
logging.info(f"chunked pdf to : {len(chunks)} chunks")
chunks = embed_chunks(chunks)
logging.info(f"embedding chunks: {len(chunks[0].embedding)}")
store_chunks(chunks)
logging.info("stored chunked")
return makeResponse(metadata | extractor_meta, chunks)
34 changes: 34 additions & 0 deletions rag-engine/src/store/services/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from fastapi import HTTPException, status
import logging

from src.layers.chunking_embedding.models import Chunk
from src.store.model import StoreResponse


def makeResponse(metadata: dict, chunks: list[Chunk]) -> StoreResponse:
return StoreResponse(
document_id=metadata["_file_hash"],
file_type=metadata["_file_type"],
page_count=metadata["_page_count"],
chunks=[chunk.id for chunk in chunks],
)


def process_with_error_handling(process_func, *args, **kwargs):
"""
Execute process_func and convert exceptions to HTTPException.
- ValueError -> 400 Bad Request
- Other Exception -> 500 Internal Server Error (with generic message)
"""
try:
return process_func(*args, **kwargs)
except ValueError as e:
# Client error – include the message
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except Exception as e:
# Server error – log and return generic message
logging.error(f"Unexpected error in processing, {str(e)} ")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing the file.",
)