From 2e5dc532a08c33911c6fd7415c5f83ff0cbd1c00 Mon Sep 17 00:00:00 2001 From: Lan Date: Sun, 25 Jan 2026 21:28:16 +0800 Subject: [PATCH 1/4] perf: optimize memory usage for large file uploads --- core/storage.py | 355 ++++++++++++++++++++++++++++++------------------ 1 file changed, 220 insertions(+), 135 deletions(-) diff --git a/core/storage.py b/core/storage.py index 4c6b73347..a40562676 100644 --- a/core/storage.py +++ b/core/storage.py @@ -4,6 +4,8 @@ # @Software: PyCharm import base64 import hashlib +import os +import tempfile from core.logger import logger import shutil from typing import Optional @@ -14,7 +16,6 @@ import asyncio from pathlib import Path import datetime -import io import re import aioboto3 from botocore.config import Config @@ -285,11 +286,12 @@ async def save_file(self, file: UploadFile, save_path: str): region_name=self.region_name, config=Config(signature_version=self.signature_version), ) as s3: - await s3.put_object( - Bucket=self.bucket_name, - Key=save_path, - Body=await file.read(), - ContentType=file.content_type, + # 使用 upload_fileobj 流式上传,避免将整个文件加载到内存 + await s3.upload_fileobj( + file.file, + self.bucket_name, + save_path, + ExtraArgs={"ContentType": file.content_type or "application/octet-stream"}, ) async def delete_file(self, file_code: FileCodes): @@ -425,12 +427,11 @@ async def save_chunk(self, upload_id: str, chunk_index: int, chunk_data: bytes, async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]: """ 合并 S3 上的分片文件 - 由于分片是独立对象存储的,需要下载后合并再上传 + 使用 S3 的 multipart upload API 实现流式合并,避免内存问题 """ file_sha256 = hashlib.sha256() chunk_dir = str(Path(save_path).parent / "chunks" / upload_id) - merged_data = io.BytesIO() - + async with self.session.client( 's3', endpoint_url=self.endpoint_url, @@ -438,38 +439,70 @@ async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: region_name=self.region_name, config=Config(signature_version=self.signature_version), ) as s3: - # 按顺序读取并验证每个分片 - for i in range(chunk_info.total_chunks): - chunk_key = f"{chunk_dir}/{i}.part" - chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() - if not chunk_record: - raise ValueError(f"分片{i}记录不存在") - - try: - response = await s3.get_object( - Bucket=self.bucket_name, - Key=chunk_key - ) - chunk_data = await response['Body'].read() - except Exception as e: - raise ValueError(f"分片{i}文件不存在: {e}") - - current_hash = hashlib.sha256(chunk_data).hexdigest() - if current_hash != chunk_record.chunk_hash: - raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") - - file_sha256.update(chunk_data) - merged_data.write(chunk_data) - - # 上传合并后的文件 - merged_data.seek(0) - await s3.put_object( + # 创建 multipart upload + mpu = await s3.create_multipart_upload( Bucket=self.bucket_name, Key=save_path, - Body=merged_data.getvalue(), ContentType='application/octet-stream' ) - + mpu_id = mpu['UploadId'] + parts = [] + + try: + # 按顺序读取、验证并上传每个分片 + for i in range(chunk_info.total_chunks): + chunk_key = f"{chunk_dir}/{i}.part" + chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() + if not chunk_record: + raise ValueError(f"分片{i}记录不存在") + + try: + response = await s3.get_object( + Bucket=self.bucket_name, + Key=chunk_key + ) + chunk_data = await response['Body'].read() + except Exception as e: + raise ValueError(f"分片{i}文件不存在: {e}") + + current_hash = hashlib.sha256(chunk_data).hexdigest() + if current_hash != chunk_record.chunk_hash: + raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") + + file_sha256.update(chunk_data) + + # 上传分片到 multipart upload + part_response = await s3.upload_part( + Bucket=self.bucket_name, + Key=save_path, + UploadId=mpu_id, + PartNumber=i + 1, # S3 part numbers start at 1 + Body=chunk_data + ) + parts.append({ + 'PartNumber': i + 1, + 'ETag': part_response['ETag'] + }) + + # 释放内存 + del chunk_data + + # 完成 multipart upload + await s3.complete_multipart_upload( + Bucket=self.bucket_name, + Key=save_path, + UploadId=mpu_id, + MultipartUpload={'Parts': parts} + ) + except Exception as e: + # 出错时取消 multipart upload + await s3.abort_multipart_upload( + Bucket=self.bucket_name, + Key=save_path, + UploadId=mpu_id + ) + raise e + return save_path, file_sha256.hexdigest() async def clean_chunks(self, upload_id: str, save_path: str): @@ -762,33 +795,44 @@ def _upload_merged(self, save_path: str, data: bytes): current_folder.upload(filename, data).execute_query() async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]: - """合并 OneDrive 上的分片文件""" + """合并 OneDrive 上的分片文件,使用临时文件避免内存问题""" file_sha256 = hashlib.sha256() chunk_dir = str(Path(save_path).parent / "chunks" / upload_id) - merged_data = io.BytesIO() - - for i in range(chunk_info.total_chunks): - chunk_path = f"{chunk_dir}/{i}.part" - chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() - if not chunk_record: - raise ValueError(f"分片{i}记录不存在") - - try: - chunk_data = await asyncio.to_thread(self._read_chunk, chunk_path) - except Exception as e: - raise ValueError(f"分片{i}文件不存在: {e}") - - current_hash = hashlib.sha256(chunk_data).hexdigest() - if current_hash != chunk_record.chunk_hash: - raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") - - file_sha256.update(chunk_data) - merged_data.write(chunk_data) - - # 上传合并后的文件 - merged_data.seek(0) - await asyncio.to_thread(self._upload_merged, save_path, merged_data.getvalue()) - + + # 使用临时文件存储合并数据 + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_path = temp_file.name + + try: + async with aiofiles.open(temp_path, 'wb') as out_file: + for i in range(chunk_info.total_chunks): + chunk_path = f"{chunk_dir}/{i}.part" + chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() + if not chunk_record: + raise ValueError(f"分片{i}记录不存在") + + try: + chunk_data = await asyncio.to_thread(self._read_chunk, chunk_path) + except Exception as e: + raise ValueError(f"分片{i}文件不存在: {e}") + + current_hash = hashlib.sha256(chunk_data).hexdigest() + if current_hash != chunk_record.chunk_hash: + raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") + + file_sha256.update(chunk_data) + await out_file.write(chunk_data) + del chunk_data # 释放内存 + + # 读取临时文件并上传 + async with aiofiles.open(temp_path, 'rb') as f: + merged_content = await f.read() + await asyncio.to_thread(self._upload_merged, save_path, merged_content) + finally: + # 清理临时文件 + if os.path.exists(temp_path): + os.unlink(temp_path) + return save_path, file_sha256.hexdigest() def _delete_chunk_dir(self, chunk_dir: str): @@ -845,7 +889,9 @@ def __init__(self): ) async def save_file(self, file: UploadFile, save_path: str): - await self.operator.write(save_path, file.file.read()) + # 使用 asyncio.to_thread 避免阻塞事件循环 + content = await asyncio.to_thread(file.file.read) + await self.operator.write(save_path, content) async def delete_file(self, file_code: FileCodes): await self.operator.delete(await file_code.get_file_path()) @@ -915,32 +961,43 @@ async def save_chunk(self, upload_id: str, chunk_index: int, chunk_data: bytes, await self.operator.write(chunk_path, chunk_data) async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]: - """合并 OpenDAL 存储上的分片文件""" + """合并 OpenDAL 存储上的分片文件,使用临时文件避免内存问题""" file_sha256 = hashlib.sha256() chunk_dir = str(Path(save_path).parent / "chunks" / upload_id) - merged_data = io.BytesIO() - - for i in range(chunk_info.total_chunks): - chunk_path = f"{chunk_dir}/{i}.part" - chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() - if not chunk_record: - raise ValueError(f"分片{i}记录不存在") - - try: - chunk_data = await self.operator.read(chunk_path) - except Exception as e: - raise ValueError(f"分片{i}文件不存在: {e}") - - current_hash = hashlib.sha256(chunk_data).hexdigest() - if current_hash != chunk_record.chunk_hash: - raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") - - file_sha256.update(chunk_data) - merged_data.write(chunk_data) - - # 写入合并后的文件 - merged_data.seek(0) - await self.operator.write(save_path, merged_data.getvalue()) + + # 使用临时文件存储合并数据 + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_path = temp_file.name + + try: + async with aiofiles.open(temp_path, 'wb') as out_file: + for i in range(chunk_info.total_chunks): + chunk_path = f"{chunk_dir}/{i}.part" + chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() + if not chunk_record: + raise ValueError(f"分片{i}记录不存在") + + try: + chunk_data = await self.operator.read(chunk_path) + except Exception as e: + raise ValueError(f"分片{i}文件不存在: {e}") + + current_hash = hashlib.sha256(chunk_data).hexdigest() + if current_hash != chunk_record.chunk_hash: + raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") + + file_sha256.update(chunk_data) + await out_file.write(chunk_data) + del chunk_data # 释放内存 + + # 读取临时文件并写入存储 + async with aiofiles.open(temp_path, 'rb') as f: + merged_content = await f.read() + await self.operator.write(save_path, merged_content) + finally: + # 清理临时文件 + if os.path.exists(temp_path): + os.unlink(temp_path) return save_path, file_sha256.hexdigest() @@ -1033,7 +1090,7 @@ async def _delete_empty_dirs(self, file_path: str, session: aiohttp.ClientSessio current_path = current_path.parent async def save_file(self, file: UploadFile, save_path: str): - """保存文件(自动创建目录)""" + """保存文件(自动创建目录,流式上传)""" path_obj = Path(save_path) directory_path = str(path_obj.parent) # 提取原始文件名并进行清理 @@ -1044,13 +1101,23 @@ async def save_file(self, file: UploadFile, save_path: str): try: # 先创建目录结构 await self._mkdir_p(directory_path) - # 上传文件 + # 上传文件(流式) url = self._build_url(safe_save_path) + + async def file_sender(): + """流式读取文件内容""" + chunk_size = 256 * 1024 # 256KB chunks + while True: + chunk = await asyncio.to_thread(file.file.read, chunk_size) + if not chunk: + break + yield chunk + async with aiohttp.ClientSession(auth=self.auth) as session: - content = await file.read() async with session.put( - url, data=content, headers={ - "Content-Type": file.content_type} + url, + data=file_sender(), + headers={"Content-Type": file.content_type or "application/octet-stream"} ) as resp: if resp.status not in (200, 201, 204): content = await resp.text() @@ -1161,52 +1228,70 @@ async def save_chunk(self, upload_id: str, chunk_index: int, chunk_data: bytes, async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]: """ 合并 WebDAV 上的分片文件 - 由于大多数 WebDAV 服务器不支持 PATCH 追加,这里下载所有分片后合并上传 + 使用临时文件避免内存问题 """ file_sha256 = hashlib.sha256() chunk_dir = str(Path(save_path).parent / "chunks" / upload_id) - merged_data = io.BytesIO() - - async with aiohttp.ClientSession(auth=self.auth) as session: - # 按顺序读取并验证每个分片 - for i in range(chunk_info.total_chunks): - chunk_path = f"{chunk_dir}/{i}.part" - chunk_url = self._build_url(chunk_path) - - # 获取分片记录 - chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() - if not chunk_record: - raise ValueError(f"分片{i}记录不存在") - - # 下载分片数据 - async with session.get(chunk_url) as resp: - if resp.status != 200: - raise ValueError(f"分片{i}文件不存在或无法访问") - chunk_data = await resp.read() - - # 验证哈希 - current_hash = hashlib.sha256(chunk_data).hexdigest() - if current_hash != chunk_record.chunk_hash: - raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") - - file_sha256.update(chunk_data) - merged_data.write(chunk_data) - - # 确保目标目录存在 - output_dir = str(Path(save_path).parent) - await self._mkdir_p(output_dir) - - # 上传合并后的文件 - output_url = self._build_url(save_path) - merged_data.seek(0) - async with session.put(output_url, data=merged_data.getvalue()) as resp: - if resp.status not in (200, 201, 204): - content = await resp.text() - raise HTTPException( - status_code=resp.status, - detail=f"合并文件上传失败: {content[:200]}" - ) - + + # 使用临时文件存储合并数据,避免内存问题 + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_path = temp_file.name + + try: + async with aiohttp.ClientSession(auth=self.auth) as session: + # 按顺序读取并验证每个分片,写入临时文件 + async with aiofiles.open(temp_path, 'wb') as out_file: + for i in range(chunk_info.total_chunks): + chunk_path = f"{chunk_dir}/{i}.part" + chunk_url = self._build_url(chunk_path) + + # 获取分片记录 + chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() + if not chunk_record: + raise ValueError(f"分片{i}记录不存在") + + # 下载分片数据 + async with session.get(chunk_url) as resp: + if resp.status != 200: + raise ValueError(f"分片{i}文件不存在或无法访问") + chunk_data = await resp.read() + + # 验证哈希 + current_hash = hashlib.sha256(chunk_data).hexdigest() + if current_hash != chunk_record.chunk_hash: + raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") + + file_sha256.update(chunk_data) + await out_file.write(chunk_data) + del chunk_data # 释放内存 + + # 确保目标目录存在 + output_dir = str(Path(save_path).parent) + await self._mkdir_p(output_dir) + + # 流式上传合并后的文件 + output_url = self._build_url(save_path) + + async def file_sender(): + async with aiofiles.open(temp_path, 'rb') as f: + while True: + chunk = await f.read(256 * 1024) + if not chunk: + break + yield chunk + + async with session.put(output_url, data=file_sender()) as resp: + if resp.status not in (200, 201, 204): + content = await resp.text() + raise HTTPException( + status_code=resp.status, + detail=f"合并文件上传失败: {content[:200]}" + ) + finally: + # 清理临时文件 + if os.path.exists(temp_path): + os.unlink(temp_path) + return save_path, file_sha256.hexdigest() async def clean_chunks(self, upload_id: str, save_path: str): From a3f599fe517ea76313dd9f3aabe00e24e1ecc81e Mon Sep 17 00:00:00 2001 From: Lan Date: Sun, 25 Jan 2026 23:40:15 +0800 Subject: [PATCH 2/4] refactor: streamline admin authorization logic and improve token handling --- apps/admin/dependencies.py | 74 +++++++++++++++++--------------------- apps/admin/views.py | 15 +++----- 2 files changed, 37 insertions(+), 52 deletions(-) diff --git a/apps/admin/dependencies.py b/apps/admin/dependencies.py index 8309b0794..205779bbb 100644 --- a/apps/admin/dependencies.py +++ b/apps/admin/dependencies.py @@ -50,7 +50,7 @@ def verify_token(token: str) -> dict: ).digest() expected_signature_b64 = base64.b64encode(expected_signature).decode() - if signature_b64 != expected_signature_b64: + if not hmac.compare_digest(signature_b64, expected_signature_b64): raise ValueError("无效的签名") # 解码payload @@ -65,39 +65,41 @@ def verify_token(token: str) -> dict: raise ValueError(f"token验证失败: {str(e)}") +def _extract_bearer_token(authorization: str) -> str: + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="未授权或授权校验失败") + token = authorization.split(" ", 1)[1].strip() + if not token: + raise HTTPException(status_code=401, detail="未授权或授权校验失败") + return token + + +def _require_admin_payload(authorization: str) -> dict: + token = _extract_bearer_token(authorization) + try: + payload = verify_token(token) + except ValueError as e: + raise HTTPException(status_code=401, detail=str(e)) + if not payload.get("is_admin", False): + raise HTTPException(status_code=401, detail="未授权或授权校验失败") + return payload + + +ADMIN_PUBLIC_ENDPOINTS = {("POST", "/admin/login")} + + async def admin_required( authorization: str = Header(default=None), request: Request = None ): """ 验证管理员权限 """ - try: - if not authorization or not authorization.startswith("Bearer "): - is_admin = False - else: - try: - token = authorization.split(" ")[1] - payload = verify_token(token) - is_admin = payload.get("is_admin", False) - except ValueError as e: - is_admin = False - - if request.url.path.startswith("/share/"): - if not settings.openUpload and not is_admin: - raise HTTPException( - status_code=403, detail="本站未开启游客上传,如需上传请先登录后台" - ) - else: - if not is_admin: - raise HTTPException(status_code=401, detail="未授权或授权校验失败") - return is_admin - except ValueError as e: - raise HTTPException(status_code=401, detail=str(e)) + if request and (request.method, request.url.path) in ADMIN_PUBLIC_ENDPOINTS: + return None + return _require_admin_payload(authorization) -async def share_required_login( - authorization: str = Header(default=None), request: Request = None -): +async def share_required_login(authorization: str = Header(default=None)): """ 验证分享上传权限 @@ -109,21 +111,11 @@ async def share_required_login( :return: 验证结果 """ if not settings.openUpload: - try: - if not authorization or not authorization.startswith("Bearer "): - raise HTTPException( - status_code=403, detail="本站未开启游客上传,如需上传请先登录后台" - ) - - token = authorization.split(" ")[1] - try: - payload = verify_token(token) - if not payload.get("is_admin", False): - raise HTTPException(status_code=401, detail="未授权或授权校验失败") - except ValueError as e: - raise HTTPException(status_code=401, detail=str(e)) - except Exception as e: - raise HTTPException(status_code=401, detail="认证失败:" + str(e)) + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException( + status_code=403, detail="本站未开启游客上传,如需上传请先登录后台" + ) + _require_admin_payload(authorization) return True diff --git a/apps/admin/views.py b/apps/admin/views.py index b5314539f..9501c16f1 100644 --- a/apps/admin/views.py +++ b/apps/admin/views.py @@ -19,7 +19,9 @@ from core.settings import settings from core.utils import get_now, verify_password -admin_api = APIRouter(prefix="/admin", tags=["管理"]) +admin_api = APIRouter( + prefix="/admin", tags=["管理"], dependencies=[Depends(admin_required)] +) @admin_api.post("/login") @@ -32,7 +34,7 @@ async def login(data: LoginData): @admin_api.get("/dashboard") -async def dashboard(admin: bool = Depends(admin_required)): +async def dashboard(): all_codes = await FileCodes.all() all_size = str(sum([code.size for code in all_codes])) sys_start = await KeyValue.filter(key="sys_start").first() @@ -61,7 +63,6 @@ async def dashboard(admin: bool = Depends(admin_required)): async def file_delete( data: IDData, file_service: FileService = Depends(get_file_service), - admin: bool = Depends(admin_required), ): await file_service.delete_file(data.id) return APIResponse() @@ -73,7 +74,6 @@ async def file_list( size: int = 10, keyword: str = "", file_service: FileService = Depends(get_file_service), - admin: bool = Depends(admin_required), ): files, total = await file_service.list_files(page, size, keyword) return APIResponse( @@ -89,7 +89,6 @@ async def file_list( @admin_api.get("/config/get") async def get_config( config_service: ConfigService = Depends(get_config_service), - admin: bool = Depends(admin_required), ): return APIResponse(detail=config_service.get_config()) @@ -98,7 +97,6 @@ async def get_config( async def update_config( data: dict, config_service: ConfigService = Depends(get_config_service), - admin: bool = Depends(admin_required), ): data.pop("themesChoices") await config_service.update_config(data) @@ -109,7 +107,6 @@ async def update_config( async def file_download( id: int, file_service: FileService = Depends(get_file_service), - admin: bool = Depends(admin_required), ): file_content = await file_service.download_file(id) return file_content @@ -118,7 +115,6 @@ async def file_download( @admin_api.get("/local/lists") async def get_local_lists( local_file_service: LocalFileService = Depends(get_local_file_service), - admin: bool = Depends(admin_required), ): files = await local_file_service.list_files() return APIResponse(detail=files) @@ -128,7 +124,6 @@ async def get_local_lists( async def delete_local_file( item: DeleteItem, local_file_service: LocalFileService = Depends(get_local_file_service), - admin: bool = Depends(admin_required), ): result = await local_file_service.delete_file(item.filename) return APIResponse(detail=result) @@ -138,7 +133,6 @@ async def delete_local_file( async def share_local_file( item: ShareItem, file_service: FileService = Depends(get_file_service), - admin: bool = Depends(admin_required), ): share_info = await file_service.share_local_file(item) return APIResponse(detail=share_info) @@ -147,7 +141,6 @@ async def share_local_file( @admin_api.patch("/file/update") async def update_file( data: UpdateFileData, - admin: bool = Depends(admin_required), ): file_code = await FileCodes.filter(id=data.id).first() if not file_code: From 24d49e1c9abc264925e945f5e100d98b8b703f70 Mon Sep 17 00:00:00 2001 From: Lan Date: Tue, 27 Jan 2026 10:05:33 +0800 Subject: [PATCH 3/4] chore: reduce number of workers in Dockerfile from 4 to 2 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 4d63c203d..5b1bb0936 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,7 +41,7 @@ RUN pip install --no-cache-dir -r requirements.txt # 环境变量配置 ENV HOST="::" \ PORT=12345 \ - WORKERS=4 \ + WORKERS=2 \ LOG_LEVEL="info" EXPOSE 12345 From 0504e6e4dc18eb5f520b12c492c4cc1ef946c712 Mon Sep 17 00:00:00 2001 From: Lan Date: Tue, 27 Jan 2026 10:37:17 +0800 Subject: [PATCH 4/4] chore: update Dockerfile to set HOST to 0.0.0.0 and reduce WORKERS to 1 --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5b1bb0936..28453323e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -39,9 +39,9 @@ COPY --from=frontend-builder /build/fronted-2023/dist ./themes/2023 RUN pip install --no-cache-dir -r requirements.txt # 环境变量配置 -ENV HOST="::" \ +ENV HOST="0.0.0.0" \ PORT=12345 \ - WORKERS=2 \ + WORKERS=1 \ LOG_LEVEL="info" EXPOSE 12345