diff --git a/app/api/annotation_api.py b/app/api/annotation_api.py new file mode 100644 index 0000000..aa5ce45 --- /dev/null +++ b/app/api/annotation_api.py @@ -0,0 +1,59 @@ +from fastapi import APIRouter, Depends + +from app.core.response import ResponseMessage +from app.core.status import CommonCode +from app.schemas.annotation.request_model import AnnotationCreateRequest +from app.schemas.annotation.response_model import AnnotationDeleteResponse, FullAnnotationResponse +from app.services.annotation_service import AnnotationService, annotation_service + +annotation_service_dependency = Depends(lambda: annotation_service) + +router = APIRouter() + + +@router.post( + "/create", + response_model=ResponseMessage[FullAnnotationResponse], + summary="새로운 어노테이션 생성", +) +async def create_annotation( + request: AnnotationCreateRequest, + service: AnnotationService = annotation_service_dependency, +) -> ResponseMessage[FullAnnotationResponse]: + """ + `db_profile_id`를 받아 AI를 통해 DB 스키마를 분석하고 어노테이션을 생성하여 반환합니다. + """ + new_annotation = await service.create_annotation(request) + return ResponseMessage.success(value=new_annotation, code=CommonCode.SUCCESS_CREATE_ANNOTATION) + + +@router.get( + "/find/{annotation_id}", + response_model=ResponseMessage[FullAnnotationResponse], + summary="특정 어노테이션 상세 정보 조회", +) +def get_annotation( + annotation_id: str, + service: AnnotationService = annotation_service_dependency, +) -> ResponseMessage[FullAnnotationResponse]: + """ + `annotation_id`에 해당하는 어노테이션의 전체 상세 정보를 조회합니다. + """ + annotation = service.get_full_annotation(annotation_id) + return ResponseMessage.success(value=annotation, code=CommonCode.SUCCESS_FIND_ANNOTATION) + + +@router.delete( + "/remove/{annotation_id}", + response_model=ResponseMessage[AnnotationDeleteResponse], + summary="특정 어노테이션 삭제", +) +def delete_annotation( + annotation_id: str, + service: AnnotationService = annotation_service_dependency, +) -> ResponseMessage[AnnotationDeleteResponse]: + """ + `annotation_id`에 해당하는 어노테이션 및 하위 데이터를 모두 삭제합니다. + """ + result = service.delete_annotation(annotation_id) + return ResponseMessage.success(value=result, code=CommonCode.SUCCESS_DELETE_ANNOTATION) diff --git a/app/api/api_key_api.py b/app/api/api_key_api.py index 53ea9f2..7a26140 100644 --- a/app/api/api_key_api.py +++ b/app/api/api_key_api.py @@ -14,7 +14,7 @@ @router.post( - "/actions", + "/create", response_model=ResponseMessage[APIKeyResponse], summary="API KEY 저장 (처음 한 번)", description="외부 AI 서비스의 API Key를 암호화하여 로컬 데이터베이스에 저장합니다.", diff --git a/app/api/api_router.py b/app/api/api_router.py index 346914a..da2d854 100644 --- a/app/api/api_router.py +++ b/app/api/api_router.py @@ -2,7 +2,7 @@ from fastapi import APIRouter -from app.api import api_key_api, chat_tab_api, driver_api, test_api, user_db_api +from app.api import annotation_api, api_key_api, chat_tab_api, driver_api, test_api, user_db_api api_router = APIRouter() @@ -14,3 +14,4 @@ api_router.include_router(user_db_api.router, prefix="/user/db", tags=["UserDb"]) api_router.include_router(api_key_api.router, prefix="/keys", tags=["API Key"]) api_router.include_router(chat_tab_api.router, prefix="/chats", tags=["AI Chat"]) +api_router.include_router(annotation_api.router, prefix="/annotations", tags=["Annotation"]) diff --git a/app/api/user_db_api.py b/app/api/user_db_api.py index 957d431..ffd6656 100644 --- a/app/api/user_db_api.py +++ b/app/api/user_db_api.py @@ -5,8 +5,9 @@ from app.core.exceptions import APIException from app.core.response import ResponseMessage +from app.core.status import CommonCode from app.schemas.user_db.db_profile_model import DBProfileInfo, UpdateOrCreateDBProfile -from app.schemas.user_db.result_model import ColumnInfo, DBProfile +from app.schemas.user_db.result_model import ColumnInfo, DBProfile, TableInfo from app.services.user_db_service import UserDbService, user_db_service user_db_service_dependency = Depends(lambda: user_db_service) @@ -89,6 +90,7 @@ def delete_profile( def find_all_profile( service: UserDbService = user_db_service_dependency, ) -> ResponseMessage[list[DBProfile]]: + result = service.find_all_profile() if not result.is_successful: @@ -102,6 +104,7 @@ def find_all_profile( summary="특정 DB의 전체 스키마 조회", ) def find_schemas(profile_id: str, service: UserDbService = user_db_service_dependency) -> ResponseMessage[list[str]]: + db_info = service.find_profile(profile_id) result = service.find_schemas(db_info) @@ -118,6 +121,7 @@ def find_schemas(profile_id: str, service: UserDbService = user_db_service_depen def find_tables( profile_id: str, schema_name: str, service: UserDbService = user_db_service_dependency ) -> ResponseMessage[list[str]]: + db_info = service.find_profile(profile_id) result = service.find_tables(db_info, schema_name) @@ -134,9 +138,26 @@ def find_tables( def find_columns( profile_id: str, schema_name: str, table_name: str, service: UserDbService = user_db_service_dependency ) -> ResponseMessage[list[ColumnInfo]]: + db_info = service.find_profile(profile_id) result = service.find_columns(db_info, schema_name, table_name) if not result.is_successful: raise APIException(result.code) return ResponseMessage.success(value=result.columns, code=result.code) + + +@router.get( + "/find/all-schemas/{profile_id}", + response_model=ResponseMessage[list[TableInfo]], + summary="특정 DB의 전체 스키마의 상세 정보 조회", + description="테이블, 컬럼, 제약조건, 인덱스를 포함한 모든 스키마 정보를 반환합니다.", +) +def find_all_schema_info( + profile_id: str, service: UserDbService = user_db_service_dependency +) -> ResponseMessage[list[TableInfo]]: + + db_info = service.find_profile(profile_id) + full_schema_info = service.get_full_schema_info(db_info) + + return ResponseMessage.success(value=full_schema_info, code=CommonCode.SUCCESS) diff --git a/app/core/enum/constraint_type.py b/app/core/enum/constraint_type.py new file mode 100644 index 0000000..d7a3bcb --- /dev/null +++ b/app/core/enum/constraint_type.py @@ -0,0 +1,16 @@ +from enum import Enum + + +class ConstraintTypeEnum(str, Enum): + """ + 데이터베이스 제약 조건의 유형을 정의하는 Enum 클래스입니다. + - str을 상속하여 Enum 멤버를 문자열 값처럼 사용할 수 있습니다. + """ + + PRIMARY_KEY = "PRIMARY KEY" + FOREIGN_KEY = "FOREIGN KEY" + UNIQUE = "UNIQUE" + CHECK = "CHECK" + NOT_NULL = "NOT NULL" # 일부 DB에서는 제약조건으로 취급 + DEFAULT = "DEFAULT" # 일부 DB에서는 제약조건으로 취급 + INDEX = "INDEX" # 제약조건은 아니지만, 관련 정보로 포함 diff --git a/app/core/enum/db_key_prefix_name.py b/app/core/enum/db_key_prefix_name.py index 83aed3b..0e3fa18 100644 --- a/app/core/enum/db_key_prefix_name.py +++ b/app/core/enum/db_key_prefix_name.py @@ -8,4 +8,13 @@ class DBSaveIdEnum(Enum): user_db = "USER-DB" driver = "DRIVER" api_key = "API-KEY" - chat_tab = "CHAT-TAB" + chat_tab = "CHAT_TAB" + + database_annotation = "DB-ANNO" + table_annotation = "TBL-ANNO" + column_annotation = "COL-ANNO" + table_constraint = "TC-ANNO" + constraint_column = "CC-ANNO" + index_annotation = "IDX-ANNO" + index_column = "IC-ANNO" + table_relationship = "TR-ANNO" diff --git a/app/core/status.py b/app/core/status.py index 953b8b4..22d4488 100644 --- a/app/core/status.py +++ b/app/core/status.py @@ -41,6 +41,9 @@ class CommonCode(Enum): SUCCESS_GET_CHAT_MESSAGES = (status.HTTP_200_OK, "2304", "채팅 탭의 모든 메시지를 성공적으로 불러왔습니다.") """ ANNOTATION 성공 코드 - 24xx """ + SUCCESS_CREATE_ANNOTATION = (status.HTTP_201_CREATED, "2400", "어노테이션을 성공적으로 생성하였습니다.") + SUCCESS_FIND_ANNOTATION = (status.HTTP_200_OK, "2401", "어노테이션 정보를 성공적으로 조회하였습니다.") + SUCCESS_DELETE_ANNOTATION = (status.HTTP_200_OK, "2402", "어노테이션을 성공적으로 삭제하였습니다.") """ SQL 성공 코드 - 25xx """ @@ -82,6 +85,7 @@ class CommonCode(Enum): NO_CHAT_TAB_DATA = (status.HTTP_404_NOT_FOUND, "4304", "해당 ID를 가진 채팅 탭을 찾을 수 없습니다.") """ ANNOTATION 클라이언트 에러 코드 - 44xx """ + INVALID_ANNOTATION_REQUEST = (status.HTTP_400_BAD_REQUEST, "4400", "어노테이션 요청 데이터가 유효하지 않습니다.") """ SQL 클라이언트 에러 코드 - 45xx """ @@ -107,6 +111,11 @@ class CommonCode(Enum): FAIL_FIND_SCHEMAS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5102", "디비 스키마 정보 조회 중 에러가 발생했습니다.") FAIL_FIND_TABLES = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5103", "디비 테이블 정보 조회 중 에러가 발생했습니다.") FAIL_FIND_COLUMNS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5104", "디비 컬럼 정보 조회 중 에러가 발생했습니다.") + FAIL_FIND_CONSTRAINTS_OR_INDEXES = ( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "5105", + "디비 제약조건 또는 인덱스 정보 조회 중 에러가 발생했습니다.", + ) FAIL_SAVE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5130", "디비 정보 저장 중 에러가 발생했습니다.") FAIL_UPDATE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5150", "디비 정보 업데이트 중 에러가 발생했습니다.") FAIL_DELETE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5170", "디비 정보 삭제 중 에러가 발생했습니다.") @@ -116,6 +125,15 @@ class CommonCode(Enum): """ AI CHAT, DB 서버 에러 코드 - 53xx """ """ ANNOTATION 서버 에러 코드 - 54xx """ + FAIL_CREATE_ANNOTATION = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5400", "어노테이션 생성 중 에러가 발생했습니다.") + FAIL_FIND_ANNOTATION = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5401", "어노테이션 조회 중 에러가 발생했습니다.") + FAIL_DELETE_ANNOTATION = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5402", "어노테이션 삭제 중 에러가 발생했습니다.") + FAIL_AI_SERVER_CONNECTION = (status.HTTP_503_SERVICE_UNAVAILABLE, "5403", "AI 서버 연결에 실패했습니다.") + FAIL_AI_SERVER_PROCESSING = ( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "5404", + "AI 서버가 요청을 처리하는 데 실패했습니다.", + ) """ SQL 서버 에러 코드 - 55xx """ diff --git a/app/repository/annotation_repository.py b/app/repository/annotation_repository.py new file mode 100644 index 0000000..c32ec61 --- /dev/null +++ b/app/repository/annotation_repository.py @@ -0,0 +1,270 @@ +import sqlite3 + +from app.core.utils import get_db_path +from app.schemas.annotation.db_model import ( + ColumnAnnotationInDB, + ConstraintColumnInDB, + DatabaseAnnotationInDB, + IndexAnnotationInDB, + IndexColumnInDB, + TableAnnotationInDB, + TableConstraintInDB, +) +from app.schemas.annotation.response_model import ( + ColumnAnnotationDetail, + ConstraintDetail, + FullAnnotationResponse, + IndexDetail, + TableAnnotationDetail, +) + + +class AnnotationRepository: + def create_full_annotation( + self, + db_conn: sqlite3.Connection, + db_annotation: DatabaseAnnotationInDB, + table_annotations: list[TableAnnotationInDB], + column_annotations: list[ColumnAnnotationInDB], + constraint_annotations: list[TableConstraintInDB], + constraint_column_annotations: list[ConstraintColumnInDB], + index_annotations: list[IndexAnnotationInDB], + index_column_annotations: list[IndexColumnInDB], + ) -> None: + """ + 하나의 트랜잭션 내에서 전체 어노테이션 데이터를 저장합니다. + - 서비스 계층에서 트랜잭션을 관리하므로 connection을 인자로 받습니다. + - 실패 시 sqlite3.Error를 발생시킵니다. + """ + cursor = db_conn.cursor() + + # Database, Table, Column Annotations 저장 + db_data = ( + db_annotation.id, + db_annotation.db_profile_id, + db_annotation.database_name, + db_annotation.description, + db_annotation.created_at, + db_annotation.updated_at, + ) + cursor.execute( + """ + INSERT INTO database_annotation (id, db_profile_id, database_name, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + db_data, + ) + table_data = [ + (t.id, t.database_annotation_id, t.table_name, t.description, t.created_at, t.updated_at) + for t in table_annotations + ] + cursor.executemany( + """ + INSERT INTO table_annotation (id, database_annotation_id, table_name, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + table_data, + ) + column_data = [ + ( + c.id, + c.table_annotation_id, + c.column_name, + c.data_type, + c.is_nullable, + c.default_value, + c.check_expression, + c.ordinal_position, + c.description, + c.created_at, + c.updated_at, + ) + for c in column_annotations + ] + cursor.executemany( + """ + INSERT INTO column_annotation (id, table_annotation_id, column_name, data_type, is_nullable, default_value, check_expression, ordinal_position, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + column_data, + ) + + # Constraint Annotations 저장 + constraint_data = [ + ( + c.id, + c.table_annotation_id, + c.constraint_type, + c.name, + c.expression, + c.ref_table, + c.on_update_action, + c.on_delete_action, + c.created_at, + c.updated_at, + ) + for c in constraint_annotations + ] + cursor.executemany( + """ + INSERT INTO table_constraint (id, table_annotation_id, constraint_type, name, expression, ref_table, on_update_action, on_delete_action, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + constraint_data, + ) + constraint_column_data = [ + ( + cc.id, + cc.constraint_id, + cc.column_annotation_id, + cc.position, + cc.referenced_column_name, + cc.created_at, + cc.updated_at, + ) + for cc in constraint_column_annotations + ] + cursor.executemany( + """ + INSERT INTO constraint_column (id, constraint_id, column_annotation_id, position, referenced_column_name, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + constraint_column_data, + ) + + # Index Annotations 저장 + index_data = [ + (i.id, i.table_annotation_id, i.name, i.is_unique, i.created_at, i.updated_at) for i in index_annotations + ] + cursor.executemany( + """ + INSERT INTO index_annotation (id, table_annotation_id, name, is_unique, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + index_data, + ) + index_column_data = [ + (ic.id, ic.index_id, ic.column_annotation_id, ic.position, ic.created_at, ic.updated_at) + for ic in index_column_annotations + ] + cursor.executemany( + """ + INSERT INTO index_column (id, index_id, column_annotation_id, position, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + index_column_data, + ) + + def find_full_annotation_by_id(self, annotation_id: str) -> FullAnnotationResponse | None: + """ + annotationId로 전체 어노테이션 상세 정보를 조회합니다. + - 여러 테이블을 JOIN하여 구조화된 데이터를 반환합니다. + - 실패 시 sqlite3.Error를 발생시킵니다. + """ + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute("SELECT * FROM database_annotation WHERE id = ?", (annotation_id,)) + db_row = cursor.fetchone() + if not db_row: + return None + + cursor.execute("SELECT * FROM table_annotation WHERE database_annotation_id = ?", (annotation_id,)) + table_rows = cursor.fetchall() + + tables_details = [] + for table_row in table_rows: + table_id = table_row["id"] + + # 컬럼 정보 + cursor.execute( + "SELECT id, column_name, description FROM column_annotation WHERE table_annotation_id = ?", + (table_id,), + ) + columns = [ColumnAnnotationDetail.model_validate(dict(c)) for c in cursor.fetchall()] + + # 제약조건 정보 + cursor.execute( + """ + SELECT tc.name, tc.constraint_type, ca.column_name + FROM table_constraint tc + JOIN constraint_column cc ON tc.id = cc.constraint_id + JOIN column_annotation ca ON cc.column_annotation_id = ca.id + WHERE tc.table_annotation_id = ? + """, + (table_id,), + ) + constraint_map = {} + for row in cursor.fetchall(): + if row["name"] not in constraint_map: + constraint_map[row["name"]] = {"type": row["constraint_type"], "columns": []} + constraint_map[row["name"]]["columns"].append(row["column_name"]) + constraints = [ + ConstraintDetail(name=k, type=v["type"], columns=v["columns"]) for k, v in constraint_map.items() + ] + + # 인덱스 정보 + cursor.execute( + """ + SELECT ia.name, ia.is_unique, ca.column_name + FROM index_annotation ia + JOIN index_column ic ON ia.id = ic.index_id + JOIN column_annotation ca ON ic.column_annotation_id = ca.id + WHERE ia.table_annotation_id = ? + """, + (table_id,), + ) + index_map = {} + for row in cursor.fetchall(): + if row["name"] not in index_map: + index_map[row["name"]] = {"is_unique": bool(row["is_unique"]), "columns": []} + index_map[row["name"]]["columns"].append(row["column_name"]) + indexes = [ + IndexDetail(name=k, is_unique=v["is_unique"], columns=v["columns"]) for k, v in index_map.items() + ] + + tables_details.append( + TableAnnotationDetail( + id=table_id, + table_name=table_row["table_name"], + description=table_row["description"], + created_at=table_row["created_at"], + updated_at=table_row["updated_at"], + columns=columns, + constraints=constraints, + indexes=indexes, + ) + ) + + db_row_dict = dict(db_row) + db_row_dict["tables"] = tables_details + return FullAnnotationResponse.model_validate(db_row_dict) + finally: + if conn: + conn.close() + + def delete_annotation_by_id(self, annotation_id: str) -> bool: + """ + annotationId로 특정 어노테이션을 삭제합니다. + ON DELETE CASCADE에 의해 하위 데이터도 모두 삭제됩니다. + 성공 시 True, 대상이 없으면 False를 반환합니다. + 실패 시 sqlite3.Error를 발생시킵니다. + """ + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + cursor = conn.cursor() + cursor.execute("DELETE FROM database_annotation WHERE id = ?", (annotation_id,)) + conn.commit() + return cursor.rowcount > 0 + finally: + if conn: + conn.close() + + +annotation_repository = AnnotationRepository() diff --git a/app/repository/user_db_repository.py b/app/repository/user_db_repository.py index cd7a0e0..b437572 100644 --- a/app/repository/user_db_repository.py +++ b/app/repository/user_db_repository.py @@ -13,7 +13,9 @@ ChangeProfileResult, ColumnInfo, ColumnListResult, + ConstraintInfo, DBProfile, + IndexInfo, SchemaListResult, TableListResult, ) @@ -259,6 +261,94 @@ def find_columns( if connection: connection.close() + def find_constraints( + self, driver_module: Any, db_type: str, table_name: str, **kwargs: Any + ) -> list[ConstraintInfo]: + """ + 테이블의 제약 조건 정보를 조회합니다. + - 현재는 SQLite만 지원합니다. + - 실패 시 DB 드라이버의 예외를 직접 발생시킵니다. + """ + connection = None + try: + connection = self._connect(driver_module, **kwargs) + cursor = connection.cursor() + constraints = [] + + if db_type == "sqlite": + # Foreign Key 제약 조건 조회 + fk_list_sql = f"PRAGMA foreign_key_list('{table_name}')" + cursor.execute(fk_list_sql) + fks = cursor.fetchall() + + # Foreign Key 정보를 그룹화 + fk_groups = {} + for fk in fks: + fk_id = fk[0] + if fk_id not in fk_groups: + fk_groups[fk_id] = {"referenced_table": fk[2], "columns": [], "referenced_columns": []} + fk_groups[fk_id]["columns"].append(fk[3]) + fk_groups[fk_id]["referenced_columns"].append(fk[4]) + + for _, group in fk_groups.items(): + constraints.append( + ConstraintInfo( + name=f"fk_{table_name}_{'_'.join(group['columns'])}", + type="FOREIGN KEY", + columns=group["columns"], + referenced_table=group["referenced_table"], + referenced_columns=group["referenced_columns"], + ) + ) + + # 다른 DB 타입에 대한 제약 조건 조회 로직 추가 가능 + # elif db_type == "postgresql": ... + + return constraints + finally: + if connection: + connection.close() + + def find_indexes(self, driver_module: Any, db_type: str, table_name: str, **kwargs: Any) -> list[IndexInfo]: + """ + 테이블의 인덱스 정보를 조회합니다. + - 현재는 SQLite만 지원합니다. + - 실패 시 DB 드라이버의 예외를 직접 발생시킵니다. + """ + connection = None + try: + connection = self._connect(driver_module, **kwargs) + cursor = connection.cursor() + indexes = [] + + if db_type == "sqlite": + index_list_sql = f"PRAGMA index_list('{table_name}')" + cursor.execute(index_list_sql) + raw_indexes = cursor.fetchall() + + for idx in raw_indexes: + index_name = idx[1] + is_unique = idx[2] == 1 + + # "sqlite_autoindex_"로 시작하는 인덱스는 PK에 의해 자동 생성된 것이므로 제외 + if index_name.startswith("sqlite_autoindex_"): + continue + + index_info_sql = f"PRAGMA index_info('{index_name}')" + cursor.execute(index_info_sql) + index_columns = [row[2] for row in cursor.fetchall()] + + if index_columns: + indexes.append(IndexInfo(name=index_name, columns=index_columns, is_unique=is_unique)) + + # 다른 DB 타입에 대한 인덱스 조회 로직 추가 가능 + # elif db_type == "postgresql": ... + + return indexes + finally: + if connection: + connection.close() + # ───────────────────────────── # DB 연결 메서드 # ───────────────────────────── diff --git a/app/schemas/annotation/base_model.py b/app/schemas/annotation/base_model.py new file mode 100644 index 0000000..970ea5e --- /dev/null +++ b/app/schemas/annotation/base_model.py @@ -0,0 +1,25 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + +from app.core.exceptions import APIException +from app.core.status import CommonCode + + +class AnnotationBase(BaseModel): + """어노테이션 스키마의 기본 모델""" + + id: str = Field(..., description="고유 ID") + created_at: datetime = Field(..., description="생성 시각") + updated_at: datetime = Field(..., description="마지막 수정 시각") + + +class RequestBase(BaseModel): + """요청 스키마의 기본 모델""" + + def validate_required_fields(self, fields: list[str]): + """필수 필드가 비어있는지 검사하는 공통 유효성 검사 메서드""" + for field_name in fields: + value = getattr(self, field_name, None) + if not value or (isinstance(value, str) and not value.strip()): + raise APIException(CommonCode.INVALID_PARAMETER, detail=f"'{field_name}' 필드는 비워둘 수 없습니다.") diff --git a/app/schemas/annotation/db_model.py b/app/schemas/annotation/db_model.py new file mode 100644 index 0000000..667765b --- /dev/null +++ b/app/schemas/annotation/db_model.py @@ -0,0 +1,64 @@ +from pydantic import Field + +from app.core.enum.constraint_type import ConstraintTypeEnum +from app.schemas.annotation.base_model import AnnotationBase + + +class DatabaseAnnotationInDB(AnnotationBase): + db_profile_id: str + database_name: str + description: str | None = Field(None, description="AI가 생성한 설명") + + +class TableAnnotationInDB(AnnotationBase): + database_annotation_id: str + table_name: str + description: str | None = Field(None, description="AI가 생성한 설명") + + +class ColumnAnnotationInDB(AnnotationBase): + table_annotation_id: str + column_name: str + data_type: str | None = None + is_nullable: int = 1 + default_value: str | None = None + check_expression: str | None = None + ordinal_position: int | None = None + description: str | None = Field(None, description="AI가 생성한 설명") + + +class TableRelationshipInDB(AnnotationBase): + database_annotation_id: str + from_table_id: str + to_table_id: str + relationship_type: str + description: str | None = Field(None, description="AI가 생성한 설명") + + +class TableConstraintInDB(AnnotationBase): + table_annotation_id: str + constraint_type: ConstraintTypeEnum + name: str | None = None + expression: str | None = None + ref_table: str | None = None + on_update_action: str | None = None + on_delete_action: str | None = None + + +class ConstraintColumnInDB(AnnotationBase): + constraint_id: str + column_annotation_id: str + position: int | None = None + referenced_column_name: str | None = None + + +class IndexAnnotationInDB(AnnotationBase): + table_annotation_id: str + name: str | None = None + is_unique: int = 0 + + +class IndexColumnInDB(AnnotationBase): + index_id: str + column_annotation_id: str + position: int | None = None diff --git a/app/schemas/annotation/request_model.py b/app/schemas/annotation/request_model.py new file mode 100644 index 0000000..1c5c40e --- /dev/null +++ b/app/schemas/annotation/request_model.py @@ -0,0 +1,12 @@ +from pydantic import Field + +from app.schemas.annotation.base_model import RequestBase + + +class AnnotationCreateRequest(RequestBase): + """어노테이션 생성 요청 스키마""" + + db_profile_id: str = Field(..., description="어노테이션을 생성할 DB 프로필의 고유 ID") + + def validate(self): + self.validate_required_fields(["db_profile_id"]) diff --git a/app/schemas/annotation/response_model.py b/app/schemas/annotation/response_model.py new file mode 100644 index 0000000..5602e0e --- /dev/null +++ b/app/schemas/annotation/response_model.py @@ -0,0 +1,61 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + +from app.schemas.annotation.base_model import AnnotationBase + + +# 상세 정보 모델 (조회 시 사용) +class ColumnAnnotationDetail(BaseModel): + id: str + column_name: str + description: str | None = None + + +class ConstraintDetail(BaseModel): + name: str | None = None + type: str + columns: list[str] + description: str | None = None # AI가 생성해줄 수 있음 + + +class IndexDetail(BaseModel): + name: str | None = None + columns: list[str] + is_unique: bool + description: str | None = None # AI가 생성해줄 수 있음 + + +class TableAnnotationDetail(AnnotationBase): + table_name: str + description: str | None = None + columns: list[ColumnAnnotationDetail] + constraints: list[ConstraintDetail] + indexes: list[IndexDetail] + + +class FullAnnotationResponse(AnnotationBase): + """전체 어노테이션 상세 정보 응답 스키마""" + + db_profile_id: str = Field(..., description="DB 프로필의 고유 ID") + database_name: str = Field(..., description="데이터베이스 이름") + description: str | None = Field(None, description="데이터베이스 전체에 대한 설명") + tables: list[TableAnnotationDetail] = Field([], description="테이블 어노테이션 목록") + + +# 간단한 생성/삭제 응답 모델 +# 필요할지는 모르겠음 +class AnnotationCreationSummary(BaseModel): + """어노테이션 생성 결과 요약 응답 스키마""" + + id: str = Field(..., description="생성된 어노테이션의 고유 ID") + db_profile_id: str = Field(..., description="DB 프로필의 고유 ID") + database_name: str = Field(..., description="데이터베이스 이름") + created_at: datetime = Field(..., description="어노테이션 생성 시각") + + +class AnnotationDeleteResponse(BaseModel): + """어노테이션 삭제 API 응답 스키마""" + + id: str = Field(..., description="삭제된 어노테이션의 고유 ID") + message: str = Field("성공적으로 삭제되었습니다.", description="삭제 결과 메시지") diff --git a/app/schemas/user_db/result_model.py b/app/schemas/user_db/result_model.py index ea8ad73..64d9190 100644 --- a/app/schemas/user_db/result_model.py +++ b/app/schemas/user_db/result_model.py @@ -55,11 +55,34 @@ class ColumnInfo(BaseModel): is_pk: bool = Field(False, description="기본 키(Primary Key) 여부") +class ConstraintInfo(BaseModel): + """테이블 제약 조건 정보를 담는 모델""" + + name: str | None = Field(None, description="제약 조건 이름") + type: str = Field(..., description="제약 조건 타입 (PRIMARY KEY, FOREIGN KEY, UNIQUE, CHECK)") + columns: list[str] = Field(..., description="제약 조건에 포함된 컬럼 목록") + # FOREIGN KEY 관련 필드 + referenced_table: str | None = Field(None, description="참조하는 테이블 (FK)") + referenced_columns: list[str] | None = Field(None, description="참조하는 테이블의 컬럼 (FK)") + # CHECK 관련 필드 + check_expression: str | None = Field(None, description="CHECK 제약 조건 표현식") + + +class IndexInfo(BaseModel): + """테이블 인덱스 정보를 담는 모델""" + + name: str | None = Field(None, description="인덱스 이름") + columns: list[str] = Field(..., description="인덱스에 포함된 컬럼 목록") + is_unique: bool = Field(False, description="고유 인덱스 여부") + + class TableInfo(BaseModel): - """단일 테이블의 이름과 컬럼 목록을 담는 모델""" + """단일 테이블의 이름과 상세 정보를 담는 모델""" name: str = Field(..., description="테이블 이름") columns: list[ColumnInfo] = Field([], description="컬럼 목록") + constraints: list[ConstraintInfo] = Field([], description="제약 조건 목록") + indexes: list[IndexInfo] = Field([], description="인덱스 목록") comment: str | None = Field(None, description="테이블 코멘트") diff --git a/app/services/annotation_service.py b/app/services/annotation_service.py new file mode 100644 index 0000000..b217586 --- /dev/null +++ b/app/services/annotation_service.py @@ -0,0 +1,267 @@ +import sqlite3 +from datetime import datetime +from typing import Any + +from fastapi import Depends + +from app.core.enum.constraint_type import ConstraintTypeEnum +from app.core.enum.db_key_prefix_name import DBSaveIdEnum +from app.core.exceptions import APIException +from app.core.status import CommonCode +from app.core.utils import generate_prefixed_uuid, get_db_path +from app.repository.annotation_repository import AnnotationRepository, annotation_repository +from app.schemas.annotation.db_model import ( + ColumnAnnotationInDB, + ConstraintColumnInDB, + DatabaseAnnotationInDB, + IndexAnnotationInDB, + IndexColumnInDB, + TableAnnotationInDB, + TableConstraintInDB, +) +from app.schemas.annotation.request_model import AnnotationCreateRequest +from app.schemas.annotation.response_model import AnnotationDeleteResponse, FullAnnotationResponse +from app.schemas.user_db.db_profile_model import AllDBProfileInfo +from app.schemas.user_db.result_model import TableInfo as UserDBTableInfo +from app.services.user_db_service import UserDbService, user_db_service + +annotation_repository_dependency = Depends(lambda: annotation_repository) +user_db_service_dependency = Depends(lambda: user_db_service) + +# AI 서버의 주소 (임시) +AI_SERVER_URL = "http://localhost:8001/api/v1/annotate/database" + + +class AnnotationService: + def __init__( + self, repository: AnnotationRepository = annotation_repository, user_db_serv: UserDbService = user_db_service + ): + self.repository = repository + self.user_db_service = user_db_serv + + async def create_annotation(self, request: AnnotationCreateRequest) -> FullAnnotationResponse: + """ + 어노테이션 생성을 위한 전체 프로세스를 관장합니다. + 1. DB 프로필 및 전체 스키마 정보 조회 + 2. TODO: AI 서버에 요청 (현재는 Mock 데이터 사용) + 3. 트랜잭션 내에서 전체 어노테이션 정보 저장 + """ + try: + request.validate() + except ValueError as e: + raise APIException(CommonCode.INVALID_ANNOTATION_REQUEST, detail=str(e)) from e + + # 1. DB 프로필 및 전체 스키마 정보 조회 + db_profile = self.user_db_service.find_profile(request.db_profile_id) + full_schema_info = self.user_db_service.get_full_schema_info(db_profile) + + # 2. AI 서버에 요청 (현재는 Mock 데이터 사용) + ai_response = await self._request_annotation_to_ai_server(full_schema_info) + + # 3. 트랜잭션 내에서 전체 어노테이션 정보 저장 + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + conn.execute("BEGIN") + + db_models = self._transform_ai_response_to_db_models(ai_response, db_profile, request.db_profile_id) + self.repository.create_full_annotation(db_conn=conn, **db_models) + + conn.commit() + annotation_id = db_models["db_annotation"].id + + except sqlite3.Error as e: + if conn: + conn.rollback() + raise APIException(CommonCode.FAIL_CREATE_ANNOTATION, detail=f"Database transaction failed: {e}") from e + finally: + if conn: + conn.close() + + return self.get_full_annotation(annotation_id) + + def _transform_ai_response_to_db_models( + self, ai_response: dict[str, Any], db_profile: AllDBProfileInfo, db_profile_id: str + ) -> dict[str, Any]: + now = datetime.now() + annotation_id = generate_prefixed_uuid(DBSaveIdEnum.database_annotation.value) + + db_anno = DatabaseAnnotationInDB( + id=annotation_id, + db_profile_id=db_profile_id, + database_name=db_profile.name, + description=ai_response.get("database_annotation"), + created_at=now, + updated_at=now, + ) + + table_annos, col_annos, constraint_annos, constraint_col_annos, index_annos, index_col_annos = ( + [], + [], + [], + [], + [], + [], + ) + + for tbl_data in ai_response.get("tables", []): + table_id = generate_prefixed_uuid(DBSaveIdEnum.table_annotation.value) + table_annos.append( + TableAnnotationInDB( + id=table_id, + database_annotation_id=annotation_id, + table_name=tbl_data["table_name"], + description=tbl_data.get("annotation"), + created_at=now, + updated_at=now, + ) + ) + + col_map = { + col["column_name"]: generate_prefixed_uuid(DBSaveIdEnum.column_annotation.value) + for col in tbl_data.get("columns", []) + } + + for col_data in tbl_data.get("columns", []): + col_annos.append( + ColumnAnnotationInDB( + id=col_map[col_data["column_name"]], + table_annotation_id=table_id, + column_name=col_data["column_name"], + description=col_data.get("annotation"), + created_at=now, + updated_at=now, + ) + ) + + for const_data in tbl_data.get("constraints", []): + const_id = generate_prefixed_uuid(DBSaveIdEnum.table_constraint.value) + constraint_annos.append( + TableConstraintInDB( + id=const_id, + table_annotation_id=table_id, + name=const_data["name"], + constraint_type=ConstraintTypeEnum(const_data["type"]), + created_at=now, + updated_at=now, + ) + ) + for col_name in const_data.get("columns", []): + constraint_col_annos.append( + ConstraintColumnInDB( + id=generate_prefixed_uuid(DBSaveIdEnum.constraint_column.value), + constraint_id=const_id, + column_annotation_id=col_map[col_name], + created_at=now, + updated_at=now, + ) + ) + + for idx_data in tbl_data.get("indexes", []): + idx_id = generate_prefixed_uuid(DBSaveIdEnum.index_annotation.value) + index_annos.append( + IndexAnnotationInDB( + id=idx_id, + table_annotation_id=table_id, + name=idx_data["name"], + is_unique=1 if idx_data.get("is_unique") else 0, + created_at=now, + updated_at=now, + ) + ) + for col_name in idx_data.get("columns", []): + index_col_annos.append( + IndexColumnInDB( + id=generate_prefixed_uuid(DBSaveIdEnum.index_column.value), + index_id=idx_id, + column_annotation_id=col_map[col_name], + created_at=now, + updated_at=now, + ) + ) + + return { + "db_annotation": db_anno, + "table_annotations": table_annos, + "column_annotations": col_annos, + "constraint_annotations": constraint_annos, + "constraint_column_annotations": constraint_col_annos, + "index_annotations": index_annos, + "index_column_annotations": index_col_annos, + } + + def get_full_annotation(self, annotation_id: str) -> FullAnnotationResponse: + try: + annotation = self.repository.find_full_annotation_by_id(annotation_id) + if not annotation: + raise APIException(CommonCode.NO_SEARCH_DATA) + return annotation + except sqlite3.Error as e: + raise APIException(CommonCode.FAIL_FIND_ANNOTATION) from e + + def delete_annotation(self, annotation_id: str) -> AnnotationDeleteResponse: + try: + is_deleted = self.repository.delete_annotation_by_id(annotation_id) + if not is_deleted: + raise APIException(CommonCode.NO_SEARCH_DATA) + return AnnotationDeleteResponse(id=annotation_id) + except sqlite3.Error as e: + raise APIException(CommonCode.FAIL_DELETE_ANNOTATION) from e + + async def _request_annotation_to_ai_server(self, schema_info: list[UserDBTableInfo]) -> dict: + """AI 서버에 스키마 정보를 보내고 어노테이션을 받아옵니다.""" + # 우선은 목업 데이터 활용 + return self._get_mock_ai_response(schema_info) + + # Real implementation below + # request_body = {"database_schema": {"tables": [table.model_dump() for table in schema_info]}} + # async with httpx.AsyncClient() as client: + # try: + # response = await client.post(AI_SERVER_URL, json=request_body, timeout=60.0) + # response.raise_for_status() + # return response.json() + # except httpx.HTTPStatusError as e: + # raise APIException(CommonCode.FAIL_AI_SERVER_PROCESSING, detail=f"AI server error: {e.response.text}") from e + # except httpx.RequestError as e: + # raise APIException(CommonCode.FAIL_AI_SERVER_CONNECTION, detail=f"AI server connection failed: {e}") from e + + def _get_mock_ai_response(self, schema_info: list[UserDBTableInfo]) -> dict: + """테스트를 위한 Mock AI 서버 응답 생성""" + mock_response = { + "database_annotation": "Mock: 데이터베이스 전체에 대한 설명입니다.", + "tables": [], + "relationships": [], + } + for table in schema_info: + mock_table = { + "table_name": table.name, + "annotation": f"Mock: '{table.name}' 테이블에 대한 설명입니다.", + "columns": [ + {"column_name": col.name, "annotation": f"Mock: '{col.name}' 컬럼에 대한 설명입니다."} + for col in table.columns + ], + "constraints": [ + { + "name": c.name, + "type": c.type, + "columns": c.columns, + "annotation": f"Mock: 제약조건 '{c.name}' 설명.", + } + for c in table.constraints + ], + "indexes": [ + { + "name": i.name, + "columns": i.columns, + "is_unique": i.is_unique, + "annotation": f"Mock: 인덱스 '{i.name}' 설명.", + } + for i in table.indexes + ], + } + mock_response["tables"].append(mock_table) + return mock_response + + +annotation_service = AnnotationService() diff --git a/app/services/user_db_service.py b/app/services/user_db_service.py index 9463d85..3afe6fd 100644 --- a/app/services/user_db_service.py +++ b/app/services/user_db_service.py @@ -19,6 +19,7 @@ ChangeProfileResult, ColumnListResult, SchemaInfoResult, + TableInfo, TableListResult, ) @@ -149,6 +150,73 @@ def find_columns( except Exception as e: raise APIException(CommonCode.FAIL) from e + def get_full_schema_info( + self, db_info: AllDBProfileInfo, repository: UserDbRepository = user_db_repository + ) -> SchemaInfoResult: + """ + DB 프로필 정보를 받아 해당 데이터베이스의 전체 스키마 정보 + (테이블, 컬럼, 제약조건, 인덱스)를 조회하여 반환합니다. + """ + try: + driver_module = self._get_driver_module(db_info.type) + connect_kwargs = self._prepare_connection_args(db_info) + + # 1. 모든 스키마(DB) 목록 조회 + schemas_result = repository.find_schemas( + driver_module, self._get_schema_query(db_info.type), **connect_kwargs + ) + if not schemas_result.is_successful: + raise APIException(schemas_result.code) + + full_schema_info = [] + + # 2. 각 스키마의 모든 테이블 목록 조회 + for schema_name in schemas_result.schemas: + tables_result = repository.find_tables( + driver_module, self._get_table_query(db_info.type), schema_name, **connect_kwargs + ) + if not tables_result.is_successful: + # 특정 스키마에서 테이블 조회 실패 시 건너뛰거나 로깅 + continue + + # 3. 각 테이블의 상세 정보 조회 + for table_name in tables_result.tables: + columns_result = repository.find_columns( + driver_module, + self._get_column_query(db_info.type), + schema_name, + db_info.type, + table_name, + **connect_kwargs, + ) + + try: + constraints = repository.find_constraints( + driver_module, db_info.type, table_name, **connect_kwargs + ) + indexes = repository.find_indexes(driver_module, db_info.type, table_name, **connect_kwargs) + except sqlite3.Error as e: + # 레포지토리에서 발생한 DB 예외를 서비스에서 처리 + raise APIException(CommonCode.FAIL_FIND_CONSTRAINTS_OR_INDEXES) from e + + table_info = TableInfo( + name=table_name, + columns=columns_result.columns if columns_result.is_successful else [], + constraints=constraints, + indexes=indexes, + comment=None, # 테이블 코멘트는 현재 조회 로직에 없음 + ) + full_schema_info.append(table_info) + + return full_schema_info + + except APIException: + # 이미 APIException인 경우 그대로 전달 + raise + except Exception as e: + # 그 외 모든 예외는 일반 실패로 처리 + raise APIException(CommonCode.FAIL) from e + def _get_driver_module(self, db_type: str): """ DB 타입에 따라 동적으로 드라이버 모듈을 로드합니다.