forked from yzt000000/device_config_python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript_thread.py
More file actions
109 lines (95 loc) · 3.75 KB
/
script_thread.py
File metadata and controls
109 lines (95 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from PyQt5.QtCore import QThread, pyqtSignal
import sys
import io
import threading
import traceback
import time
class ScriptThread(QThread):
output = pyqtSignal(str)
paused = pyqtSignal()
resumed = pyqtSignal()
finished = pyqtSignal()
def __init__(self, script, read_func, write_func,power_switch_function,dmm,uart_read_func,uart_write_func,i2c_read_disp_func,i2c_write_disp_func):
super().__init__()
self.script = script
self.read_func = self.wrap_function(read_func)
self.write_func = self.wrap_function(write_func)
self.uart_write_func = self.wrap_function(uart_write_func)
self.uart_read_func = self.wrap_function(uart_read_func)
self.i2c_read_disp_func = self.wrap_function(i2c_read_disp_func)
self.i2c_write_disp_func = self.wrap_function(i2c_write_disp_func)
self.power_switch_function = power_switch_function
self.dmm = dmm
self.is_paused = False
self.should_exit = False
self.pause_lock = threading.Lock()
self.pause_cond = threading.Condition(self.pause_lock)
def run(self):
old_stdout = None # 确保 old_stdout 变量在任何情况下都能被访问
new_stdout = None # 确保 new_stdout 变量在任何情况下都能被访问
try:
globals_dict = {
'read_i2c': self.read_func,
'write_i2c': self.write_func,
'write_uart': self.uart_write_func,
'read_uart': self.uart_read_func,
'read_i2c_disp' : self.i2c_read_disp_func,
'write_i2c_disp' : self.i2c_write_disp_func,
'power_control': self.power_switch_function,
'dmm' : self.dmm,
'print': self.custom_print,
'time': time # Add time module for sleep function
}
locals_dict = {}
# 重定向标准输出
old_stdout = sys.stdout
new_stdout = io.StringIO()
sys.stdout = new_stdout
exec(self.script, globals_dict, locals_dict)
except SystemExit:
self.output.emit("脚本执行被用户终止\n")
except Exception as e:
error_msg = f'<span style="color: red;">Error: {str(e)}</span>'
self.output.emit(error_msg)
finally:
# 恢复标准输出
if old_stdout is not None:
sys.stdout = old_stdout
if new_stdout is not None:
output = new_stdout.getvalue()
if output:
self.output.emit(output)
self.finished.emit()
def wrap_function(self, func):
def wrapper(*args, **kwargs):
self.check_pause()
return func(*args, **kwargs)
return wrapper
def check_pause(self):
if self.is_paused:
self.paused.emit()
with self.pause_cond:
while self.is_paused and not self.should_exit:
self.pause_cond.wait()
if self.should_exit:
raise SystemExit("脚本执行被用户终止")
else:
self.resumed.emit()
elif self.should_exit:
raise SystemExit("脚本执行被用户终止")
def custom_print(self, *args, **kwargs):
self.check_pause()
output = " ".join(map(str, args))
self.output.emit(output + "\n")
def interrupt(self):
with self.pause_lock:
self.is_paused = True
def resume(self):
with self.pause_lock:
self.is_paused = False
self.pause_cond.notify_all()
def exit(self):
with self.pause_lock:
self.should_exit = True
self.is_paused = False
self.pause_cond.notify_all()