-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
459 lines (401 loc) · 19.1 KB
/
main.py
File metadata and controls
459 lines (401 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# main.py
import os
import logging
from io import BytesIO
from pathlib import Path
from uuid import uuid4
from datetime import datetime, timezone
from typing import Optional
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError
from fastapi import FastAPI, UploadFile, File, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from PIL import Image, UnidentifiedImageError
from dotenv import load_dotenv
# ─────────────────────────────────────
# .env 로드 (OS 환경변수가 있으면 OS 값 우선)
# ─────────────────────────────────────
load_dotenv(dotenv_path=Path(__file__).with_name(".env"), override=False)
# ─────────────────────────────────────
# 로깅
# ─────────────────────────────────────
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger("app")
# ─────────────────────────────────────
# 공통 설정
# ─────────────────────────────────────
S3_BUCKET = os.getenv("S3_BUCKET", "binbean-bucket")
AWS_REGION = os.getenv("AWS_REGION", "ap-northeast-2")
# 공개 경로를 CloudFront 등으로 쓰고 싶으면 지정 (예: https://cdn.example.com)
PUBLIC_BASE_URL = os.getenv("PUBLIC_BASE_URL") # 없으면 S3 기본 URL로 생성
# S3 서버사이드 암호화(조직 정책에 따라 필요할 수 있음)
S3_SSE = os.getenv("S3_SSE") # "AES256" 또는 "aws:kms"
S3_SSE_KMS_KEY_ID = os.getenv("S3_SSE_KMS_KEY_ID")
# 크기 제한
MAX_IMAGE_MB = int(os.getenv("MAX_IMAGE_MB", "50"))
MAX_IMAGE_BYTES = MAX_IMAGE_MB * 1024 * 1024
MAX_VIDEO_MB = int(os.getenv("MAX_VIDEO_MB", "500")) # 기본 500MB
MAX_VIDEO_BYTES = MAX_VIDEO_MB * 1024 * 1024
# Firestore 컬렉션
FIRESTORE_COLLECTION_IMAGES = os.getenv("FIRESTORE_COLLECTION_IMAGES", "images")
FIRESTORE_COLLECTION_VIDEOS = os.getenv("FIRESTORE_COLLECTION_VIDEOS", "videos")
# ─────────────────────────────────────
# S3 클라이언트
# ─────────────────────────────────────
s3 = boto3.client("s3", region_name=AWS_REGION)
def _resolve_bucket_region(bucket: str) -> Optional[str]:
try:
resp = s3.get_bucket_location(Bucket=bucket)
return resp.get("LocationConstraint") or "us-east-1"
except Exception as e:
log.warning(f"get_bucket_location failed: {e}")
return None
_actual = _resolve_bucket_region(S3_BUCKET)
if _actual and _actual != AWS_REGION:
log.warning(f"S3 region mismatch: env={AWS_REGION} actual={_actual} → reconfigure client")
AWS_REGION = _actual
s3 = boto3.client("s3", region_name=AWS_REGION)
# ─────────────────────────────────────
# Firestore 초기화(필수)
# ─────────────────────────────────────
try:
import firebase_admin
from firebase_admin import credentials, firestore
SA = str(Path(__file__).with_name("stan-dup-firebase-adminsdk-fbsvc-808a55fe4f.json"))
if not firebase_admin._apps:
if SA and os.path.exists(SA):
cred = credentials.Certificate(SA)
firebase_admin.initialize_app(cred)
else:
firebase_admin.initialize_app() # GOOGLE_APPLICATION_CREDENTIALS 사용 시
db = firestore.client()
log.info("Firestore initialized.")
except Exception as e:
log.error(f"Firestore init failed: {e}")
raise RuntimeError("Firestore가 초기화되지 않았습니다. 서비스계정 JSON 또는 GOOGLE_APPLICATION_CREDENTIALS를 설정하세요.") from e
# ─────────────────────────────────────
# FastAPI
# ─────────────────────────────────────
app = FastAPI(title="Upload to S3 + Save Path to Firestore (Image & Video)")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["POST", "GET", "OPTIONS"],
allow_headers=["*"],
)
# ─────────────────────────────────────
# 유틸
# ─────────────────────────────────────
# 이미지 포맷 매핑
FMT_TO_CT = {"JPEG": "image/jpeg", "PNG": "image/png", "WEBP": "image/webp"}
CT_TO_EXT_IMG = {"image/jpeg": "jpg", "image/png": "png", "image/webp": "webp"}
# 동영상 허용 MIME
CT_TO_EXT_VID = {
"video/mp4": "mp4",
"video/quicktime": "mov", # iPhone 등
"video/webm": "webm",
"video/x-matroska": "mkv",
"video/x-msvideo": "avi",
"video/mpeg": "mpeg",
}
def _build_object_url(key: str) -> str:
"""DB에 저장할 공개 URL(또는 CDN URL) 생성. 사설 버킷이면 열람은 403일 수 있음."""
if PUBLIC_BASE_URL:
return f"{PUBLIC_BASE_URL.rstrip('/')}/{key}"
if AWS_REGION == "us-east-1":
return f"https://{S3_BUCKET}.s3.amazonaws.com/{key}"
return f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{key}"
def _s3_upload_stream(stream, key: str, content_type: str) -> None:
extra = {"ContentType": content_type, "CacheControl": "public, max-age=31536000"}
if S3_SSE:
extra["ServerSideEncryption"] = S3_SSE
if S3_SSE.lower() == "aws:kms" and S3_SSE_KMS_KEY_ID:
extra["SSEKMSKeyId"] = S3_SSE_KMS_KEY_ID
try:
s3.upload_fileobj(stream, S3_BUCKET, key, ExtraArgs=extra)
log.info(f"[S3] uploaded -> s3://{S3_BUCKET}/{key} (ct={content_type})")
except NoCredentialsError:
raise HTTPException(status_code=500, detail="No AWS credentials found")
except EndpointConnectionError as e:
raise HTTPException(status_code=502, detail=f"S3 endpoint error: {e}")
except ClientError as e:
msg = str(e)
if "NoSuchBucket" in msg:
raise HTTPException(status_code=500, detail=f"S3 bucket not found: {S3_BUCKET}")
if "PermanentRedirect" in msg or "AuthorizationHeaderMalformed" in msg:
raise HTTPException(status_code=500, detail=f"Wrong region for bucket (AWS_REGION='{AWS_REGION}')")
if "AccessDenied" in msg:
raise HTTPException(status_code=500, detail="AccessDenied: check IAM permissions for s3:PutObject")
raise HTTPException(status_code=500, detail=f"S3 upload failed: {e}")
def _save_record(collection: str, doc_id: str, *, s3_key: str, url: str,
content_type: str, size: int, filename: str) -> None:
now = datetime.now(timezone.utc)
db.collection(collection).document(doc_id).set({
"s3_key": s3_key,
"url": url,
"content_type": content_type,
"size_bytes": size,
"filename": filename,
"created_at": now,
"updated_at": now,
})
# ─────────────────────────────────────
# 이미지 업로드 (이전과 동일: S3 + DB 경로 저장, 응답 바디 없음)
# ─────────────────────────────────────
@app.post("/api/image", status_code=200)
async def upload_image(file: UploadFile = File(...)):
if file is None or not getattr(file, "filename", None):
raise HTTPException(status_code=400, detail="Empty file")
# 이미지 검증 + MIME 확정
raw = await file.read()
if len(raw) > MAX_IMAGE_BYTES:
raise HTTPException(status_code=413, detail=f"File too large (> {MAX_IMAGE_MB}MB)")
try:
bio = BytesIO(raw)
with Image.open(bio) as img:
img.verify()
bio.seek(0)
with Image.open(bio) as img2:
fmt = img2.format
except UnidentifiedImageError:
raise HTTPException(status_code=400, detail="Invalid image file")
ct = FMT_TO_CT.get(fmt)
if not ct:
raise HTTPException(status_code=415, detail=f"Unsupported image format: {fmt}")
ext = CT_TO_EXT_IMG[ct]
filename = Path(file.filename).name.replace("/", "_") or "upload"
if "." not in filename:
filename = f"{filename}.{ext}"
s3_key = f"images/{uuid4().hex}-{filename}"
_s3_upload_stream(BytesIO(raw), s3_key, ct)
url = _build_object_url(s3_key)
doc_id = uuid4().hex
_save_record(FIRESTORE_COLLECTION_IMAGES, doc_id,
s3_key=s3_key, url=url, content_type=ct, size=len(raw), filename=filename)
return Response(status_code=200, headers={"X-Image-Id": doc_id})
@app.get("/api/image/{doc_id}")
def get_image(doc_id: str):
snap = db.collection(FIRESTORE_COLLECTION_IMAGES).document(doc_id).get()
if not snap.exists:
raise HTTPException(status_code=404, detail="Not found")
data = snap.to_dict() or {}
return JSONResponse({
"id": doc_id,
"s3_key": data.get("s3_key"),
"url": data.get("url"),
"content_type": data.get("content_type"),
"size_bytes": data.get("size_bytes"),
"filename": data.get("filename"),
"created_at": data.get("created_at").isoformat() if data.get("created_at") else None,
"updated_at": data.get("updated_at").isoformat() if data.get("updated_at") else None,
})
# ─────────────────────────────────────
# 동영상 업로드 (S3 + DB 경로 저장, 응답 바디 없음)
# - 대용량 대비: 절대 메모리에 올리지 않고 스트리밍 업로드
# - Postman: form-data / key=file (Type=File)
# ─────────────────────────────────────
@app.post("/api/video", status_code=200)
async def upload_video(file: UploadFile = File(...)):
if file is None or not getattr(file, "filename", None):
raise HTTPException(status_code=400, detail="Empty file")
# 1) MIME 체크(유연): Postman이 정확히 못 줄 수도 있으니 없으면 기본 mp4로
content_type = (file.content_type or "").lower()
if content_type not in CT_TO_EXT_VID:
# 확장자 보고 추정(간단 보정)
name_lower = (file.filename or "").lower()
if name_lower.endswith(".mp4"): content_type = "video/mp4"
elif name_lower.endswith(".mov"): content_type = "video/quicktime"
elif name_lower.endswith(".webm"):content_type = "video/webm"
elif name_lower.endswith(".mkv"): content_type = "video/x-matroska"
elif name_lower.endswith(".avi"): content_type = "video/x-msvideo"
elif name_lower.endswith(".mpeg"):content_type = "video/mpeg"
else:
raise HTTPException(status_code=415, detail=f"Unsupported video type: {file.content_type or 'unknown'}")
ext = CT_TO_EXT_VID[content_type]
# 2) 파일 크기 확인(스트림에서 사이즈 추정: 가능한 경우에만)
try:
pos = file.file.tell()
file.file.seek(0, os.SEEK_END)
size = file.file.tell()
file.file.seek(pos)
if size and size > MAX_VIDEO_BYTES:
raise HTTPException(status_code=413, detail=f"Video too large (> {MAX_VIDEO_MB}MB)")
except Exception:
# 사이즈를 못 구해도 그냥 진행(멀티파트 스트림)
size = None
# 3) S3 업로드(스트리밍)
filename = Path(file.filename).name.replace("/", "_") or "upload"
if "." not in filename:
filename = f"{filename}.{ext}"
s3_key = f"videos/{uuid4().hex}-{filename}"
_s3_upload_stream(file.file, s3_key, content_type)
# 4) DB 저장 (경로 + 메타만)
url = _build_object_url(s3_key)
doc_id = uuid4().hex
_save_record(FIRESTORE_COLLECTION_VIDEOS, doc_id,
s3_key=s3_key, url=url, content_type=content_type,
size=(int(size) if size else None) or 0, filename=filename)
# 5) 응답: 바디 없이 200 + 헤더에 ID만
return Response(status_code=200, headers={"X-Video-Id": doc_id})
@app.get("/api/video/{doc_id}")
def get_video(doc_id: str):
snap = db.collection(FIRESTORE_COLLECTION_VIDEOS).document(doc_id).get()
if not snap.exists:
raise HTTPException(status_code=404, detail="Not found")
data = snap.to_dict() or {}
return JSONResponse({
"id": doc_id,
"s3_key": data.get("s3_key"),
"url": data.get("url"),
"content_type": data.get("content_type"),
"size_bytes": data.get("size_bytes"),
"filename": data.get("filename"),
"created_at": data.get("created_at").isoformat() if data.get("created_at") else None,
"updated_at": data.get("updated_at").isoformat() if data.get("updated_at") else None,
})
# ---- 아래를 main.py 하단에 추가 ----
# ---- REPLACE /api/media block ----
import json, base64
from typing import Optional, Literal, List, Dict, Any, Tuple
from datetime import datetime, timezone
from fastapi import Query, HTTPException
try:
from google.cloud.firestore_v1 import FieldPath as _FP
HAS_FIELDPATH = True
except Exception:
try:
from google.cloud.firestore_v1.field_path import FieldPath as _FP
HAS_FIELDPATH = True
except Exception:
_FP = None
HAS_FIELDPATH = False
from google.cloud import firestore
from google.cloud import firestore
def _encode_token(cursors: Dict[str, Optional[Dict[str, Any]]]) -> str:
"""{"image":{"t": float_ts, "id": str}, "video": {...}} → urlsafe b64"""
raw = json.dumps(cursors, separators=(",", ":"))
return base64.urlsafe_b64encode(raw.encode("utf-8")).decode("ascii")
def _decode_token(token: str) -> Dict[str, Optional[Dict[str, Any]]]:
try:
raw = base64.urlsafe_b64decode(token.encode("ascii")).decode("utf-8")
data = json.loads(raw)
for k in ("image", "video"):
v = data.get(k)
if v is not None:
if not isinstance(v, dict) or "t" not in v or "id" not in v:
raise ValueError("bad cursor")
return data
except Exception:
raise HTTPException(status_code=400, detail="Invalid page_token")
def _start_after_args(cursor: Optional[Dict[str, Any]]) -> Optional[Tuple[datetime, str]]:
"""커서 dict -> start_after 인자 (created_at(datetime), doc_id)"""
if not cursor:
return None
t = float(cursor["t"])
dt = datetime.fromtimestamp(t, tz=timezone.utc)
return (dt, str(cursor["id"]))
def _fetch_collection(collection_name: str, limit: int, order: Literal["asc","desc"], cursor: Optional[Dict[str, Any]]):
"""단일 컬렉션에서 created_at(+document_id) 기준 페이지 조회"""
direction = firestore.Query.DESCENDING if order == "desc" else firestore.Query.ASCENDING
q = db.collection(collection_name).order_by("created_at", direction=direction)
if HAS_FIELDPATH:
q = q.order_by(_FP.document_id(), direction=direction) # FieldPath 사용 가능할 때만
q = q.limit(limit)
sa = _start_after_args(cursor)
if sa:
if HAS_FIELDPATH:
# (created_at, doc_id) 둘 다 전달
q = q.start_after(*sa)
else:
# FieldPath 없으면 created_at만으로 페이지 이동 (동시간대 타이브레이커 없음)
q = q.start_after(sa[0])
docs = list(q.stream())
items = []
for d in docs:
data = d.to_dict() or {}
ca = data.get("created_at")
if isinstance(ca, datetime):
ts = ca.timestamp()
iso = ca.isoformat()
else:
ca = datetime.now(timezone.utc)
ts = ca.timestamp()
iso = ca.isoformat()
items.append({
"_id": d.id,
"_created_at": ca,
"_ts": ts,
"s3_key": data.get("s3_key"),
"url": data.get("url"),
"content_type": data.get("content_type"),
"size_bytes": data.get("size_bytes"),
"filename": data.get("filename"),
"created_at": iso,
"updated_at": data.get("updated_at").isoformat() if isinstance(data.get("updated_at"), datetime) else None,
})
return items
@app.get("/api/media")
def list_media(
limit: int = Query(20, ge=1, le=100),
order: Literal["asc", "desc"] = "desc",
page_token: Optional[str] = Query(None, description="이전 응답의 nextPageToken"),
):
"""
이미지와 영상을 하나의 목록으로 합쳐서 반환.
- 최신순/오름차순 정렬 지원
- 페이지네이션: page_token 사용 (내부적으로 이미지/영상 각자 커서 유지)
"""
# 1) 커서 파싱 (컬렉션별 독립 커서)
cursors = {"image": None, "video": None}
if page_token:
cursors = _decode_token(page_token)
# 2) 각 컬렉션에서 limit개씩 가져오기
imgs = _fetch_collection(FIRESTORE_COLLECTION_IMAGES, limit, order, cursors.get("image"))
vids = _fetch_collection(FIRESTORE_COLLECTION_VIDEOS, limit, order, cursors.get("video"))
# 3) 병합 정렬
def key_fn(item):
return (item["_created_at"], item["_id"])
reverse = (order == "desc")
imgs.sort(key=key_fn, reverse=reverse)
vids.sort(key=key_fn, reverse=reverse)
i = j = 0
out: List[Dict[str, Any]] = []
last_img_idx = -1
last_vid_idx = -1
while len(out) < limit and (i < len(imgs) or j < len(vids)):
# 어떤 쪽을 뽑을지 결정
if i < len(imgs) and (j >= len(vids) or
(key_fn(imgs[i]) >= key_fn(vids[j]) if reverse else key_fn(imgs[i]) <= key_fn(vids[j]))):
pick = imgs[i]; pick_kind = "image"; i += 1; last_img_idx = i - 1
elif j < len(vids):
pick = vids[j]; pick_kind = "video"; j += 1; last_vid_idx = j - 1
else:
break
out.append({
"id": pick["_id"],
"type": pick_kind, # image | video
"s3_key": pick["s3_key"],
"url": pick["url"],
"content_type": pick["content_type"],
"size_bytes": pick["size_bytes"],
"filename": pick["filename"],
"created_at": pick["created_at"],
"updated_at": pick["updated_at"],
})
# 4) 다음 페이지 토큰 구성 (각 컬렉션 마지막 소비 문서 기준)
new_cursors = {"image": cursors.get("image"), "video": cursors.get("video")}
if last_img_idx >= 0:
li = imgs[last_img_idx]
new_cursors["image"] = {"t": li["_ts"], "id": li["_id"]}
if last_vid_idx >= 0:
lv = vids[last_vid_idx]
new_cursors["video"] = {"t": lv["_ts"], "id": lv["_id"]}
has_more = (len(imgs) == limit) or (len(vids) == limit)
next_token = _encode_token(new_cursors) if (has_more and len(out) > 0) else None
return {
"items": out,
"nextPageToken": next_token,
"count": len(out),
"order": order,
}