-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
494 lines (415 loc) · 18.9 KB
/
main.py
File metadata and controls
494 lines (415 loc) · 18.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
import time
import cv2
import numpy as np
import base64
import json
import socket
import os
from picamera2 import Picamera2
def loadSettingsFromJson(file_path="thresholds.json"):
"""Loads settings from a JSON file."""
base_dir = os.path.dirname(os.path.abspath(__file__))
paths_to_try = [
file_path,
os.path.join(base_dir, file_path),
]
settings = {} # Use 'settings' instead of 'thresholds' for clarity
for path in paths_to_try:
if os.path.exists(path):
print(f"Reading JSON settings file: {path}")
try:
with open(path, "r") as f:
settings = json.load(f) # Load the entire JSON structure
# --- Convert specific lists to numpy arrays ---
# Check if keys exist before attempting conversion
if "minThreshold" in settings and isinstance(
settings["minThreshold"], list
):
settings["minThreshold"] = np.array(
settings["minThreshold"], dtype=np.uint8
) # Use uint8 for HSV
if "maxThreshold" in settings and isinstance(
settings["maxThreshold"], list
):
settings["maxThreshold"] = np.array(
settings["maxThreshold"], dtype=np.uint8
) # Use uint8 for HSV
# Ensure numeric types for others if needed (json loads them as int/float)
if "ballDetectRadius" in settings:
settings["ballDetectRadius"] = int(
settings["ballDetectRadius"]
) # Ensure int if needed
if "circularityThreshold" in settings:
settings["circularityThreshold"] = float(
settings["circularityThreshold"]
)
print("JSON settings loaded successfully.")
break # Exit loop once file is found and loaded
except json.JSONDecodeError as e:
print(f"Error decoding JSON from {path}: {e}")
settings = {} # Reset settings if JSON is invalid
break # Stop processing if the found file is invalid
except Exception as e:
print(f"An unexpected error occurred while reading {path}: {e}")
settings = {}
break
else:
# This block runs if the loop completes without finding the file
print(
f"Warning: Settings file '{file_path}' not found in checked locations. Using default values."
)
# Default values will be handled by the classes' __init__ methods
return settings
class ImageProcessor:
def __init__(self, settings=None):
if settings is None:
settings = {}
# Use .get() to provide defaults if keys are missing from JSON or file wasn't found
self._minThreshold = settings.get(
"minThreshold", np.array([1, 120, 100], dtype=np.uint8)
)
self._maxThreshold = settings.get(
"maxThreshold", np.array([15, 255, 255], dtype=np.uint8)
)
self._ksize = tuple(
settings.get("gaussianKernelSize", (5, 5))
) # Example: allow configuring ksize via JSON
self._sigmaX = settings.get("gaussianSigmaX", 0)
self._shape = cv2.MORPH_RECT # Could also be made configurable
self._size = tuple(
settings.get("morphKernelSize", (3, 3))
) # Example: allow configuring morph size
self._operation = cv2.MORPH_OPEN # Could also be made configurable
def extractColors(self, frame):
filtered = self._filterFrame(frame)
hsv = cv2.cvtColor(filtered, cv2.COLOR_BGR2HSV)
# Shadow detection might need adjustments depending on lighting
# shadowMask = self._detectShadows(hsv) # Consider if shadow detection is always needed
hsv = self._equalizeHist(hsv)
mask = cv2.inRange(hsv, self._minThreshold, self._maxThreshold)
# mask = cv2.bitwise_and(mask, mask, mask=shadowMask) # Apply shadow mask if used
mask = self._applyMorphologicalTransformations(mask)
return mask
def _filterFrame(self, frame):
return cv2.GaussianBlur(frame, self._ksize, self._sigmaX)
def _detectShadows(self, hsv):
# Shadow detection thresholds might need tuning
v = hsv[:, :, 2]
# Using a fixed threshold (50) might not be robust
_, shadowMask = cv2.threshold(v, 50, 255, cv2.THRESH_BINARY)
# shadowMask = cv2.inRange(v, 0, 50) # Original approach
# shadowMask = cv2.bitwise_not(shadowMask) # Invert if needed
return shadowMask
def _equalizeHist(self, hsv):
h, s, v = cv2.split(hsv)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
v_equalized = clahe.apply(v)
return cv2.merge((h, s, v_equalized))
def _applyMorphologicalTransformations(self, mask):
kernel = cv2.getStructuringElement(self._shape, self._size)
# Opening removes small noise (erosion followed by dilation)
return cv2.morphologyEx(mask, self._operation, kernel)
class BallDetector:
def __init__(self, settings=None):
if settings is None:
settings = {}
self.imageProcessor = ImageProcessor(settings)
# Use .get() for robustness against missing keys
self._radius = int(settings.get("ballDetectRadius", 150)) # Ensure int
self._circularityThreshold = float(
settings.get("circularityThreshold", 0.2)
) # Ensure float
self._minContourArea = float(
settings.get("minContourArea", 100)
) # Example: add min area threshold
self._previousCenter = None
def detect(self, frame):
roi, offset, vertices = self._focus(frame, self._previousCenter)
if roi.size == 0: # Check if ROI is valid
print("Warning: ROI is empty.")
self._previousCenter = None # Reset focus if ROI becomes invalid
return None, None, None, None
mask = self.imageProcessor.extractColors(roi)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
valid_contours = [
cnt for cnt in contours if cv2.contourArea(cnt) > self._minContourArea
]
if not valid_contours:
# Optional: Gradually expand ROI if nothing found?
# self._previousCenter = None # Or keep last known position?
return None, None, vertices, None # Return vertices even if no ball found
bestContour = max(valid_contours, key=cv2.contourArea)
if self._isCircular(bestContour):
(x, y), radius = cv2.minEnclosingCircle(bestContour)
# Adjust center coordinates relative to the original frame
center = (int(x + offset[0]), int(y + offset[1]))
# Create a circle contour for visualization in the original frame coordinates
circleContour = self._createCircleContour(
x + offset[0], y + offset[1], radius
)
self._previousCenter = center
diameter = int(radius * 2)
distance = 320 * 40 / diameter
# Return the absolute center, the absolute circle contour, and the ROI vertices
return center, circleContour, vertices, distance
# If the largest contour isn't circular enough
self._previousCenter = None # Reset focus if no suitable ball found
return None, None, vertices, None # Return vertices even if no ball found
def _isCircular(self, contour):
perimeter = cv2.arcLength(contour, True)
area = cv2.contourArea(contour)
if perimeter == 0 or area == 0:
return False
circularity = (4 * np.pi * area) / (perimeter**2)
return circularity > self._circularityThreshold
def _focus(self, frame, center):
height, width = frame.shape[:2]
radius = self._radius # Use the instance variable
if center is not None:
cx, cy = center
# Define ROI boundaries, ensuring they are within frame limits
xMin = max(0, cx - radius)
yMin = max(0, cy - radius)
xMax = min(width, cx + radius)
yMax = min(height, cy + radius)
else:
# If no previous center, use the whole frame
xMin, yMin, xMax, yMax = 0, 0, width, height
# Ensure coordinates are integers for slicing
xMin, yMin, xMax, yMax = int(xMin), int(yMin), int(xMax), int(yMax)
# Check if the calculated ROI is valid (has non-zero dimensions)
if yMin >= yMax or xMin >= xMax:
print(
f"Warning: Invalid ROI calculated: ({xMin},{yMin}) to ({xMax},{yMax}). Using full frame."
)
xMin, yMin, xMax, yMax = 0, 0, width, height # Fallback to full frame
roi = frame[yMin:yMax, xMin:xMax]
offset = (
xMin,
yMin,
) # Top-left corner of the ROI in original frame coordinates
vertices = (xMin, yMin, xMax, yMax) # ROI boundaries
return roi, offset, vertices
def _createCircleContour(self, centerX, centerY, radius, num_points=36):
"""Creates a circle contour in absolute frame coordinates."""
angles = np.linspace(0, 2 * np.pi, num_points, endpoint=False)
# Calculate points directly in absolute coordinates
contour_points = np.array(
[
[centerX + radius * np.cos(ang), centerY + radius * np.sin(ang)]
for ang in angles
],
dtype=np.int32,
)
# Reshape for drawContours: needs shape (num_points, 1, 2)
return contour_points.reshape((-1, 1, 2))
class Visualizer:
def __init__(self, radius=5, windowName="Frame"):
self._radius = radius # Radius for drawing the center point
self._windowName = windowName
# cv2.namedWindow(self._windowName, cv2.WINDOW_NORMAL) # Make window resizable
def draw(self, frame, center, circleContour, vertices):
# Draw ROI rectangle (optional)
if vertices:
cv2.rectangle(
frame,
(vertices[0], vertices[1]),
(vertices[2], vertices[3]),
(0, 0, 255),
1,
) # Red ROI box
# Draw detected circle contour
if circleContour is not None:
cv2.drawContours(
frame, [circleContour], -1, (255, 0, 0), 2
) # Blue circle outline
# Draw center point
if center is not None:
cv2.circle(
frame, center, self._radius, (0, 255, 0), -1
) # Green center filled
# cv2.imshow(self._windowName, frame)
return frame
def destroy(self):
pass
# cv2.destroyWindow(self._windowName)
class VideoCapture:
def __init__(self, device=0, settings=None):
if settings is None:
settings = {}
self.cap = Picamera2()
config = self.cap.create_preview_configuration({"format": "RGB888"})
self.cap.configure(config)
self.cap.start()
# if not self.cap.isOpened():
# raise IOError(f"Cannot open video capture device {device}")
# Get settings or use defaults
self._fps = int(settings.get("fps", 30))
self._bufferSize = int(settings.get("bufferSize", 4))
self._width = int(
settings.get("frameWidth", 640)
) # Example: Add configurable width
self._height = int(
settings.get("frameHeight", 480)
) # Example: Add configurable height
def setProperties(self):
"""Applies configured properties to the video capture device."""
print(
f"Attempting to set camera properties: Resolution={self._width}x{self._height}, FPS={self._fps}, BufferSize={self._bufferSize}"
)
# Setting properties might not work on all cameras/backends
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width)
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height)
# self.cap.set(cv2.CAP_PROP_FPS, self._fps)
# self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self._bufferSize)
# Verify settings
# actual_width = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)
# actual_height = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
# actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
# print(f"Actual camera properties: Resolution={actual_width}x{actual_height}, FPS={actual_fps}")
pass
def read(self):
# print("aa")
return (True, self.cap.capture_array())
def release(self):
# if self.cap.isOpened():
# self.cap.release()
self.cap.close()
print("Video capture released.")
class Encoder:
@staticmethod
def encodeData(frame, center=None, distance=None, quality=90):
"""Encodes frame to JPEG bytes, Base64, and bundles with center coords in JSON."""
# Ensure frame is not empty
if frame is None or frame.size == 0:
print("Error: Cannot encode empty frame.")
return None
# Encode frame to JPEG
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
result, encoded_image = cv2.imencode(".jpg", frame, encode_param)
if not result:
print("Error: Failed to encode image to JPEG.")
return None
# Convert JPEG bytes to Base64 string
frame_bytes_b64 = base64.b64encode(encoded_image.tobytes()).decode("utf-8")
# Prepare coordinates
x_coord = center[0] if center is not None else None
y_coord = distance if center is not None else None
# Create data dictionary
isball = center is not None # True if a ball was detected
data = {"frame": frame_bytes_b64, "x": x_coord, "y": y_coord, "isball": isball}
# Serialize dictionary to JSON string
try:
return json.dumps(data)
except TypeError as e:
print(f"Error serializing data to JSON: {e}")
return None
class UDPClient:
def __init__(self, host="127.0.0.1", port=31133):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set a timeout for socket operations (optional, but good practice)
# self.socket.settimeout(1.0)
print(f"UDP Client initialized for {self.host}:{self.port}")
def send(self, data_json_string):
"""Sends data string over UDP."""
if not data_json_string:
print("UDP Send Error: No data to send.")
return False
try:
# Encode the JSON string to bytes for sending
self.socket.sendto(data_json_string.encode("utf-8"), (self.host, self.port))
# print(f"UDP data sent to {self.host}:{self.port} ({len(data_json_string)} bytes)") # Verbose logging
return True
except socket.error as e:
print(f"UDP Send Error: {e}")
return False
except Exception as e:
print(f"An unexpected error occurred during UDP send: {e}")
return False
def close(self):
if self.socket:
self.socket.close()
self.socket = None # Prevent further use
print("UDP Client socket closed.")
def main():
# --- Load settings from JSON ---
settings = loadSettingsFromJson() # Tries to load "thresholds.json"
try:
# --- Initialize components with settings ---
ballDetector = BallDetector(settings)
visualizer = Visualizer()
videoCapture = VideoCapture(
0, settings
) # Pass settings for FPS, buffer, resolution
videoCapture.setProperties() # Apply camera settings
udpClient = UDPClient(
# host=settings.get(
# "udpHost", "172.16.0.14"
# ), # Allow configuring UDP target via JSON
port=int(settings.get("udpPort", 31133)),
)
output_width = int(
settings.get("outputFrameWidth", 160)
) # Configurable output size
output_height = int(
settings.get("outputFrameHeight", 96)
) # Configurable output size
jpeg_quality = int(settings.get("jpegQuality", 90)) # Configurable quality
while True:
ret, frame = videoCapture.read()
if not ret or frame is None:
print("Error: Failed to capture frame from camera.")
time.sleep(0.5) # Avoid busy-looping if camera fails
continue # Skip processing for this iteration
# Detect the ball
center, circleContour, vertices, distance = ballDetector.detect(
frame.copy()
) # Work on a copy for detection
print(center, distance)
frame = visualizer.draw(frame, center, circleContour, vertices)
# Prepare frame for encoding (resize)
frame_resized = cv2.resize(
frame, (output_width, output_height), interpolation=cv2.INTER_AREA
)
# Encode data (resized frame and center coordinates)
encoded_json_string = Encoder.encodeData(
frame_resized, center, distance, quality=jpeg_quality
)
# Send data via UDP if encoding was successful
if encoded_json_string:
udpClient.send(encoded_json_string)
# Visualization (optional - use original frame for better quality)
# visualizer.draw(frame, center, circleContour, vertices) # Draw on the original frame
# # Exit condition
# if cv2.waitKey(1) & 0xFF == ord('q'):
# print("Exit requested.")
# break
except IOError as e:
print(f"Initialization Error: {e}")
except KeyboardInterrupt:
print("Program interrupted by user.")
except Exception as e:
print(f"An unexpected error occurred in main loop: {e}")
import traceback
traceback.print_exc() # Print detailed traceback for debugging
finally:
# --- Cleanup resources ---
print("Cleaning up resources...")
if "videoCapture" in locals() and videoCapture:
videoCapture.release()
if "udpClient" in locals() and udpClient:
udpClient.close()
if "visualizer" in locals() and visualizer:
visualizer.destroy() # Explicitly destroy window if using the class method
# cv2.destroyAllWindows() # Alternative cleanup if not using Visualizer.destroy()
print("Cleanup complete.")
if __name__ == "__main__":
# Optional: Add command-line argument parsing for config file path
# import argparse
# parser = argparse.ArgumentParser()
# parser.add_argument("-c", "--config", default="thresholds.json", help="Path to the JSON settings file")
# args = parser.parse_args()
# main(config_path=args.config) # Pass path to main if using args
main()