From fb4418d31f21e9ac3214ee85dfb262a55c729cdc Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 12:15:56 -0600 Subject: [PATCH 01/18] Initial commit --- main_2024.py | 15 ++++++++++-- modules/common | 2 +- modules/communications/communications.py | 23 +++++++++++++++--- .../communications/communications_worker.py | 24 ++++++++++++++++--- modules/flight_interface/flight_interface.py | 6 ++++- .../flight_interface_worker.py | 9 ++++++- 6 files changed, 68 insertions(+), 11 deletions(-) diff --git a/main_2024.py b/main_2024.py index 25f68a72..2037001e 100644 --- a/main_2024.py +++ b/main_2024.py @@ -201,6 +201,10 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) + communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, + ) communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -285,7 +289,10 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, ), - input_queues=[flight_interface_decision_queue], + input_queues=[ + flight_interface_decision_queue, + communications_to_flight_interface_queue, + ], output_queues=[ flight_interface_to_data_merge_queue, flight_interface_to_communications_queue, @@ -367,7 +374,10 @@ def main() -> int: flight_interface_to_communications_queue, cluster_estimation_to_communications_queue, ], - output_queues=[communications_to_main_queue], + output_queues=[ + communications_to_main_queue, + communications_to_flight_interface_queue, + ], controller=controller, local_logger=main_logger, ) @@ -505,6 +515,7 @@ def main() -> int: cluster_estimation_to_communications_queue.fill_and_drain_queue() communications_to_main_queue.fill_and_drain_queue() flight_interface_decision_queue.fill_and_drain_queue() + communications_to_flight_interface_queue.fill_and_drain_queue() for manager in worker_managers: manager.join_workers() diff --git a/modules/common b/modules/common index 9acf88b4..720f98c3 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad +Subproject commit 720f98c3cb9df92d84898df6ea41103aadbedb09 diff --git a/modules/communications/communications.py b/modules/communications/communications.py index b1127475..a7f40c02 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -8,6 +8,8 @@ from ..common.modules import position_global from ..common.modules import position_local from ..common.modules.logger import logger +from ..common.modules.data_encoding import message_encoding_decoding +from ..common.modules.data_encoding import worker_enum from ..common.modules.mavlink import local_global_conversion @@ -23,6 +25,7 @@ def create( cls, home_position: position_global.PositionGlobal, local_logger: logger.Logger, + worker_id: int, ) -> "tuple[True, Communications] | tuple[False, None]": """ Logs data and forwards it. @@ -32,13 +35,14 @@ def create( Returns: Success, class object. """ - return True, Communications(cls.__create_key, home_position, local_logger) + return True, Communications(cls.__create_key, home_position, local_logger, worker_id) def __init__( self, class_private_create_key: object, home_position: position_global.PositionGlobal, local_logger: logger.Logger, + worker_id: int, ) -> None: """ Private constructor, use create() method. @@ -47,11 +51,12 @@ def __init__( self.__home_position = home_position self.__logger = local_logger + self.__worker_id = worker_id def run( self, objects_in_world: list[object_in_world.ObjectInWorld], - ) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]: + ) -> tuple[True, list[bytes]] | tuple[False, None]: objects_in_world_global = [] for object_in_world in objects_in_world: @@ -87,4 +92,16 @@ def run( self.__logger.info(f"{time.time()}: {objects_in_world_global}") - return True, objects_in_world + encoded_position_global_objects = [] + for object in object_in_world_global: + + result, message = message_encoding_decoding.encode_position_global( + self.__worker_id, object + ) + if not result: + self.__logger.warning("Conversion from PositionGlobal to bytes failed", True) + return False, None + + encoded_position_global_objects.append(message) + + return True, encoded_position_global_objects diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 3b25dc85..b0886b64 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -5,16 +5,20 @@ import os import pathlib import queue +import time from modules import object_in_world from . import communications from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller +from ..common.modules.data_encoding import metadata_encoding_decoding +from ..common.modules.data_encoding import worker_enum from ..common.modules.logger import logger def communications_worker( timeout: float, + period: float, home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -29,6 +33,7 @@ def communications_worker( """ worker_name = pathlib.Path(__file__).stem + worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER process_id = os.getpid() result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) if not result: @@ -49,7 +54,7 @@ def communications_worker( local_logger.info(f"Home position received: {home_position}", True) - result, comm = communications.Communications.create(home_position, local_logger) + result, comm = communications.Communications.create(home_position, local_logger, worker_id) if not result: local_logger.error("Worker failed to create class object", True) return @@ -79,8 +84,21 @@ def communications_worker( if is_invalid: continue - result, value = comm.run(input_data) + result, list_of_messages = comm.run(input_data) if not result: continue - output_queue.queue.put(value) + result, metadata = metadata_encoding_decoding.encode_metadata( + worker_id, len(list_of_messages) + ) + if not result: + local_logger.error("Failed to encode metadata", True) + continue + + output_queue.queue.put(metadata) + + for message in list_of_messages: + + time.sleep(period) + + output_queue.queue.put(message) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index ef976278..52131364 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal: """ return self.__home_position - def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": + def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": """ Returns a possible OdometryAndTime with current timestamp. """ @@ -103,6 +103,10 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": self.__logger.info(str(odometry_and_time_object), True) + result = self.controller.send_statustext_msg(message) + if not result: + self.__logger.error("Failed to send statustext message", True) + return True, odometry_and_time_object def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool: diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 41610a73..8f57c90f 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -20,6 +20,7 @@ def flight_interface_worker( input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper, + coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -62,7 +63,13 @@ def flight_interface_worker( time.sleep(period) - result, value = interface.run() + coordinate = coordinates_input_queue.queue.get_nowait() + + if not isinstance(coordinate, bytes): + local_logger.warning(f"Skipping unexpected input: {coordinate}") + continue + + result, value = interface.run(coordinate) if not result: continue From 4524414800ec307da26eeaa610e849872beaabd2 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 12:31:29 -0600 Subject: [PATCH 02/18] common updates --- modules/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/common b/modules/common index 720f98c3..9acf88b4 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 720f98c3cb9df92d84898df6ea41103aadbedb09 +Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad From 882eb868a3750354af6d1e546ec1d46aa1657297 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 12:38:24 -0600 Subject: [PATCH 03/18] Changed worker_id to enum type in communications --- modules/communications/communications.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/communications/communications.py b/modules/communications/communications.py index a7f40c02..7aa7f694 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -25,7 +25,7 @@ def create( cls, home_position: position_global.PositionGlobal, local_logger: logger.Logger, - worker_id: int, + worker_id: worker_enum.WorkerEnum, ) -> "tuple[True, Communications] | tuple[False, None]": """ Logs data and forwards it. @@ -42,7 +42,7 @@ def __init__( class_private_create_key: object, home_position: position_global.PositionGlobal, local_logger: logger.Logger, - worker_id: int, + worker_id: worker_enum.WorkerEnum, ) -> None: """ Private constructor, use create() method. From b37f535162598ec17cc5649e75fa621996dd2aa0 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 14:00:30 -0600 Subject: [PATCH 04/18] Modified config.yaml to include new worker argument --- config.yaml | 1 + main_2024.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index c6d4f59e..4dbf9c9c 100644 --- a/config.yaml +++ b/config.yaml @@ -77,3 +77,4 @@ cluster_estimation: communications: timeout: 30.0 # seconds + worker_period: 0.5 # seconds diff --git a/main_2024.py b/main_2024.py index 2037001e..81383697 100644 --- a/main_2024.py +++ b/main_2024.py @@ -156,6 +156,7 @@ def main() -> int: RANDOM_STATE = config["cluster_estimation"]["random_state"] COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"] + COMMUNICATIONS_WORKER_PERIOD = config["communications"]["worker_period"] # pylint: enable=invalid-name except KeyError as exception: @@ -369,7 +370,7 @@ def main() -> int: result, communications_worker_properties = worker_manager.WorkerProperties.create( count=1, target=communications_worker.communications_worker, - work_arguments=(COMMUNICATIONS_TIMEOUT,), + work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD), input_queues=[ flight_interface_to_communications_queue, cluster_estimation_to_communications_queue, From 04d07673f1d2e51afe65e74566f54786a3b2cffc Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 14:07:37 -0600 Subject: [PATCH 05/18] Updated common --- modules/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/common b/modules/common index 9acf88b4..016e4827 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad +Subproject commit 016e48277621e356bf5343169f1a78e5cffab44c From 1f99de0fa9b993bdddaaf377da7163c78ac36f66 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 15:14:52 -0600 Subject: [PATCH 06/18] Fixed common (hopefully) --- modules/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/common b/modules/common index 016e4827..9acf88b4 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 016e48277621e356bf5343169f1a78e5cffab44c +Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad From dbb6446624ce7ddb330f0b28d3bd243201ea2e7f Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 29 Jan 2025 15:19:04 -0600 Subject: [PATCH 07/18] Updated common --- modules/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/common b/modules/common index 9acf88b4..cec49cc9 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad +Subproject commit cec49cc9501bbd81e86afbca3ed67bae3bbd086a From 313c324f0a170ec3f0915c1e0eae0845335c7d3e Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Thu, 30 Jan 2025 17:54:19 -0600 Subject: [PATCH 08/18] added argument to flight interface hardware integration test --- tests/integration/test_flight_interface_hardware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_flight_interface_hardware.py b/tests/integration/test_flight_interface_hardware.py index e0050441..654d5134 100644 --- a/tests/integration/test_flight_interface_hardware.py +++ b/tests/integration/test_flight_interface_hardware.py @@ -34,7 +34,7 @@ def main() -> int: assert interface is not None # Run - result, odometry_time = interface.run() + result, odometry_time = interface.run(None) # Test assert result From 42e91c710e9ad3109afb9f1e8c9e23951f187a99 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Fri, 31 Jan 2025 16:05:35 -0600 Subject: [PATCH 09/18] Modified test_flight_interface_worker to handle new inputs, added exception handling. --- .../flight_interface_worker.py | 7 ++- .../test_flight_interface_worker.py | 47 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 8f57c90f..7fd32c1d 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -4,6 +4,7 @@ import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -63,7 +64,11 @@ def flight_interface_worker( time.sleep(period) - coordinate = coordinates_input_queue.queue.get_nowait() + try: + coordinate = coordinates_input_queue.queue.get_nowait() + except queue.Empty: + local_logger.warning("No more coordinates to process") + coordinate = None if not isinstance(coordinate, bytes): local_logger.warning(f"Skipping unexpected input: {coordinate}") diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index af52cc9d..82ec25bc 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -11,11 +11,34 @@ from modules import decision_command from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller +from modules.common.modules import position_global +from modules.common.modules.data_encoding import message_encoding_decoding +from modules.common.modules.data_encoding import worker_enum MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550" FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds +WORK_COUNT = 4 +COMMUNICATIONS_WORKER_ID = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER + + +def simulate_communications_worker( + in_queue: queue_proxy_wrapper.QueueProxyWrapper, + data_point: position_global.PositionGlobal, +) -> None: + """ + Encode coordinates and place into queue. + """ + result, message = message_encoding_decoding.encode_position_global( + COMMUNICATIONS_WORKER_ID, data_point + ) + assert result + assert message is not None + + in_queue.queue.put(message) + + return def apply_decision_test( @@ -105,6 +128,8 @@ def main() -> int: out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + communications_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) worker = mp.Process( target=flight_interface_worker.flight_interface_worker, @@ -116,6 +141,7 @@ def main() -> int: in_queue, # Added input_queue out_queue, home_position_out_queue, + communications_in_queue, controller, ), ) @@ -129,6 +155,27 @@ def main() -> int: home_position = home_position_out_queue.queue.get() assert home_position is not None + data_points = [position_global.PositionGlobal.create(43.471468, -80.544205, 335), + position_global.PositionGlobal.create(43.6629, -79.3957, 105), + position_global.PositionGlobal.create(43.2609, -79.9192, 100), + position_global.PositionGlobal.create(43.7735, -79.5019, 170) + ] + + # Simulate communications worker + for i in range(0, WORK_COUNT): + simulate_communications_worker(communications_in_queue, home_position, data_points[i]) + + # Test flight interface worker sending statustext messages + for i in range(0, WORK_COUNT): + try: + input_data: bytes = communications_out_queue.queue.get_nowait() + assert input_data is not None + except queue.Empty: + print("Output queue has no more messages to process, exiting") + break + + assert communications_out_queue.queue.empty() + # Run the apply_decision tests test_result = apply_decision_test(in_queue, out_queue) if not test_result: From e08f5d3df2f21d93ef97332437c1efb422c50011 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Fri, 31 Jan 2025 16:06:22 -0600 Subject: [PATCH 10/18] fixed linting --- tests/integration/test_flight_interface_worker.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index 82ec25bc..40a5c28c 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -155,11 +155,12 @@ def main() -> int: home_position = home_position_out_queue.queue.get() assert home_position is not None - data_points = [position_global.PositionGlobal.create(43.471468, -80.544205, 335), - position_global.PositionGlobal.create(43.6629, -79.3957, 105), - position_global.PositionGlobal.create(43.2609, -79.9192, 100), - position_global.PositionGlobal.create(43.7735, -79.5019, 170) - ] + data_points = [ + position_global.PositionGlobal.create(43.471468, -80.544205, 335), + position_global.PositionGlobal.create(43.6629, -79.3957, 105), + position_global.PositionGlobal.create(43.2609, -79.9192, 100), + position_global.PositionGlobal.create(43.7735, -79.5019, 170), + ] # Simulate communications worker for i in range(0, WORK_COUNT): From 34ab226e42ef8f46789fbb4a198e0031a5fb22b3 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Sat, 1 Feb 2025 16:22:47 -0600 Subject: [PATCH 11/18] Added message queue to communications, moved checking to inside run() function, adjusted flight interface test to reflect new inputs --- modules/communications/communications.py | 7 +-- .../communications/communications_worker.py | 8 +-- modules/flight_interface/flight_interface.py | 7 +-- .../flight_interface_worker.py | 7 +-- .../test_flight_interface_worker.py | 50 +------------------ 5 files changed, 14 insertions(+), 65 deletions(-) diff --git a/modules/communications/communications.py b/modules/communications/communications.py index 7aa7f694..75010e65 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -25,7 +25,6 @@ def create( cls, home_position: position_global.PositionGlobal, local_logger: logger.Logger, - worker_id: worker_enum.WorkerEnum, ) -> "tuple[True, Communications] | tuple[False, None]": """ Logs data and forwards it. @@ -35,14 +34,13 @@ def create( Returns: Success, class object. """ - return True, Communications(cls.__create_key, home_position, local_logger, worker_id) + return True, Communications(cls.__create_key, home_position, local_logger) def __init__( self, class_private_create_key: object, home_position: position_global.PositionGlobal, local_logger: logger.Logger, - worker_id: worker_enum.WorkerEnum, ) -> None: """ Private constructor, use create() method. @@ -51,7 +49,6 @@ def __init__( self.__home_position = home_position self.__logger = local_logger - self.__worker_id = worker_id def run( self, @@ -96,7 +93,7 @@ def run( for object in object_in_world_global: result, message = message_encoding_decoding.encode_position_global( - self.__worker_id, object + worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object ) if not result: self.__logger.warning("Conversion from PositionGlobal to bytes failed", True) diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index b0886b64..9a8da0d6 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -22,6 +22,7 @@ def communications_worker( home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + message_output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -33,7 +34,6 @@ def communications_worker( """ worker_name = pathlib.Path(__file__).stem - worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER process_id = os.getpid() result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) if not result: @@ -54,7 +54,7 @@ def communications_worker( local_logger.info(f"Home position received: {home_position}", True) - result, comm = communications.Communications.create(home_position, local_logger, worker_id) + result, comm = communications.Communications.create(home_position, local_logger) if not result: local_logger.error("Worker failed to create class object", True) return @@ -89,16 +89,18 @@ def communications_worker( continue result, metadata = metadata_encoding_decoding.encode_metadata( - worker_id, len(list_of_messages) + worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, len(list_of_messages) ) if not result: local_logger.error("Failed to encode metadata", True) continue output_queue.queue.put(metadata) + message_output_queue.queue.put(metadata) for message in list_of_messages: time.sleep(period) output_queue.queue.put(message) + message_output_queue.queue.put(message) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index 52131364..df747ebd 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -103,9 +103,10 @@ def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime self.__logger.info(str(odometry_and_time_object), True) - result = self.controller.send_statustext_msg(message) - if not result: - self.__logger.error("Failed to send statustext message", True) + if not isinstance(message, bytes): + self.__logger.warning(f"Skipping unexpected input: {message}") + else: + result = self.controller.send_statustext_msg(message) return True, odometry_and_time_object diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 7fd32c1d..4bbda1a0 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -19,9 +19,9 @@ def flight_interface_worker( baud_rate: int, period: float, input_queue: queue_proxy_wrapper.QueueProxyWrapper, + coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper, - coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -67,13 +67,8 @@ def flight_interface_worker( try: coordinate = coordinates_input_queue.queue.get_nowait() except queue.Empty: - local_logger.warning("No more coordinates to process") coordinate = None - if not isinstance(coordinate, bytes): - local_logger.warning(f"Skipping unexpected input: {coordinate}") - continue - result, value = interface.run(coordinate) if not result: continue diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index 40a5c28c..0c45d4d1 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -11,35 +11,12 @@ from modules import decision_command from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller -from modules.common.modules import position_global -from modules.common.modules.data_encoding import message_encoding_decoding -from modules.common.modules.data_encoding import worker_enum + MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550" FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds -WORK_COUNT = 4 -COMMUNICATIONS_WORKER_ID = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER - - -def simulate_communications_worker( - in_queue: queue_proxy_wrapper.QueueProxyWrapper, - data_point: position_global.PositionGlobal, -) -> None: - """ - Encode coordinates and place into queue. - """ - result, message = message_encoding_decoding.encode_position_global( - COMMUNICATIONS_WORKER_ID, data_point - ) - assert result - assert message is not None - - in_queue.queue.put(message) - - return - def apply_decision_test( in_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -129,7 +106,6 @@ def main() -> int: home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - communications_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) worker = mp.Process( target=flight_interface_worker.flight_interface_worker, @@ -139,9 +115,9 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, in_queue, # Added input_queue + communications_in_queue, out_queue, home_position_out_queue, - communications_in_queue, controller, ), ) @@ -155,28 +131,6 @@ def main() -> int: home_position = home_position_out_queue.queue.get() assert home_position is not None - data_points = [ - position_global.PositionGlobal.create(43.471468, -80.544205, 335), - position_global.PositionGlobal.create(43.6629, -79.3957, 105), - position_global.PositionGlobal.create(43.2609, -79.9192, 100), - position_global.PositionGlobal.create(43.7735, -79.5019, 170), - ] - - # Simulate communications worker - for i in range(0, WORK_COUNT): - simulate_communications_worker(communications_in_queue, home_position, data_points[i]) - - # Test flight interface worker sending statustext messages - for i in range(0, WORK_COUNT): - try: - input_data: bytes = communications_out_queue.queue.get_nowait() - assert input_data is not None - except queue.Empty: - print("Output queue has no more messages to process, exiting") - break - - assert communications_out_queue.queue.empty() - # Run the apply_decision tests test_result = apply_decision_test(in_queue, out_queue) if not test_result: From dc6f1f3336e193bb1054998f348686828ac28dd3 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Sat, 1 Feb 2025 16:26:10 -0600 Subject: [PATCH 12/18] fixed linting error --- tests/integration/test_flight_interface_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index 0c45d4d1..97ea874a 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -18,6 +18,7 @@ FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds + def apply_decision_test( in_queue: queue_proxy_wrapper.QueueProxyWrapper, out_queue: queue_proxy_wrapper.QueueProxyWrapper, From 7c809a70ba2c868622d9ebbaaaca0ab88feac845 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Sun, 2 Feb 2025 16:37:31 -0600 Subject: [PATCH 13/18] removed conditional --- modules/flight_interface/flight_interface.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index df747ebd..c8757d0c 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -103,10 +103,7 @@ def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime self.__logger.info(str(odometry_and_time_object), True) - if not isinstance(message, bytes): - self.__logger.warning(f"Skipping unexpected input: {message}") - else: - result = self.controller.send_statustext_msg(message) + self.controller.send_statustext_msg(message) return True, odometry_and_time_object From f056b62a07b90b760c176e67e29aa9a672378ac0 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Mon, 3 Feb 2025 09:01:29 -0600 Subject: [PATCH 14/18] updated common --- modules/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/common b/modules/common index cec49cc9..a710405b 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit cec49cc9501bbd81e86afbca3ed67bae3bbd086a +Subproject commit a710405b4bc11bdad6bccfc50e0e495f21c8394f From 7070a1ed4a35b315dcc0d97ff8ca1b4453eb908d Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Tue, 4 Feb 2025 14:16:52 -0600 Subject: [PATCH 15/18] moved metadata encoding to run() --- modules/communications/communications.py | 18 +++++++++++++----- .../communications/communications_worker.py | 9 +-------- modules/flight_interface/flight_interface.py | 5 +++-- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/modules/communications/communications.py b/modules/communications/communications.py index 75010e65..3426a3c7 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -9,6 +9,7 @@ from ..common.modules import position_local from ..common.modules.logger import logger from ..common.modules.data_encoding import message_encoding_decoding +from ..common.modules.data_encoding import metadata_encoding_decoding from ..common.modules.data_encoding import worker_enum from ..common.modules.mavlink import local_global_conversion @@ -53,7 +54,7 @@ def __init__( def run( self, objects_in_world: list[object_in_world.ObjectInWorld], - ) -> tuple[True, list[bytes]] | tuple[False, None]: + ) -> tuple[True, list[bytes], bytes] | tuple[False, None, None]: objects_in_world_global = [] for object_in_world in objects_in_world: @@ -71,7 +72,7 @@ def run( self.__logger.warning( f"Could not convert ObjectInWorld to PositionLocal:\nobject in world: {object_in_world}" ) - return False, None + return False, None, None result, object_in_world_global = ( local_global_conversion.position_global_from_position_local( @@ -83,7 +84,7 @@ def run( self.__logger.warning( f"position_global_from_position_local conversion failed:\nhome_position: {self.__home_position}\nobject_position_local: {object_position_local}" ) - return False, None + return False, None, None objects_in_world_global.append(object_in_world_global) @@ -97,8 +98,15 @@ def run( ) if not result: self.__logger.warning("Conversion from PositionGlobal to bytes failed", True) - return False, None + return False, None, None encoded_position_global_objects.append(message) - return True, encoded_position_global_objects + result, metadata = metadata_encoding_decoding.encode_metadata( + worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, len(encoded_position_global_objects) + ) + if not result: + self.__logger.error("Failed to encode metadata", True) + return False, None, None + + return True, encoded_position_global_objects, metadata diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 9a8da0d6..57aae669 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -84,17 +84,10 @@ def communications_worker( if is_invalid: continue - result, list_of_messages = comm.run(input_data) + result, list_of_messages, metadata = comm.run(input_data) if not result: continue - result, metadata = metadata_encoding_decoding.encode_metadata( - worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, len(list_of_messages) - ) - if not result: - local_logger.error("Failed to encode metadata", True) - continue - output_queue.queue.put(metadata) message_output_queue.queue.put(metadata) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index c8757d0c..581e7fd9 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal: """ return self.__home_position - def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": + def run(self, message: bytes | None) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": """ Returns a possible OdometryAndTime with current timestamp. """ @@ -103,7 +103,8 @@ def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime self.__logger.info(str(odometry_and_time_object), True) - self.controller.send_statustext_msg(message) + if message: + self.controller.send_statustext_msg(message) return True, odometry_and_time_object From 485b1fe4a6c38c65abe03539a4ee2f069bc86b4a Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Tue, 4 Feb 2025 14:17:24 -0600 Subject: [PATCH 16/18] removed unnecessary imports --- modules/communications/communications_worker.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 57aae669..616c05f5 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -11,8 +11,6 @@ from . import communications from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller -from ..common.modules.data_encoding import metadata_encoding_decoding -from ..common.modules.data_encoding import worker_enum from ..common.modules.logger import logger From f7092923dba73267980208c2f8c908b2ea407a6c Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Wed, 5 Feb 2025 19:06:01 -0600 Subject: [PATCH 17/18] updated --- modules/communications/communications.py | 4 ++-- modules/communications/communications_worker.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/communications/communications.py b/modules/communications/communications.py index 3426a3c7..dcb29595 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -54,7 +54,7 @@ def __init__( def run( self, objects_in_world: list[object_in_world.ObjectInWorld], - ) -> tuple[True, list[bytes], bytes] | tuple[False, None, None]: + ) -> tuple[True, bytes, list[bytes]] | tuple[False, None, None]: objects_in_world_global = [] for object_in_world in objects_in_world: @@ -109,4 +109,4 @@ def run( self.__logger.error("Failed to encode metadata", True) return False, None, None - return True, encoded_position_global_objects, metadata + return True, metadata, encoded_position_global_objects diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 616c05f5..860530e3 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -82,13 +82,16 @@ def communications_worker( if is_invalid: continue - result, list_of_messages, metadata = comm.run(input_data) + result, metadata, list_of_messages = comm.run(input_data) if not result: continue output_queue.queue.put(metadata) message_output_queue.queue.put(metadata) + if list_of_messages is None: + continue + for message in list_of_messages: time.sleep(period) From b6bd57c08167ab286ba089ed5194ac4256fe4938 Mon Sep 17 00:00:00 2001 From: Herman Gahra Date: Thu, 6 Feb 2025 10:20:38 -0600 Subject: [PATCH 18/18] removed redundant check --- modules/communications/communications_worker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 860530e3..346d6e55 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -89,9 +89,6 @@ def communications_worker( output_queue.queue.put(metadata) message_output_queue.queue.put(metadata) - if list_of_messages is None: - continue - for message in list_of_messages: time.sleep(period)