diff --git a/rag-engine/.gitignore b/rag-engine/.gitignore index 033df5f..c97ebf1 100644 --- a/rag-engine/.gitignore +++ b/rag-engine/.gitignore @@ -1,2 +1,4 @@ .venv +models_cache +qdrant_storage __pycache__ diff --git a/rag-engine/src/common/__init__.py b/rag-engine/src/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/common/utils.py b/rag-engine/src/common/utils.py new file mode 100644 index 0000000..e5f5130 --- /dev/null +++ b/rag-engine/src/common/utils.py @@ -0,0 +1,69 @@ +import os +from pathlib import Path as FilePath +from fastembed import TextEmbedding +from fastembed.common.model_description import ModelSource, PoolingType +from qdrant_client import QdrantClient, models +from qdrant_client.models import Distance, FieldCondition, MatchValue, VectorParams + +CACHE_DIR = FilePath("./models_cache") +CACHE_DIR.mkdir(exist_ok=True) +VECTOR_SIZE = 384 +COLLECTION_NAME = "dcup_documents" + +TextEmbedding.add_custom_model( + model="intfloat/multilingual-e5-small", + pooling=PoolingType.MEAN, + normalization=True, + sources=ModelSource(hf="intfloat/multilingual-e5-small"), + dim=VECTOR_SIZE, + model_file="onnx/model.onnx", +) +embedding_model = TextEmbedding( + model_name="intfloat/multilingual-e5-small", + cache_dir=str(CACHE_DIR), +) + +qclient = QdrantClient( + url=os.getenv("QDRANT_DB_URL"), + api_key=os.getenv("QDRANT_DB_KEY"), +) + +if COLLECTION_NAME not in [c.name for c in qclient.get_collections().collections]: + qclient.create_collection( + collection_name=COLLECTION_NAME, + vectors_config=VectorParams( + size=VECTOR_SIZE, + distance=Distance.COSINE, + ), + ) + qclient.create_payload_index( + collection_name=COLLECTION_NAME, + field_name="_file_hash", + field_schema=models.PayloadSchemaType.KEYWORD, + ) + qclient.create_payload_index( + collection_name=COLLECTION_NAME, + field_name="_user_id", + field_schema=models.PayloadSchemaType.KEYWORD, + ) + + +def document_exists(user_id: str, file_hash: str) -> bool: + results = qclient.scroll( + collection_name=COLLECTION_NAME, + scroll_filter=models.Filter( + must=[ + FieldCondition( + key="_user_id", + match=MatchValue(value=user_id), + ), + FieldCondition( + key="_file_hash", + match=MatchValue(value=file_hash), + ), + ] + ), + limit=1, + ) + + return len(results[0]) > 0 diff --git a/rag-engine/src/layers/chunking_embedding/chunk_document.py b/rag-engine/src/layers/chunking_embedding/chunk_document.py index ab6d086..4ed3eb7 100644 --- a/rag-engine/src/layers/chunking_embedding/chunk_document.py +++ b/rag-engine/src/layers/chunking_embedding/chunk_document.py @@ -211,6 +211,7 @@ def _build_chunk( page_start=page_start, page_end=page_end, metadata=metadata, + embedding=None, ) @@ -286,7 +287,7 @@ def _build_table_chunks_from_section( chunks.append( Chunk( id=str(uuid.uuid4()), - text=table_json, + text=table_json, token_count=count_tokens(table_json), section_title=section.title, section_path=section_path, @@ -294,6 +295,7 @@ def _build_table_chunks_from_section( page_start=section.page_number, page_end=section.page_number, metadata=table_metadata, + embedding=None, ) ) diff --git a/rag-engine/src/layers/chunking_embedding/embedding.py b/rag-engine/src/layers/chunking_embedding/embedding.py index fd9b5ca..0eaa73d 100644 --- a/rag-engine/src/layers/chunking_embedding/embedding.py +++ b/rag-engine/src/layers/chunking_embedding/embedding.py @@ -1,25 +1,13 @@ -from fastembed import TextEmbedding from typing import List -from fastembed.common.model_description import ModelSource, PoolingType - from src.layers.chunking_embedding.models import Chunk +from src.common.utils import embedding_model -TextEmbedding.add_custom_model( - model="intfloat/multilingual-e5-small", - pooling=PoolingType.MEAN, - normalization=True, - sources=ModelSource(hf="intfloat/multilingual-e5-small"), - dim=384, - model_file="onnx/model.onnx", -) -_embedding_model = TextEmbedding(model_name="intfloat/multilingual-e5-small") - def embed_chunks(chunks: List[Chunk], batch_size: int = 64) -> List[Chunk]: for i in range(0, len(chunks), batch_size): batch = chunks[i : i + batch_size] texts = [c.text for c in batch] - vectors = list(_embedding_model.embed(texts)) + vectors = list(embedding_model.embed(texts)) for chunk, vector in zip(batch, vectors): - chunk.metadata["_embedding"] = vector.tolist() + chunk.embedding = vector.tolist() return chunks diff --git a/rag-engine/src/layers/chunking_embedding/models.py b/rag-engine/src/layers/chunking_embedding/models.py index edb00ea..a565925 100644 --- a/rag-engine/src/layers/chunking_embedding/models.py +++ b/rag-engine/src/layers/chunking_embedding/models.py @@ -17,5 +17,6 @@ class Chunk(BaseModel): page_start: int | None page_end: int | None + embedding: Any metadata: Dict[str, Any] = {} diff --git a/rag-engine/src/layers/qdrant_store/__init__.py b/rag-engine/src/layers/qdrant_store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-engine/src/layers/qdrant_store/store.py b/rag-engine/src/layers/qdrant_store/store.py new file mode 100644 index 0000000..e1ee82f --- /dev/null +++ b/rag-engine/src/layers/qdrant_store/store.py @@ -0,0 +1,33 @@ +from typing import List +from qdrant_client.conversions.common_types import PointStruct, Points +from src.layers.chunking_embedding.models import Chunk +from src.common.utils import VECTOR_SIZE, qclient, COLLECTION_NAME + + +def store_chunks(chunks: List[Chunk], batch_size: int = 64) -> None: + for i in range(0, len(chunks), batch_size): + batch = chunks[i : i + batch_size] + points:Points = [] + for chunk in batch: + if chunk.embedding is None: + continue + payload = { + "text": chunk.text, + "token_count": chunk.token_count, + "section_title": chunk.section_title, + "section_path": chunk.section_path, + "level": chunk.level, + "page_start": chunk.page_start, + "page_end": chunk.page_end, + **chunk.metadata, + } + points.append( + PointStruct( + id=chunk.id, + vector=chunk.embedding, + payload=payload, + ), + ) + assert isinstance(chunk.embedding, list) + assert len(chunk.embedding) == VECTOR_SIZE + qclient.upsert(collection_name=COLLECTION_NAME, points=points) diff --git a/rag-engine/src/main.py b/rag-engine/src/main.py index f636fcf..cc373bc 100644 --- a/rag-engine/src/main.py +++ b/rag-engine/src/main.py @@ -1,7 +1,13 @@ +from dotenv import load_dotenv from fastapi import FastAPI from src.process.controller import router as process from .logging import configure_logging, LogLevels +from pathlib import Path + +env_path = Path(__file__).parent / '.env' + +load_dotenv(dotenv_path=env_path) configure_logging(LogLevels.info) app = FastAPI() app.include_router(process) diff --git a/rag-engine/src/process/controller.py b/rag-engine/src/process/controller.py index b06434d..a5c2b38 100644 --- a/rag-engine/src/process/controller.py +++ b/rag-engine/src/process/controller.py @@ -6,7 +6,7 @@ from src.process.service import processFile from . import models from urllib.parse import urlparse - +import hashlib router = APIRouter(prefix="/process", tags=["Process"]) @@ -48,6 +48,7 @@ async def process( parsed = urlparse(url) filename = os.path.basename(parsed.path) or "unkown" meta["_source_file"] = filename + meta["_file_hash"] = hashlib.sha256(resp.content).hexdigest() data = processFile(models.FileType.pdf, resp.content, meta) return JSONResponse(content=data, status_code=status.HTTP_200_OK) if input_mode == models.InputMode.file: @@ -58,6 +59,7 @@ async def process( ) meta["_source_file"] = upload.filename data_bytes = await upload.read() + meta["_file_hash"] = hashlib.sha256(data_bytes).hexdigest() data = processFile(models.FileType.pdf, data_bytes, meta) return JSONResponse(content=data, status_code=status.HTTP_200_OK) diff --git a/rag-engine/src/process/service.py b/rag-engine/src/process/service.py index e784c9a..2967e87 100644 --- a/rag-engine/src/process/service.py +++ b/rag-engine/src/process/service.py @@ -1,24 +1,38 @@ import logging +from src.common.utils import document_exists from src.layers.chunking_embedding.chunk_document import chunk_document from src.layers.chunking_embedding.embedding import embed_chunks from src.layers.data_extractor import extractor +from src.layers.qdrant_store.store import store_chunks from src.layers.structure_analyzer.analyzer import analyze_layout from . import models def processFile(fileType: models.FileType, file_bytes: bytes, metadata: dict): + userId = metadata.get("_user_id") + file_hash = metadata.get("_file_hash") + if userId is None or file_hash is None: + raise Exception("user_id or file_hash is missing") + + if document_exists(userId, file_hash): + raise Exception("Document already uploaded") + if fileType == models.FileType.pdf: - logging.info("start processing pdf files") pages, extractor_meta = extractor.pdf(file_bytes) + logging.info(f"pdf data extracted pages: {len(pages)}") structured_document = analyze_layout(pages) + logging.info("analyzed pdf structured") chunks = chunk_document( structured_document, extractor_meta | metadata, max_tokens=400, ) - logging.info(f"pdf data extracted pages: {len(pages)}") + logging.info(f"chunked pdf to : {len(chunks)} chunks") chunks = embed_chunks(chunks) - return [chunk.model_dump() for chunk in chunks] + logging.info(f"embedding chunks: {len(chunks[0].embedding)}") + store_chunks(chunks) + logging.info("stored chunked") + return {"ok": True} raise Exception("Unspported File type")