From 00c6bc2b78ab975bab787c359f630b472f4936dc Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Thu, 16 Jan 2025 13:05:02 -0500 Subject: [PATCH 1/7] restart worker patch with fixed commit history --- .../cluster_estimation/cluster_estimation_worker.py | 13 +++++++++++++ modules/detect_target/detect_target_worker.py | 6 ++++++ modules/geolocation/geolocation_worker.py | 6 ++++++ utilities/workers/worker_manager.py | 6 +++--- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index f10c8313..66192a8d 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -5,6 +5,7 @@ import os import pathlib +from modules import detection_in_world from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import cluster_estimation @@ -71,6 +72,18 @@ def cluster_estimation_worker( input_data = input_queue.queue.get() if input_data is None: + local_logger.info("Recieved type None, exiting.") + break + + isInvalid = False + + for input in input_data: + if not isinstance(input, detection_in_world.DetectionInWorld): + local_logger.warning(f"Skipping unexpected input: {input}") + isInvalid = True + break + + if isInvalid: continue # TODO: When to override diff --git a/modules/detect_target/detect_target_worker.py b/modules/detect_target/detect_target_worker.py index 8ce5cbc4..17b5cf43 100644 --- a/modules/detect_target/detect_target_worker.py +++ b/modules/detect_target/detect_target_worker.py @@ -5,6 +5,7 @@ import os import pathlib +from modules import image_and_time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import detect_target_factory @@ -64,8 +65,13 @@ def detect_target_worker( input_data = input_queue.queue.get() if input_data is None: + local_logger.info("Recieved type None, exiting.") break + if not isinstance(input_data, image_and_time.ImageAndTime): + local_logger.warning(f"Skipping unexpected input: {input}") + continue + result, value = detector.run(input_data) if not result: continue diff --git a/modules/geolocation/geolocation_worker.py b/modules/geolocation/geolocation_worker.py index 11781688..4e7ac6d7 100644 --- a/modules/geolocation/geolocation_worker.py +++ b/modules/geolocation/geolocation_worker.py @@ -5,6 +5,7 @@ import os import pathlib +from modules import merged_odometry_detections from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import camera_properties @@ -55,8 +56,13 @@ def geolocation_worker( input_data = input_queue.queue.get() if input_data is None: + local_logger.info("Recieved type None, exiting.") break + if not isinstance(input_data, merged_odometry_detections.MergedOdometryDetections): + local_logger.warning(f"Skipping unexpected input: {input}") + continue + result, value = locator.run(input_data) if not result: continue diff --git a/utilities/workers/worker_manager.py b/utilities/workers/worker_manager.py index be7480a7..fcf8cdfb 100644 --- a/utilities/workers/worker_manager.py +++ b/utilities/workers/worker_manager.py @@ -247,8 +247,8 @@ def check_and_restart_dead_workers(self) -> bool: # Draining the preceding queue ensures that the preceding queue data wasn't what # caused the worker to fail. Draining the succeeding queues is not needed # because a worker that died would not have put bad data into the queue. - input_queues = self.__worker_properties.get_input_queues() - for queue in input_queues: - queue.drain_queue() + # input_queues = self.__worker_properties.get_input_queues() + # for queue in input_queues: + # queue.drain_queue() return True From d9f44a880d3e159711aac3ab0f3cfd9a10a58807 Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Sat, 18 Jan 2025 09:52:29 -0500 Subject: [PATCH 2/7] fixed black error for CI/CD --- modules/cluster_estimation/cluster_estimation_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index 66192a8d..31fe4d13 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -75,14 +75,14 @@ def cluster_estimation_worker( local_logger.info("Recieved type None, exiting.") break - isInvalid = False + isInvalid = False for input in input_data: if not isinstance(input, detection_in_world.DetectionInWorld): local_logger.warning(f"Skipping unexpected input: {input}") isInvalid = True break - + if isInvalid: continue From a30cfd1ee638f6db53e6ae3f28b5abb14208cdf7 Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Sat, 18 Jan 2025 09:59:39 -0500 Subject: [PATCH 3/7] fixed pylint error for CI/CD --- modules/cluster_estimation/cluster_estimation_worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index 31fe4d13..099ecb7b 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -75,15 +75,15 @@ def cluster_estimation_worker( local_logger.info("Recieved type None, exiting.") break - isInvalid = False + is_invalid = False for input in input_data: if not isinstance(input, detection_in_world.DetectionInWorld): local_logger.warning(f"Skipping unexpected input: {input}") - isInvalid = True + is_invalid = True break - if isInvalid: + if is_invalid: continue # TODO: When to override From fc92af009ce0ef4801644e2f89d46faab1fb9942 Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Sat, 18 Jan 2025 10:07:01 -0500 Subject: [PATCH 4/7] fixed pylint error for CI/CD --- modules/cluster_estimation/cluster_estimation_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index 099ecb7b..a38db39f 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -77,8 +77,8 @@ def cluster_estimation_worker( is_invalid = False - for input in input_data: - if not isinstance(input, detection_in_world.DetectionInWorld): + for single_input in input_data: + if not isinstance(single_input, detection_in_world.DetectionInWorld): local_logger.warning(f"Skipping unexpected input: {input}") is_invalid = True break From 13bea175a4aaa985ebcd8261d6e822c24540dd5d Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Sun, 19 Jan 2025 08:14:15 -0500 Subject: [PATCH 5/7] added submodules --- modules/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/common b/modules/common index 9b10a334..c7ab98a7 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 9b10a334651b7cca5d014d4640e42d3a55d128f8 +Subproject commit c7ab98a75be0f78c2c17084d30c7fce5708897ff From 7aa0554e46c93b20b66907628e0e6dc6f85dbbcb Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Tue, 21 Jan 2025 09:14:42 -0500 Subject: [PATCH 6/7] added data checking to communication worker --- .../communications/communications_worker.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 8339587f..35b43282 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -6,6 +6,8 @@ import pathlib import queue +from modules import object_in_world + from . import communications from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller @@ -59,6 +61,23 @@ def communications_worker( while not controller.is_exit_requested(): controller.check_pause() + input_data = input_queue.queue.get() + + if input_data is None: + local_logger.info("Recieved type None, exiting.") + break + + is_invalid = False + + for single_input in input_data: + if not isinstance(single_input, object_in_world.ObjectInWorld): + local_logger.warning(f"Skipping unexpected input: {input}") + is_invalid = True + break + + if is_invalid: + continue + result, value = comm.run(input_queue.queue.get()) if not result: continue From 900661d61381f1553b887b5c0267721a7a102f92 Mon Sep 17 00:00:00 2001 From: Adityya Kaushal Date: Wed, 22 Jan 2025 10:49:38 -0500 Subject: [PATCH 7/7] fixed code that was commented on during review --- modules/communications/communications_worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 35b43282..6e5f3b66 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -7,7 +7,6 @@ import queue from modules import object_in_world - from . import communications from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller @@ -78,7 +77,7 @@ def communications_worker( if is_invalid: continue - result, value = comm.run(input_queue.queue.get()) + result, value = comm.run(input_data) if not result: continue