-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
135 lines (109 loc) · 4.97 KB
/
Copy pathtest.py
File metadata and controls
135 lines (109 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import time
import cv2
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
from PIL import Image
from utils.utils import cvtColor, preprocess_input, resize_image
class TensorRTPredictor:
def __init__(self, engine_path='weights/model_complex.trt', input_shape=[512, 512], use_fp16=True):
self.input_shape = input_shape
self.engine_path = engine_path
self.colors = [(0, 0, 0), (128, 0, 0), (0, 128, 0), (128, 128, 0)]
self.num_classes = 4
self.use_fp16 = use_fp16
self.initialize_engine()
def initialize_engine(self):
# 加载TensorRT引擎
logger = trt.Logger(trt.Logger.WARNING)
trt.init_libnvinfer_plugins(logger, '')
with open(self.engine_path, 'rb') as f, trt.Runtime(logger) as runtime:
if self.use_fp16:
print("Note: Using FP16 precision")
self.engine = runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
# 分配输入输出内存
self.inputs = []
self.outputs = []
self.bindings = []
self.stream = cuda.Stream()
num_io = self.engine.num_io_tensors
for i in range(num_io):
name = self.engine.get_tensor_name(i)
shape = self.engine.get_tensor_shape(name)
size = trt.volume(shape)
dtype = trt.nptype(self.engine.get_tensor_dtype(name))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))
if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
self.inputs.append({'host': host_mem, 'device': device_mem})
else:
self.outputs.append({'host': host_mem, 'device': device_mem})
def preprocess_gpu(self, image):
# 图像预处理
image = cvtColor(image)
old_img = image.copy()
# 将PIL Image转换为numpy数组
image = np.array(image)
original_h = image.shape[0]
original_w = image.shape[1]
# 使用OpenCV进行resize和预处理
image_data = cv2.resize(image, (self.input_shape[1], self.input_shape[0]))
image_data = cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB)
image_data = np.float32(image_data) / 255.0
image_data = np.transpose(image_data, (2, 0, 1))
image_data = np.expand_dims(image_data, 0)
return image_data, old_img, original_h, original_w
def predict(self, image, batch_size=1):
# 图像预处理
image_data, old_img, original_h, original_w = self.preprocess_gpu(image)
# 使用CUDA流水线优化内存拷贝和推理
start_time = time.time()
# 异步复制输入数据到GPU
np.copyto(self.inputs[0]['host'], image_data.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
# 设置输入输出地址并执行推理
self.context.set_tensor_address(self.engine.get_tensor_name(0), self.inputs[0]['device'])
self.context.set_tensor_address(self.engine.get_tensor_name(1), self.outputs[0]['device'])
# 使用CUDA事件优化同步
start_event = cuda.Event()
end_event = cuda.Event()
start_event.record(self.stream)
self.context.execute_async_v3(stream_handle=self.stream.handle)
end_event.record(self.stream)
# 异步复制输出数据回CPU
for out in self.outputs:
cuda.memcpy_dtoh_async(out['host'], out['device'], self.stream)
# 等待所有操作完成
end_event.synchronize()
inference_time = time.time() - start_time
# 后处理
pred = self.outputs[0]['host'].reshape(1, self.num_classes, self.input_shape[0], self.input_shape[1])
pred = np.transpose(pred[0], (1, 2, 0))
# 使用OpenCV进行resize
pred = cv2.resize(pred, (original_w, original_h))
pred = pred.argmax(axis=-1)
# 生成分割图像
seg_img = np.zeros((original_h, original_w, 3), dtype=np.uint8)
for i, color in enumerate(self.colors):
mask = pred == i
seg_img[mask] = color
# 图像混合
result = cv2.addWeighted(np.array(old_img), 0.3, seg_img, 0.7, 0)
image = Image.fromarray(result)
return image, inference_time
if __name__ == "__main__":
# 初始化预测器,启用FP16精度
predictor = TensorRTPredictor(use_fp16=True)
# 读取测试图片
image_path = "E:\\one\\Downloads\\VOC2007\\JPEGImages\\4771_42.jpg"
image = Image.open(image_path)
# 执行预测
result_image, inference_time = predictor.predict(image)
# 打印推理时间
print(f'Inference time: {inference_time*1000:.2f}ms')
# 保存结果
result_image.save('result_trt.jpg')