-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathview_logs.py
More file actions
254 lines (195 loc) · 7.48 KB
/
Copy pathview_logs.py
File metadata and controls
254 lines (195 loc) · 7.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
"""
日志查看和分析工具
"""
import sys
from pathlib import Path
from datetime import datetime, timedelta
import argparse
def list_log_files(log_dir="./logs"):
"""列出所有日志文件"""
log_path = Path(log_dir)
if not log_path.exists():
print(f"❌ 日志目录不存在: {log_dir}")
return []
log_files = sorted(log_path.glob("*.log"), reverse=True)
return log_files
def show_log_files(log_dir="./logs"):
"""显示日志文件列表"""
log_files = list_log_files(log_dir)
if not log_files:
print("📭 没有找到日志文件")
return
print("=" * 70)
print("📋 日志文件列表")
print("=" * 70)
total_size = 0
for i, log_file in enumerate(log_files, 1):
size = log_file.stat().st_size
total_size += size
size_mb = size / (1024 * 1024)
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
print(f"{i}. {log_file.name}")
print(f" 大小: {size_mb:.2f} MB")
print(f" 修改时间: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")
print()
print(f"总共 {len(log_files)} 个日志文件, 占用 {total_size / (1024 * 1024):.2f} MB")
print("=" * 70)
def view_log(log_file, lines=50, level=None, search=None):
"""查看日志内容"""
log_path = Path(log_file)
if not log_path.exists():
print(f"❌ 日志文件不存在: {log_file}")
return
print("=" * 70)
print(f"📄 {log_path.name}")
print("=" * 70)
print()
try:
with open(log_path, 'r', encoding='utf-8') as f:
all_lines = f.readlines()
# 过滤日志级别
if level:
all_lines = [line for line in all_lines if level.upper() in line]
# 搜索关键词
if search:
all_lines = [line for line in all_lines if search in line]
# 显示最后 N 行
display_lines = all_lines[-lines:] if lines else all_lines
for line in display_lines:
print(line, end='')
print()
print("=" * 70)
print(f"显示 {len(display_lines)}/{len(all_lines)} 行")
except Exception as e:
print(f"❌ 读取日志文件失败: {e}")
def analyze_log(log_file):
"""分析日志统计"""
log_path = Path(log_file)
if not log_path.exists():
print(f"❌ 日志文件不存在: {log_file}")
return
try:
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 统计各级别日志
stats = {
"DEBUG": 0,
"INFO": 0,
"WARNING": 0,
"ERROR": 0,
"CRITICAL": 0,
}
for line in lines:
for level in stats.keys():
if level in line:
stats[level] += 1
break
print("=" * 70)
print(f"📊 日志统计: {log_path.name}")
print("=" * 70)
print()
print(f"总行数: {len(lines)}")
print()
print("日志级别分布:")
for level, count in stats.items():
if count > 0:
percentage = (count / len(lines)) * 100
print(f" {level:10s}: {count:6d} ({percentage:5.1f}%)")
print()
print("=" * 70)
except Exception as e:
print(f"❌ 分析日志文件失败: {e}")
def clean_old_logs(log_dir="./logs", days=7, dry_run=True):
"""清理旧日志"""
log_path = Path(log_dir)
if not log_path.exists():
print(f"❌ 日志目录不存在: {log_dir}")
return
cutoff_date = datetime.now() - timedelta(days=days)
log_files = list_log_files(log_dir)
to_delete = []
for log_file in log_files:
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime < cutoff_date:
to_delete.append(log_file)
if not to_delete:
print(f"✅ 没有找到 {days} 天前的日志文件")
return
print("=" * 70)
print(f"🗑️ 清理 {days} 天前的日志 ({cutoff_date.strftime('%Y-%m-%d')} 之前)")
print("=" * 70)
print()
total_size = 0
for log_file in to_delete:
size = log_file.stat().st_size
total_size += size
print(f" - {log_file.name} ({size / 1024:.1f} KB)")
print()
print(f"共 {len(to_delete)} 个文件, {total_size / (1024 * 1024):.2f} MB")
if dry_run:
print()
print("⚠️ 这是预览模式,使用 --execute 参数实际删除文件")
else:
print()
confirm = input("确认删除? (yes/no): ")
if confirm.lower() == 'yes':
for log_file in to_delete:
log_file.unlink()
print(f"✅ 已删除 {len(to_delete)} 个日志文件")
else:
print("❌ 取消删除")
print("=" * 70)
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="LLM-Local 日志查看工具")
subparsers = parser.add_subparsers(dest="command", help="命令")
# 列出日志文件
list_parser = subparsers.add_parser("list", help="列出日志文件")
list_parser.add_argument("--dir", default="./logs", help="日志目录")
# 查看日志
view_parser = subparsers.add_parser("view", help="查看日志内容")
view_parser.add_argument("file", nargs="?", help="日志文件路径(留空查看最新)")
view_parser.add_argument("-n", "--lines", type=int, default=50, help="显示行数")
view_parser.add_argument("-l", "--level", help="过滤日志级别")
view_parser.add_argument("-s", "--search", help="搜索关键词")
view_parser.add_argument("--dir", default="./logs", help="日志目录")
# 分析日志
analyze_parser = subparsers.add_parser("analyze", help="分析日志统计")
analyze_parser.add_argument("file", nargs="?", help="日志文件路径(留空分析最新)")
analyze_parser.add_argument("--dir", default="./logs", help="日志目录")
# 清理日志
clean_parser = subparsers.add_parser("clean", help="清理旧日志")
clean_parser.add_argument("--days", type=int, default=7, help="保留天数")
clean_parser.add_argument("--execute", action="store_true", help="实际执行删除")
clean_parser.add_argument("--dir", default="./logs", help="日志目录")
args = parser.parse_args()
if args.command == "list":
show_log_files(args.dir)
elif args.command == "view":
if args.file:
log_file = args.file
else:
# 使用最新的日志文件
log_files = list_log_files(args.dir)
if not log_files:
print("❌ 没有找到日志文件")
return
log_file = log_files[0]
view_log(log_file, args.lines, args.level, args.search)
elif args.command == "analyze":
if args.file:
log_file = args.file
else:
# 使用最新的日志文件
log_files = list_log_files(args.dir)
if not log_files:
print("❌ 没有找到日志文件")
return
log_file = log_files[0]
analyze_log(log_file)
elif args.command == "clean":
clean_old_logs(args.dir, args.days, not args.execute)
else:
parser.print_help()
if __name__ == "__main__":
main()