Fix sensor publish topics and add MQTT tests#45
Fix sensor publish topics and add MQTT tests#45Dinithi-Gunathilake wants to merge 16 commits intodevfrom
Conversation
Added error handling for invalid hex code formats in set_color_from_hex_code method.
Removed unnecessary import statements for math and json modules.
Added error handling to log exceptions during rotation.
There was a problem hiding this comment.
Pull request overview
This pull request ports robot control functionality from Java to Python, adding MQTT client tests and implementing sensor, communication, and motion control modules. The PR establishes the foundation for a robot control library with comprehensive MQTT-based communication.
Changes:
- Adds MQTT client tests with mock infrastructure for unit testing
- Implements sensor classes (distance, color, proximity) with MQTT communication
- Adds type definitions for RGB colors and proximity readings
- Implements helper modules for motion control, coordinates, and MQTT handling
Reviewed changes
Copilot reviewed 47 out of 48 changed files in this pull request and generated 24 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_mqtt_client.py | New comprehensive MQTT client tests with mocking framework |
| src/robot/virtual_robot.py | New virtual robot base class with sensor interrupt handlers |
| src/robot/types/rgb_color_type.py | RGB color type with multiple input format support |
| src/robot/types/proximity_reading_type.py | Proximity sensor reading type with distance and color parsing |
| src/robot/sensors/*.py | Sensor implementations (distance, color, proximity) with MQTT integration |
| src/robot/mqtt/robot_mqtt_client.py | MQTT client wrapper with channel prefix support and queue management |
| src/robot/robot_base.py | Complete robot base class with lifecycle management and subscription handling |
| src/robot/helpers/*.py | Helper classes for motion control, coordinates, and MQTT operations |
| src/robot/indicators/neopixel.py | NeoPixel LED controller with MQTT-based color changes |
| src/robot/communication/*.py | Simple and directed communication modules |
| docs/Makefile | Updated livehtml target with clean dependency |
| .flake8 | New flake8 configuration file |
| .github/workflows/ci.yml | Updated CI workflow to run on PR to dev/main branches |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/robot/virtual_robot.py
Outdated
|
|
||
| def sensor_interrupt(self, sensor: str, value: str) -> None: | ||
| if sensor == "distance": | ||
| print(f"Distance sensor interrupt on {self.id}with value{value}") |
There was a problem hiding this comment.
Missing spaces in the print statement. Should be "on {self.id} with value {value}" instead of "on {self.id}with value{value}".
src/robot/virtual_robot.py
Outdated
| if sensor == "distance": | ||
| print(f"Distance sensor interrupt on {self.id}with value{value}") | ||
| elif sensor == "color": | ||
| print(f"Color sensor interrupt on {self.id}with value{value}") |
There was a problem hiding this comment.
Missing spaces in the print statement. Should be "on {self.id} with value {value}" instead of "on {self.id}with value{value}".
src/robot/sensors/proximity.py
Outdated
| def __init__with_angles( | ||
| self, robot, angles: list[int], mqtt_client: RobotMqttClient | ||
| ): # helper to mirror overload | ||
| super().__init__(robot, mqtt_client) | ||
| self._topics_sub = {} | ||
| self._angles = list(angles) | ||
| self._subscribe("PROXIMITY_IN", f"sensor/proximity/{self.robot_id}") | ||
| self._proximity_lock = False | ||
| self._proximity = None | ||
|
|
There was a problem hiding this comment.
The __init__with_angles method is a duplicate of init with only slight differences. This is not a valid Python pattern. Python doesn't support method overloading in the same way as Java. Consider using a single init with a default parameter value for angles or using a class method as an alternative constructor pattern (e.g., @classmethod def with_angles(...)).
| class ProximityReadingType: | ||
| def __init__(self, angles: list[int] | tuple[int, ...], s: str): | ||
| reading_count = len(angles) | ||
| values = s.split() | ||
|
|
||
| self._distances: list[int] = [0] * reading_count | ||
| self._colors: list[RGBColorType] = [ | ||
| RGBColorType(0, 0, 0) for _ in range(reading_count) | ||
| ] | ||
|
|
||
| if len(values) != reading_count * 2: | ||
| raise ProximityException( | ||
| f"ProximityReadingType: length mismatch {len(values)}" | ||
| ) | ||
|
|
||
| for i in range(reading_count): | ||
| vi = values[i] | ||
| if vi == "Infinity": | ||
| print(f"Proximity: Infinity reading received for {i}") | ||
| self._distances[i] = -1 | ||
| else: | ||
| self._distances[i] = int(vi) | ||
|
|
||
| color = RGBColorType(0, 0, 0) | ||
| color.set_color_from_hex_code(values[reading_count + i]) | ||
| self._colors[i] = color | ||
|
|
||
| def distances(self) -> list[int]: | ||
| return self._distances | ||
|
|
||
| def colors(self) -> list[RGBColorType]: | ||
| return self._colors | ||
|
|
||
| def __str__(self) -> str: | ||
| parts: list[str] = [] | ||
| parts.extend(str(d) for d in self._distances) | ||
| parts.append(" ") | ||
| parts.extend(str(c) for c in self._colors) | ||
| return " ".join(parts) |
There was a problem hiding this comment.
The ProximityReadingType class lacks test coverage. The class handles complex parsing of distance and color data with special cases like "Infinity" values, and should be tested to ensure it correctly handles various input formats and edge cases.
| print(self._topics_sub.get("COMMUNICATION_IN_SIMP")) | ||
| print(f"Received (unknown simp): {topic}> {msg}") |
There was a problem hiding this comment.
Line 35 prints the expected topic but doesn't provide useful context. This debug print statement should either be removed or combined with line 36 to provide clearer information about what was expected vs what was received.
| print(self._topics_sub.get("COMMUNICATION_IN_SIMP")) | |
| print(f"Received (unknown simp): {topic}> {msg}") | |
| expected_topic = self._topics_sub.get("COMMUNICATION_IN_SIMP") | |
| print( | |
| f"Received (unknown simp): expected '{expected_topic}', " | |
| f"but got '{topic}' with message '{msg}'" | |
| ) |
| @pytest.fixture(autouse=True) | ||
| def patch_mqtt_client(monkeypatch): | ||
| """Replace paho.mqtt with a lightweight fake and skip real sleeps.""" | ||
| import robot.mqtt.robot_mqtt_client as mqtt_client_mod |
There was a problem hiding this comment.
Module 'robot.mqtt.robot_mqtt_client' is imported with both 'import' and 'import from'.
src/robot/motion.py
Outdated
| """Placeholder motion controller.""" | ||
|
|
||
| pass | ||
| from .helpers.motion_controller import MotionController # noqa: F401 |
There was a problem hiding this comment.
Import of 'MotionController' is not used.
src/robot/mqtt_client.py
Outdated
| """Placeholder MQTT client wrapper.""" | ||
|
|
||
| pass | ||
| from .mqtt.robot_mqtt_client import RobotMqttClient # noqa: F401 |
There was a problem hiding this comment.
Import of 'RobotMqttClient' is not used.
| def delay(self, milliseconds: int) -> None: | ||
| try: | ||
| time.sleep(max(0, milliseconds) / 1000.0) | ||
| except Exception: |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
3560baa to
fdb3a43
Compare
No description provided.