From b5557129e0425b99b768d114aba8a8b6aad3dbec Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Wed, 3 Jun 2026 16:23:46 +0100 Subject: [PATCH] chore: apply black formatting to src and test Normalize string quotes and line breaks per black defaults; no behaviour changes (226 tests pass). --- src/api/BaseAPIWrapper.py | 8 +- src/base_feature.py | 8 +- src/bot.py | 16 +- src/commands/admin.py | 12 +- src/commands/enroll.py | 4 +- src/commands/factory.py | 37 +-- src/commands/nodes.py | 4 +- src/helpers.py | 2 +- src/main.py | 1 + src/meshcore/channel_sync.py | 7 +- src/meshcore/radio.py | 11 +- src/meshtastic/radio.py | 16 +- src/meshtastic/tcp_interface.py | 83 +++--- src/meshtastic/traceroute.py | 8 +- src/persistence/commands_logger.py | 132 +++++++--- src/persistence/node_db.py | 242 +++++++++++++----- src/persistence/node_info.py | 41 ++- src/persistence/packet_dump.py | 4 +- src/persistence/user_prefs.py | 45 +++- src/radio/interface.py | 8 +- src/responders/responder_factory.py | 2 +- src/utils/stopwatch.py | 36 +-- test/commands/__init__.py | 4 +- test/commands/test_admin.py | 96 ++++--- test/commands/test_command.py | 30 ++- test/commands/test_enroll.py | 16 +- test/commands/test_hello.py | 4 +- test/commands/test_help.py | 32 ++- test/commands/test_nodes.py | 28 +- test/commands/test_ping.py | 8 +- test/commands/test_prefs.py | 29 ++- test/commands/test_template.py | 22 +- test/fake_radio.py | 5 +- test/meshcore/test_channels.py | 12 +- test/meshcore/test_flood_advert_scheduler.py | 37 ++- test/meshcore/test_initial_flood_advert.py | 4 +- test/meshcore/test_radio.py | 4 +- test/meshtastic/test_translation.py | 15 +- test/persistence/test_commands_logger.py | 64 +++-- test/persistence/test_node_db.py | 54 ++-- test/persistence/test_node_info.py | 11 +- test/persistence/test_user_prefs.py | 49 ++-- .../test_message_reaction_responder.py | 8 +- test/test_base_feature.py | 14 +- test/test_bot_integration.py | 6 +- test/test_data_classes.py | 98 +++---- test/test_setup_data.py | 29 ++- 47 files changed, 915 insertions(+), 491 deletions(-) diff --git a/src/api/BaseAPIWrapper.py b/src/api/BaseAPIWrapper.py index 294b02d..1b15f56 100644 --- a/src/api/BaseAPIWrapper.py +++ b/src/api/BaseAPIWrapper.py @@ -8,16 +8,16 @@ class BaseAPIWrapper(ABC): auth_token: str | None def __init__(self, base_url: str, auth_token: str = None): - self.base_url = base_url.rstrip('/') + self.base_url = base_url.rstrip("/") self.auth_token = auth_token def _get_headers(self) -> dict: headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', + "Content-Type": "application/json", + "Accept": "application/json", } if self.auth_token: - headers['Authorization'] = f'Token {self.auth_token}' + headers["Authorization"] = f"Token {self.auth_token}" return headers diff --git a/src/base_feature.py b/src/base_feature.py index 5f0a075..16c487f 100644 --- a/src/base_feature.py +++ b/src/base_feature.py @@ -41,7 +41,9 @@ def reply_in_channel( """Reply to ``message`` on the same channel it was received on.""" self.message_in_channel(message.channel, response, want_ack) - def message_in_channel(self, channel: int, response: str, want_ack: bool = False) -> None: + def message_in_channel( + self, channel: int, response: str, want_ack: bool = False + ) -> None: logging.debug("Sending message: '%s'", response) self.bot.radio.send_text( response, @@ -57,7 +59,9 @@ def reply_in_dm( ) -> None: self.message_in_dm(message.from_id, response, want_ack) - def message_in_dm(self, destination_id: str, response: str, want_ack: bool = False) -> None: + def message_in_dm( + self, destination_id: str, response: str, want_ack: bool = False + ) -> None: logging.debug("Sending DM: '%s'", response) self.bot.radio.send_text( response, diff --git a/src/bot.py b/src/bot.py index 23cf53e..645eb19 100644 --- a/src/bot.py +++ b/src/bot.py @@ -24,8 +24,12 @@ from src.persistence.packet_dump import dump_packet from src.persistence.user_prefs import AbstractUserPrefsPersistence from src.radio.errors import call_safely, get_global_error_counter -from src.radio.events import (ConnectionEstablished, IncomingPacket, - IncomingTextMessage, NodeUpdate) +from src.radio.events import ( + ConnectionEstablished, + IncomingPacket, + IncomingTextMessage, + NodeUpdate, +) from src.radio.interface import RadioHandlers, RadioInterface from src.responders.responder_factory import ResponderFactory @@ -93,9 +97,11 @@ def disconnect(self) -> None: def on_apply_mc_channel_config(self, channels: list) -> None: """Handle apply_mc_channel_config from WebSocket (MeshCore feeders).""" - from src.meshcore.channel_sync import (apply_channels_on_device, - sync_channels_after_apply, - sync_channels_to_api) + from src.meshcore.channel_sync import ( + apply_channels_on_device, + sync_channels_after_apply, + sync_channels_to_api, + ) if not hasattr(self.radio, "run_coroutine"): logger.warning( diff --git a/src/commands/admin.py b/src/commands/admin.py index 4c03947..1f2b436 100644 --- a/src/commands/admin.py +++ b/src/commands/admin.py @@ -8,7 +8,9 @@ from src.radio.events import IncomingTextMessage -def _rows_for_sender(rows: list[dict[str, Any]], sender_id: str) -> list[dict[str, Any]]: +def _rows_for_sender( + rows: list[dict[str, Any]], sender_id: str +) -> list[dict[str, Any]]: return [r for r in rows if r["sender_id"] == sender_id] @@ -42,9 +44,7 @@ def reset_packets(self, message: IncomingTextMessage, args: list[str]) -> None: self.bot.node_info.reset_packets_today() response = "Packet counter reset" else: - response = ( - f"reset: Unknown argument '{args[0]}' - options are: {available_options}" - ) + response = f"reset: Unknown argument '{args[0]}' - options are: {available_options}" self.reply(message, response) def show_users(self, message: IncomingTextMessage, args: list[str]) -> None: @@ -56,9 +56,7 @@ def show_users(self, message: IncomingTextMessage, args: list[str]) -> None: return self._show_user(message, req_user) return self._show_users(message) - def _show_user( - self, message: IncomingTextMessage, req_user: MeshNode.User - ) -> None: + def _show_user(self, message: IncomingTextMessage, req_user: MeshNode.User) -> None: since = datetime.now(timezone.utc) - timedelta(days=7) since = since.replace(hour=0, minute=0, second=0, microsecond=0) diff --git a/src/commands/enroll.py b/src/commands/enroll.py index 8cdb9a6..028b128 100644 --- a/src/commands/enroll.py +++ b/src/commands/enroll.py @@ -21,9 +21,7 @@ def show_help(self, message: IncomingTextMessage, args: list[str]) -> None: ) self.reply(message, response) - def enroll_testing( - self, message: IncomingTextMessage, args: list[str] - ) -> None: + def enroll_testing(self, message: IncomingTextMessage, args: list[str]) -> None: sender_id = message.from_id user_prefs = self.bot.user_prefs_persistence.get_user_prefs(sender_id) if user_prefs is None: diff --git a/src/commands/factory.py b/src/commands/factory.py index 0aa4437..6ad5b47 100644 --- a/src/commands/factory.py +++ b/src/commands/factory.py @@ -3,34 +3,13 @@ class CommandFactory: commands = { - "!ping": { - "class": "src.commands.ping.PingCommand", - "args": [] - }, - "!hello": { - "class": "src.commands.hello.HelloCommand", - "args": [] - }, - "!help": { - "class": "src.commands.help.HelpCommand", - "args": [] - }, - "!nodes": { - "class": "src.commands.nodes.NodesCommand", - "args": [] - }, - "!whoami": { - "class": "src.commands.template.WhoAmI", - "args": [] - }, - "!prefs": { - "class": "src.commands.prefs.PrefsCommandHandler", - "args": [] - }, - "!admin": { - "class": "src.commands.admin.AdminCommand", - "args": [] - }, + "!ping": {"class": "src.commands.ping.PingCommand", "args": []}, + "!hello": {"class": "src.commands.hello.HelloCommand", "args": []}, + "!help": {"class": "src.commands.help.HelpCommand", "args": []}, + "!nodes": {"class": "src.commands.nodes.NodesCommand", "args": []}, + "!whoami": {"class": "src.commands.template.WhoAmI", "args": []}, + "!prefs": {"class": "src.commands.prefs.PrefsCommandHandler", "args": []}, + "!admin": {"class": "src.commands.admin.AdminCommand", "args": []}, # "!enroll": { # "class": "src.commands.enroll.EnrollCommandHandler", # "args": ["enroll"] @@ -45,7 +24,7 @@ class CommandFactory: def create_command(command_name, bot): command_info = CommandFactory.commands.get(command_name) if command_info: - module_name, class_name = command_info["class"].rsplit('.', 1) + module_name, class_name = command_info["class"].rsplit(".", 1) module = importlib.import_module(module_name) command_class = getattr(module, class_name) args = [bot] + command_info["args"] diff --git a/src/commands/nodes.py b/src/commands/nodes.py index 9ce36ca..b6f5e80 100644 --- a/src/commands/nodes.py +++ b/src/commands/nodes.py @@ -72,9 +72,7 @@ def send_busy_node_list(self, sender: str) -> None: packets_today = self.bot.node_info.get_node_packets_today(node.id) response += f"- {node.short_name} ({packets_today} pkts)\n" - response += ( - f"(last reset at {self.bot.node_info.packet_counter_reset_time.strftime('%H:%M:%S')})" - ) + response += f"(last reset at {self.bot.node_info.packet_counter_reset_time.strftime('%H:%M:%S')})" self.reply_to(sender, response) def send_detailed_nodeinfo(self, sender: str, node_id: str) -> None: diff --git a/src/helpers.py b/src/helpers.py index 17ba343..cb8cdf0 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -34,4 +34,4 @@ def pretty_print_last_heard(last_heard_timestamp: int | datetime | None) -> str: def safe_encode_node_name(name): - return ''.join(c if c in _safe_chars else urllib.parse.quote(c) for c in name) + return "".join(c if c in _safe_chars else urllib.parse.quote(c) for c in name) diff --git a/src/main.py b/src/main.py index 7667994..a4d470c 100644 --- a/src/main.py +++ b/src/main.py @@ -28,6 +28,7 @@ logging.getLogger("mesh_interface").setLevel(logging.WARNING) from src.api.packet_serializer import PacketSerializer + # Now we can import the rest of our local files from src.api.StorageAPI import StorageAPIWrapper from src.bot import MeshflowBot diff --git a/src/meshcore/channel_sync.py b/src/meshcore/channel_sync.py index d9777eb..93813e2 100644 --- a/src/meshcore/channel_sync.py +++ b/src/meshcore/channel_sync.py @@ -8,8 +8,11 @@ CHANNEL_READ_DELAY_S = 2.0 -from src.meshcore.channels import (log_device_channels, read_device_channels, - snapshot_sync_body) +from src.meshcore.channels import ( + log_device_channels, + read_device_channels, + snapshot_sync_body, +) if TYPE_CHECKING: from src.api.StorageAPI import StorageAPIWrapper diff --git a/src/meshcore/radio.py b/src/meshcore/radio.py index 241e1fe..c6d1ce2 100644 --- a/src/meshcore/radio.py +++ b/src/meshcore/radio.py @@ -12,9 +12,11 @@ from meshcore import MeshCore from meshcore.events import Event, EventType from src.meshcore.dump import dump_meshcore_event -from src.meshcore.translation import (event_to_incoming_packet, - event_to_node_update, - event_to_text_message) +from src.meshcore.translation import ( + event_to_incoming_packet, + event_to_node_update, + event_to_text_message, +) from src.radio.errors import RadioError, call_safely, get_global_error_counter from src.radio.events import ConnectionEstablished from src.radio.interface import RadioHandlers, RadioInterface @@ -411,8 +413,7 @@ def schedule_channel_sync( return async def _task() -> None: - from src.meshcore.channel_sync import \ - sync_channels_to_storage_apis_async + from src.meshcore.channel_sync import sync_channels_to_storage_apis_async labels = [str(getattr(s, "base_url", "?")) for s in storage_apis] logger.info( diff --git a/src/meshtastic/radio.py b/src/meshtastic/radio.py index 8559da8..4d4ae74 100644 --- a/src/meshtastic/radio.py +++ b/src/meshtastic/radio.py @@ -72,7 +72,9 @@ def connect(self) -> None: packet_queue=old_packet_queue, ) - logger.info("MeshtasticRadio: TCP interface created; awaiting library connect event") + logger.info( + "MeshtasticRadio: TCP interface created; awaiting library connect event" + ) def disconnect(self) -> None: self._is_connected = False @@ -111,7 +113,9 @@ def send_text( kwargs["destinationId"] = destination_id if hop_limit is not None: kwargs["hopLimit"] = hop_limit - logger.debug("MeshtasticRadio: send_text dest=%s ch=%s", destination_id, channel) + logger.debug( + "MeshtasticRadio: send_text dest=%s ch=%s", destination_id, channel + ) self._interface.sendText(text, **kwargs) def send_reaction( @@ -137,7 +141,9 @@ def send_traceroute( channel_index: int = 0, ) -> None: if not self._interface or not self._is_connected: - logger.warning("MeshtasticRadio: send_traceroute called before connect; skipping") + logger.warning( + "MeshtasticRadio: send_traceroute called before connect; skipping" + ) return _send_traceroute(self._interface, target_node_id, channel_index=channel_index) @@ -149,7 +155,9 @@ def _ensure_pubsub_subscribed(self) -> None: pub.subscribe(self._on_receive, "meshtastic.receive") pub.subscribe(self._on_receive_text, "meshtastic.receive.text") pub.subscribe(self._on_node_updated, "meshtastic.node.updated") - pub.subscribe(self._on_connection_established, "meshtastic.connection.established") + pub.subscribe( + self._on_connection_established, "meshtastic.connection.established" + ) self._pubsub_subscribed = True def _require_interface(self) -> None: diff --git a/src/meshtastic/tcp_interface.py b/src/meshtastic/tcp_interface.py index d4eb5db..fc13e07 100644 --- a/src/meshtastic/tcp_interface.py +++ b/src/meshtastic/tcp_interface.py @@ -11,18 +11,18 @@ class SupportsMessageReactionInterface(TCPInterface): def sendReaction( - self, - emoji: str, - messageId: int, - destinationId: Union[int, str] = BROADCAST_ADDR, - wantAck: bool = False, - channelIndex: int = 0, - portNum: portnums_pb2.PortNum.ValueType = portnums_pb2.PortNum.TEXT_MESSAGE_APP, - hopLimit: Optional[int] = None, - pkiEncrypted: Optional[bool] = False, - publicKey: Optional[bytes] = None, + self, + emoji: str, + messageId: int, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantAck: bool = False, + channelIndex: int = 0, + portNum: portnums_pb2.PortNum.ValueType = portnums_pb2.PortNum.TEXT_MESSAGE_APP, + hopLimit: Optional[int] = None, + pkiEncrypted: Optional[bool] = False, + publicKey: Optional[bytes] = None, ): - emoji_bytes = emoji.encode('utf-8') + emoji_bytes = emoji.encode("utf-8") packet = mesh_pb2.MeshPacket() packet.channel = channelIndex @@ -31,21 +31,27 @@ def sendReaction( packet.decoded.reply_id = messageId packet.decoded.emoji = 1 - self._sendPacket(packet, destinationId, - wantAck=wantAck, - hopLimit=hopLimit, - pkiEncrypted=pkiEncrypted, - publicKey=publicKey) + self._sendPacket( + packet, + destinationId, + wantAck=wantAck, + hopLimit=hopLimit, + pkiEncrypted=pkiEncrypted, + publicKey=publicKey, + ) return packet class AutoReconnectTcpInterface(SupportsMessageReactionInterface, TCPInterface): packet_queue: Queue - def __init__(self, *args, - error_handler: Optional[Callable[[Exception], None]] = None, - packet_queue: Optional[Queue] = None, - **kwargs): + def __init__( + self, + *args, + error_handler: Optional[Callable[[Exception], None]] = None, + packet_queue: Optional[Queue] = None, + **kwargs, + ): self.error_handler = error_handler self.packet_queue = packet_queue or Queue() super().__init__(*args, **kwargs) @@ -71,13 +77,13 @@ def sendHeartbeat(self): self._shutdown_and_call_error_handler() def _sendPacket( - self, - meshPacket: mesh_pb2.MeshPacket, - destinationId: Union[int, str] = BROADCAST_ADDR, - wantAck: bool = False, - hopLimit: Optional[int] = None, - pkiEncrypted: Optional[bool] = False, - publicKey: Optional[bytes] = None, + self, + meshPacket: mesh_pb2.MeshPacket, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantAck: bool = False, + hopLimit: Optional[int] = None, + pkiEncrypted: Optional[bool] = False, + publicKey: Optional[bytes] = None, ): try: super()._sendPacket( @@ -86,11 +92,13 @@ def _sendPacket( wantAck=wantAck, hopLimit=hopLimit, pkiEncrypted=pkiEncrypted, - publicKey=publicKey + publicKey=publicKey, ) except (OSError, BrokenPipeError) as e: logging.error(f"sendPacket failed: {e}") - self.packet_queue.put((meshPacket, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey)) + self.packet_queue.put( + (meshPacket, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey) + ) # self._reconnect_with_backoff() self._shutdown_and_call_error_handler(e) @@ -101,7 +109,8 @@ def _shutdown_and_call_error_handler(self, conn_error: Optional[Exception] = Non except Exception as e: logging.warning( f"Failed to close connection. " - f"This might not be an issue since we've already disconnected: {e}") + f"This might not be an issue since we've already disconnected: {e}" + ) if self.error_handler: self.error_handler(conn_error) @@ -124,13 +133,17 @@ def _reconnect_with_backoff(self): if backoff_time == max_backoff_time: logging.error("Max backoff time reached. Exiting.") sys.exit(1) - backoff_time = min(backoff_time * backoff_rate, max_backoff_time) # Exponential back-off + backoff_time = min( + backoff_time * backoff_rate, max_backoff_time + ) # Exponential back-off logging.info(f"Next reconnection attempt in {backoff_time} seconds") time.sleep(backoff_time) def _replay_packet_queue(self): while not self.packet_queue.empty(): - packet, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey = self.packet_queue.get() + packet, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey = ( + self.packet_queue.get() + ) try: super()._sendPacket( meshPacket=packet, @@ -138,10 +151,12 @@ def _replay_packet_queue(self): wantAck=wantAck, hopLimit=hopLimit, pkiEncrypted=pkiEncrypted, - publicKey=publicKey + publicKey=publicKey, ) logging.info("Replayed packet successfully") except Exception as e: logging.error(f"Failed to replay packet: {e}") - self.packet_queue.put((packet, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey)) + self.packet_queue.put( + (packet, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey) + ) break diff --git a/src/meshtastic/traceroute.py b/src/meshtastic/traceroute.py index 819e78f..ce4346d 100644 --- a/src/meshtastic/traceroute.py +++ b/src/meshtastic/traceroute.py @@ -27,13 +27,17 @@ TR_HOPS_LIMIT = int(os.getenv("TR_HOPS_LIMIT", "5")) if TR_HOPS_LIMIT < 3: - logger.warning("TR_HOPS_LIMIT is less than 3, traceroutes are likely to fail. Capping at 3.") + logger.warning( + "TR_HOPS_LIMIT is less than 3, traceroutes are likely to fail. Capping at 3." + ) TR_HOPS_LIMIT = 3 elif TR_HOPS_LIMIT < 5: logger.warning("TR_HOPS_LIMIT is less than 5, traceroutes are likely to fail") if TR_HOPS_LIMIT > 7: - logger.warning("TR_HOPS_LIMIT is greater than the Meshtastic limit of 7. Capping at 7.") + logger.warning( + "TR_HOPS_LIMIT is greater than the Meshtastic limit of 7. Capping at 7." + ) TR_HOPS_LIMIT = 7 diff --git a/src/persistence/commands_logger.py b/src/persistence/commands_logger.py index 3af11fe..c642a18 100644 --- a/src/persistence/commands_logger.py +++ b/src/persistence/commands_logger.py @@ -6,7 +6,9 @@ from src.persistence import BaseSqlitePersistenceStore -def _sqlite_rows_to_dicts(rows: list[tuple], columns: list[str]) -> list[dict[str, Any]]: +def _sqlite_rows_to_dicts( + rows: list[tuple], columns: list[str] +) -> list[dict[str, Any]]: return [dict(zip(columns, row)) for row in rows] @@ -21,19 +23,27 @@ def log_unknown_request(self, sender_id: str, message: str) -> None: pass @abc.abstractmethod - def log_responder_handled(self, sender_id: str, responder_instance, message_text: str) -> None: + def log_responder_handled( + self, sender_id: str, responder_instance, message_text: str + ) -> None: pass @abc.abstractmethod - def get_command_history(self, since: datetime, sender_id: str = None) -> list[dict[str, Any]]: + def get_command_history( + self, since: datetime, sender_id: str = None + ) -> list[dict[str, Any]]: pass @abc.abstractmethod - def get_unknown_command_history(self, since: datetime, sender_id: str = None) -> list[dict[str, Any]]: + def get_unknown_command_history( + self, since: datetime, sender_id: str = None + ) -> list[dict[str, Any]]: pass @abc.abstractmethod - def get_responder_history(self, since: datetime, sender_id: str = None) -> list[dict[str, Any]]: + def get_responder_history( + self, since: datetime, sender_id: str = None + ) -> list[dict[str, Any]]: pass @@ -42,7 +52,7 @@ class SqliteCommandLogger(AbstractCommandLogger, BaseSqlitePersistenceStore): def _initialize_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute(""" CREATE TABLE IF NOT EXISTS command_log ( sender_id TEXT, base_command TEXT, @@ -51,102 +61,148 @@ def _initialize_db(self): timestamp TEXT, handler_class TEXT ) - ''') - cursor.execute(''' + """) + cursor.execute(""" CREATE TABLE IF NOT EXISTS unknown_requests ( sender_id TEXT, message TEXT, timestamp TEXT ) - ''') - cursor.execute(''' + """) + cursor.execute(""" CREATE TABLE IF NOT EXISTS responder_log ( sender_id TEXT, message TEXT, timestamp TEXT, responder_class TEXT ) - ''') + """) conn.commit() def log_command(self, sender_id: str, command_instance, message: str) -> None: base_cmd, subcommands, args = command_instance.get_command_for_logging(message) - subcommands_str = ' '.join(subcommands) if subcommands else None + subcommands_str = " ".join(subcommands) if subcommands else None with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ INSERT INTO command_log (sender_id, base_command, sub_commands, args, timestamp, handler_class) VALUES (?, ?, ?, ?, ?, ?) - ''', (sender_id, base_cmd, subcommands_str, args, datetime.now(timezone.utc).isoformat(), - command_instance.__class__.__name__)) + """, + ( + sender_id, + base_cmd, + subcommands_str, + args, + datetime.now(timezone.utc).isoformat(), + command_instance.__class__.__name__, + ), + ) conn.commit() - def log_responder_handled(self, sender_id: str, responder_instance, message_text: str) -> None: + def log_responder_handled( + self, sender_id: str, responder_instance, message_text: str + ) -> None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ INSERT INTO responder_log (sender_id, message, timestamp, responder_class) VALUES (?, ?, ?, ?) - ''', (sender_id, message_text, datetime.now(timezone.utc).isoformat(), responder_instance.__class__.__name__)) + """, + ( + sender_id, + message_text, + datetime.now(timezone.utc).isoformat(), + responder_instance.__class__.__name__, + ), + ) conn.commit() def log_unknown_request(self, sender_id: str, message: str) -> None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ INSERT INTO unknown_requests (sender_id, message, timestamp) VALUES (?, ?, ?) - ''', (sender_id, message, datetime.now(timezone.utc).isoformat())) + """, + (sender_id, message, datetime.now(timezone.utc).isoformat()), + ) conn.commit() - def get_command_history(self, since: datetime, sender_id: str = None) -> list[dict[str, Any]]: - columns = ['sender_id', 'base_command', 'timestamp'] + def get_command_history( + self, since: datetime, sender_id: str = None + ) -> list[dict[str, Any]]: + columns = ["sender_id", "base_command", "timestamp"] with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if sender_id: - cursor.execute(''' + cursor.execute( + """ SELECT sender_id, base_command, timestamp FROM command_log WHERE sender_id = ? AND timestamp >= ? - ''', (sender_id, since.isoformat())) + """, + (sender_id, since.isoformat()), + ) else: - cursor.execute(''' + cursor.execute( + """ SELECT sender_id, base_command, timestamp FROM command_log WHERE timestamp >= ? - ''', (since.isoformat(),)) + """, + (since.isoformat(),), + ) rows = cursor.fetchall() return _sqlite_rows_to_dicts(rows, columns) - def get_unknown_command_history(self, since: datetime, sender_id: str = None) -> list[dict[str, Any]]: - columns = ['sender_id', 'message', 'timestamp'] + def get_unknown_command_history( + self, since: datetime, sender_id: str = None + ) -> list[dict[str, Any]]: + columns = ["sender_id", "message", "timestamp"] with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if sender_id: - cursor.execute(''' + cursor.execute( + """ SELECT sender_id, message, timestamp FROM unknown_requests WHERE sender_id = ? AND timestamp >= ? - ''', (sender_id, since.isoformat())) + """, + (sender_id, since.isoformat()), + ) else: - cursor.execute(''' + cursor.execute( + """ SELECT sender_id, message, timestamp FROM unknown_requests WHERE timestamp >= ? - ''', (since.isoformat(),)) + """, + (since.isoformat(),), + ) rows = cursor.fetchall() return _sqlite_rows_to_dicts(rows, columns) - def get_responder_history(self, since: datetime, sender_id: str = None) -> list[dict[str, Any]]: - columns = ['sender_id', 'responder_class', 'timestamp'] + def get_responder_history( + self, since: datetime, sender_id: str = None + ) -> list[dict[str, Any]]: + columns = ["sender_id", "responder_class", "timestamp"] with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if sender_id: - cursor.execute(''' + cursor.execute( + """ SELECT sender_id, responder_class, timestamp FROM responder_log WHERE sender_id = ? AND timestamp >= ? - ''', (sender_id, since.isoformat())) + """, + (sender_id, since.isoformat()), + ) else: - cursor.execute(''' + cursor.execute( + """ SELECT sender_id, responder_class, timestamp FROM responder_log WHERE timestamp >= ? - ''', (since.isoformat(),)) + """, + (since.isoformat(),), + ) rows = cursor.fetchall() return _sqlite_rows_to_dicts(rows, columns) diff --git a/src/persistence/node_db.py b/src/persistence/node_db.py index a7ea6f7..25d5103 100644 --- a/src/persistence/node_db.py +++ b/src/persistence/node_db.py @@ -10,9 +10,9 @@ class AbstractNodeDB(abc.ABC): def store_node(self, node: MeshNode): self.store_user(node.user) - if hasattr(node, 'position') and node.position: + if hasattr(node, "position") and node.position: self.store_position(node.user.id, node.position) - if hasattr(node, 'device_metrics') and node.device_metrics: + if hasattr(node, "device_metrics") and node.device_metrics: self.store_device_metrics(node.user.id, node.device_metrics) @abc.abstractmethod @@ -40,13 +40,15 @@ def get_last_position(self, node_id: str) -> MeshNode.Position | None: pass @abc.abstractmethod - def get_position_log(self, node_id: str, - start: datetime, - end: datetime) -> list[MeshNode.Position]: + def get_position_log( + self, node_id: str, start: datetime, end: datetime + ) -> list[MeshNode.Position]: pass @abc.abstractmethod - def store_device_metrics(self, node_id: str, device_metrics: MeshNode.DeviceMetrics): + def store_device_metrics( + self, node_id: str, device_metrics: MeshNode.DeviceMetrics + ): pass @abc.abstractmethod @@ -54,9 +56,9 @@ def get_last_device_metrics(self, node_id: str) -> MeshNode.DeviceMetrics | None pass @abc.abstractmethod - def get_device_metrics_log(self, node_id: str, - start: datetime, - end: datetime) -> list[MeshNode.DeviceMetrics]: + def get_device_metrics_log( + self, node_id: str, start: datetime, end: datetime + ) -> list[MeshNode.DeviceMetrics]: pass @@ -74,7 +76,9 @@ def store_position(self, node_id: str, position: MeshNode.Position): self.positions[node_id] = [] self.positions[node_id].append(position) - def store_device_metrics(self, node_id: str, device_metrics: MeshNode.DeviceMetrics): + def store_device_metrics( + self, node_id: str, device_metrics: MeshNode.DeviceMetrics + ): if node_id not in self.device_metrics: self.device_metrics[node_id] = [] self.device_metrics[node_id].append(device_metrics) @@ -96,10 +100,15 @@ def get_last_position(self, node_id: str) -> MeshNode.Position | None: return self.positions[node_id][-1] return None - def get_position_log(self, node_id: str, start: datetime, end: datetime) -> list[ - MeshNode.Position]: + def get_position_log( + self, node_id: str, start: datetime, end: datetime + ) -> list[MeshNode.Position]: if node_id in self.positions: - return [pos for pos in self.positions[node_id] if start <= pos.logged_time <= end] + return [ + pos + for pos in self.positions[node_id] + if start <= pos.logged_time <= end + ] return [] def get_last_device_metrics(self, node_id: str) -> MeshNode.DeviceMetrics | None: @@ -107,10 +116,15 @@ def get_last_device_metrics(self, node_id: str) -> MeshNode.DeviceMetrics | None return self.device_metrics[node_id][-1] return None - def get_device_metrics_log(self, node_id: str, start: datetime, end: datetime) -> list[ - MeshNode.DeviceMetrics]: + def get_device_metrics_log( + self, node_id: str, start: datetime, end: datetime + ) -> list[MeshNode.DeviceMetrics]: if node_id in self.device_metrics: - return [metrics for metrics in self.device_metrics[node_id] if start <= metrics.logged_time <= end] + return [ + metrics + for metrics in self.device_metrics[node_id] + if start <= metrics.logged_time <= end + ] return [] @@ -119,7 +133,7 @@ class SqliteNodeDB(BaseSqlitePersistenceStore, AbstractNodeDB): def _initialize_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute(""" CREATE TABLE IF NOT EXISTS nodes ( id TEXT PRIMARY KEY, short_name TEXT, @@ -128,8 +142,8 @@ def _initialize_db(self): hw_model TEXT, public_key TEXT ) - ''') - cursor.execute(''' + """) + cursor.execute(""" CREATE TABLE IF NOT EXISTS positions ( node_id TEXT, logged_time DATETIME, @@ -140,8 +154,8 @@ def _initialize_db(self): location_source TEXT, FOREIGN KEY(node_id) REFERENCES nodes(id) ) - ''') - cursor.execute(''' + """) + cursor.execute(""" CREATE TABLE IF NOT EXISTS device_metrics ( node_id TEXT, logged_time DATETIME, @@ -152,127 +166,227 @@ def _initialize_db(self): uptime_seconds INTEGER, FOREIGN KEY(node_id) REFERENCES nodes(id) ) - ''') + """) conn.commit() def store_user(self, node_user: MeshNode.User): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ INSERT OR REPLACE INTO nodes (id, short_name, long_name, macaddr, hw_model, public_key) VALUES (?, ?, ?, ?, ?, ?) - ''', (node_user.id, node_user.short_name, node_user.long_name, node_user.macaddr, node_user.hw_model, - node_user.public_key)) + """, + ( + node_user.id, + node_user.short_name, + node_user.long_name, + node_user.macaddr, + node_user.hw_model, + node_user.public_key, + ), + ) conn.commit() def store_position(self, node_id: str, position: MeshNode.Position): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ INSERT INTO positions (node_id, logged_time, reported_time, latitude, longitude, altitude, location_source) VALUES (?, ?, ?, ?, ?, ?, ?) - ''', (node_id, position.logged_time, position.reported_time, position.latitude, position.longitude, - position.altitude, position.location_source)) + """, + ( + node_id, + position.logged_time, + position.reported_time, + position.latitude, + position.longitude, + position.altitude, + position.location_source, + ), + ) conn.commit() - def store_device_metrics(self, node_id: str, device_metrics: MeshNode.DeviceMetrics): + def store_device_metrics( + self, node_id: str, device_metrics: MeshNode.DeviceMetrics + ): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ INSERT INTO device_metrics (node_id, logged_time, battery_level, voltage, channel_utilization, air_util_tx, uptime_seconds) VALUES (?, ?, ?, ?, ?, ?, ?) - ''', (node_id, device_metrics.logged_time, device_metrics.battery_level, device_metrics.voltage, - device_metrics.channel_utilization, device_metrics.air_util_tx, device_metrics.uptime_seconds)) + """, + ( + node_id, + device_metrics.logged_time, + device_metrics.battery_level, + device_metrics.voltage, + device_metrics.channel_utilization, + device_metrics.air_util_tx, + device_metrics.uptime_seconds, + ), + ) conn.commit() def get_by_radio_id(self, node_id: str) -> MeshNode.User | None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute('SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes WHERE id = ?', - (node_id,)) + cursor.execute( + "SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes WHERE id = ?", + (node_id,), + ) row = cursor.fetchone() if row: - return MeshNode.User(node_id=row[0], short_name=row[1], long_name=row[2], macaddr=row[3], - hw_model=row[4], public_key=row[5]) + return MeshNode.User( + node_id=row[0], + short_name=row[1], + long_name=row[2], + macaddr=row[3], + hw_model=row[4], + public_key=row[5], + ) return None def get_by_short_name(self, short_name: str) -> MeshNode.User | None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( - 'SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes WHERE short_name = ? COLLATE NOCASE', - (short_name,)) + "SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes WHERE short_name = ? COLLATE NOCASE", + (short_name,), + ) row = cursor.fetchone() if row: - return MeshNode.User(node_id=row[0], short_name=row[1], long_name=row[2], macaddr=row[3], - hw_model=row[4], public_key=row[5]) + return MeshNode.User( + node_id=row[0], + short_name=row[1], + long_name=row[2], + macaddr=row[3], + hw_model=row[4], + public_key=row[5], + ) return None def list_nodes(self) -> list[MeshNode.User]: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute('SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes') + cursor.execute( + "SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes" + ) rows = cursor.fetchall() - return [MeshNode.User(node_id=row[0], short_name=row[1], long_name=row[2], macaddr=row[3], - hw_model=row[4], public_key=row[5]) for row in rows] + return [ + MeshNode.User( + node_id=row[0], + short_name=row[1], + long_name=row[2], + macaddr=row[3], + hw_model=row[4], + public_key=row[5], + ) + for row in rows + ] def get_last_position(self, node_id: str) -> MeshNode.Position | None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ SELECT logged_time, reported_time, latitude, longitude, altitude, location_source FROM positions WHERE node_id = ? ORDER BY logged_time DESC LIMIT 1 - ''', (node_id,)) + """, + (node_id,), + ) row = cursor.fetchone() if row: - return MeshNode.Position(logged_time=row[0], reported_time=row[1], latitude=row[2], longitude=row[3], - altitude=row[4], location_source=row[5]) + return MeshNode.Position( + logged_time=row[0], + reported_time=row[1], + latitude=row[2], + longitude=row[3], + altitude=row[4], + location_source=row[5], + ) return None - def get_position_log(self, node_id: str, start: datetime, end: datetime) -> list[ - MeshNode.Position]: + def get_position_log( + self, node_id: str, start: datetime, end: datetime + ) -> list[MeshNode.Position]: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ SELECT logged_time, reported_time, latitude, longitude, altitude, location_source FROM positions WHERE node_id = ? AND logged_time BETWEEN ? AND ? ORDER BY logged_time - ''', (node_id, start, end)) + """, + (node_id, start, end), + ) rows = cursor.fetchall() - return [MeshNode.Position(logged_time=row[0], reported_time=row[1], latitude=row[2], longitude=row[3], - altitude=row[4], location_source=row[5]) for row in rows] + return [ + MeshNode.Position( + logged_time=row[0], + reported_time=row[1], + latitude=row[2], + longitude=row[3], + altitude=row[4], + location_source=row[5], + ) + for row in rows + ] def get_last_device_metrics(self, node_id: str) -> MeshNode.DeviceMetrics | None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ SELECT logged_time, battery_level, voltage, channel_utilization, air_util_tx, uptime_seconds FROM device_metrics WHERE node_id = ? ORDER BY logged_time DESC LIMIT 1 - ''', (node_id,)) + """, + (node_id,), + ) row = cursor.fetchone() if row: - return MeshNode.DeviceMetrics(logged_time=row[0], battery_level=row[1], voltage=row[2], - channel_utilization=row[3], air_util_tx=row[4], uptime_seconds=row[5]) + return MeshNode.DeviceMetrics( + logged_time=row[0], + battery_level=row[1], + voltage=row[2], + channel_utilization=row[3], + air_util_tx=row[4], + uptime_seconds=row[5], + ) return None - def get_device_metrics_log(self, node_id: str, start: datetime, end: datetime) -> list[ - MeshNode.DeviceMetrics]: + def get_device_metrics_log( + self, node_id: str, start: datetime, end: datetime + ) -> list[MeshNode.DeviceMetrics]: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ SELECT logged_time, battery_level, voltage, channel_utilization, air_util_tx, uptime_seconds FROM device_metrics WHERE node_id = ? AND logged_time BETWEEN ? AND ? ORDER BY logged_time - ''', (node_id, start, end)) + """, + (node_id, start, end), + ) rows = cursor.fetchall() - return [MeshNode.DeviceMetrics(logged_time=row[0], battery_level=row[1], voltage=row[2], - channel_utilization=row[3], air_util_tx=row[4], uptime_seconds=row[5]) for - row in rows] + return [ + MeshNode.DeviceMetrics( + logged_time=row[0], + battery_level=row[1], + voltage=row[2], + channel_utilization=row[3], + air_util_tx=row[4], + uptime_seconds=row[5], + ) + for row in rows + ] diff --git a/src/persistence/node_info.py b/src/persistence/node_info.py index 960c7ce..7cb8d6d 100644 --- a/src/persistence/node_info.py +++ b/src/persistence/node_info.py @@ -92,19 +92,29 @@ def node_packet_received(self, node_id: str, packet_type: str) -> None: self.node_packets_today_breakdown[node_id][packet_type] += 1 def update_last_heard(self, node_id: str, last_heard: datetime = None) -> None: - self.nodes_last_heard[node_id] = last_heard if last_heard else datetime.now(timezone.utc) + self.nodes_last_heard[node_id] = ( + last_heard if last_heard else datetime.now(timezone.utc) + ) def reset_packets_today(self) -> None: self.node_packets_today = {} self.node_packets_today_breakdown = {} def get_online_nodes(self) -> dict[str, datetime]: - return {node_id: last_heard for node_id, last_heard in self.nodes_last_heard.items() - if last_heard > datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec)} + return { + node_id: last_heard + for node_id, last_heard in self.nodes_last_heard.items() + if last_heard + > datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec) + } def get_offline_nodes(self) -> dict[str, datetime]: - return {node_id: last_heard for node_id, last_heard in self.nodes_last_heard.items() - if last_heard <= datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec)} + return { + node_id: last_heard + for node_id, last_heard in self.nodes_last_heard.items() + if last_heard + <= datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec) + } def get_all_nodes(self) -> dict[str, datetime]: return self.nodes_last_heard @@ -113,18 +123,23 @@ def load_from_file(self, node_info_file: str) -> None: if not os.path.exists(node_info_file): return - with open(node_info_file, 'r') as file: + with open(node_info_file, "r") as file: data = json.load(file) - self.nodes_last_heard = {k: datetime.fromisoformat(v) for k, v in data['nodes_last_heard'].items()} - self.node_packets_today = data['node_packets_today'] - self.node_packets_today_breakdown = data['node_packets_today_breakdown'] + self.nodes_last_heard = { + k: datetime.fromisoformat(v) + for k, v in data["nodes_last_heard"].items() + } + self.node_packets_today = data["node_packets_today"] + self.node_packets_today_breakdown = data["node_packets_today_breakdown"] def persist_to_file(self, node_info_file: str) -> None: - with open(node_info_file, 'w') as file: + with open(node_info_file, "w") as file: data = { - 'nodes_last_heard': {k: v.isoformat() for k, v in self.nodes_last_heard.items()}, - 'node_packets_today': self.node_packets_today, - 'node_packets_today_breakdown': self.node_packets_today_breakdown + "nodes_last_heard": { + k: v.isoformat() for k, v in self.nodes_last_heard.items() + }, + "node_packets_today": self.node_packets_today, + "node_packets_today_breakdown": self.node_packets_today_breakdown, } json.dump(data, file, indent=4) logging.info(f"Node info persisted to {node_info_file}") diff --git a/src/persistence/packet_dump.py b/src/persistence/packet_dump.py index ca09f3f..4ce9193 100644 --- a/src/persistence/packet_dump.py +++ b/src/persistence/packet_dump.py @@ -14,7 +14,9 @@ logging.info(f"Will dump all packets for portnums to JSON: {dump_portnums}") else: - logging.info("Not dumping packets - set DUMP_PACKETS_PORTNUMS to comma separated list of portnums to dump") + logging.info( + "Not dumping packets - set DUMP_PACKETS_PORTNUMS to comma separated list of portnums to dump" + ) def dump_packet(packet: dict): diff --git a/src/persistence/user_prefs.py b/src/persistence/user_prefs.py index 5888fbd..0a56cc5 100644 --- a/src/persistence/user_prefs.py +++ b/src/persistence/user_prefs.py @@ -48,12 +48,14 @@ def persist_user_prefs(self, user_id: str, user_prefs: UserPrefs): pass -class SqliteUserPrefsPersistence(AbstractUserPrefsPersistence, BaseSqlitePersistenceStore): +class SqliteUserPrefsPersistence( + AbstractUserPrefsPersistence, BaseSqlitePersistenceStore +): def _initialize_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(''' + cursor.execute(""" CREATE TABLE IF NOT EXISTS user_prefs ( user_id TEXT, setting_name TEXT, @@ -62,18 +64,21 @@ def _initialize_db(self): num_changes INTEGER, PRIMARY KEY (user_id, setting_name) ) - ''') + """) conn.commit() def get_user_prefs(self, user_id: str) -> UserPrefs: with sqlite3.connect(self.db_path) as conn: # Fetch the data cursor = conn.cursor() - cursor.execute(''' + cursor.execute( + """ SELECT setting_name, setting_value, time_set, num_changes FROM user_prefs WHERE user_id = ? - ''', (user_id,)) + """, + (user_id,), + ) rows = cursor.fetchall() # Parse the data @@ -81,9 +86,13 @@ def get_user_prefs(self, user_id: str) -> UserPrefs: for row in rows: setting_name, setting_value, time_set, num_changes = row preference = UserPrefs.Preference.from_values( - value=True if setting_value == 'True' else False if setting_value == 'False' else setting_value, + value=( + True + if setting_value == "True" + else False if setting_value == "False" else setting_value + ), time_set=datetime.fromisoformat(time_set), - num_changes=num_changes + num_changes=num_changes, ) setattr(user_prefs, setting_name, preference) @@ -94,14 +103,17 @@ def persist_user_prefs(self, user_id: str, user_prefs: UserPrefs) -> UserPrefs: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() for key, preference in user_prefs.__dict__.items(): - if key == 'user_id': + if key == "user_id": continue - cursor.execute(''' + cursor.execute( + """ SELECT setting_value, num_changes FROM user_prefs WHERE user_id = ? AND setting_name = ? - ''', (user_id, key)) + """, + (user_id, key), + ) row = cursor.fetchone() if row: @@ -117,14 +129,23 @@ def persist_user_prefs(self, user_id: str, user_prefs: UserPrefs) -> UserPrefs: time_set = datetime.now(timezone.utc) # Store this value - cursor.execute(''' + cursor.execute( + """ INSERT INTO user_prefs (user_id, setting_name, setting_value, time_set, num_changes) VALUES (?, ?, ?, ?, ?) ON CONFLICT(user_id, setting_name) DO UPDATE SET setting_value=excluded.setting_value, time_set=excluded.time_set, num_changes=excluded.num_changes - ''', (user_id, key, str(preference.value), time_set.isoformat(), num_changes)) + """, + ( + user_id, + key, + str(preference.value), + time_set.isoformat(), + num_changes, + ), + ) # Commit all values conn.commit() diff --git a/src/radio/interface.py b/src/radio/interface.py index 8320285..6fce5e8 100644 --- a/src/radio/interface.py +++ b/src/radio/interface.py @@ -11,8 +11,12 @@ from dataclasses import dataclass from typing import Callable, Optional -from src.radio.events import (ConnectionEstablished, IncomingPacket, - IncomingTextMessage, NodeUpdate) +from src.radio.events import ( + ConnectionEstablished, + IncomingPacket, + IncomingTextMessage, + NodeUpdate, +) @dataclass diff --git a/src/responders/responder_factory.py b/src/responders/responder_factory.py index 9bbe07a..6be6f2a 100644 --- a/src/responders/responder_factory.py +++ b/src/responders/responder_factory.py @@ -26,7 +26,7 @@ def match_responder(message: str, bot): @staticmethod def create_responder(responder_info, bot): - module_name, class_name = responder_info["class"].rsplit('.', 1) + module_name, class_name = responder_info["class"].rsplit(".", 1) module = importlib.import_module(module_name) responder_class = getattr(module, class_name) args = [bot] + responder_info["args"] diff --git a/src/utils/stopwatch.py b/src/utils/stopwatch.py index 41ba82c..04495f8 100644 --- a/src/utils/stopwatch.py +++ b/src/utils/stopwatch.py @@ -6,12 +6,14 @@ logger = logging.getLogger(__name__) + @dataclass class Lap: timestamp: float duration: float comment: Optional[str] = None + class Stopwatch: def __init__(self): self._start_time: Optional[float] = None @@ -24,12 +26,12 @@ def start(self, description: Optional[str] = None) -> None: if self._is_running: logger.warning("Stopwatch is already running") return - + self._start_time = time.time() self._description = description self._is_running = True self._laps = [] - + if description: logger.info(f"Stopwatch started: {description}") else: @@ -40,16 +42,18 @@ def lap(self, comment: Optional[str] = None) -> float: if not self._is_running: logger.warning("Stopwatch is not running") return 0.0 - + current_time = time.time() duration = current_time - self._start_time - self._laps.append(Lap(timestamp=current_time, duration=duration, comment=comment)) - + self._laps.append( + Lap(timestamp=current_time, duration=duration, comment=comment) + ) + if comment: logger.info(f"Lap recorded: {comment} - {duration:.3f}s") else: logger.info(f"Lap recorded: {duration:.3f}s") - + return duration def stop(self) -> float: @@ -57,36 +61,38 @@ def stop(self) -> float: if not self._is_running: logger.warning("Stopwatch is not running") return 0.0 - + total_duration = time.time() - self._start_time self._is_running = False - + if self._description: - logger.info(f"Stopwatch stopped: {self._description} - Total: {total_duration:.3f}s") + logger.info( + f"Stopwatch stopped: {self._description} - Total: {total_duration:.3f}s" + ) else: logger.info(f"Stopwatch stopped - Total: {total_duration:.3f}s") - + return total_duration def get_summary(self) -> str: """Get a formatted summary of all laps and total duration.""" if not self._laps: return "No laps recorded" - + summary = [] if self._description: summary.append(f"Stopwatch: {self._description}") - + for i, lap in enumerate(self._laps, 1): lap_info = f"Lap {i}: {lap.duration:.3f}s" if lap.comment: lap_info += f" - {lap.comment}" summary.append(lap_info) - + if not self._is_running and self._laps: total_duration = self._laps[-1].duration summary.append(f"Total duration: {total_duration:.3f}s") - + return "\n".join(summary) def reset(self) -> None: @@ -95,4 +101,4 @@ def reset(self) -> None: self._laps = [] self._description = None self._is_running = False - logger.info("Stopwatch reset") \ No newline at end of file + logger.info("Stopwatch reset") diff --git a/test/commands/__init__.py b/test/commands/__init__.py index b964cdc..9c5d85f 100644 --- a/test/commands/__init__.py +++ b/test/commands/__init__.py @@ -25,10 +25,10 @@ def assert_show_help_for_command(self, message: IncomingTextMessage): self.assertIn(want, response) def test_show_help(self): - if self.__class__.__name__ == 'CommandWSCTestCase': + if self.__class__.__name__ == "CommandWSCTestCase": return base_cmd = self.command.base_command message = build_test_text_message( - f'!{base_cmd} help', self.test_nodes[1].user.id, self.bot.my_id + f"!{base_cmd} help", self.test_nodes[1].user.id, self.bot.my_id ) self.assert_show_help_for_command(message) diff --git a/test/commands/test_admin.py b/test/commands/test_admin.py index 61a6c35..ae1d024 100644 --- a/test/commands/test_admin.py +++ b/test/commands/test_admin.py @@ -19,40 +19,48 @@ def setUp(self): self._setup_nodes_mock_data(self.test_nodes) def _setup_nodes_mock_data(self, node_list: list[MeshNode]): - unknown_messages = ['unknown1', 'unknown2', 'unknown3'] + unknown_messages = ["unknown1", "unknown2", "unknown3"] self.mock_command_history = [ { - 'sender_id': node.user.id, - 'base_command': 'cmd1' if j == 0 else 'cmd2', - 'timestamp': datetime.now(timezone.utc) - timedelta(days=i * 2 + j), + "sender_id": node.user.id, + "base_command": "cmd1" if j == 0 else "cmd2", + "timestamp": datetime.now(timezone.utc) - timedelta(days=i * 2 + j), } for i, node in enumerate(node_list) for j in range(2) ] self.mock_unknown_command_history = [ { - 'sender_id': node.user.id, - 'message': unknown_messages[i], - 'timestamp': datetime.now(timezone.utc) - timedelta(days=i), + "sender_id": node.user.id, + "message": unknown_messages[i], + "timestamp": datetime.now(timezone.utc) - timedelta(days=i), } for i, node in enumerate(node_list) ] self.mock_responder_history = [ { - 'sender_id': node.user.id, - 'responder_class': 'Responder1' if j == 0 else 'Responder2', - 'timestamp': datetime.now(timezone.utc) - timedelta(days=i * 2 + j), + "sender_id": node.user.id, + "responder_class": "Responder1" if j == 0 else "Responder2", + "timestamp": datetime.now(timezone.utc) - timedelta(days=i * 2 + j), } for i, node in enumerate(node_list) for j in range(2) ] - self.bot.command_logger.get_command_history = MagicMock(return_value=self.mock_command_history) - self.bot.command_logger.get_unknown_command_history = MagicMock(return_value=self.mock_unknown_command_history) - self.bot.command_logger.get_responder_history = MagicMock(return_value=self.mock_responder_history) + self.bot.command_logger.get_command_history = MagicMock( + return_value=self.mock_command_history + ) + self.bot.command_logger.get_unknown_command_history = MagicMock( + return_value=self.mock_unknown_command_history + ) + self.bot.command_logger.get_responder_history = MagicMock( + return_value=self.mock_responder_history + ) def test_handle_packet_not_authorized(self): - msg = build_test_text_message('!admin', self.test_non_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin", self.test_non_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent( f"Sorry {self.test_non_admin_nodes[0].user.long_name}, you are not authorized to use this command", @@ -60,7 +68,9 @@ def test_handle_packet_not_authorized(self): ) def test_handle_packet_authorized(self): - msg = build_test_text_message('!admin', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent( "Invalid command format - expected !admin ", @@ -68,7 +78,9 @@ def test_handle_packet_authorized(self): ) def test_handle_base_command(self): - msg = build_test_text_message('!admin', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent( "Invalid command format - expected !admin ", @@ -76,14 +88,18 @@ def test_handle_base_command(self): ) def test_reset_packets(self): - msg = build_test_text_message('!admin reset packets', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin reset packets", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assertEqual(self.bot.node_info.get_all_nodes_packets_today(), {}) self.assertEqual(self.bot.node_info.get_all_nodes_packets_today_breakdown(), {}) self.assert_message_sent("Packet counter reset", self.test_admin_nodes[0]) def test_reset_packets_missing_argument(self): - msg = build_test_text_message('!admin reset', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin reset", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent( "reset: Missing argument - options are: ['packets']", @@ -91,7 +107,9 @@ def test_reset_packets_missing_argument(self): ) def test_reset_packets_unknown_argument(self): - msg = build_test_text_message('!admin reset unknown', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin reset unknown", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent( "reset: Unknown argument 'unknown' - options are: ['packets']", @@ -99,7 +117,9 @@ def test_reset_packets_unknown_argument(self): ) def test_show_users_user_not_found(self): - msg = build_test_text_message('!admin users !ffffff', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin users !ffffff", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent("User '!ffffff' not found", self.test_admin_nodes[0]) @@ -107,35 +127,49 @@ def test_show_users_user_found(self): target_node = self.test_nodes[1] # 0 is my_id self._setup_nodes_mock_data([target_node]) midnight_7_days_ago = datetime.now(timezone.utc) - timedelta(days=7) - midnight_7_days_ago = midnight_7_days_ago.replace(hour=0, minute=0, second=0, microsecond=0) + midnight_7_days_ago = midnight_7_days_ago.replace( + hour=0, minute=0, second=0, microsecond=0 + ) msg = build_test_text_message( - f'!admin users {target_node.user.short_name}', + f"!admin users {target_node.user.short_name}", self.test_admin_nodes[0].user.id, self.bot.my_id, ) self.command.handle_packet(msg) - summary_response = f"{target_node.user.long_name} - 2 cmds, 2 responders, 1 unknown cmds\n" + summary_response = ( + f"{target_node.user.long_name} - 2 cmds, 2 responders, 1 unknown cmds\n" + ) summary_response += f"Since {midnight_7_days_ago.strftime('%Y-%m-%d %H:%M:%S')}" - self.assert_message_sent(summary_response, self.test_admin_nodes[0], multi_response=True) + self.assert_message_sent( + summary_response, self.test_admin_nodes[0], multi_response=True + ) known_commands_response = "Commands:\n" known_commands_response += "- cmd1: 1\n" known_commands_response += "- cmd2: 1\n" - self.assert_message_sent(known_commands_response, self.test_admin_nodes[0], multi_response=True) + self.assert_message_sent( + known_commands_response, self.test_admin_nodes[0], multi_response=True + ) unknown_commands_response = "Unknown Commands:\n" unknown_commands_response += "- unknown1\n" - self.assert_message_sent(unknown_commands_response, self.test_admin_nodes[0], multi_response=True) + self.assert_message_sent( + unknown_commands_response, self.test_admin_nodes[0], multi_response=True + ) responders_response = "Responders:\n" responders_response += "- Responder1: 1\n" responders_response += "- Responder2: 1\n" - self.assert_message_sent(responders_response, self.test_admin_nodes[0], multi_response=True) + self.assert_message_sent( + responders_response, self.test_admin_nodes[0], multi_response=True + ) def test_show_users_all_users(self): - msg = build_test_text_message('!admin users', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin users", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.command.handle_packet(msg) expected_response = f"Users: {len(self.test_nodes)}\n" @@ -146,9 +180,11 @@ def test_show_users_all_users(self): self.assert_message_sent(expected_response, self.test_admin_nodes[0]) def test_show_help(self): - msg = build_test_text_message('!admin help', self.test_admin_nodes[0].user.id, self.bot.my_id) + msg = build_test_text_message( + "!admin help", self.test_admin_nodes[0].user.id, self.bot.my_id + ) self.assert_show_help_for_command(msg) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_command.py b/test/commands/test_command.py index 5689230..dd9ae76 100644 --- a/test/commands/test_command.py +++ b/test/commands/test_command.py @@ -15,7 +15,9 @@ def get_command_for_logging(self, message: str): class ConcreteCommandWithSubcommands(AbstractCommandWithSubcommands): - def handle_base_command(self, message: IncomingTextMessage, args: list[str]) -> None: + def handle_base_command( + self, message: IncomingTextMessage, args: list[str] + ) -> None: self.reply(message, "Base command handled") def show_help(self, message: IncomingTextMessage, args: list[str]) -> None: @@ -33,12 +35,16 @@ def setUp(self): self.command = ConcreteCommand(bot=self.bot, base_command="test") def test_handle_packet(self): - msg = build_test_text_message('!test', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!test", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent("Handled", self.test_nodes[1]) def test_reply(self): - msg = build_test_text_message('!test', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!test", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.reply(msg, "Reply message") self.assert_message_sent("Reply message", self.test_nodes[1]) @@ -48,23 +54,31 @@ class TestAbstractCommandWithSubcommands(CommandWSCTestCase): def setUp(self): super().setUp() - self.command = ConcreteCommandWithSubcommands(bot=self.bot, base_command_str="test") + self.command = ConcreteCommandWithSubcommands( + bot=self.bot, base_command_str="test" + ) def test_handle_base_command(self): - msg = build_test_text_message('!test', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!test", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent("Base command handled", self.test_nodes[1]) def test_show_help(self): - msg = build_test_text_message('!test help', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!test help", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent("Help shown", self.test_nodes[1]) def test_unknown_subcommand(self): - msg = build_test_text_message('!test unknown', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!test unknown", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent("Unknown command 'unknown'", self.test_nodes[1]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_enroll.py b/test/commands/test_enroll.py index 7afc3c7..f25d635 100644 --- a/test/commands/test_enroll.py +++ b/test/commands/test_enroll.py @@ -11,14 +11,16 @@ class TestEnrollCommandHandler(CommandWSCTestCase): def setUp(self): super().setUp() - self.command = EnrollCommandHandler(self.bot, 'enroll') + self.command = EnrollCommandHandler(self.bot, "enroll") self.bot.user_prefs_persistence = MagicMock() def test_enroll_testing(self): - command = EnrollCommandHandler(self.bot, 'enroll') + command = EnrollCommandHandler(self.bot, "enroll") sender_node = self.test_nodes[1] - packet = build_test_text_message('!enroll testing', sender_node.user.id, self.bot.my_id) + packet = build_test_text_message( + "!enroll testing", sender_node.user.id, self.bot.my_id + ) command.handle_packet(packet) self.bot.user_prefs_persistence.persist_user_prefs.assert_called_once() @@ -26,10 +28,12 @@ def test_enroll_testing(self): self.assertTrue(user_prefs.respond_to_testing.value) def test_leave_testing(self): - command = EnrollCommandHandler(self.bot, 'leave') + command = EnrollCommandHandler(self.bot, "leave") sender_node = self.test_nodes[1] - packet = build_test_text_message('!leave testing', sender_node.user.id, self.bot.my_id) + packet = build_test_text_message( + "!leave testing", sender_node.user.id, self.bot.my_id + ) command.handle_packet(packet) self.bot.user_prefs_persistence.persist_user_prefs.assert_called_once() @@ -37,5 +41,5 @@ def test_leave_testing(self): self.assertFalse(user_prefs.respond_to_testing.value) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_hello.py b/test/commands/test_hello.py index d54e694..ff0552b 100644 --- a/test/commands/test_hello.py +++ b/test/commands/test_hello.py @@ -14,7 +14,7 @@ def setUp(self): def test_handle_packet(self): sender_node = self.test_nodes[1] - message = build_test_text_message('!hello', sender_node.user.id, self.bot.my_id) + message = build_test_text_message("!hello", sender_node.user.id, self.bot.my_id) expected = ( f"Hello, {sender_node.user.long_name}! How can I help you? (tip: try !help). " "I'm a bot maintained by PDY4 / pskillen@gmail.com" @@ -23,5 +23,5 @@ def test_handle_packet(self): self.assert_message_sent(expected, sender_node) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_help.py b/test/commands/test_help.py index 082af67..fc30ba5 100644 --- a/test/commands/test_help.py +++ b/test/commands/test_help.py @@ -14,11 +14,13 @@ def setUp(self): self.command = HelpCommand(bot=self.bot) def test_handle_packet_no_additional_message(self): - message = build_test_text_message('!help', self.test_nodes[1].user.id, self.bot.my_id) + message = build_test_text_message( + "!help", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(message) response = self.fake_radio.send_text.call_args[0][0] - skipped_commands = ['!admin'] + skipped_commands = ["!admin"] for command in CommandFactory.commands.keys(): if command in skipped_commands: self.assertNotIn(command, response) @@ -26,12 +28,16 @@ def test_handle_packet_no_additional_message(self): self.assertIn(command, response) def test_handle_packet_hello_command(self): - message = build_test_text_message('!help hello', self.test_nodes[1].user.id, self.bot.my_id) + message = build_test_text_message( + "!help hello", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(message) self.assert_message_sent("!hello: responds with a greeting", self.test_nodes[1]) def test_handle_packet_ping_command(self): - message = build_test_text_message('!help ping', self.test_nodes[1].user.id, self.bot.my_id) + message = build_test_text_message( + "!help ping", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(message) self.assert_message_sent( "!ping (+ optional correlation message): responds with a pong", @@ -39,18 +45,26 @@ def test_handle_packet_ping_command(self): ) def test_handle_packet_help_command(self): - message = build_test_text_message('!help help', self.test_nodes[1].user.id, self.bot.my_id) + message = build_test_text_message( + "!help help", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(message) self.assert_message_sent("!help: show this help message", self.test_nodes[1]) def test_handle_packet_unknown_command(self): - message = build_test_text_message('!help unknown', self.test_nodes[1].user.id, self.bot.my_id) + message = build_test_text_message( + "!help unknown", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(message) self.assert_message_sent("Unknown command 'unknown'", self.test_nodes[1]) def test_handle_packet_ping_with_and_without_exclamation(self): - with_excl = build_test_text_message('!help !ping', self.test_nodes[1].user.id, self.bot.my_id) - without_excl = build_test_text_message('!help ping', self.test_nodes[1].user.id, self.bot.my_id) + with_excl = build_test_text_message( + "!help !ping", self.test_nodes[1].user.id, self.bot.my_id + ) + without_excl = build_test_text_message( + "!help ping", self.test_nodes[1].user.id, self.bot.my_id + ) expected = "!ping (+ optional correlation message): responds with a pong" self.command.handle_packet(with_excl) @@ -65,5 +79,5 @@ def test_show_help(self): pass -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_nodes.py b/test/commands/test_nodes.py index 9391f9a..5543162 100644 --- a/test/commands/test_nodes.py +++ b/test/commands/test_nodes.py @@ -16,7 +16,9 @@ def setUp(self): self.offline_count = len(self.bot.node_info.get_offline_nodes()) def test_handle_base_command(self): - msg = build_test_text_message('!nodes', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!nodes", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) expected_response = ( @@ -38,7 +40,9 @@ def test_handle_base_command(self): self.assert_message_sent(expected_response, self.test_nodes[1]) def test_handle_busy_command(self): - msg = build_test_text_message('!nodes busy', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!nodes busy", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) sorted_nodes = sorted( @@ -52,13 +56,17 @@ def test_handle_busy_command(self): node = self.bot.node_db.get_by_radio_id(node_id) expected_response += f"- {node.short_name} ({packet_count} pkts)\n" - last_reset_time = self.bot.node_info.packet_counter_reset_time.strftime("%H:%M:%S") + last_reset_time = self.bot.node_info.packet_counter_reset_time.strftime( + "%H:%M:%S" + ) expected_response += f"(last reset at {last_reset_time})" self.assert_message_sent(expected_response, self.test_nodes[1]) def test_handle_busy_detailed_command(self): - msg = build_test_text_message('!nodes busy detailed', self.test_nodes[1].user.id, self.bot.my_id) + msg = build_test_text_message( + "!nodes busy detailed", self.test_nodes[1].user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.fake_radio.send_text.assert_called() @@ -67,7 +75,7 @@ def test_handle_busy_detailed_command(self): def test_handle_busy_specific_node(self): target_node = self.test_nodes[1] msg = build_test_text_message( - f'!nodes busy {target_node.user.short_name}', + f"!nodes busy {target_node.user.short_name}", self.test_nodes[1].user.id, self.bot.my_id, ) @@ -79,16 +87,20 @@ def test_handle_busy_specific_node(self): ) last_heard = self.bot.node_info.get_last_heard(target_node.user.id) - expected_response = f"{target_node.user.long_name} ({target_node.user.short_name})\n" + expected_response = ( + f"{target_node.user.long_name} ({target_node.user.short_name})\n" + ) expected_response += f"Last heard: {pretty_print_last_heard(last_heard)}\n" expected_response += f"Pkts today: {packets_today}\n" - sorted_breakdown = sorted(packet_breakdown_today.items(), key=lambda x: x[1], reverse=True) + sorted_breakdown = sorted( + packet_breakdown_today.items(), key=lambda x: x[1], reverse=True + ) for packet_type, count in sorted_breakdown: expected_response += f"- {packet_type}: {count}\n" self.assert_message_sent(expected_response, self.test_nodes[1]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_ping.py b/test/commands/test_ping.py index 4d1bddb..4106104 100644 --- a/test/commands/test_ping.py +++ b/test/commands/test_ping.py @@ -14,14 +14,14 @@ def setUp(self): def test_handle_packet_no_additional_message(self): message = build_test_text_message( - '!ping', self.test_nodes[1].user.id, self.bot.my_id, max_hops=3, hops_left=3 + "!ping", self.test_nodes[1].user.id, self.bot.my_id, max_hops=3, hops_left=3 ) self.command.handle_packet(message) self.assert_message_sent("!pong (ping took 0 hops)", self.test_nodes[1]) def test_handle_packet_with_additional_message(self): message = build_test_text_message( - '!ping extra message', + "!ping extra message", self.test_nodes[1].user.id, self.bot.my_id, max_hops=3, @@ -34,11 +34,11 @@ def test_handle_packet_with_additional_message(self): def test_handle_packet_with_hop_count(self): message = build_test_text_message( - '!ping', self.test_nodes[1].user.id, self.bot.my_id, max_hops=3, hops_left=2 + "!ping", self.test_nodes[1].user.id, self.bot.my_id, max_hops=3, hops_left=2 ) self.command.handle_packet(message) self.assert_message_sent("!pong (ping took 1 hops)", self.test_nodes[1]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_prefs.py b/test/commands/test_prefs.py index a47ad68..586af64 100644 --- a/test/commands/test_prefs.py +++ b/test/commands/test_prefs.py @@ -17,20 +17,21 @@ def setUp(self): def test_base_command(self): sender_node = self.test_nodes[1] - self.bot.user_prefs_persistence.get_user_prefs.return_value = UserPrefs(sender_node.user.id) + self.bot.user_prefs_persistence.get_user_prefs.return_value = UserPrefs( + sender_node.user.id + ) - msg = build_test_text_message('!prefs', sender_node.user.id, self.bot.my_id) + msg = build_test_text_message("!prefs", sender_node.user.id, self.bot.my_id) self.command.handle_packet(msg) - expected_response = ( - "Your preferences:\n" - "Respond to 'testing': disabled\n" - ) + expected_response = "Your preferences:\n" "Respond to 'testing': disabled\n" self.assert_message_sent(expected_response, sender_node) def test_enable_testing(self): sender_node = self.test_nodes[1] - msg = build_test_text_message('!prefs testing enable', sender_node.user.id, self.bot.my_id) + msg = build_test_text_message( + "!prefs testing enable", sender_node.user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.bot.user_prefs_persistence.persist_user_prefs.assert_called_once() @@ -44,7 +45,9 @@ def test_enable_testing(self): def test_disable_testing(self): sender_node = self.test_nodes[1] - msg = build_test_text_message('!prefs testing disable', sender_node.user.id, self.bot.my_id) + msg = build_test_text_message( + "!prefs testing disable", sender_node.user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.bot.user_prefs_persistence.persist_user_prefs.assert_called_once() @@ -58,7 +61,9 @@ def test_disable_testing(self): def test_invalid_args(self): sender_node = self.test_nodes[1] - msg = build_test_text_message('!prefs testing invalid', sender_node.user.id, self.bot.my_id) + msg = build_test_text_message( + "!prefs testing invalid", sender_node.user.id, self.bot.my_id + ) self.command.handle_packet(msg) self.assert_message_sent( "Invalid mode for 'testing'. Please specify 'enable' or 'disable'.", @@ -67,7 +72,9 @@ def test_invalid_args(self): def test_invalid_pref(self): sender_node = self.test_nodes[1] - msg = build_test_text_message('!prefs invalid_pref enable', sender_node.user.id, self.bot.my_id) + msg = build_test_text_message( + "!prefs invalid_pref enable", sender_node.user.id, self.bot.my_id + ) self.command.handle_packet(msg) expected_response = ( "!prefs: configure bot settings related to your node:\n" @@ -77,5 +84,5 @@ def test_invalid_pref(self): self.assert_message_sent(expected_response, sender_node, multi_response=True) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/commands/test_template.py b/test/commands/test_template.py index b9555ab..c7a77ff 100644 --- a/test/commands/test_template.py +++ b/test/commands/test_template.py @@ -14,12 +14,16 @@ class TestTemplateCommand(CommandTestCase): def setUp(self): super().setUp() self.template = "Hello, {{ sender_name }}! You sent: {{ rx_message }}" - self.command = TemplateCommand(self.bot, 'template', self.template) + self.command = TemplateCommand(self.bot, "template", self.template) def test_handle_packet(self): sender = self.test_nodes[1] msg = build_test_text_message( - '!template test message', sender.user.id, self.bot.my_id, max_hops=5, hops_left=3 + "!template test message", + sender.user.id, + self.bot.my_id, + max_hops=5, + hops_left=3, ) self.command.handle_packet(msg) self.assert_message_sent( @@ -29,10 +33,14 @@ def test_handle_packet(self): def test_handle_packet_no_sender(self): sender = MeshNode() sender.user = MeshNode.User() - sender.user.id = '1234567890' + sender.user.id = "1234567890" msg = build_test_text_message( - '!template test message', sender.user.id, self.bot.my_id, max_hops=5, hops_left=3 + "!template test message", + sender.user.id, + self.bot.my_id, + max_hops=5, + hops_left=3, ) self.command.handle_packet(msg) self.assert_message_sent( @@ -41,7 +49,7 @@ def test_handle_packet_no_sender(self): def test_user_prefs_rendering(self): sender = self.test_nodes[1] - msg = build_test_text_message('!myuserprefs', sender.user.id, self.bot.my_id) + msg = build_test_text_message("!myuserprefs", sender.user.id, self.bot.my_id) custom_user_prefs = UserPrefs(sender.user.id) custom_user_prefs.respond_to_testing = True @@ -50,11 +58,11 @@ def test_user_prefs_rendering(self): self.bot.user_prefs_persistence.get_user_prefs.return_value = custom_user_prefs template = "{{ 'Respond to testing: ' ~ user_prefs.respond_to_testing }}" - command = TemplateCommand(self.bot, 'myuserprefs', template) + command = TemplateCommand(self.bot, "myuserprefs", template) command.handle_packet(msg) self.assert_message_sent("Respond to testing: True", sender) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/fake_radio.py b/test/fake_radio.py index 30e8019..7112f0d 100644 --- a/test/fake_radio.py +++ b/test/fake_radio.py @@ -39,7 +39,10 @@ def send_traceroute(self, *args, **kwargs) -> None: # pragma: no cover pass def __init__( - self, *, local_node_id: Optional[str] = None, local_nodenum: Optional[int] = None + self, + *, + local_node_id: Optional[str] = None, + local_nodenum: Optional[int] = None, ): self._local_node_id = local_node_id self._local_nodenum = local_nodenum diff --git a/test/meshcore/test_channels.py b/test/meshcore/test_channels.py index 7b117cd..16f0119 100644 --- a/test/meshcore/test_channels.py +++ b/test/meshcore/test_channels.py @@ -6,10 +6,14 @@ from unittest.mock import AsyncMock, MagicMock from meshcore.events import Event, EventType -from src.meshcore.channels import (_channel_entry_from_info, - apply_device_channels, log_device_channels, - merge_channel_region_scopes, - read_device_channels, snapshot_sync_body) +from src.meshcore.channels import ( + _channel_entry_from_info, + apply_device_channels, + log_device_channels, + merge_channel_region_scopes, + read_device_channels, + snapshot_sync_body, +) def test_channel_entry_public(): diff --git a/test/meshcore/test_flood_advert_scheduler.py b/test/meshcore/test_flood_advert_scheduler.py index 5e06adc..af57180 100644 --- a/test/meshcore/test_flood_advert_scheduler.py +++ b/test/meshcore/test_flood_advert_scheduler.py @@ -12,20 +12,35 @@ def test_parse_flood_advert_interval_hours_defaults() -> None: - assert MeshCoreRadio.parse_flood_advert_interval_hours(None) == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS - assert MeshCoreRadio.parse_flood_advert_interval_hours({}) == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + assert ( + MeshCoreRadio.parse_flood_advert_interval_hours(None) + == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + ) + assert ( + MeshCoreRadio.parse_flood_advert_interval_hours({}) + == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + ) def test_parse_flood_advert_interval_hours_clamps() -> None: - assert MeshCoreRadio.parse_flood_advert_interval_hours( - {"mc_flood_advert_interval_hours": 1} - ) == 2.0 - assert MeshCoreRadio.parse_flood_advert_interval_hours( - {"mc_flood_advert_interval_hours": 48} - ) == 24.0 - assert MeshCoreRadio.parse_flood_advert_interval_hours( - {"mc_flood_advert_interval_hours": 12} - ) == 12.0 + assert ( + MeshCoreRadio.parse_flood_advert_interval_hours( + {"mc_flood_advert_interval_hours": 1} + ) + == 2.0 + ) + assert ( + MeshCoreRadio.parse_flood_advert_interval_hours( + {"mc_flood_advert_interval_hours": 48} + ) + == 24.0 + ) + assert ( + MeshCoreRadio.parse_flood_advert_interval_hours( + {"mc_flood_advert_interval_hours": 12} + ) + == 12.0 + ) def test_schedule_flood_advert_from_config() -> None: diff --git a/test/meshcore/test_initial_flood_advert.py b/test/meshcore/test_initial_flood_advert.py index 6342be9..2132825 100644 --- a/test/meshcore/test_initial_flood_advert.py +++ b/test/meshcore/test_initial_flood_advert.py @@ -19,9 +19,7 @@ async def _runner() -> None: mc = MagicMock() mc.is_connected = True - mc.commands.send_advert = AsyncMock( - return_value=Event(EventType.OK, {}, {}) - ) + mc.commands.send_advert = AsyncMock(return_value=Event(EventType.OK, {}, {})) radio._meshcore = mc radio.schedule_initial_flood_advert() diff --git a/test/meshcore/test_radio.py b/test/meshcore/test_radio.py index 2a86450..1cf4624 100644 --- a/test/meshcore/test_radio.py +++ b/test/meshcore/test_radio.py @@ -47,7 +47,9 @@ def on_node(u: NodeUpdate) -> None: radio.dispatch_meshcore_event_for_tests(ev) assert (tmp_path / "meshcore_packets" / "contact_message").exists() - json_files = list((tmp_path / "meshcore_packets" / "contact_message").glob("*.json")) + json_files = list( + (tmp_path / "meshcore_packets" / "contact_message").glob("*.json") + ) assert len(json_files) == 1 kinds = [k for k, _ in received] assert "packet" in kinds diff --git a/test/meshtastic/test_translation.py b/test/meshtastic/test_translation.py index f9d6f53..c983f0f 100644 --- a/test/meshtastic/test_translation.py +++ b/test/meshtastic/test_translation.py @@ -14,16 +14,16 @@ class TestIdConversions(unittest.TestCase): def test_round_trip(self): - for nodenum in (1, 0xdeadbeef, 0xaabbccdd): + for nodenum in (1, 0xDEADBEEF, 0xAABBCCDD): self.assertEqual(id_to_nodenum(nodenum_to_id(nodenum)), nodenum) def test_id_format_is_canonical_hex(self): - self.assertEqual(nodenum_to_id(0xaabbccdd), "!aabbccdd") + self.assertEqual(nodenum_to_id(0xAABBCCDD), "!aabbccdd") self.assertEqual(nodenum_to_id(1), "!00000001") def test_id_to_nodenum_strips_leading_bang(self): - self.assertEqual(id_to_nodenum("!aabbccdd"), 0xaabbccdd) - self.assertEqual(id_to_nodenum("aabbccdd"), 0xaabbccdd) + self.assertEqual(id_to_nodenum("!aabbccdd"), 0xAABBCCDD) + self.assertEqual(id_to_nodenum("aabbccdd"), 0xAABBCCDD) class TestPacketToIncoming(unittest.TestCase): @@ -106,7 +106,12 @@ def test_full_node(self): "hwModel": "T-BEAM", "publicKey": "pk", }, - "position": {"latitude": 1.0, "longitude": 2.0, "altitude": 100, "time": 1700000000}, + "position": { + "latitude": 1.0, + "longitude": 2.0, + "altitude": 100, + "time": 1700000000, + }, "deviceMetrics": {"batteryLevel": 75, "voltage": 3.9}, "lastHeard": 1700000000, "isFavorite": True, diff --git a/test/persistence/test_commands_logger.py b/test/persistence/test_commands_logger.py index fa0fbf8..8524c93 100644 --- a/test/persistence/test_commands_logger.py +++ b/test/persistence/test_commands_logger.py @@ -11,7 +11,7 @@ class TestSqliteCommandLogger(unittest.TestCase): def setUp(self): - self.db_path = 'test_commands.sqlite' + self.db_path = "test_commands.sqlite" self.logger = SqliteCommandLogger(self.db_path) def tearDown(self): @@ -21,80 +21,98 @@ def tearDown(self): def test_initialize_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='command_log'") + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='command_log'" + ) self.assertIsNotNone(cursor.fetchone()) - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='unknown_requests'") + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='unknown_requests'" + ) self.assertIsNotNone(cursor.fetchone()) - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='responder_log'") + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='responder_log'" + ) self.assertIsNotNone(cursor.fetchone()) def test_log_command(self): command_instance = MagicMock(spec=AbstractCommand) - command_instance.get_command_for_logging.return_value = ('base_cmd', ['sub_cmd1', 'sub_cmd2'], 'arg1 arg2') - self.logger.log_command('sender1', command_instance, 'message') + command_instance.get_command_for_logging.return_value = ( + "base_cmd", + ["sub_cmd1", "sub_cmd2"], + "arg1 arg2", + ) + self.logger.log_command("sender1", command_instance, "message") with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM command_log WHERE sender_id='sender1'") row = cursor.fetchone() self.assertIsNotNone(row) - self.assertEqual(row[1], 'base_cmd') - self.assertEqual(row[2], 'sub_cmd1 sub_cmd2') - self.assertEqual(row[3], 'arg1 arg2') - self.assertEqual(row[5], 'AbstractCommand') + self.assertEqual(row[1], "base_cmd") + self.assertEqual(row[2], "sub_cmd1 sub_cmd2") + self.assertEqual(row[3], "arg1 arg2") + self.assertEqual(row[5], "AbstractCommand") def test_log_unknown_request(self): - self.logger.log_unknown_request('sender1', 'unknown message') + self.logger.log_unknown_request("sender1", "unknown message") with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM unknown_requests WHERE sender_id='sender1'") row = cursor.fetchone() self.assertIsNotNone(row) - self.assertEqual(row[1], 'unknown message') + self.assertEqual(row[1], "unknown message") def test_log_responder_handled(self): responder_instance = MagicMock(spec=AbstractResponder) - self.logger.log_responder_handled('sender1', responder_instance, 'response message') + self.logger.log_responder_handled( + "sender1", responder_instance, "response message" + ) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM responder_log WHERE sender_id='sender1'") row = cursor.fetchone() self.assertIsNotNone(row) - self.assertEqual(row[1], 'response message') - self.assertEqual(row[3], 'AbstractResponder') + self.assertEqual(row[1], "response message") + self.assertEqual(row[3], "AbstractResponder") def test_get_command_history(self): command_instance = MagicMock(spec=AbstractCommand) - command_instance.get_command_for_logging.return_value = ('base_cmd', ['sub_cmd1', 'sub_cmd2'], 'arg1 arg2') - self.logger.log_command('sender1', command_instance, 'message') + command_instance.get_command_for_logging.return_value = ( + "base_cmd", + ["sub_cmd1", "sub_cmd2"], + "arg1 arg2", + ) + self.logger.log_command("sender1", command_instance, "message") since = datetime.now(timezone.utc) - timedelta(days=1) history = self.logger.get_command_history(since=since) self.assertGreater(len(history), 0) - for key in ('sender_id', 'base_command', 'timestamp'): + for key in ("sender_id", "base_command", "timestamp"): self.assertIn(key, history[0]) def test_get_unknown_command_history(self): - self.logger.log_unknown_request('sender1', 'unknown message') + self.logger.log_unknown_request("sender1", "unknown message") since = datetime.now(timezone.utc) - timedelta(days=1) history = self.logger.get_unknown_command_history(since=since) self.assertGreater(len(history), 0) - for key in ('sender_id', 'message', 'timestamp'): + for key in ("sender_id", "message", "timestamp"): self.assertIn(key, history[0]) def test_get_responder_history(self): responder_instance = MagicMock(spec=AbstractResponder) - self.logger.log_responder_handled('sender1', responder_instance, 'response message') + self.logger.log_responder_handled( + "sender1", responder_instance, "response message" + ) since = datetime.now(timezone.utc) - timedelta(days=1) history = self.logger.get_responder_history(since=since) self.assertGreater(len(history), 0) - for key in ('sender_id', 'responder_class', 'timestamp'): + for key in ("sender_id", "responder_class", "timestamp"): self.assertIn(key, history[0]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/persistence/test_node_db.py b/test/persistence/test_node_db.py index 3a47d49..5e7f9c1 100644 --- a/test/persistence/test_node_db.py +++ b/test/persistence/test_node_db.py @@ -9,11 +9,18 @@ class TestInMemoryNodeDB(unittest.TestCase): def setUp(self): self.db = InMemoryNodeDB() - self.node_user = MeshNode.User(node_id='node1', short_name='Node1', long_name='Test Node 1') - self.position = MeshNode.Position(logged_time=datetime.now(timezone.utc), - latitude=10.0, longitude=20.0, altitude=100.0) - self.device_metrics = MeshNode.DeviceMetrics(logged_time=datetime.now(timezone.utc), - battery_level=90) + self.node_user = MeshNode.User( + node_id="node1", short_name="Node1", long_name="Test Node 1" + ) + self.position = MeshNode.Position( + logged_time=datetime.now(timezone.utc), + latitude=10.0, + longitude=20.0, + altitude=100.0, + ) + self.device_metrics = MeshNode.DeviceMetrics( + logged_time=datetime.now(timezone.utc), battery_level=90 + ) def test_store_and_get_user(self): self.db.store_user(self.node_user) @@ -32,7 +39,9 @@ def test_store_and_get_position(self): def test_store_and_get_device_metrics(self): self.db.store_device_metrics(self.node_user.id, self.device_metrics) retrieved_metrics = self.db.get_last_device_metrics(self.node_user.id) - self.assertEqual(retrieved_metrics.battery_level, self.device_metrics.battery_level) + self.assertEqual( + retrieved_metrics.battery_level, self.device_metrics.battery_level + ) def test_list_nodes(self): self.db.store_user(self.node_user) @@ -52,20 +61,29 @@ def test_get_device_metrics_log(self): start_time = datetime.now(timezone.utc) - timedelta(days=1) end_time = datetime.now(timezone.utc) + timedelta(days=1) self.db.store_device_metrics(self.node_user.id, self.device_metrics) - metrics = self.db.get_device_metrics_log(self.node_user.id, start_time, end_time) + metrics = self.db.get_device_metrics_log( + self.node_user.id, start_time, end_time + ) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0].battery_level, self.device_metrics.battery_level) class TestSqliteNodeDB(unittest.TestCase): def setUp(self): - self.db_path = 'test_node_db.sqlite' + self.db_path = "test_node_db.sqlite" self.db = SqliteNodeDB(self.db_path) - self.node_user = MeshNode.User(node_id='node1', short_name='Node1', long_name='Test Node 1') - self.position = MeshNode.Position(logged_time=datetime.now(timezone.utc), - latitude=10.0, longitude=20.0, altitude=100.0) - self.device_metrics = MeshNode.DeviceMetrics(logged_time=datetime.now(timezone.utc), - battery_level=90) + self.node_user = MeshNode.User( + node_id="node1", short_name="Node1", long_name="Test Node 1" + ) + self.position = MeshNode.Position( + logged_time=datetime.now(timezone.utc), + latitude=10.0, + longitude=20.0, + altitude=100.0, + ) + self.device_metrics = MeshNode.DeviceMetrics( + logged_time=datetime.now(timezone.utc), battery_level=90 + ) def tearDown(self): if os.path.exists(self.db_path): @@ -88,7 +106,9 @@ def test_store_and_get_position(self): def test_store_and_get_device_metrics(self): self.db.store_device_metrics(self.node_user.id, self.device_metrics) retrieved_metrics = self.db.get_last_device_metrics(self.node_user.id) - self.assertEqual(retrieved_metrics.battery_level, self.device_metrics.battery_level) + self.assertEqual( + retrieved_metrics.battery_level, self.device_metrics.battery_level + ) def test_list_nodes(self): self.db.store_user(self.node_user) @@ -108,10 +128,12 @@ def test_get_device_metrics_log(self): start_time = datetime.now(timezone.utc) - timedelta(days=1) end_time = datetime.now(timezone.utc) + timedelta(days=1) self.db.store_device_metrics(self.node_user.id, self.device_metrics) - metrics = self.db.get_device_metrics_log(self.node_user.id, start_time, end_time) + metrics = self.db.get_device_metrics_log( + self.node_user.id, start_time, end_time + ) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0].battery_level, self.device_metrics.battery_level) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/persistence/test_node_info.py b/test/persistence/test_node_info.py index 84b2939..4f6bb57 100644 --- a/test/persistence/test_node_info.py +++ b/test/persistence/test_node_info.py @@ -7,13 +7,16 @@ class TestInMemoryNodeInfoStore(unittest.TestCase): def setUp(self): self.store = InMemoryNodeInfoStore() - self.node_id = 'node1' - self.packet_type = 'data' + self.node_id = "node1" + self.packet_type = "data" def test_node_packet_received(self): self.store.node_packet_received(self.node_id, self.packet_type) self.assertEqual(self.store.get_node_packets_today(self.node_id), 1) - self.assertEqual(self.store.get_node_packets_today_breakdown(self.node_id)[self.packet_type], 1) + self.assertEqual( + self.store.get_node_packets_today_breakdown(self.node_id)[self.packet_type], + 1, + ) def test_update_last_heard(self): now = datetime.now(timezone.utc) @@ -55,5 +58,5 @@ def test_get_all_nodes_packets_today_breakdown(self): self.assertEqual(all_packets_breakdown[self.node_id][self.packet_type], 1) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/persistence/test_user_prefs.py b/test/persistence/test_user_prefs.py index 7d6d5ba..e19e878 100644 --- a/test/persistence/test_user_prefs.py +++ b/test/persistence/test_user_prefs.py @@ -8,7 +8,7 @@ class TestSqliteUserPrefsPersistence(unittest.TestCase): def setUp(self): - self.db_path = 'test_user_prefs.sqlite' + self.db_path = "test_user_prefs.sqlite" self.persistence = SqliteUserPrefsPersistence(self.db_path) self.persistence._initialize_db() @@ -19,14 +19,17 @@ def tearDown(self): def test_initialize_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_prefs'") + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='user_prefs'" + ) self.assertIsNotNone(cursor.fetchone()) def test_persist_and_get_user_prefs(self): - user_id = 'test_user' + user_id = "test_user" user_prefs = UserPrefs(user_id) user_prefs.respond_to_testing = UserPrefs.Preference.from_values( - value=True, time_set=datetime.now(timezone.utc), num_changes=0) + value=True, time_set=datetime.now(timezone.utc), num_changes=0 + ) self.persistence.persist_user_prefs(user_id, user_prefs) retrieved_prefs = self.persistence.get_user_prefs(user_id) @@ -36,10 +39,11 @@ def test_persist_and_get_user_prefs(self): self.assertEqual(retrieved_prefs.respond_to_testing.num_changes, 0) def test_update_user_prefs(self): - user_id = 'test_user' + user_id = "test_user" user_prefs = UserPrefs(user_id) user_prefs.respond_to_testing = UserPrefs.Preference.from_values( - value=True, time_set=datetime.now(timezone.utc), num_changes=0) + value=True, time_set=datetime.now(timezone.utc), num_changes=0 + ) self.persistence.persist_user_prefs(user_id, user_prefs) user_prefs.respond_to_testing.value = False @@ -50,27 +54,31 @@ def test_update_user_prefs(self): self.assertEqual(retrieved_prefs.respond_to_testing.num_changes, 1) def test_multiple_settings(self): - user_id = 'test_user' + user_id = "test_user" user_prefs = UserPrefs(user_id) user_prefs.respond_to_testing = UserPrefs.Preference.from_values( - value=True, time_set=datetime.now(timezone.utc), num_changes=0) + value=True, time_set=datetime.now(timezone.utc), num_changes=0 + ) user_prefs.another_setting = UserPrefs.Preference.from_values( - value='value', time_set=datetime.now(timezone.utc), num_changes=0) + value="value", time_set=datetime.now(timezone.utc), num_changes=0 + ) self.persistence.persist_user_prefs(user_id, user_prefs) retrieved_prefs = self.persistence.get_user_prefs(user_id) self.assertEqual(retrieved_prefs.user_id, user_id) self.assertTrue(retrieved_prefs.respond_to_testing.value) - self.assertEqual(retrieved_prefs.another_setting.value, 'value') + self.assertEqual(retrieved_prefs.another_setting.value, "value") def test_field_change_does_not_update_others(self): - user_id = 'test_user' + user_id = "test_user" user_prefs = UserPrefs(user_id) user_prefs.respond_to_testing = UserPrefs.Preference.from_values( - value=True, time_set=datetime.now(timezone.utc), num_changes=0) + value=True, time_set=datetime.now(timezone.utc), num_changes=0 + ) user_prefs.another_setting = UserPrefs.Preference.from_values( - value='value', time_set=datetime.now(timezone.utc), num_changes=0) + value="value", time_set=datetime.now(timezone.utc), num_changes=0 + ) original_time_set_testing = user_prefs.respond_to_testing.time_set original_time_set_another = user_prefs.another_setting.time_set @@ -84,12 +92,17 @@ def test_field_change_does_not_update_others(self): self.assertFalse(retrieved_prefs.respond_to_testing.value) self.assertEqual(retrieved_prefs.respond_to_testing.num_changes, 1) - self.assertNotEqual(retrieved_prefs.respond_to_testing.time_set, original_time_set_testing, - 'Expected time set to have been updated') - self.assertEqual(retrieved_prefs.another_setting.value, 'value') + self.assertNotEqual( + retrieved_prefs.respond_to_testing.time_set, + original_time_set_testing, + "Expected time set to have been updated", + ) + self.assertEqual(retrieved_prefs.another_setting.value, "value") self.assertEqual(retrieved_prefs.another_setting.num_changes, 0) - self.assertEqual(retrieved_prefs.another_setting.time_set, original_time_set_another) + self.assertEqual( + retrieved_prefs.another_setting.time_set, original_time_set_another + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/responders/test_message_reaction_responder.py b/test/responders/test_message_reaction_responder.py index 9d9c1c8..d8f8d94 100644 --- a/test/responders/test_message_reaction_responder.py +++ b/test/responders/test_message_reaction_responder.py @@ -13,11 +13,11 @@ def setUp(self): super().setUp() self.responder = MessageReactionResponder(bot=self.bot, emoji="👍😊🎉") - @patch('random.choice', return_value="👍") + @patch("random.choice", return_value="👍") def test_handle_packet(self, _mock_random_choice): sender_node = self.test_nodes[1] msg = build_test_text_message( - 'Hello', sender_node.user.id, self.bot.my_id, channel=1, is_dm=False + "Hello", sender_node.user.id, self.bot.my_id, channel=1, is_dm=False ) self.responder.handle_packet(msg) self.assert_reaction_sent("👍", msg.message_id, channel=1) @@ -25,12 +25,12 @@ def test_handle_packet(self, _mock_random_choice): def test_handle_packet_not_enrolled(self): sender_node = self.test_nodes[1] msg = build_test_text_message( - 'Hello', sender_node.user.id, self.bot.my_id, channel=1, is_dm=False + "Hello", sender_node.user.id, self.bot.my_id, channel=1, is_dm=False ) self.bot.user_prefs_persistence.get_user_prefs.return_value = None self.responder.handle_packet(msg) self.fake_radio.send_reaction.assert_not_called() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_base_feature.py b/test/test_base_feature.py index 6777469..450a1de 100644 --- a/test/test_base_feature.py +++ b/test/test_base_feature.py @@ -18,7 +18,9 @@ def setUp(self): def test_reply_in_channel(self): sender = self.test_non_admin_nodes[1] - msg = build_test_text_message('!test', sender.user.id, self.bot.my_id, channel=1) + msg = build_test_text_message( + "!test", sender.user.id, self.bot.my_id, channel=1 + ) self.feature.reply_in_channel(msg, "Test message") self.fake_radio.send_text.assert_called_once_with( "Test message", channel=1, want_ack=False, hop_limit=5 @@ -32,7 +34,7 @@ def test_message_in_channel(self): def test_reply_in_dm(self): sender = self.test_non_admin_nodes[1] - msg = build_test_text_message('!test', sender.user.id, self.bot.my_id) + msg = build_test_text_message("!test", sender.user.id, self.bot.my_id) self.feature.reply_in_dm(msg, "Test message") self.fake_radio.send_text.assert_called_once_with( "Test message", destination_id=sender.user.id, want_ack=False, hop_limit=5 @@ -47,7 +49,9 @@ def test_message_in_dm(self): def test_react_in_channel(self): sender = self.test_non_admin_nodes[1] - msg = build_test_text_message('!test', sender.user.id, self.bot.my_id, channel=1) + msg = build_test_text_message( + "!test", sender.user.id, self.bot.my_id, channel=1 + ) self.feature.react_in_channel(msg, "👍") self.fake_radio.send_reaction.assert_called_once_with( "👍", msg.message_id, channel=1 @@ -55,12 +59,12 @@ def test_react_in_channel(self): def test_react_in_dm(self): sender = self.test_non_admin_nodes[1] - msg = build_test_text_message('!test', sender.user.id, self.bot.my_id) + msg = build_test_text_message("!test", sender.user.id, self.bot.my_id) self.feature.react_in_dm(msg, "👍") self.fake_radio.send_reaction.assert_called_once_with( "👍", msg.message_id, destination_id=sender.user.id ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_bot_integration.py b/test/test_bot_integration.py index c1f928d..a1e7e51 100644 --- a/test/test_bot_integration.py +++ b/test/test_bot_integration.py @@ -21,7 +21,7 @@ def _build_bot(): - radio = FakeRadio(local_node_id="!aabbccdd", local_nodenum=0xaabbccdd) + radio = FakeRadio(local_node_id="!aabbccdd", local_nodenum=0xAABBCCDD) bot = MeshflowBot(radio=radio) bot.node_db = InMemoryNodeDB() bot.node_info = InMemoryNodeInfoStore() @@ -65,7 +65,9 @@ def test_node_update_stores_node_and_uploads(self): node = MeshNode() node.user = MeshNode.User(node_id="!11112222", long_name="Alice") - radio.deliver_node_update(NodeUpdate(node=node, last_heard=datetime.now(timezone.utc))) + radio.deliver_node_update( + NodeUpdate(node=node, last_heard=datetime.now(timezone.utc)) + ) self.assertIsNotNone(bot.node_db.get_by_radio_id("!11112222")) storage.store_node.assert_called_once_with(node) diff --git a/test/test_data_classes.py b/test/test_data_classes.py index 38ba331..57a865a 100644 --- a/test/test_data_classes.py +++ b/test/test_data_classes.py @@ -7,74 +7,74 @@ class TestMeshNode(unittest.TestCase): def setUp(self): self.node_data = { - 'num': 1, - 'user': { - 'id': 'user123', - 'longName': 'Test User', - 'shortName': 'TU', - 'macaddr': '00:11:22:33:44:55', - 'hwModel': 'ModelX', - 'publicKey': 'public_key_123' + "num": 1, + "user": { + "id": "user123", + "longName": "Test User", + "shortName": "TU", + "macaddr": "00:11:22:33:44:55", + "hwModel": "ModelX", + "publicKey": "public_key_123", }, - 'position': { - 'altitude': 100, - 'time': 1234567890, - 'locationSource': 'GPS', - 'latitude': 37.7749, - 'longitude': -122.4194 + "position": { + "altitude": 100, + "time": 1234567890, + "locationSource": "GPS", + "latitude": 37.7749, + "longitude": -122.4194, }, - 'lastHeard': 1234567890, - 'deviceMetrics': { - 'batteryLevel': 80, - 'voltage': 3.7, - 'channelUtilization': 0.5, - 'airUtilTx': 0.1, - 'uptimeSeconds': 3600 + "lastHeard": 1234567890, + "deviceMetrics": { + "batteryLevel": 80, + "voltage": 3.7, + "channelUtilization": 0.5, + "airUtilTx": 0.1, + "uptimeSeconds": 3600, }, - 'isFavorite': True, + "isFavorite": True, } self.mesh_node = MeshNode.from_dict(self.node_data) def test_from_dict(self): raw_data = { - 'num': 1129933592, - 'user': { - 'id': '!43596b18', - 'longName': 'PDY base (unattended)', - 'shortName': 'PDYb', - 'macaddr': 'SMpDWWsY', - 'hwModel': 'HELTEC_V3', - 'publicKey': 'JWs3gfXaE1XvUm9Z2MNiC1gcGvkiFLT1/69CDCtqsxQ=' + "num": 1129933592, + "user": { + "id": "!43596b18", + "longName": "PDY base (unattended)", + "shortName": "PDYb", + "macaddr": "SMpDWWsY", + "hwModel": "HELTEC_V3", + "publicKey": "JWs3gfXaE1XvUm9Z2MNiC1gcGvkiFLT1/69CDCtqsxQ=", }, - 'position': { - 'latitudeI': 558243766, - 'longitudeI': -41218138, - 'altitude': 41, - 'time': 1737237050, - 'locationSource': 'LOC_EXTERNAL', - 'latitude': 55.8243766, - 'longitude': -4.1218138 + "position": { + "latitudeI": 558243766, + "longitudeI": -41218138, + "altitude": 41, + "time": 1737237050, + "locationSource": "LOC_EXTERNAL", + "latitude": 55.8243766, + "longitude": -4.1218138, }, - 'lastHeard': 1737237050, - 'deviceMetrics': { - 'batteryLevel': 101, - 'voltage': 4.302, - 'channelUtilization': 2.915, - 'airUtilTx': 0.6213055, - 'uptimeSeconds': 372627 + "lastHeard": 1737237050, + "deviceMetrics": { + "batteryLevel": 101, + "voltage": 4.302, + "channelUtilization": 2.915, + "airUtilTx": 0.6213055, + "uptimeSeconds": 372627, }, - 'isFavorite': True + "isFavorite": True, } mesh_node = MeshNode.from_dict(raw_data) - self.assertEqual(mesh_node.user.id, '!43596b18') - self.assertEqual(mesh_node.user.long_name, 'PDY base (unattended)') + self.assertEqual(mesh_node.user.id, "!43596b18") + self.assertEqual(mesh_node.user.long_name, "PDY base (unattended)") self.assertEqual(mesh_node.position.latitude, 55.8243766) self.assertEqual(mesh_node.device_metrics.battery_level, 101) self.assertTrue(mesh_node.is_favorite) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_setup_data.py b/test/test_setup_data.py index bae3f67..97223ae 100644 --- a/test/test_setup_data.py +++ b/test/test_setup_data.py @@ -27,15 +27,15 @@ def generate_random_rssi(): def generate_random_packet_id(): - return random.randint(0, 2 ** 32 - 1) # 32-bit unsigned int + return random.randint(0, 2**32 - 1) # 32-bit unsigned int _packet_types = [ - 'TEXT_MESSAGE_APP', - 'POSITION_APP', - 'TRACKER_APP', - 'PRIVATE_APP', - 'BROADCAST_APP', + "TEXT_MESSAGE_APP", + "POSITION_APP", + "TRACKER_APP", + "PRIVATE_APP", + "BROADCAST_APP", ] @@ -44,7 +44,7 @@ def generate_random_packet_type() -> str: def random_node_id(): - return random.randint(0, 2 ** 32 - 1) # 32-bit unsigned int + return random.randint(0, 2**32 - 1) # 32-bit unsigned int def random_node_id_hex(): @@ -56,7 +56,7 @@ def make_node(): node.user = MeshNode.User() node.user.id = random_node_id_hex() node.user.short_name = node.user.id[-4:] - node.user.long_name = 'Node ' + node.user.id + node.user.long_name = "Node " + node.user.id return node @@ -73,7 +73,10 @@ def get_test_bot(node_count=2, admin_node_count=1): admin_nodes: list[MeshNode] = [make_node() for _ in range(admin_node_count)] all_nodes = nodes + admin_nodes - fake_radio = FakeRadio(local_node_id=nodes[0].user.id, local_nodenum=meshtastic_hex_to_int(nodes[0].user.id)) + fake_radio = FakeRadio( + local_node_id=nodes[0].user.id, + local_nodenum=meshtastic_hex_to_int(nodes[0].user.id), + ) bot = MeshflowBot(radio=fake_radio) bot.admin_nodes = [node.user.id for node in admin_nodes] @@ -88,7 +91,9 @@ def get_test_bot(node_count=2, admin_node_count=1): bot.node_db.store_node(node) for _ in range(random.randint(1, 10)): - bot.node_info.node_packet_received(node.user.id, generate_random_packet_type()) + bot.node_info.node_packet_received( + node.user.id, generate_random_packet_type() + ) bot.node_info.update_last_heard(node.user.id, last_heard) return bot, nodes, admin_nodes @@ -117,7 +122,9 @@ def build_test_text_message( from_id=sender_id, to_id=to_id, channel=channel, - message_id=message_id if message_id is not None else generate_random_packet_id(), + message_id=( + message_id if message_id is not None else generate_random_packet_id() + ), hop_start=max_hops, hop_limit=hops_left, is_dm=is_dm,