Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rag-engine/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.venv
models_cache
qdrant_storage
__pycache__
Empty file.
69 changes: 69 additions & 0 deletions rag-engine/src/common/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
from pathlib import Path as FilePath
from fastembed import TextEmbedding
from fastembed.common.model_description import ModelSource, PoolingType
from qdrant_client import QdrantClient, models
from qdrant_client.models import Distance, FieldCondition, MatchValue, VectorParams

CACHE_DIR = FilePath("./models_cache")
CACHE_DIR.mkdir(exist_ok=True)
VECTOR_SIZE = 384
COLLECTION_NAME = "dcup_documents"

TextEmbedding.add_custom_model(
model="intfloat/multilingual-e5-small",
pooling=PoolingType.MEAN,
normalization=True,
sources=ModelSource(hf="intfloat/multilingual-e5-small"),
dim=VECTOR_SIZE,
model_file="onnx/model.onnx",
)
embedding_model = TextEmbedding(
model_name="intfloat/multilingual-e5-small",
cache_dir=str(CACHE_DIR),
)

qclient = QdrantClient(
url=os.getenv("QDRANT_DB_URL"),
api_key=os.getenv("QDRANT_DB_KEY"),
)

if COLLECTION_NAME not in [c.name for c in qclient.get_collections().collections]:
qclient.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=VectorParams(
size=VECTOR_SIZE,
distance=Distance.COSINE,
),
)
qclient.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="_file_hash",
field_schema=models.PayloadSchemaType.KEYWORD,
)
qclient.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="_user_id",
field_schema=models.PayloadSchemaType.KEYWORD,
)


def document_exists(user_id: str, file_hash: str) -> bool:
results = qclient.scroll(
collection_name=COLLECTION_NAME,
scroll_filter=models.Filter(
must=[
FieldCondition(
key="_user_id",
match=MatchValue(value=user_id),
),
FieldCondition(
key="_file_hash",
match=MatchValue(value=file_hash),
),
]
),
limit=1,
)

return len(results[0]) > 0
4 changes: 3 additions & 1 deletion rag-engine/src/layers/chunking_embedding/chunk_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def _build_chunk(
page_start=page_start,
page_end=page_end,
metadata=metadata,
embedding=None,
)


Expand Down Expand Up @@ -286,14 +287,15 @@ def _build_table_chunks_from_section(
chunks.append(
Chunk(
id=str(uuid.uuid4()),
text=table_json,
text=table_json,
token_count=count_tokens(table_json),
section_title=section.title,
section_path=section_path,
level=section.level,
page_start=section.page_number,
page_end=section.page_number,
metadata=table_metadata,
embedding=None,
)
)

Expand Down
18 changes: 3 additions & 15 deletions rag-engine/src/layers/chunking_embedding/embedding.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
from fastembed import TextEmbedding
from typing import List
from fastembed.common.model_description import ModelSource, PoolingType

from src.layers.chunking_embedding.models import Chunk
from src.common.utils import embedding_model


TextEmbedding.add_custom_model(
model="intfloat/multilingual-e5-small",
pooling=PoolingType.MEAN,
normalization=True,
sources=ModelSource(hf="intfloat/multilingual-e5-small"),
dim=384,
model_file="onnx/model.onnx",
)
_embedding_model = TextEmbedding(model_name="intfloat/multilingual-e5-small")

def embed_chunks(chunks: List[Chunk], batch_size: int = 64) -> List[Chunk]:
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
texts = [c.text for c in batch]
vectors = list(_embedding_model.embed(texts))
vectors = list(embedding_model.embed(texts))
for chunk, vector in zip(batch, vectors):
chunk.metadata["_embedding"] = vector.tolist()
chunk.embedding = vector.tolist()
return chunks
1 change: 1 addition & 0 deletions rag-engine/src/layers/chunking_embedding/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ class Chunk(BaseModel):

page_start: int | None
page_end: int | None
embedding: Any

metadata: Dict[str, Any] = {}
Empty file.
33 changes: 33 additions & 0 deletions rag-engine/src/layers/qdrant_store/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import List
from qdrant_client.conversions.common_types import PointStruct, Points
from src.layers.chunking_embedding.models import Chunk
from src.common.utils import VECTOR_SIZE, qclient, COLLECTION_NAME


def store_chunks(chunks: List[Chunk], batch_size: int = 64) -> None:
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
points:Points = []
for chunk in batch:
if chunk.embedding is None:
continue
payload = {
"text": chunk.text,
"token_count": chunk.token_count,
"section_title": chunk.section_title,
"section_path": chunk.section_path,
"level": chunk.level,
"page_start": chunk.page_start,
"page_end": chunk.page_end,
**chunk.metadata,
}
points.append(
PointStruct(
id=chunk.id,
vector=chunk.embedding,
payload=payload,
),
)
assert isinstance(chunk.embedding, list)
assert len(chunk.embedding) == VECTOR_SIZE
qclient.upsert(collection_name=COLLECTION_NAME, points=points)
6 changes: 6 additions & 0 deletions rag-engine/src/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from dotenv import load_dotenv
from fastapi import FastAPI
from src.process.controller import router as process
from .logging import configure_logging, LogLevels
from pathlib import Path


env_path = Path(__file__).parent / '.env'

load_dotenv(dotenv_path=env_path)
configure_logging(LogLevels.info)
app = FastAPI()
app.include_router(process)
4 changes: 3 additions & 1 deletion rag-engine/src/process/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from src.process.service import processFile
from . import models
from urllib.parse import urlparse

import hashlib
router = APIRouter(prefix="/process", tags=["Process"])


Expand Down Expand Up @@ -48,6 +48,7 @@ async def process(
parsed = urlparse(url)
filename = os.path.basename(parsed.path) or "unkown"
meta["_source_file"] = filename
meta["_file_hash"] = hashlib.sha256(resp.content).hexdigest()
data = processFile(models.FileType.pdf, resp.content, meta)
return JSONResponse(content=data, status_code=status.HTTP_200_OK)
if input_mode == models.InputMode.file:
Expand All @@ -58,6 +59,7 @@ async def process(
)
meta["_source_file"] = upload.filename
data_bytes = await upload.read()
meta["_file_hash"] = hashlib.sha256(data_bytes).hexdigest()
data = processFile(models.FileType.pdf, data_bytes, meta)
return JSONResponse(content=data, status_code=status.HTTP_200_OK)

Expand Down
20 changes: 17 additions & 3 deletions rag-engine/src/process/service.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
import logging
from src.common.utils import document_exists
from src.layers.chunking_embedding.chunk_document import chunk_document
from src.layers.chunking_embedding.embedding import embed_chunks
from src.layers.data_extractor import extractor
from src.layers.qdrant_store.store import store_chunks
from src.layers.structure_analyzer.analyzer import analyze_layout

from . import models


def processFile(fileType: models.FileType, file_bytes: bytes, metadata: dict):
userId = metadata.get("_user_id")
file_hash = metadata.get("_file_hash")
if userId is None or file_hash is None:
raise Exception("user_id or file_hash is missing")

if document_exists(userId, file_hash):
raise Exception("Document already uploaded")

if fileType == models.FileType.pdf:
logging.info("start processing pdf files")
pages, extractor_meta = extractor.pdf(file_bytes)
logging.info(f"pdf data extracted pages: {len(pages)}")
structured_document = analyze_layout(pages)
logging.info("analyzed pdf structured")
chunks = chunk_document(
structured_document,
extractor_meta | metadata,
max_tokens=400,
)
logging.info(f"pdf data extracted pages: {len(pages)}")
logging.info(f"chunked pdf to : {len(chunks)} chunks")
chunks = embed_chunks(chunks)
return [chunk.model_dump() for chunk in chunks]
logging.info(f"embedding chunks: {len(chunks[0].embedding)}")
store_chunks(chunks)
logging.info("stored chunked")
return {"ok": True}

raise Exception("Unspported File type")