-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
188 lines (161 loc) · 6.32 KB
/
main.py
File metadata and controls
188 lines (161 loc) · 6.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
import json
import logging
from sqlalchemy.orm import Session
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
from database import get_db, redis_client, engine
import models
# 创建数据库表
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="图书管理系统API")
# 定义数据模型
class Book(BaseModel):
id: Optional[int] = None
title: str
author: str
price: float
description: Optional[str] = None
create_time: Optional[datetime] = None
class Config:
orm_mode = True
# 缓存装饰器
# 修改缓存装饰器,添加日志
def cache_response(expire=300):
def decorator(func):
async def wrapper(*args, **kwargs):
# 记录函数调用信息
logger.debug(f"调用函数: {func.__name__}")
logger.debug(f"参数: args={args}, kwargs={kwargs}")
# 生成简单的缓存键
func_name = func.__name__
path_params = kwargs.get('book_id', '')
query_params = kwargs.get('keyword', '')
cache_key = f"{func_name}:{path_params}:{query_params}"
logger.debug(f"缓存键: {cache_key}")
# 尝试从缓存获取数据
cached_data = redis_client.get(cache_key)
if cached_data:
logger.info(f"从缓存中获取数据: {cache_key}")
return json.loads(cached_data)
# 执行原始函数
try:
result = await func(*args, **kwargs)
logger.debug(f"函数执行结果类型: {type(result)}")
except Exception as e:
logger.error(f"函数执行错误: {str(e)}", exc_info=True)
raise
# 处理序列化
try:
if isinstance(result, list):
serialized_data = [
{
key: str(value) if isinstance(value, datetime) else value
for key, value in item.__dict__.items()
if not key.startswith('_')
}
for item in result
]
else:
if hasattr(result, '__dict__'):
serialized_data = {
key: str(value) if isinstance(value, datetime) else value
for key, value in result.__dict__.items()
if not key.startswith('_')
}
else:
serialized_data = result
logger.debug(f"序列化数据: {serialized_data}")
except Exception as e:
logger.error(f"序列化错误: {str(e)}", exc_info=True)
raise
# 存储到缓存
try:
redis_client.setex(
cache_key,
expire,
json.dumps(serialized_data)
)
logger.info(f"数据已缓存: {cache_key}")
except Exception as e:
logger.error(f"缓存存储错误: {str(e)}", exc_info=True)
raise
return result
return wrapper
return decorator
@app.get("/")
def read_root():
return {"message": "欢迎使用图书管理系统API"}
@app.get("/books", response_model=List[Book])
# @cache_response(expire=60)
async def get_books(db: Session = Depends(get_db)):
books = db.query(models.BookModel).all()
return books
# 将搜索路由移到这里,确保它在 book_id 路由之前
@app.get("/books/search/")
# @cache_response(expire=60)
async def search_books(keyword: str, db: Session = Depends(get_db)):
books = db.query(models.BookModel).filter(
(models.BookModel.title.ilike(f"%{keyword}%")) |
(models.BookModel.author.ilike(f"%{keyword}%"))
).all()
return books
@app.get("/books/{book_id}", response_model=Book)
# @cache_response(expire=60)
async def get_book(book_id: int, db: Session = Depends(get_db)):
book = db.query(models.BookModel).filter(models.BookModel.id == book_id).first()
if book is None:
raise HTTPException(status_code=404, detail="书籍未找到")
return book
@app.post("/books", response_model=Book)
async def create_book(book: Book, db: Session = Depends(get_db)):
db_book = models.BookModel(**book.dict(exclude={'id', 'create_time'}))
db.add(db_book)
db.commit()
db.refresh(db_book)
# 清除列表缓存
redis_client.delete("get_books:():{}")
return db_book
@app.put("/books/{book_id}", response_model=Book)
async def update_book(book_id: int, book_update: Book, db: Session = Depends(get_db)):
book = db.query(models.BookModel).filter(models.BookModel.id == book_id).first()
if book is None:
raise HTTPException(status_code=404, detail="书籍未找到")
for key, value in book_update.dict(exclude={'id', 'create_time'}).items():
setattr(book, key, value)
db.commit()
db.refresh(book)
# 清除相关缓存
redis_client.delete(f"get_book:({book_id},):{{}}")
redis_client.delete("get_books:():{}")
return book
@app.delete("/books/{book_id}")
async def delete_book(book_id: int, db: Session = Depends(get_db)):
book = db.query(models.BookModel).filter(models.BookModel.id == book_id).first()
if book is None:
raise HTTPException(status_code=404, detail="书籍未找到")
db.delete(book)
db.commit()
# 清除相关缓存
redis_client.delete(f"get_book:({book_id},):{{}}")
redis_client.delete("get_books:():{}")
return {"message": "书籍已删除"}
@app.get("/books/search/")
# @cache_response(expire=60)
async def search_books(keyword: str, db: Session = Depends(get_db)):
books = db.query(models.BookModel).filter(
(models.BookModel.title.ilike(f"%{keyword}%")) |
(models.BookModel.author.ilike(f"%{keyword}%"))
).all()
return books