From c36cb9df627b9d75f3ca82e404213416a0c25dcd Mon Sep 17 00:00:00 2001 From: Jane Date: Thu, 30 Jan 2025 22:49:49 -0500 Subject: [PATCH 1/6] Added new integration test for communication to ground --- .../test_communications_to_ground_station.py | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 tests/integration/test_communications_to_ground_station.py diff --git a/tests/integration/test_communications_to_ground_station.py b/tests/integration/test_communications_to_ground_station.py new file mode 100644 index 00000000..4a8ca1ff --- /dev/null +++ b/tests/integration/test_communications_to_ground_station.py @@ -0,0 +1,114 @@ +""" +Test MAVLink integration test +""" + +import multiprocessing as mp +import queue +import time + +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + +from modules.common.modules import position_global +from modules.flight_interface import flight_interface_worker + + +MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550" +FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds +FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate +FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds + + +def apply_communications_test( + in_queue: queue_proxy_wrapper.QueueProxyWrapper, + out_queue: queue_proxy_wrapper.QueueProxyWrapper, +) -> None: + """ + Method to send in hardcoded GPS coordinates to the flight interface worker + """ + gps_coordinates = [ + position_global.PositionGlobal.create(43.47321268948186, -80.53950244232878, 10), # E7 + position_global.PositionGlobal.create(37.7749, 122.4194, 30), # San Francisco + position_global.PositionGlobal.create(40.7128, 74.0060, -5.6), # New York + position_global.PositionGlobal.create(51.5072, 0.1276, 20.1), # London UK + ] + + # Place the GPS coordinates + for success, gps_coordinate in gps_coordinates: + if not success: + return False + in_queue.queue.put(gps_coordinate) + + # Wait for processing + time.sleep(10) + + # Verify that stuff is sending + try: + # pylint: disable=unused-variable + for i in range(len(gps_coordinates)): + data = out_queue.queue.get_nowait() + print(f"MAVLink data sent by drone: {data}") + except queue.Empty: + print("Output queue is empty.") + return False + + print("apply_communications_test completed successfully") + return True + + +# pylint: disable=duplicate-code +def main() -> int: + """ + Main function + """ + # Setup + controller = worker_controller.WorkerController() + + mp_manager = mp.Manager() + + out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + worker = mp.Process( + target=flight_interface_worker.flight_interface_worker, + args=( + MAVLINK_CONNECTION_ADDRESS, + FLIGHT_INTERFACE_TIMEOUT, + FLIGHT_INTERFACE_BAUD_RATE, + FLIGHT_INTERFACE_WORKER_PERIOD, + in_queue, + out_queue, + home_position_out_queue, + controller, + ), + ) + + worker.start() + + time.sleep(3) + + # Test + home_position = home_position_out_queue.queue.get() + assert home_position is not None + + # Run the apply_communication tests + test_result = apply_communications_test(in_queue, out_queue) + if not test_result: + print("apply_communications test failed.") + worker.terminate() + return -1 + + # Teardown + controller.request_exit() + worker.join() + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!") From 7171b5077405e50f26589d128e726e8525735296 Mon Sep 17 00:00:00 2001 From: Jane Date: Sat, 1 Feb 2025 11:29:27 -0500 Subject: [PATCH 2/6] Changed to display actions required by the test operator --- .../test_communications_to_ground_station.py | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/tests/integration/test_communications_to_ground_station.py b/tests/integration/test_communications_to_ground_station.py index 4a8ca1ff..0717917a 100644 --- a/tests/integration/test_communications_to_ground_station.py +++ b/tests/integration/test_communications_to_ground_station.py @@ -3,7 +3,6 @@ """ import multiprocessing as mp -import queue import time from utilities.workers import queue_proxy_wrapper @@ -14,15 +13,14 @@ MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550" -FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds +FLIGHT_INTERFACE_TIMEOUT = 30.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds def apply_communications_test( in_queue: queue_proxy_wrapper.QueueProxyWrapper, - out_queue: queue_proxy_wrapper.QueueProxyWrapper, -) -> None: +) -> bool: """ Method to send in hardcoded GPS coordinates to the flight interface worker """ @@ -43,16 +41,9 @@ def apply_communications_test( time.sleep(10) # Verify that stuff is sending - try: - # pylint: disable=unused-variable - for i in range(len(gps_coordinates)): - data = out_queue.queue.get_nowait() - print(f"MAVLink data sent by drone: {data}") - except queue.Empty: - print("Output queue is empty.") - return False - - print("apply_communications_test completed successfully") + print( + "TEST OPERATOR ACTION REQUIRED: Open mission planner's MAVLink inspector or the groundside repo (https://github.com/UWARG/statustext-parser-2025) to check for MAVLink messages" + ) return True @@ -93,7 +84,7 @@ def main() -> int: assert home_position is not None # Run the apply_communication tests - test_result = apply_communications_test(in_queue, out_queue) + test_result = apply_communications_test(in_queue) if not test_result: print("apply_communications test failed.") worker.terminate() From d2ab6ee162278bd9b424537f99391829465466c7 Mon Sep 17 00:00:00 2001 From: Jane Date: Sat, 1 Feb 2025 17:48:12 -0500 Subject: [PATCH 3/6] changed the queue --- .../test_communications_to_ground_station.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_communications_to_ground_station.py b/tests/integration/test_communications_to_ground_station.py index 0717917a..0053dcf3 100644 --- a/tests/integration/test_communications_to_ground_station.py +++ b/tests/integration/test_communications_to_ground_station.py @@ -19,7 +19,7 @@ def apply_communications_test( - in_queue: queue_proxy_wrapper.QueueProxyWrapper, + communications_input_queue: queue_proxy_wrapper.QueueProxyWrapper, ) -> bool: """ Method to send in hardcoded GPS coordinates to the flight interface worker @@ -35,7 +35,7 @@ def apply_communications_test( for success, gps_coordinate in gps_coordinates: if not success: return False - in_queue.queue.put(gps_coordinate) + communications_input_queue.queue.put(gps_coordinate) # Wait for processing time.sleep(10) @@ -57,9 +57,9 @@ def main() -> int: mp_manager = mp.Manager() - out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + communications_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) worker = mp.Process( target=flight_interface_worker.flight_interface_worker, @@ -70,7 +70,7 @@ def main() -> int: FLIGHT_INTERFACE_WORKER_PERIOD, in_queue, out_queue, - home_position_out_queue, + communications_input_queue, controller, ), ) @@ -80,11 +80,11 @@ def main() -> int: time.sleep(3) # Test - home_position = home_position_out_queue.queue.get() + home_position = communications_input_queue.queue.get() assert home_position is not None # Run the apply_communication tests - test_result = apply_communications_test(in_queue) + test_result = apply_communications_test(communications_input_queue) if not test_result: print("apply_communications test failed.") worker.terminate() From 248f2ea94ef3623042ab66c7c4d4df18577f47d2 Mon Sep 17 00:00:00 2001 From: Jane Date: Sat, 1 Feb 2025 18:38:42 -0500 Subject: [PATCH 4/6] update with different queue --- .../test_communications_to_ground_station.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_communications_to_ground_station.py b/tests/integration/test_communications_to_ground_station.py index 0053dcf3..3b6d67fd 100644 --- a/tests/integration/test_communications_to_ground_station.py +++ b/tests/integration/test_communications_to_ground_station.py @@ -9,6 +9,7 @@ from utilities.workers import worker_controller from modules.common.modules import position_global +from modules.common.modules.data_encoding import message_encoding_decoding from modules.flight_interface import flight_interface_worker @@ -32,10 +33,22 @@ def apply_communications_test( ] # Place the GPS coordinates + print(f"Inserting list of gps coordinates, length {len(gps_coordinates)}") + for success, gps_coordinate in gps_coordinates: if not success: + print("ERROR: GPS Coordinate not successfully generated") + return False + + success, message = message_encoding_decoding.encode_position_global( + "FLIGHT_INTERFACE_WORKER", gps_coordinate + ) + + if not success: + print("ERROR: Conversion from PositionGlobal to bytes failed") return False - communications_input_queue.queue.put(gps_coordinate) + + communications_input_queue.queue.put(message) # Wait for processing time.sleep(10) @@ -58,8 +71,9 @@ def main() -> int: mp_manager = mp.Manager() in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) communications_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) worker = mp.Process( target=flight_interface_worker.flight_interface_worker, @@ -69,8 +83,9 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, in_queue, - out_queue, communications_input_queue, + out_queue, + home_position_out_queue, controller, ), ) @@ -80,7 +95,7 @@ def main() -> int: time.sleep(3) # Test - home_position = communications_input_queue.queue.get() + home_position = home_position_out_queue.queue.get() assert home_position is not None # Run the apply_communication tests From 042e8298d22f269d723c048fd3d6b6630382cbf7 Mon Sep 17 00:00:00 2001 From: Jane Date: Sat, 1 Feb 2025 22:32:05 -0500 Subject: [PATCH 5/6] Added metadata to queue --- .../test_communications_to_ground_station.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_communications_to_ground_station.py b/tests/integration/test_communications_to_ground_station.py index 3b6d67fd..79a940b0 100644 --- a/tests/integration/test_communications_to_ground_station.py +++ b/tests/integration/test_communications_to_ground_station.py @@ -10,6 +10,7 @@ from modules.common.modules import position_global from modules.common.modules.data_encoding import message_encoding_decoding +from modules.common.modules.data_encoding import metadata_encoding_decoding from modules.flight_interface import flight_interface_worker @@ -17,6 +18,7 @@ FLIGHT_INTERFACE_TIMEOUT = 30.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds +WORKER_NAME = "FLIGHT_INTERFACE_WORKER" def apply_communications_test( @@ -34,6 +36,13 @@ def apply_communications_test( # Place the GPS coordinates print(f"Inserting list of gps coordinates, length {len(gps_coordinates)}") + success, metadata = metadata_encoding_decoding.encode_metadata( + WORKER_NAME, len(gps_coordinates) + ) + if not success: + return False + + communications_input_queue.queue.put(metadata) for success, gps_coordinate in gps_coordinates: if not success: @@ -41,7 +50,7 @@ def apply_communications_test( return False success, message = message_encoding_decoding.encode_position_global( - "FLIGHT_INTERFACE_WORKER", gps_coordinate + WORKER_NAME, gps_coordinate ) if not success: From 167478488f62215f39d9f89c08fe7b219378d54e Mon Sep 17 00:00:00 2001 From: Jane Date: Mon, 3 Feb 2025 18:55:01 -0500 Subject: [PATCH 6/6] Changed to use worker id --- .../integration/test_communications_to_ground_station.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_communications_to_ground_station.py b/tests/integration/test_communications_to_ground_station.py index 79a940b0..f83a1f8c 100644 --- a/tests/integration/test_communications_to_ground_station.py +++ b/tests/integration/test_communications_to_ground_station.py @@ -11,6 +11,7 @@ from modules.common.modules import position_global from modules.common.modules.data_encoding import message_encoding_decoding from modules.common.modules.data_encoding import metadata_encoding_decoding +from modules.common.modules.data_encoding import worker_enum from modules.flight_interface import flight_interface_worker @@ -18,7 +19,7 @@ FLIGHT_INTERFACE_TIMEOUT = 30.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds -WORKER_NAME = "FLIGHT_INTERFACE_WORKER" +WORKER_ID = worker_enum.WorkerEnum.FLIGHT_INTERFACE_WORKER def apply_communications_test( @@ -36,9 +37,7 @@ def apply_communications_test( # Place the GPS coordinates print(f"Inserting list of gps coordinates, length {len(gps_coordinates)}") - success, metadata = metadata_encoding_decoding.encode_metadata( - WORKER_NAME, len(gps_coordinates) - ) + success, metadata = metadata_encoding_decoding.encode_metadata(WORKER_ID, len(gps_coordinates)) if not success: return False @@ -50,7 +49,7 @@ def apply_communications_test( return False success, message = message_encoding_decoding.encode_position_global( - WORKER_NAME, gps_coordinate + WORKER_ID, gps_coordinate ) if not success: