diff --git a/.env.example b/.env.example index 02bdbfd..455b7ad 100644 --- a/.env.example +++ b/.env.example @@ -1,11 +1,19 @@ +# The IP address of your Meshtastic node MESHTASTIC_IP=192.168.123.123 ADMIN_NODES='!aae8900d' +# The root URL of the Meshflow API STORAGE_API_ROOT='http://localhost:8000' STORAGE_API_TOKEN=... - # Features ENABLE_TCP_PROXY=true +# Handshake Cache size for TCP proxy (how many historical packets to cache to allow new clients to catch up quickly) +PROXY_HANDSHAKE_CACHE_SIZE=100 +# Rolling Cache size for TCP proxy (how many recent packets to keep in the rolling buffer) +PROXY_ROLLING_CACHE_SIZE=100 +ENABLE_FEATURE_NODE_TOTALS=true +FREQUENCY_OF_NODE_REPORTS=3 +CHANNEL_FOR_NODE_TOTAL_BROADCAST=2 # Commands ENABLE_COMMAND_PING=true @@ -17,3 +25,25 @@ ENABLE_COMMAND_WHOAMI=true ENABLE_COMMAND_PREFS=true ENABLE_COMMAND_ADMIN=true ENABLE_COMMAND_STATUS=true + +# API Version (usually 1 or 2) +STORAGE_API_VERSION=2 + +# Use these if you want to upload to a second API (usually used during testing) +# STORAGE_API_2_ROOT=... +# STORAGE_API_2_VERSION=2 +# STORAGE_API_2_TOKEN=... + +# Use this if you want to receive commands from the Meshflow server (e.g. traceroute) +MESHFLOW_WS_URL=ws://localhost:8000 + +# Comma-separated portnums to skip when submitting packets to the API (e.g. custom or rejected portnums) +IGNORE_PORTNUMS=345,ROUTING_APP + +# Traceroute config (for WebSocket commands) +TR_HOPS_LIMIT=5 +# Min seconds between traceroutes (firmware enforces ~30s; we rate-limit client-side) +TR_MIN_INTERVAL_SEC=30 + +# Max hops for text messages sent by the bot (1-7, default 5) +TEXT_MESSAGE_MAX_HOPS=5 diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..4c0c392 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,26 @@ +# Summary + + + + + +## Testing performed + + diff --git a/.github/workflows/armv7-docker-base-image-build.yaml b/.github/workflows/armv7-docker-base-image-build.yaml index cceafff..3e682d2 100644 --- a/.github/workflows/armv7-docker-base-image-build.yaml +++ b/.github/workflows/armv7-docker-base-image-build.yaml @@ -20,10 +20,10 @@ jobs: steps: - name: Checkout Repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + uses: docker/setup-buildx-action@v4 - name: Log in to GitHub Container Registry run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin diff --git a/.github/workflows/dev.yaml b/.github/workflows/dev.yaml new file mode 100644 index 0000000..c27c251 --- /dev/null +++ b/.github/workflows/dev.yaml @@ -0,0 +1,34 @@ +name: dev + +on: + push: + branches: + - dev + +permissions: + contents: read + packages: write + +jobs: + extract-dev-tag: + runs-on: ubuntu-latest + steps: + - name: Checkout Repository + uses: actions/checkout@v6 + + - name: Get short SHA + id: get_sha + run: | + SHORT_SHA=$(echo $GITHUB_SHA | cut -c1-7) + echo "VERSION_LABEL=dev-$SHORT_SHA" >> "$GITHUB_OUTPUT" + + outputs: + VERSION_LABEL: ${{ steps.get_sha.outputs.VERSION_LABEL }} + + build: + needs: + - extract-dev-tag + uses: ./.github/workflows/docker-build.yaml + with: + VERSION_LABEL: ${{ needs.extract-dev-tag.outputs.VERSION_LABEL }} + ENVIRONMENT: dev diff --git a/.github/workflows/docker-build.yaml b/.github/workflows/docker-build.yaml index a12ea71..5ca59ad 100644 --- a/.github/workflows/docker-build.yaml +++ b/.github/workflows/docker-build.yaml @@ -3,88 +3,130 @@ name: docker-build-and-push on: workflow_call: inputs: - VERSION_TAG: + VERSION_LABEL: required: true type: string - IS_LATEST: - required: false - type: boolean - default: false - IS_PRERELEASE: - required: false - type: boolean - default: false + ENVIRONMENT: + required: true + type: string + description: "dev | preprod | prod" +env: + IMAGE: ghcr.io/${{ github.repository }} permissions: contents: read packages: write - jobs: - build-and-push: - runs-on: ubuntu-latest - + build: strategy: + fail-fast: false matrix: - platform: [ linux/amd64, linux/arm/v7, linux/arm64 ] include: - platform: linux/amd64 - base_image: "python:3.12" - - platform: linux/arm/v7 - base_image: "ghcr.io/pskillen/meshtastic-bot-armv7-base:py3.12" + runner: ubuntu-latest + artifact: linux-amd64 - platform: linux/arm64 - base_image: "arm64v8/python:3.12" + runner: ubuntu-24.04-arm + artifact: linux-arm64 + runs-on: ${{ matrix.runner }} steps: - name: Checkout Repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v4 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + uses: docker/setup-buildx-action@v4 - - name: Log in to GitHub Container Registry - run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin + - name: Build and push by digest + id: build + uses: docker/build-push-action@v6 + with: + context: . + platforms: ${{ matrix.platform }} + outputs: type=image,name=${{ env.IMAGE }},push-by-digest=true,name-canonical=true,push=true + build-args: | + VERSION=${{ inputs.VERSION_LABEL }} + cache-from: type=gha,scope=${{ github.ref_name }}-meshtastic-bot + cache-to: type=gha,mode=max,scope=${{ github.ref_name }}-meshtastic-bot - - name: Setup vars + - name: Export digest run: | - PLATFORM_TAG=$(echo "${{ matrix.platform }}" | sed 's|/|-|g') - echo "PLATFORM_TAG=$PLATFORM_TAG" >> $GITHUB_ENV + mkdir -p ${{ runner.temp }}/digests + echo "${{ steps.build.outputs.digest }}" | sed 's/^sha256://' | xargs -I{} touch "${{ runner.temp }}/digests/{}" - - name: Build and Push Docker Image - run: | - docker buildx build \ - --platform ${{ matrix.platform }} \ - --build-arg BASE_IMAGE=${{ matrix.base_image }} \ - -t ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }}-${{ env.PLATFORM_TAG }} \ - --push . + - name: Upload digest artifact + uses: actions/upload-artifact@v4 + with: + name: digests-${{ matrix.artifact }} + path: ${{ runner.temp }}/digests/* + retention-days: 1 - create-manifest: + merge: + needs: build runs-on: ubuntu-latest - needs: - - build-and-push steps: - name: Log in to GitHub Container Registry - run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin + uses: docker/login-action@v4 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} - - name: Create and Push Multi-Arch Manifest - run: | - docker buildx imagetools create \ - -t ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }} \ - ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }}-linux-amd64 \ - ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }}-linux-arm-v7 \ - ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }}-linux-arm64 - - - name: Tag as Latest or RC - if: ${{ inputs.IS_LATEST == true }} + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v4 + + - name: Download digest artifacts + uses: actions/download-artifact@v4 + with: + path: ${{ runner.temp }}/digests + pattern: digests-* + merge-multiple: true + + - name: Create and push multi-arch manifest + working-directory: ${{ runner.temp }}/digests run: | - if [[ "${{ inputs.IS_PRERELEASE }}" == "true" ]]; then + DIGEST_REFS=$(printf "${{ env.IMAGE }}@sha256:%s " *) + if [ "${{ inputs.ENVIRONMENT }}" = "dev" ]; then + docker buildx imagetools create -t ${{ env.IMAGE }}:latest-dev ${DIGEST_REFS} + elif [ "${{ inputs.ENVIRONMENT }}" = "preprod" ]; then docker buildx imagetools create \ - -t ghcr.io/${{ github.repository }}:latest-rc \ - ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }} + -t ${{ env.IMAGE }}:${{ inputs.VERSION_LABEL }} \ + -t ${{ env.IMAGE }}:latest-rc \ + ${DIGEST_REFS} else + VERSION="${{ inputs.VERSION_LABEL }}" + MAJOR=$(echo "${VERSION}" | cut -d. -f1) + MINOR=$(echo "${VERSION}" | cut -d. -f2) docker buildx imagetools create \ - -t ghcr.io/${{ github.repository }}:latest \ - ghcr.io/${{ github.repository }}:${{ inputs.VERSION_TAG }} + -t ${{ env.IMAGE }}:${VERSION} \ + -t ${{ env.IMAGE }}:latest \ + -t ${{ env.IMAGE }}:${MAJOR} \ + -t ${{ env.IMAGE }}:${MAJOR}.${MINOR} \ + ${DIGEST_REFS} fi + + cleanup: + runs-on: ubuntu-latest + needs: + - build + - merge + if: always() && !cancelled() && needs.merge.result == 'success' + steps: + - name: Delete untagged images + uses: Chizkiyahu/delete-untagged-ghcr-action@v6.1.0 + with: + token: ${{ secrets.GITHUB_TOKEN }} + repository_owner: ${{ github.repository_owner }} + package_name: meshtastic-bot + owner_type: user + untagged_only: true diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..062f10f --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,34 @@ +name: Release main + +on: + push: + branches: + - main + +permissions: + contents: read + packages: write + +jobs: + extract-dev-tag: + runs-on: ubuntu-latest + steps: + - name: Checkout Repository + uses: actions/checkout@v6 + + - name: Get short SHA + id: get_sha + run: | + SHORT_SHA=$(echo $GITHUB_SHA | cut -c1-7) + echo "VERSION_LABEL=main-$SHORT_SHA" >> "$GITHUB_OUTPUT" + + outputs: + VERSION_LABEL: ${{ steps.get_sha.outputs.VERSION_LABEL }} + + build: + needs: + - extract-dev-tag + uses: ./.github/workflows/docker-build.yaml + with: + VERSION_LABEL: ${{ needs.extract-dev-tag.outputs.VERSION_LABEL }} + ENVIRONMENT: dev diff --git a/.github/workflows/manual-release.yaml b/.github/workflows/manual-release.yaml deleted file mode 100644 index d61044d..0000000 --- a/.github/workflows/manual-release.yaml +++ /dev/null @@ -1,29 +0,0 @@ -name: Manual release - -on: - workflow_dispatch: - inputs: - VERSION_TAG: - required: true - type: string - IS_LATEST: - required: true - type: boolean - default: false - IS_PRERELEASE: - required: true - type: boolean - default: false - -jobs: - unit-tests: - uses: ./.github/workflows/unit-tests.yaml - - build-and-push: - needs: - - unit-tests - uses: ./.github/workflows/docker-build.yaml - with: - VERSION_TAG: ${{ inputs.VERSION_TAG }} - IS_PRERELEASE: ${{ inputs.IS_PRERELEASE }} - IS_LATEST: ${{ inputs.IS_LATEST }} diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index c0aeaf3..8ec7353 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -3,6 +3,10 @@ name: pull request on: pull_request: +concurrency: + group: pr-${{ github.event.pull_request.number }} + cancel-in-progress: true + jobs: UnitTest: uses: ./.github/workflows/unit-tests.yaml diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 1617a6f..78a702b 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -2,7 +2,7 @@ name: Release on: release: - types: [ published ] + types: [published] jobs: unit-tests: @@ -13,13 +13,11 @@ jobs: steps: - name: Extract Release Tag id: get_tag - run: | - echo "VERSION_TAG=${GITHUB_REF#refs/tags/}" >> "$GITHUB_OUTPUT" - echo "IS_PRERELEASE=${{ github.event.release.prerelease }}" >> "$GITHUB_OUTPUT" + run: echo "VERSION_LABEL=${{ github.event.release.tag_name }}" >> "$GITHUB_OUTPUT" outputs: - VERSION_TAG: ${{ steps.get_tag.outputs.VERSION_TAG }} - IS_PRERELEASE: ${{ steps.get_tag.outputs.IS_PRERELEASE }} + VERSION_LABEL: ${{ steps.get_tag.outputs.VERSION_LABEL }} + IS_PRERELEASE: ${{ github.event.release.prerelease }} build-and-push: needs: @@ -27,6 +25,5 @@ jobs: - extract-tag uses: ./.github/workflows/docker-build.yaml with: - VERSION_TAG: ${{ needs.extract-tag.outputs.VERSION_TAG }} - IS_PRERELEASE: ${{ fromJson(needs.extract-tag.outputs.IS_PRERELEASE) }} # must be boolean - IS_LATEST: true + VERSION_LABEL: ${{ needs.extract-tag.outputs.VERSION_LABEL }} + ENVIRONMENT: ${{ needs.extract-tag.outputs.IS_PRERELEASE == 'true' && 'preprod' || 'prod' }} diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index ce5eca1..6659fbf 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -16,14 +16,14 @@ jobs: strategy: matrix: - python-version: [ "3.10", "3.11", "3.12", "3.13" ] + python-version: [ "3.12", "3.13", "3.14" ] steps: - name: Checkout Repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} @@ -36,7 +36,7 @@ jobs: pytest test/ --doctest-modules --junitxml=reports/test-results-${{ matrix.python-version }}.xml - name: Test Report - uses: dorny/test-reporter@v1 + uses: dorny/test-reporter@v2 with: name: pytest-results-${{ matrix.python-version }} path: reports/test-results-${{ matrix.python-version }}.xml diff --git a/.gitignore b/.gitignore index 20ee143..db8f5e0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,11 @@ nodes.json all_state.json data/ docker-data/ +tmp/ +temp/ + +# OS files +.DS_Store ### Python template # Byte-compiled / optimized / DLL files diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f6c8b54 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,13 @@ +{ + "configurations": [ + { + "name": "Meshtastic Bot", + "type": "debugpy", + "request": "launch", + "module": "src.main", + "python": "venv/bin/python", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..4f5a722 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,105 @@ +# Meshtastic Bot – Agent Context + +Python bot for interacting with Meshtastic devices. Connects to a Meshtastic node over TCP, listens for messages, processes commands, and reports packets to the Meshflow API. Part of the Meshflow system alongside meshflow-api and meshtastic-bot-ui. + +## Project Structure + +``` +src/ +├── main.py # Entry point, env config, bot setup +├── bot.py # MeshtasticBot: pubsub handlers, connection, commands +├── tcp_interface.py # AutoReconnectTcpInterface (Meshtastic TCP connection) +├── ws_client.py # MeshflowWSClient – receives commands from API (e.g. traceroute) +├── traceroute.py # Traceroute command (triggered via WebSocket) +├── data_classes.py # MeshNode, packet data structures +├── helpers.py # pretty_print_last_heard, safe_encode_node_name, etc. +├── base_feature.py # AbstractBaseFeature – reply_in_channel, message_in_dm, etc. +├── commands/ # Text commands (!help, !nodes, !ping, etc.) +│ ├── factory.py # CommandFactory – registers commands +│ ├── command.py # AbstractCommand base class +│ ├── help.py, hello.py, nodes.py, ping.py, prefs.py, admin.py, template.py +│ └── enroll.py # (commented out) +├── responders/ # Non-command message handlers +│ ├── responder_factory.py # ResponderFactory +│ ├── responder.py # AbstractResponder base class +│ └── message_reaction_responder.py +├── api/ # Meshflow API integration +│ ├── StorageAPI.py # StorageAPIWrapper – packet ingestion, node sync +│ ├── BaseAPIWrapper.py # Base HTTP client +│ └── serializers.py # MeshNodeSerializer +└── persistence/ # Local storage + ├── node_db.py # AbstractNodeDB, SqliteNodeDB + ├── node_info.py # AbstractNodeInfoStore, InMemoryNodeInfoStore + ├── commands_logger.py # AbstractCommandLogger, SqliteCommandLogger + ├── user_prefs.py # AbstractUserPrefsPersistence, SqliteUserPrefsPersistence + └── packet_dump.py # Packet dump utilities + +test/ # pytest unit tests +deploy/ # Deployment scripts (Raspberry Pi, Docker) +``` + +## Key Concepts + +- **MeshtasticBot**: Central class. Subscribes to pubsub (`meshtastic.receive`, `meshtastic.receive.text`, `meshtastic.node.updated`, `meshtastic.connection.established`). Owns interface, node_db, node_info, storage_apis, ws_client. +- **Commands**: Text messages starting with `!` (e.g. `!help`, `!nodes`). Registered in `CommandFactory`; extend `AbstractCommand`. +- **Responders**: Handle public channel messages without `!` prefix. Extend `AbstractResponder`. +- **StorageAPIWrapper**: Reports raw packets and node data to Meshflow API. Supports v1 and v2 API paths. Uses `STORAGE_API_*` or `STORAGE_API_2_*` env vars. +- **MeshflowWSClient**: Connects to `ws/nodes/?api_key=...` to receive remote commands (e.g. traceroute). Started after connection; uses same token as storage API. + +## API Integration + +- **Packet ingestion**: `StorageAPIWrapper` posts to `/api/packets/{my_nodenum}/ingest/` (v2) or `/api/raw-packet/` (v1). +- **Node sync**: `StorageAPIWrapper` fetches nodes from API for reconciliation. +- **WebSocket**: `MeshflowWSClient` connects to Meshflow API; receives JSON commands (e.g. `{"type": "traceroute", "target_node_id": 123}`). Invokes `on_traceroute_command` on the bot. + +## Development + +```bash +# activate venv +source venv/bin/activate + +pip install -r requirements.txt +# Copy .env.example to .env and configure +python main.py +# or: python -m src.main (from project root) +``` + +## Testing + +- **Unit tests**: `pytest test/ --doctest-modules` +- Tests live under `test/` (commands, persistence, responders, etc.) +- CI runs on Python 3.12, 3.13, 3.14 + +## Tech Stack + +- Python 3.12+ +- meshtastic (protobuf, TCP interface) +- Pypubsub (pub/sub events) +- requests (HTTP to Meshflow API) +- websockets (MeshflowWSClient) +- schedule (periodic tasks) +- pytest + +## Configuration + +Environment variables (see `.env.example`): + +- `MESHTASTIC_IP` – Meshtastic node IP (TCP connection) +- `ADMIN_NODES` – Comma-separated node IDs (e.g. `!aae8900d`) for admin commands +- `STORAGE_API_ROOT`, `STORAGE_API_TOKEN`, `STORAGE_API_VERSION` – Primary Meshflow API +- `STORAGE_API_2_*` – Optional second API +- `MESHFLOW_WS_URL` – WebSocket URL (optional; derived from storage API if unset) +- `DATA_DIR` – Data directory (default `data/`) + +## Conventions + +- Commands: Add class to `src/commands/`, register in `CommandFactory.commands`. +- Responders: Add class to `src/responders/`, register in `ResponderFactory`. +- Use `reply_in_channel` / `reply_in_dm` from `AbstractBaseFeature`; avoid deprecated `reply` / `reply_to`. +- Node IDs: hex string format (e.g. `!12345678`). `my_nodenum` is decimal. +- Data persisted in `data/` (node_info.json, SQLite DBs, failed_packets). + +## Source control + +When asked to create a pull request description, follow the template at +.github/pull_request_template.md, and output a markdown file named `tmp/PR.md` diff --git a/Dockerfile b/Dockerfile index 6329ff0..8d4832b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,17 +1,18 @@ -ARG BASE_IMAGE=python:3.12 -FROM ${BASE_IMAGE} +FROM python:3.14 + +ARG VERSION=development +ENV APP_VERSION=${VERSION} WORKDIR /app # Copy only the requirements file first to leverage Docker layer caching COPY requirements.txt . -# Install dependencies -RUN pip install --no-cache-dir -r requirements.txt +# Install dependencies (cache mount speeds up repeat builds) +RUN --mount=type=cache,target=/root/.cache/pip \ + pip install -r requirements.txt # Copy the rest of the application files COPY . . -RUN pip install -r requirements.txt - CMD ["python", "-m", "src.main"] diff --git a/README.md b/README.md index b175203..00359c8 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,8 @@ You can enable or disable specific features and commands using environment varia ### Feature Toggles - `ENABLE_TCP_PROXY`: Set to `false` to disable the internal TCP proxy. The bot will connect directly to `MESHTASTIC_IP`. +- `PROXY_HANDSHAKE_CACHE_SIZE`: Number of initial packets to cache for connecting proxy clients (default `100`). +- `PROXY_ROLLING_CACHE_SIZE`: Number of recent packets to cache in a rolling queue for connecting proxy clients (default `100`). ### Command Toggles Set any of the following to `false` to disable the command and hide it from the `!help` menu: diff --git a/docker-compose-remote.yaml b/docker-compose-remote.yaml index 3ce8378..d20ae94 100644 --- a/docker-compose-remote.yaml +++ b/docker-compose-remote.yaml @@ -4,6 +4,8 @@ services: image: ghcr.io/pskillen/meshtastic-bot:latest container_name: meshtastic-bot restart: unless-stopped + ports: + - "4403:4403" env_file: - meshtastic-bot.env volumes: diff --git a/docker-compose.yaml b/docker-compose.yaml index c27965c..91b1ca6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -13,6 +13,9 @@ services: - STORAGE_API_TOKEN=${STORAGE_API_TOKEN} - STORAGE_API_VERSION=${STORAGE_API_VERSION} - ENABLE_TCP_PROXY=${ENABLE_TCP_PROXY:-true} + - ENABLE_FEATURE_NODE_TOTALS=${ENABLE_FEATURE_NODE_TOTALS:-true} + - FREQUENCY_OF_NODE_REPORTS=${FREQUENCY_OF_NODE_REPORTS:-3} + - CHANNEL_FOR_NODE_TOTAL_BROADCAST=${CHANNEL_FOR_NODE_TOTAL_BROADCAST:-2} - ENABLE_COMMAND_PING=${ENABLE_COMMAND_PING:-true} - ENABLE_COMMAND_TR=${ENABLE_COMMAND_TR:-true} - ENABLE_COMMAND_HELLO=${ENABLE_COMMAND_HELLO:-true} @@ -22,6 +25,8 @@ services: - ENABLE_COMMAND_PREFS=${ENABLE_COMMAND_PREFS:-true} - ENABLE_COMMAND_ADMIN=${ENABLE_COMMAND_ADMIN:-true} - ENABLE_COMMAND_STATUS=${ENABLE_COMMAND_STATUS:-true} + labels: + - "com.centurylinklabs.watchtower.enable=false" watchtower: image: containrrr/watchtower @@ -29,4 +34,6 @@ services: restart: unless-stopped volumes: - /var/run/docker.sock:/var/run/docker.sock - command: --interval 3600 meshtastic-bot # Check for updates every hour + environment: + - DOCKER_API_VERSION=1.44 + command: --interval 3600 --label-enable diff --git a/requirements.txt b/requirements.txt index c10f372..88ced90 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # dependencies meshtastic>=2.5.0,<3.0.0 +websockets>=14.0 Pypubsub~=4.0.3 jinja2~=3.1.6 schedule~=1.2.2 diff --git a/src/api/StorageAPI.py b/src/api/StorageAPI.py index cda35d2..c05d2d6 100644 --- a/src/api/StorageAPI.py +++ b/src/api/StorageAPI.py @@ -29,6 +29,7 @@ def _get_url(self, path: str, args: dict = None): if args is None: args = {} + my_nodenum = self.bot.my_nodenum if self.api_version == 1: api_paths = { 'raw_packet': '/api/raw-packet/', @@ -36,7 +37,6 @@ def _get_url(self, path: str, args: dict = None): 'node_by_id': f'/api/nodes/{args.get("node_id", "")}', } else: - my_nodenum = self.bot.my_nodenum api_paths = { 'raw_packet': f'/api/packets/{my_nodenum}/ingest/', 'nodes': f'/api/packets/{my_nodenum}/nodes/', @@ -65,14 +65,28 @@ def store_raw_packet(self, packet: dict): """ Store a raw packet in the storage API """ + if self.api_version == 2 and (self.bot.my_nodenum is None or self.bot.my_nodenum <= 0): + logging.debug("Skipping store_raw_packet: Bot node number not yet initialized.") + return + + logging.info(f"store_raw_packet called for portnum: {packet.get('decoded', {}).get('portnum')}") # Filter out packet types that the API doesn't support or we don't want to store - ignored_ports = [345, 'ROUTING_APP', 'TRACEROUTE_APP', 'ADMIN_APP', 'NEIGHBORINFO_APP'] + ignored_ports = [345, 'TRACEROUTE_APP', 'ADMIN_APP', 'NEIGHBORINFO_APP', 'ROUTING_APP'] portnum = packet.get('decoded', {}).get('portnum') if portnum in ignored_ports: return # Additional filtering for Telemetry packets to avoid API errors # The API requires either 'deviceMetrics' or 'localStats' + if portnum == 'ROUTING_APP': + from_id = packet.get('from') + logging.info(f"DEBUG: ROUTING_APP Packet from {from_id}: {packet}") + + # Log all text messages + if portnum == 'TEXT_MESSAGE_APP': + from_id = packet.get('from') + logging.info(f"DEBUG: TEXT_MESSAGE_APP Packet from {from_id}: {packet}") + if portnum == 'TELEMETRY_APP': telemetry = packet.get('decoded', {}).get('telemetry', {}) if 'deviceMetrics' not in telemetry and 'localStats' not in telemetry: @@ -88,31 +102,44 @@ def store_raw_packet(self, packet: dict): if raw_packet: if 'channel' not in packet: packet['channel'] = raw_packet.channel + if 'id' not in packet: + packet['id'] = raw_packet.id + if 'from' not in packet: + packet['from'] = raw_packet.from_node - logging.debug(f"Storing packet: {packet}") + logging.info(f"Storing packet: {packet}") try: response = self._post(self._get_url('raw_packet'), json=packet) + + try: + response_json = response.json() + logging.info(f"API Response ({response.status_code}): {response_json}") + return response_json + except JSONDecodeError: + logging.info(f"API Response ({response.status_code}, not JSON): {response.text}") + return {'text': response.text} + except HTTPError as ex: - logging.error(f"Error storing packet: {ex.response.text}") + logging.error(f"HTTP error storing packet: {ex.response.text}") logging.error(f"Packet: {packet}") - - # Dump the packet to a .json file if self.failed_packets_dir: self._dump_failed_packet(packet, ex) return - try: - response_json = response.json() - logging.debug(f"Response: {response_json}") - return response_json - except JSONDecodeError: - logging.debug(f"Response (not JSON): {response.text}") - return {'text': response.text} + except Exception as ex: + logging.error(f"Error storing packet: {ex}") + logging.error(f"Packet: {packet}") + if self.failed_packets_dir: + self._dump_failed_packet(packet, ex) + return def list_nodes(self) -> list[MeshNode]: """ Get a list of all nodes stored in the storage API. This list generally does not include position or metrics data. """ + if self.api_version == 2 and (self.bot.my_nodenum is None or self.bot.my_nodenum <= 0): + return [] + response = self._get(self._get_url('nodes')) response_json = response.json() @@ -124,6 +151,9 @@ def store_node(self, node: MeshNode): If the node contains position or metrics data, it will be stored as well """ + if self.api_version == 2 and (self.bot.my_nodenum is None or self.bot.my_nodenum <= 0): + logging.debug("Skipping store_node: Bot node number not yet initialized.") + return node_data = MeshNodeSerializer.to_api_dict(node) diff --git a/src/api/serializers.py b/src/api/serializers.py index a5e8426..7476704 100644 --- a/src/api/serializers.py +++ b/src/api/serializers.py @@ -27,22 +27,25 @@ class PositionSerializer(AbstractModelSerializer): def to_api_dict(cls, position: MeshNode.Position) -> dict: return { "logged_time": cls.date_to_api(position.logged_time), # api v1 compatibility + "loggedTime": cls.date_to_api(position.logged_time), "reported_time": cls.date_to_api(position.reported_time), # api v2 compatibility + "reportedTime": cls.date_to_api(position.reported_time), "latitude": position.latitude, "longitude": position.longitude, "altitude": position.altitude, "location_source": position.location_source or "LOC_UNKNOWN", + "locationSource": position.location_source or "LOC_UNKNOWN", } @classmethod def from_api_dict(cls, position_data: dict) -> MeshNode.Position: return MeshNode.Position( - logged_time=cls.date_from_api(position_data['logged_time']), - reported_time=cls.date_from_api(position_data['reported_time']), + logged_time=cls.date_from_api(position_data.get('logged_time') or position_data.get('loggedTime')), + reported_time=cls.date_from_api(position_data.get('reported_time') or position_data.get('reportedTime')), latitude=position_data['latitude'], longitude=position_data['longitude'], altitude=position_data['altitude'], - location_source=position_data['location_source'] + location_source=position_data.get('location_source') or position_data.get('locationSource') ) @@ -51,23 +54,29 @@ class DeviceMetricsSerializer(AbstractModelSerializer): def to_api_dict(cls, device_metrics: MeshNode.DeviceMetrics) -> dict: return { "logged_time": cls.date_to_api(device_metrics.logged_time), # api v1 compatibility + "loggedTime": cls.date_to_api(device_metrics.logged_time), "reported_time": cls.date_to_api(device_metrics.logged_time), # api v2 compatibility + "reportedTime": cls.date_to_api(device_metrics.logged_time), "battery_level": device_metrics.battery_level, + "batteryLevel": device_metrics.battery_level, "voltage": device_metrics.voltage, "channel_utilization": device_metrics.channel_utilization, + "channelUtilization": device_metrics.channel_utilization, "air_util_tx": device_metrics.air_util_tx, - "uptime_seconds": device_metrics.uptime_seconds + "airUtilTx": device_metrics.air_util_tx, + "uptime_seconds": device_metrics.uptime_seconds, + "uptimeSeconds": device_metrics.uptime_seconds } @classmethod def from_api_dict(cls, device_metrics_data: dict) -> MeshNode.DeviceMetrics: return MeshNode.DeviceMetrics( - logged_time=cls.date_from_api(device_metrics_data['logged_time']), - battery_level=device_metrics_data['battery_level'], + logged_time=cls.date_from_api(device_metrics_data.get('logged_time') or device_metrics_data.get('loggedTime') or device_metrics_data.get('reported_time') or device_metrics_data.get('reportedTime')), + battery_level=device_metrics_data.get('battery_level') or device_metrics_data.get('batteryLevel'), voltage=device_metrics_data['voltage'], - channel_utilization=device_metrics_data['channel_utilization'], - air_util_tx=device_metrics_data['air_util_tx'], - uptime_seconds=device_metrics_data['uptime_seconds'] + channel_utilization=device_metrics_data.get('channel_utilization') or device_metrics_data.get('channelUtilization'), + air_util_tx=device_metrics_data.get('air_util_tx') or device_metrics_data.get('airUtilTx'), + uptime_seconds=device_metrics_data.get('uptime_seconds') or device_metrics_data.get('uptimeSeconds') ) @@ -80,10 +89,14 @@ def to_api_dict(cls, node: MeshNode) -> dict: "id": node.user.id, "macaddr": node.user.macaddr, "hw_model": node.user.hw_model, + "hwModel": node.user.hw_model, "public_key": node.user.public_key, + "publicKey": node.user.public_key, 'user': { "long_name": node.user.long_name, - "short_name": node.user.short_name + "longName": node.user.long_name, + "short_name": node.user.short_name, + "shortName": node.user.short_name } } @@ -94,6 +107,7 @@ def to_api_dict(cls, node: MeshNode) -> dict: if node.device_metrics: node_data['device_metrics'] = DeviceMetricsSerializer.to_api_dict(node.device_metrics) + node_data['deviceMetrics'] = DeviceMetricsSerializer.to_api_dict(node.device_metrics) return node_data @@ -103,10 +117,10 @@ def from_api_dict(cls, node_data: dict) -> MeshNode: user = MeshNode.User( node_id=node_data['id'], macaddr=node_data['macaddr'], - hw_model=node_data['hw_model'], - public_key=node_data['public_key'], - long_name=user_data['long_name'], - short_name=user_data['short_name'] + hw_model=node_data.get('hw_model') or node_data.get('hwModel'), + public_key=node_data.get('public_key') or node_data.get('publicKey'), + long_name=user_data.get('long_name') or user_data.get('longName'), + short_name=user_data.get('short_name') or user_data.get('shortName') ) position_data = node_data.get('position') @@ -114,7 +128,7 @@ def from_api_dict(cls, node_data: dict) -> MeshNode: if position_data: position = PositionSerializer.from_api_dict(position_data) - device_metrics_data = node_data.get('device_metrics') + device_metrics_data = node_data.get('device_metrics') or node_data.get('deviceMetrics') device_metrics = None if device_metrics_data: device_metrics = DeviceMetricsSerializer.from_api_dict(device_metrics_data) diff --git a/src/base_feature.py b/src/base_feature.py index fa12783..29adeb6 100644 --- a/src/base_feature.py +++ b/src/base_feature.py @@ -1,4 +1,5 @@ import logging +import os import time from abc import ABC @@ -6,6 +7,14 @@ from src.bot import MeshtasticBot +TEXT_MESSAGE_MAX_HOPS = int(os.getenv("TEXT_MESSAGE_MAX_HOPS", "5")) +if TEXT_MESSAGE_MAX_HOPS < 1: + logging.warning("TEXT_MESSAGE_MAX_HOPS is less than 1, capping at 1.") + TEXT_MESSAGE_MAX_HOPS = 1 +elif TEXT_MESSAGE_MAX_HOPS > 7: + logging.warning("TEXT_MESSAGE_MAX_HOPS is greater than the Meshtastic limit of 7. Capping at 7.") + TEXT_MESSAGE_MAX_HOPS = 7 + class AbstractBaseFeature(ABC): """ @@ -28,7 +37,9 @@ def message_in_channel(self, channel: int, message: str, want_ack=False) -> None Send a message in a channel """ logging.debug(f"Sending message: '{message}'") - self.bot.interface.sendText(message, channelIndex=channel, wantAck=want_ack) + self.bot.interface.sendText( + message, channelIndex=channel, wantAck=want_ack, hopLimit=TEXT_MESSAGE_MAX_HOPS + ) def reply_in_dm(self, packet: MeshPacket, message: str, want_ack=True) -> None: """ @@ -43,7 +54,12 @@ def message_in_dm(self, destination_id: str, message: str, want_ack=True) -> Non """ logging.debug(f"Sending DM: '{message}'") time.sleep(1) # Wait a second to let the radio settle - self.bot.interface.sendText(message, destinationId=destination_id, wantAck=want_ack) + self.bot.interface.sendText( + message, + destinationId=destination_id, + wantAck=want_ack, + hopLimit=TEXT_MESSAGE_MAX_HOPS, + ) def react_in_channel(self, packet: MeshPacket, emoji: str) -> None: """ diff --git a/src/bot.py b/src/bot.py index a8c6e3d..f3ab458 100644 --- a/src/bot.py +++ b/src/bot.py @@ -11,8 +11,12 @@ from src.api.StorageAPI import StorageAPIWrapper from src.commands.factory import CommandFactory +try: + from src.traceroute import on_traceroute_command +except ImportError: + on_traceroute_command = None from src.data_classes import MeshNode -from src.helpers import pretty_print_last_heard, safe_encode_node_name +from src.helpers import pretty_print_last_heard, safe_encode_node_name, get_env_bool, get_env_int from src.persistence.commands_logger import AbstractCommandLogger from src.persistence.node_db import AbstractNodeDB from src.persistence.node_info import AbstractNodeInfoStore @@ -24,6 +28,7 @@ class MeshtasticBot: admin_nodes: list[str] + ignore_portnums: frozenset # Portnums to skip when submitting to API (from IGNORE_PORTNUMS env) interface: SupportsMessageReactionInterface init_complete: bool @@ -37,6 +42,7 @@ class MeshtasticBot: user_prefs_persistence: AbstractUserPrefsPersistence storage_apis: list[StorageAPIWrapper] + ws_client: object | None # MeshflowWSClient when configured def __init__(self, address: str): self.address = address @@ -44,6 +50,7 @@ def __init__(self, address: str): self.proxy = None self.admin_nodes = [] + self.ignore_portnums = frozenset() self.interface = None self.init_complete = False @@ -55,6 +62,7 @@ def __init__(self, address: str): self.command_logger = None self.user_prefs_persistence = None self.storage_apis = [] + self.ws_client = None self.pending_traces = {} self.last_report_zero = False @@ -112,20 +120,34 @@ def disconnect(self): except OSError as ex: logging.warning(f"Failed to close connection. Continuing anyway: {ex}") + def on_traceroute_command(self, target_node_id: int): + """Handle traceroute command from WebSocket (e.g. from Meshflow API).""" + if on_traceroute_command: + on_traceroute_command(self, target_node_id) + else: + logging.warning("Traceroute handling via WebSocket is not available (import failed).") + def on_connection(self, interface, topic=pub.AUTO_TOPIC): self.my_nodenum = interface.localNode.nodeNum # in dec - self.my_id = f"!{hex(self.my_nodenum)[2:]}" + self.my_id = f"!{self.my_nodenum:08x}" self.init_complete = True - logging.info('Connected to Meshtastic node') + logging.info(f'Connected to Meshtastic node as {self.my_id}') self.print_nodes() # Send an immediate node count report upon connection # We use a timer to delay slightly to ensure everything settles - threading.Timer(10.0, self.report_node_count).start() + if get_env_bool('ENABLE_FEATURE_NODE_TOTALS', True): + threading.Timer(10.0, self.report_node_count).start() + + if self.ws_client: + self.ws_client.start() def on_receive_text(self, packet: MeshPacket, interface): """Callback function triggered when a text message is received.""" + from_id = packet.get('fromId') + text = packet.get('decoded', {}).get('text', '') + logging.info(f"on_receive_text: Incoming text from {from_id}: {text}") to_id = packet['toId'] @@ -140,17 +162,23 @@ def handle_private_message(self, packet: MeshPacket): from_id = packet['fromId'] sender = self.node_db.get_by_id(from_id) - logging.info(f"Received private message: '{message}' from {sender.long_name if sender else from_id}") + logging.info(f"✉️ [PRIVATE MSG] '{message}' from {sender.long_name if sender else from_id}") words = message.split() command_name = words[0] command_instance = CommandFactory.create_command(command_name, self) if command_instance: self.command_logger.log_command(from_id, command_instance, message) - try: - command_instance.handle_packet(packet) - except Exception as e: - logging.error(f"Error handling message: {e}") + + def run_command(): + try: + logging.info(f"🤖 [BOT CMD] Running private command {command_name} in thread for {from_id}") + command_instance.handle_packet(packet) + logging.info(f"✅ [BOT CMD] Finished private command {command_name} for {from_id}") + except Exception as e: + logging.error(f"❌ [BOT CMD] Error handling private command {command_name}: {e}", exc_info=True) + + threading.Thread(target=run_command, daemon=True).start() else: self.command_logger.log_unknown_request(from_id, message) @@ -174,26 +202,29 @@ def handle_public_message(self, packet: MeshPacket): sender_name = sender.long_name if sender else from_id channel_name = self.get_channel_name(packet) - logging.info(f"Received group message on channel '{channel_name}' from {sender_name}: {message}") + logging.info(f"📢 [GROUP MSG] Channel '{channel_name}' from {sender_name}: {message}") # Allow certain commands in public channels words = message.split() if words: command_name = words[0].lower() if command_name in ["!tr", "!ping", "!hello", "!nodes", "!status", "!whoami"]: - from src.helpers import get_env_bool env_var_name = f"ENABLE_COMMAND_{command_name.lstrip('!').upper()}" if get_env_bool(env_var_name, True): - logging.info(f"Received public {command_name} from {sender_name}") - from src.commands.factory import CommandFactory + logging.info(f"🤖 [BOT CMD] Received public {command_name} from {sender_name}") command_instance = CommandFactory.create_command(command_name, self) if command_instance: - try: - # Commands by default reply via DM (reply_in_dm). - command_instance.handle_packet(packet) - return # Stop processing responders - except Exception as e: - logging.error(f"Error handling public command {command_name}: {e}") + def run_command(): + try: + logging.info(f"🤖 [BOT CMD] Running public command {command_name} in thread for {from_id}") + # Commands by default reply via DM (reply_in_dm). + command_instance.handle_packet(packet) + logging.info(f"✅ [BOT CMD] Finished public command {command_name} for {from_id}") + except Exception as e: + logging.error(f"❌ [BOT CMD] Error handling public command {command_name}: {e}", exc_info=True) + + threading.Thread(target=run_command, daemon=True).start() + return # Stop processing responders responder = ResponderFactory.match_responder(message, self) if responder: @@ -202,130 +233,154 @@ def handle_public_message(self, packet: MeshPacket): if outcome: logging.info( - f"Handled message from {sender.long_name if sender else from_id} with responder {responder.__class__.__name__}: {message}") + f"🤖 [RESPONDER] Handled message from {sender.long_name if sender else from_id} with responder {responder.__class__.__name__}: {message}") self.command_logger.log_responder_handled(from_id, responder, message) + except (KeyError, ValueError) as e: + logging.error(f"Packet format error handling message: {e}", exc_info=True) except Exception as e: - logging.error(f"Error handling message: {e}") + logging.error(f"Error handling message: {e}", exc_info=True) def on_traceroute(self, packet, route): """Callback for when a traceroute response is received.""" - target_id = packet.get('fromId') - - if target_id not in self.pending_traces: - logging.debug(f"Received traceroute from {target_id} but no pending request found.") - return - - requester_id = self.pending_traces.pop(target_id) + logging.info(f"on_traceroute: Received signal from {packet.get('fromId') if isinstance(packet, dict) else 'obj'}") - # Format the OUTBOUND route - route_ids = route.route - hops = [] - for node_id_int in route_ids: - # Convert int to !hex string - node_id_str = f"!{node_id_int:08x}" - node = self.node_db.get_by_id(node_id_str) - if node: - hops.append(f"{node.short_name}") - else: - hops.append(f"{node_id_str}") + def process_traceroute(): + try: + target_id = packet.get('fromId') + if target_id not in self.pending_traces: + return + + requesters = self.pending_traces.pop(target_id) + if not isinstance(requesters, list): + requesters = [requesters] + + if route is None: + for ctx in requesters: + r_id = ctx[0] if isinstance(ctx, tuple) else ctx + msg = f"Traceroute response received from {target_id}, but no route data was provided." + self.interface.sendText(msg, destinationId=r_id, wantAck=True) + return + + def get_route_hops(r, key='route'): + if isinstance(r, dict): + return r.get(key, []) + return getattr(r, key, []) + + # Format compact routes + target_node = self.node_db.get_by_id(target_id) + t_name = target_node.short_name if target_node else target_id[-4:] + + my_node = self.node_db.get_by_id(self.my_id) + m_name = my_node.short_name if my_node else self.my_id[-4:] + + # Outbound + route_ids = get_route_hops(route, 'route') + hops_to = [] + for nid in route_ids: + n = self.node_db.get_by_id(f"!{nid:08x}") + hops_to.append(n.short_name if n else f"{nid:08x}"[-4:]) + route_to_str = ">".join(hops_to) + (">" if hops_to else "") + t_name + + # Inbound + route_back_ids = get_route_hops(route, 'route_back') + hops_fr = [] + for nid in route_back_ids: + n = self.node_db.get_by_id(f"!{nid:08x}") + hops_fr.append(n.short_name if n else f"{nid:08x}"[-4:]) + route_fr_str = ">".join(hops_fr) + (">" if hops_fr else "") + m_name + + # Consolidate into a single message + combined_response = f"!tr {t_name}:\nTO({len(route_ids)}h): {route_to_str}\nFR({len(route_back_ids)}h): {route_fr_str}" + + # Longer wait for radio to settle + time.sleep(8) + + for ctx in requesters: + r_id, is_pub, to_id, c_idx = ctx if isinstance(ctx, tuple) else (ctx, False, ctx, 0) + dest_id = to_id if is_pub else r_id + self.interface.sendText(combined_response, destinationId=dest_id, channelIndex=c_idx, wantAck=True) + time.sleep(2) + except Exception as e: + logging.error(f"Error in on_traceroute thread: {e}", exc_info=True) - route_str = " -> ".join(hops) if hops else "Direct (or unknown)" - - response_out = f"Trace TO {target_id} ({len(hops)} hops):\n{route_str}" - logging.info(f"Sending traceroute OUT result to {requester_id}: {response_out}") - self.interface.sendText(response_out, destinationId=requester_id) - - # Format the INBOUND route (if available) - if hasattr(route, 'route_back') and route.route_back: - hops_back = [] - for node_id_int in route.route_back: - node_id_str = f"!{node_id_int:08x}" - node = self.node_db.get_by_id(node_id_str) - if node: - hops_back.append(f"{node.short_name}") - else: - hops_back.append(f"{node_id_str}") - back_str = " -> ".join(hops_back) - - response_in = f"Trace FROM {target_id} ({len(hops_back)} hops):\n{back_str}" - logging.info(f"Sending traceroute IN result to {requester_id}: {response_in}") - # Small delay to ensure order - time.sleep(1) - self.interface.sendText(response_in, destinationId=requester_id) + threading.Thread(target=process_traceroute, daemon=True).start() def on_receive(self, packet: MeshPacket, interface): - if packet.get('fromId') == '!69828b98': - logging.debug(f"Received ANY packet from mte4: {packet}") - # dump the packet to disk (if enabled) dump_packet(packet) - for storage_api in self.storage_apis: - try: - storage_api.store_raw_packet(packet) - except HTTPError as ex: - logging.warning(f"Error storing packet: {ex.response.text}") - pass - except Exception as ex: - logging.warning(f"Error storing packet in API: {ex}") - pass + portnum = packet.get("decoded", {}).get("portnum", "unknown") + # Ensure we check against both the string name and the integer ID if available + portnum_key = str(portnum).upper() + + has_decoded = 'decoded' in packet or 'decrypted' in packet + is_ignored = False + if self.ignore_portnums: + if portnum_key in self.ignore_portnums: + is_ignored = True + elif isinstance(portnum, int) and str(portnum) in self.ignore_portnums: + is_ignored = True + + if is_ignored: + logging.info(f"Skipping API submission for packet with portnum {portnum} (in IGNORE_PORTNUMS)") + elif not has_decoded: + pass # Skip API submission for packets with no decoded data + else: + for storage_api in self.storage_apis: + try: + storage_api.store_raw_packet(packet) + except HTTPError as ex: + logging.warning(f"Error storing packet: {ex.response.text}") + except Exception as ex: + logging.warning(f"Error storing packet in API: {ex}") sender = packet['fromId'] node = self.node_db.get_by_id(sender) if not node: - # logging.warning(f"Received packet from unknown sender {sender}") return if node: portnum = packet['decoded']['portnum'] if 'decoded' in packet else 'unknown' if sender == self.my_id and portnum == 'TELEMETRY_APP': - # Ignore telemetry packets sent by self pass else: - # Increment packets_today for this node self.node_info.node_packet_received(sender, portnum) - if sender == self.my_id: - recipient_id = packet['toId'] - recipient = self.node_db.get_by_id(recipient_id) - portnum = packet['decoded']['portnum'] - - logging.debug( - f"Received packet from self: {recipient.long_name if recipient else recipient_id} (port {portnum})") - def on_node_updated(self, node, interface): if interface.localNode and self.my_nodenum is None: self.my_nodenum = interface.localNode.nodeNum - self.my_id = f"!{hex(self.my_nodenum)[2:]}" + self.my_id = f"!{self.my_nodenum:08x}" - # Check if the node is a new user if node['user'] is not None: mesh_node = MeshNode.from_dict(node) last_heard_int = node.get('lastHeard', 0) - last_heard = datetime.fromtimestamp(last_heard_int, tz=timezone.utc) - self.node_db.store_node(mesh_node) - self.node_info.update_last_heard(mesh_node.user.id, last_heard) - - for storage_api in self.storage_apis: - try: - storage_api.store_node(mesh_node) - except HTTPError as ex: - logging.warning(f"Error storing node: {ex.response.text}") - pass - except Exception as ex: - logging.warning(f"Error storing node: {ex}") - pass - - if self.init_complete: - last_heard_str = pretty_print_last_heard(last_heard) + + if last_heard_int > 0: + last_heard = datetime.fromtimestamp(last_heard_int, tz=timezone.utc) + existing_last_heard = self.node_info.get_last_heard(mesh_node.user.id) + if not existing_last_heard or last_heard > existing_last_heard: + self.node_info.update_last_heard(mesh_node.user.id, last_heard) + + existing_user = self.node_db.get_by_id(mesh_node.user.id) + is_new = existing_user is None + + if is_new or existing_user != mesh_node.user: + self.node_db.store_node(mesh_node) + for storage_api in self.storage_apis: + try: + storage_api.store_node(mesh_node) + except Exception as ex: + logging.warning(f"Error storing node: {ex}") + + if self.init_complete and is_new: + current_last_heard = self.node_info.get_last_heard(mesh_node.user.id) + last_heard_str = pretty_print_last_heard(current_last_heard) if current_last_heard else "unknown" logging.info(f"New user: {mesh_node.user.long_name} (last heard {last_heard_str})") def print_nodes(self): - # filter nodes where last heard is more than 2 hours ago online_nodes = self.node_info.get_online_nodes() offline_nodes = self.node_info.get_offline_nodes() - # print all nodes, sorted by last heard descending logging.info(f"Online nodes: ({len(online_nodes)})") sorted_nodes = sorted(online_nodes, key=lambda x: online_nodes[x], reverse=True) for node_id in sorted_nodes: @@ -339,12 +394,13 @@ def print_nodes(self): logging.info(f"- Plus {len(offline_nodes)} offline nodes") - def report_node_count(self, destination=None, channel_index=2): - """Report the current node count to a specific channel or destination.""" + def report_node_count(self, destination=None, channel_index=None): if not self.init_complete or not self.interface: - logging.warning("Skipping node count report: interface not ready.") return + if channel_index is None: + channel_index = get_env_int('CHANNEL_FOR_NODE_TOTAL_BROADCAST', 2) + online_nodes = self.node_info.get_online_nodes() count = len(online_nodes) @@ -360,24 +416,17 @@ def report_node_count(self, destination=None, channel_index=2): if destination: self.interface.sendText(message, destinationId=destination, wantAck=True) else: - # Default to Channel 2 (GregPrivate) self.interface.sendText(message, channelIndex=channel_index, wantAck=True) except Exception as e: - logging.error(f"Failed to report node count: {e}") + logging.error(f"Error reporting node count: {e}") def check_for_zero_nodes(self): - """Checks if the node count is zero and alerts immediately if it transitioned to zero.""" if not self.init_complete or not self.interface: return - online_nodes = self.node_info.get_online_nodes() - count = len(online_nodes) - - if count == 0 and not self.last_report_zero: - logging.warning("Immediate alert: Node count dropped to zero!") + if len(online_nodes) == 0 and not self.last_report_zero: self.report_node_count() - elif count > 0: - # Reset flag so we can alert again if it drops to zero later + elif len(online_nodes) > 0: self.last_report_zero = False def get_global_context(self): @@ -389,8 +438,10 @@ def get_global_context(self): def start_scheduler(self): schedule.every().day.at("00:00").do(self.node_info.reset_packets_today) - schedule.every(3).hours.do(self.report_node_count) - schedule.every(1).minutes.do(self.check_for_zero_nodes) + if get_env_bool('ENABLE_FEATURE_NODE_TOTALS', True): + report_frequency = get_env_int('FREQUENCY_OF_NODE_REPORTS', 3) + schedule.every(report_frequency).hours.do(self.report_node_count) + schedule.every(1).minutes.do(self.check_for_zero_nodes) while True: schedule.run_pending() try: diff --git a/src/commands/help.py b/src/commands/help.py index 42a0991..0f44568 100644 --- a/src/commands/help.py +++ b/src/commands/help.py @@ -22,6 +22,8 @@ def __init__(self, bot: MeshtasticBot): self.sub_commands['prefs'] = self.handle_prefs if get_env_bool('ENABLE_COMMAND_STATUS', True): self.sub_commands['status'] = self.handle_status + if get_env_bool('ENABLE_COMMAND_ADMIN', True): + self.sub_commands['admin'] = self.handle_admin # if get_env_bool('ENABLE_COMMAND_ENROLL', True): # self.sub_commands['enroll'] = self.handle_enroll # if get_env_bool('ENABLE_COMMAND_LEAVE', True): @@ -92,5 +94,9 @@ def handle_status(self, packet: MeshPacket, args: list[str]) -> None: response = "!status: show current bot and proxy health status" self.reply(packet, response) + def handle_admin(self, packet: MeshPacket, args: list[str]) -> None: + response = "!admin: admin commands (restricted)" + self.reply(packet, response) + def get_command_for_logging(self, message: str) -> (str, list[str] | None, str | None): return self._gcfl_base_command_and_args(message) diff --git a/src/commands/nodes.py b/src/commands/nodes.py index 4dd3508..a688f6a 100644 --- a/src/commands/nodes.py +++ b/src/commands/nodes.py @@ -1,3 +1,4 @@ +from datetime import datetime, timezone from meshtastic.protobuf.mesh_pb2 import MeshPacket from src.bot import MeshtasticBot @@ -25,8 +26,8 @@ def handle_base_command(self, packet: MeshPacket, args: list[str]) -> None: online_nodes = self.bot.node_info.get_online_nodes() offline_nodes = self.bot.node_info.get_offline_nodes() - # get nodes sorted by last_head - sorted_nodes = sorted(nodes, key=lambda n: self.bot.node_info.get_last_heard(n.id), reverse=True) + # get nodes sorted by last_head, handling None values (sort them to the bottom) + sorted_nodes = sorted(nodes, key=lambda n: self.bot.node_info.get_last_heard(n.id) or datetime.min.replace(tzinfo=timezone.utc), reverse=True) response = f"{len(online_nodes)} nodes online, {len(offline_nodes)} offline." # Add up to 10 nodes with the most packets received today diff --git a/src/commands/status.py b/src/commands/status.py index bcbeb21..644c39a 100644 --- a/src/commands/status.py +++ b/src/commands/status.py @@ -22,7 +22,7 @@ def handle_packet(self, packet): status = self.bot.proxy.get_status() if isinstance(status, dict): state = "Online" if status['connected'] else "Reconnecting" - proxy_info = f"{state}, {status['clients']} clients, last radio data {status['silence_secs']}s ago" + proxy_info = f"{state}, {status['clients']} clients, {status['cached_packets']} pkts cached, last radio {status['silence_secs']}s ago" else: proxy_info = status diff --git a/src/commands/tr.py b/src/commands/tr.py index b7312db..c1b3623 100644 --- a/src/commands/tr.py +++ b/src/commands/tr.py @@ -1,4 +1,6 @@ import logging +import threading +import time from meshtastic.protobuf.mesh_pb2 import MeshPacket from src.commands.command import AbstractCommand @@ -9,33 +11,105 @@ def __init__(self, bot): super().__init__(bot, 'tr') def handle_packet(self, packet: MeshPacket) -> None: - hop_start = packet.get('hopStart', 0) - hop_limit = packet.get('hopLimit', 0) - hops_away = hop_start - hop_limit + message = packet['decoded']['text'] + words = message.split() - snr = packet.get('rxSnr', 0.0) + is_public = packet.get('toId') == '^all' or 'channel' in packet - sender_id = packet['fromId'] - sender = self.bot.node_db.get_by_id(sender_id) - sender_name = sender.long_name if sender else sender_id + def send_reply(msg): + # Always reply in DM + self.reply_in_dm(packet, msg, want_ack=True) - if hops_away == 0: - response = f"{sender_name} you are Zero Hops from me. No traceroute required!" - self.reply_in_dm(packet, response) + # Add a reaction (thumbs up for public to acknowledge without spamming, hourglass for DM) + reaction_emoji = "👍" if is_public else "⌛" + reaction_dest = packet.get('toId') if is_public else packet.get('fromId') + logging.info(f"Adding reaction {reaction_emoji} for packet {packet.get('id')} to {reaction_dest}") + self.bot.interface.sendReaction(reaction_emoji, messageId=packet['id'], destinationId=reaction_dest) + + requester_id = packet.get('fromId') + requester = self.bot.node_db.get_by_id(requester_id) + requester_name = requester.long_name if requester else requester_id + + target_node = None + if len(words) > 1: + target_short = words[1] + target_node = self.bot.get_node_by_short_name(target_short) + if not target_node: + send_reply(f"Could not find node with short name '{target_short}'") + return + target_id = target_node.id + target_long_name = target_node.long_name + else: + target_id = requester_id + target_long_name = requester_name + + if target_id == self.bot.my_id: + send_reply("I am already here! No traceroute required.") return - response = f"{sender_name} you are {hops_away} hops away (Signal: {snr} dB). Starting full traceroute..." - self.reply_in_dm(packet, response) + # If tracing back to requester, we can show hops_away/SNR from the incoming packet + if target_id == requester_id: + hop_start = packet.get('hopStart', 0) + hop_limit = packet.get('hopLimit', 0) + hops_away = hop_start - hop_limit + snr = packet.get('rxSnr', 0.0) + + # We can log this, but no need to send it explicitly over the radio to save airtime + logging.info(f"Detected {hops_away} hops for {target_id}. SNR: {snr}dB.") + else: + # Tracing to a different node + logging.info(f"Starting traceroute to {target_long_name} ({target_id}) for you...") + + # Store for the callback + if target_id not in self.bot.pending_traces: + self.bot.pending_traces[target_id] = [] - # Initiate actual traceroute - self.bot.pending_traces[sender_id] = sender_id + # Store context: force is_public=False so bot.py always replies via DM + to_id = packet.get('toId') + channel_index = packet.get('channel', 0) + context = (requester_id, False, to_id, channel_index) + + if context not in self.bot.pending_traces[target_id]: + self.bot.pending_traces[target_id].append(context) + + # Start a timeout timer (120 seconds) + def check_timeout(): + time.sleep(120) + if target_id in self.bot.pending_traces: + # Find and remove this specific context from the pending list + self.bot.pending_traces[target_id] = [c for c in self.bot.pending_traces[target_id] if c[0] != requester_id] + # If no more requesters for this target, clean up the key + if not self.bot.pending_traces[target_id]: + del self.bot.pending_traces[target_id] + + logging.info(f"Traceroute to {target_id} (requested by {requester_id}) timed out.") + timeout_msg = f"Traceroute to {target_long_name} ({target_id}) timed out (no response from mesh)." + + # Send the timeout message in a separate thread to avoid blocking the timer/interface + def send_timeout(): + self.message_in_dm(requester_id, timeout_msg, want_ack=True) + + threading.Thread(target=send_timeout, daemon=True).start() + + threading.Thread(target=check_timeout, daemon=True).start() + try: - logging.info(f"Initiating traceroute to {sender_id}") + # Let the reaction settle before firing the trace + time.sleep(2) + logging.info(f"Initiating traceroute to {target_id} requested by {requester_id}") # hopLimit=7 is standard max - self.bot.interface.sendTraceRoute(sender_id, hopLimit=7) + p = self.bot.interface.sendTraceRoute(target_id, hopLimit=7) + if p: + logging.info(f"Sent traceroute packet to {target_id}. Packet ID: {p.id}") + else: + logging.warning(f"sendTraceRoute returned None for {target_id}") except Exception as e: - logging.error(f"Failed to send traceroute to {sender_id}: {e}") - self.reply_in_dm(packet, f"Error starting traceroute: {e}") + logging.error(f"Failed to send traceroute to {target_id}: {e}") + if target_id in self.bot.pending_traces and requester_id in self.bot.pending_traces[target_id]: + self.bot.pending_traces[target_id].remove(requester_id) + if not self.bot.pending_traces[target_id]: + del self.bot.pending_traces[target_id] + send_reply(f"Error starting traceroute: {e}") def get_command_for_logging(self, message: str) -> (str, list[str] | None, str | None): - return self._gcfl_just_base_command(message) + return self._gcfl_base_command_and_args(message) diff --git a/src/data_classes.py b/src/data_classes.py index 9ed28b3..4171c7e 100644 --- a/src/data_classes.py +++ b/src/data_classes.py @@ -19,6 +19,16 @@ def __init__(self, self.hw_model = hw_model self.public_key = public_key + def __eq__(self, other): + if not isinstance(other, MeshNode.User): + return False + return (self.id == other.id and + self.long_name == other.long_name and + self.short_name == other.short_name and + self.macaddr == other.macaddr and + self.hw_model == other.hw_model and + self.public_key == other.public_key) + id: str long_name: str short_name: str diff --git a/src/helpers.py b/src/helpers.py index 0c6fc77..fd5935f 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -11,7 +11,20 @@ def get_env_bool(name: str, default: bool = True) -> bool: return value.lower() in ('true', '1', 't', 'y', 'yes') -def pretty_print_last_heard(last_heard_timestamp: int | datetime) -> str: +def get_env_int(name: str, default: int) -> int: + value = os.getenv(name) + if value is None: + return default + try: + return int(value) + except (ValueError, TypeError): + return default + + +def pretty_print_last_heard(last_heard_timestamp: int | datetime | None) -> str: + if not last_heard_timestamp: + return "never" + if not isinstance(last_heard_timestamp, datetime): last_heard = datetime.fromtimestamp(last_heard_timestamp, timezone.utc) else: diff --git a/src/main.py b/src/main.py index 9a121d6..bfb6877 100644 --- a/src/main.py +++ b/src/main.py @@ -23,6 +23,7 @@ from src.api.StorageAPI import StorageAPIWrapper from src.bot import MeshtasticBot from src.helpers import get_env_bool +from src.ws_client import MeshflowWSClient from src.persistence.commands_logger import SqliteCommandLogger from src.persistence.node_info import InMemoryNodeInfoStore from src.persistence.node_db import SqliteNodeDB @@ -36,6 +37,8 @@ ADMIN_NODES = [node.strip() for node in admin_nodes_raw.split(',') if node.strip()] ENABLE_TCP_PROXY = get_env_bool("ENABLE_TCP_PROXY", True) +PROXY_HANDSHAKE_CACHE_SIZE = int(os.getenv("PROXY_HANDSHAKE_CACHE_SIZE", 100)) +PROXY_ROLLING_CACHE_SIZE = int(os.getenv("PROXY_ROLLING_CACHE_SIZE", 100)) DATA_DIR = os.getenv("DATA_DIR", "data") STORAGE_API_ROOT = os.getenv("STORAGE_API_ROOT") @@ -44,6 +47,13 @@ STORAGE_API_2_ROOT = os.getenv("STORAGE_API_2_ROOT") STORAGE_API_2_TOKEN = os.getenv("STORAGE_API_2_TOKEN", None) STORAGE_API_2_VERSION = int(os.getenv("STORAGE_API_2_VERSION", 1)) +MESHFLOW_WS_URL = os.getenv("MESHFLOW_WS_URL") # e.g. ws://localhost:8000; derived from storage API if unset + +# Comma-separated portnums to skip when submitting to API (e.g. 345,ROUTING_APP) +_ignore_portnums_raw = os.getenv("IGNORE_PORTNUMS", "") +IGNORE_PORTNUMS = frozenset( + p.strip().upper() for p in _ignore_portnums_raw.split(",") if p.strip() +) def main(): @@ -60,15 +70,24 @@ def main(): logging.info(f"--- Configuration ---") logging.info(f"MESHTASTIC_IP: {MESHTASTIC_IP}") logging.info(f"ENABLE_TCP_PROXY: {ENABLE_TCP_PROXY}") + logging.info(f"PROXY_HANDSHAKE_CACHE_SIZE: {PROXY_HANDSHAKE_CACHE_SIZE}") + logging.info(f"PROXY_ROLLING_CACHE_SIZE: {PROXY_ROLLING_CACHE_SIZE}") + logging.info(f"ENABLE_FEATURE_NODE_TOTALS: {get_env_bool('ENABLE_FEATURE_NODE_TOTALS', True)}") + logging.info(f"FREQUENCY_OF_NODE_REPORTS: {os.getenv('FREQUENCY_OF_NODE_REPORTS', '3')} hours") + logging.info(f"CHANNEL_FOR_NODE_TOTAL_BROADCAST: {os.getenv('CHANNEL_FOR_NODE_TOTAL_BROADCAST', '2')}") logging.info(f"ENABLE_COMMAND_PING: {get_env_bool('ENABLE_COMMAND_PING', True)}") logging.info(f"ENABLE_COMMAND_TR: {get_env_bool('ENABLE_COMMAND_TR', True)}") + logging.info(f"IGNORE_PORTNUMS: {list(IGNORE_PORTNUMS)}") + logging.info(f"STORAGE_API_ROOT: {STORAGE_API_ROOT}") + if STORAGE_API_2_ROOT: + logging.info(f"STORAGE_API_2_ROOT: {STORAGE_API_2_ROOT}") logging.info(f"---------------------") proxy = None if ENABLE_TCP_PROXY: # Start the TCP Proxy # It listens on 0.0.0.0:4403 and forwards to MESHTASTIC_IP:4403 - proxy = TcpProxy(target_host=MESHTASTIC_IP, target_port=4403, listen_host='0.0.0.0', listen_port=4403) + proxy = TcpProxy(target_host=MESHTASTIC_IP, target_port=4403, listen_host='0.0.0.0', listen_port=4403, handshake_cache_size=PROXY_HANDSHAKE_CACHE_SIZE, rolling_cache_size=PROXY_ROLLING_CACHE_SIZE) proxy.start() # Give the proxy a moment to bind to the port before the bot tries to connect @@ -79,6 +98,7 @@ def main(): connection_address = 'localhost' if ENABLE_TCP_PROXY else MESHTASTIC_IP bot = MeshtasticBot(connection_address) bot.proxy = proxy + bot.ignore_portnums = IGNORE_PORTNUMS bot.admin_nodes = ADMIN_NODES bot.user_prefs_persistence = SqliteUserPrefsPersistence(str(user_prefs_file)) bot.command_logger = SqliteCommandLogger(str(command_log_file)) @@ -90,6 +110,24 @@ def main(): if STORAGE_API_2_ROOT: bot.storage_apis.append(StorageAPIWrapper(bot, STORAGE_API_2_ROOT, STORAGE_API_2_TOKEN, STORAGE_API_2_VERSION, failed_packets_dir)) + # WebSocket client for receiving commands (e.g. traceroute) + ws_url = MESHFLOW_WS_URL + ws_token = None + if not ws_url: + base = STORAGE_API_ROOT + if base: + ws_url = base \ + .replace("http://", "ws://") \ + .replace("https://", "wss://") + if STORAGE_API_ROOT and STORAGE_API_TOKEN: + ws_token = STORAGE_API_TOKEN + if ws_url and ws_token: + bot.ws_client = MeshflowWSClient( + ws_url=ws_url, + api_key=ws_token, + on_traceroute=bot.on_traceroute_command, + ) + try: node_info.load_from_file(str(node_info_file)) bot.connect() @@ -98,6 +136,8 @@ def main(): except Exception as e: logging.error(f"Error: {e}") finally: + if bot.ws_client: + bot.ws_client.stop() bot.disconnect() node_info.persist_to_file(str(node_info_file)) diff --git a/src/persistence/__init__.py b/src/persistence/__init__.py index a7d2976..2797e70 100644 --- a/src/persistence/__init__.py +++ b/src/persistence/__init__.py @@ -1,20 +1,33 @@ -import abc -import logging -from pathlib import Path - - -class BaseSqlitePersistenceStore(abc.ABC): - db_path: Path - - def __init__(self, db_path: str): - self.db_path = Path(db_path) - self._initialize_db() - if self.db_path.is_relative_to(Path.cwd()): - path_string = self.db_path.relative_to(Path.cwd()) - else: - path_string = self.db_path - logging.info(f"Connected to {self.__class__.__name__} DB at {path_string}") - - @abc.abstractmethod - def _initialize_db(self): - pass +import abc +import logging +import sqlite3 +import threading +from contextlib import contextmanager +from pathlib import Path + + +class BaseSqlitePersistenceStore(abc.ABC): + db_path: Path + + def __init__(self, db_path: str): + self.db_path = Path(db_path) + self._lock = threading.RLock() + self._initialize_db() + if self.db_path.is_relative_to(Path.cwd()): + path_string = self.db_path.relative_to(Path.cwd()) + else: + path_string = self.db_path + logging.info(f"Connected to {self.__class__.__name__} DB at {path_string}") + + @contextmanager + def _get_connection(self): + """Returns a thread-safe sqlite3 connection and ensures it is closed.""" + conn = sqlite3.connect(self.db_path, check_same_thread=False) + try: + yield conn + finally: + conn.close() + + @abc.abstractmethod + def _initialize_db(self): + pass diff --git a/src/persistence/commands_logger.py b/src/persistence/commands_logger.py index ca57d89..09edc5e 100644 --- a/src/persistence/commands_logger.py +++ b/src/persistence/commands_logger.py @@ -37,7 +37,7 @@ def get_responder_history(self, since: datetime, sender_id: str = None) -> pd.Da class SqliteCommandLogger(AbstractCommandLogger, BaseSqlitePersistenceStore): def _initialize_db(self): - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS command_log ( @@ -70,7 +70,7 @@ def log_command(self, sender_id: str, command_instance, message: str) -> None: base_cmd, subcommands, args = command_instance.get_command_for_logging(message) subcommands_str = ' '.join(subcommands) if subcommands else None - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO command_log (sender_id, base_command, sub_commands, args, timestamp, handler_class) @@ -80,7 +80,7 @@ def log_command(self, sender_id: str, command_instance, message: str) -> None: conn.commit() def log_responder_handled(self, sender_id: str, responder_instance, message_text: str) -> None: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO responder_log (sender_id, message, timestamp, responder_class) @@ -89,7 +89,7 @@ def log_responder_handled(self, sender_id: str, responder_instance, message_text conn.commit() def log_unknown_request(self, sender_id: str, message: str) -> None: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO unknown_requests (sender_id, message, timestamp) @@ -98,7 +98,7 @@ def log_unknown_request(self, sender_id: str, message: str) -> None: conn.commit() def get_command_history(self, since: datetime, sender_id: str = None) -> pd.DataFrame: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() if sender_id: cursor.execute(''' @@ -114,7 +114,7 @@ def get_command_history(self, since: datetime, sender_id: str = None) -> pd.Data return pd.DataFrame(rows, columns=['sender_id', 'base_command', 'timestamp']) def get_unknown_command_history(self, since: datetime, sender_id: str = None) -> pd.DataFrame: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() if sender_id: cursor.execute(''' @@ -130,7 +130,7 @@ def get_unknown_command_history(self, since: datetime, sender_id: str = None) -> return pd.DataFrame(rows, columns=['sender_id', 'message', 'timestamp']) def get_responder_history(self, since: datetime, sender_id: str = None) -> pd.DataFrame: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() if sender_id: cursor.execute(''' diff --git a/src/persistence/node_db.py b/src/persistence/node_db.py index 955cb23..e6d3726 100644 --- a/src/persistence/node_db.py +++ b/src/persistence/node_db.py @@ -117,7 +117,7 @@ def get_device_metrics_log(self, node_id: str, start: datetime, end: datetime) - class SqliteNodeDB(BaseSqlitePersistenceStore, AbstractNodeDB): def _initialize_db(self): - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS nodes ( @@ -156,7 +156,7 @@ def _initialize_db(self): conn.commit() def store_user(self, node_user: MeshNode.User): - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO nodes (id, short_name, long_name, macaddr, hw_model, public_key) @@ -166,7 +166,7 @@ def store_user(self, node_user: MeshNode.User): conn.commit() def store_position(self, node_id: str, position: MeshNode.Position): - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO positions (node_id, logged_time, reported_time, latitude, longitude, altitude, location_source) @@ -176,7 +176,7 @@ def store_position(self, node_id: str, position: MeshNode.Position): conn.commit() def store_device_metrics(self, node_id: str, device_metrics: MeshNode.DeviceMetrics): - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO device_metrics (node_id, logged_time, battery_level, voltage, channel_utilization, air_util_tx, uptime_seconds) @@ -186,7 +186,7 @@ def store_device_metrics(self, node_id: str, device_metrics: MeshNode.DeviceMetr conn.commit() def get_by_id(self, node_id: str) -> MeshNode.User | None: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes WHERE id = ?', (node_id,)) @@ -197,7 +197,7 @@ def get_by_id(self, node_id: str) -> MeshNode.User | None: return None def get_by_short_name(self, short_name: str) -> MeshNode.User | None: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes WHERE short_name = ? COLLATE NOCASE', @@ -209,7 +209,7 @@ def get_by_short_name(self, short_name: str) -> MeshNode.User | None: return None def list_nodes(self) -> list[MeshNode.User]: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT id, short_name, long_name, macaddr, hw_model, public_key FROM nodes') rows = cursor.fetchall() @@ -217,7 +217,7 @@ def list_nodes(self) -> list[MeshNode.User]: hw_model=row[4], public_key=row[5]) for row in rows] def get_last_position(self, node_id: str) -> MeshNode.Position | None: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT logged_time, reported_time, latitude, longitude, altitude, location_source @@ -234,7 +234,7 @@ def get_last_position(self, node_id: str) -> MeshNode.Position | None: def get_position_log(self, node_id: str, start: datetime, end: datetime) -> list[ MeshNode.Position]: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT logged_time, reported_time, latitude, longitude, altitude, location_source @@ -247,7 +247,7 @@ def get_position_log(self, node_id: str, start: datetime, end: datetime) -> list altitude=row[4], location_source=row[5]) for row in rows] def get_last_device_metrics(self, node_id: str) -> MeshNode.DeviceMetrics | None: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT logged_time, battery_level, voltage, channel_utilization, air_util_tx, uptime_seconds @@ -264,7 +264,7 @@ def get_last_device_metrics(self, node_id: str) -> MeshNode.DeviceMetrics | None def get_device_metrics_log(self, node_id: str, start: datetime, end: datetime) -> list[ MeshNode.DeviceMetrics]: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT logged_time, battery_level, voltage, channel_utilization, air_util_tx, uptime_seconds diff --git a/src/persistence/node_info.py b/src/persistence/node_info.py index 960c7ce..9a4e2c0 100644 --- a/src/persistence/node_info.py +++ b/src/persistence/node_info.py @@ -100,11 +100,11 @@ def reset_packets_today(self) -> None: def get_online_nodes(self) -> dict[str, datetime]: return {node_id: last_heard for node_id, last_heard in self.nodes_last_heard.items() - if last_heard > datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec)} + if last_heard and last_heard > datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec)} def get_offline_nodes(self) -> dict[str, datetime]: return {node_id: last_heard for node_id, last_heard in self.nodes_last_heard.items() - if last_heard <= datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec)} + if not last_heard or last_heard <= datetime.now(timezone.utc) - timedelta(seconds=self.online_threshold_sec)} def get_all_nodes(self) -> dict[str, datetime]: return self.nodes_last_heard @@ -115,7 +115,7 @@ def load_from_file(self, node_info_file: str) -> None: with open(node_info_file, 'r') as file: data = json.load(file) - self.nodes_last_heard = {k: datetime.fromisoformat(v) for k, v in data['nodes_last_heard'].items()} + self.nodes_last_heard = {k: (datetime.fromisoformat(v) if v else None) for k, v in data['nodes_last_heard'].items()} self.node_packets_today = data['node_packets_today'] self.node_packets_today_breakdown = data['node_packets_today_breakdown'] diff --git a/src/persistence/user_prefs.py b/src/persistence/user_prefs.py index 5888fbd..af3bc70 100644 --- a/src/persistence/user_prefs.py +++ b/src/persistence/user_prefs.py @@ -51,7 +51,7 @@ def persist_user_prefs(self, user_id: str, user_prefs: UserPrefs): class SqliteUserPrefsPersistence(AbstractUserPrefsPersistence, BaseSqlitePersistenceStore): def _initialize_db(self): - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS user_prefs ( @@ -66,7 +66,7 @@ def _initialize_db(self): conn.commit() def get_user_prefs(self, user_id: str) -> UserPrefs: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: # Fetch the data cursor = conn.cursor() cursor.execute(''' @@ -91,7 +91,7 @@ def get_user_prefs(self, user_id: str) -> UserPrefs: return user_prefs def persist_user_prefs(self, user_id: str, user_prefs: UserPrefs) -> UserPrefs: - with sqlite3.connect(self.db_path) as conn: + with self._lock, self._get_connection() as conn: cursor = conn.cursor() for key, preference in user_prefs.__dict__.items(): if key == 'user_id': diff --git a/src/tcp_interface.py b/src/tcp_interface.py index c0db8ed..ab87eeb 100644 --- a/src/tcp_interface.py +++ b/src/tcp_interface.py @@ -30,7 +30,7 @@ def sendReaction( packet.decoded.portnum = portNum packet.decoded.payload = emoji_bytes packet.decoded.reply_id = messageId - packet.decoded.emoji = True + packet.decoded.emoji = ord(emoji) if isinstance(emoji, str) else 1 self._sendPacket(packet, destinationId, wantAck=wantAck, @@ -62,20 +62,42 @@ def __init__(self, *args, # Store packets in a queue and resend them after reconnecting # This will involve exposing the queue, and reloading the queue in bot.py since we create a new interface object - def onResponseTraceRoute(self, packet, routeDiscovery): + def onResponseTraceRoute(self, packet): """ Callback for when a traceroute response is received. """ - super().onResponseTraceRoute(packet, routeDiscovery) - pub.sendMessage("meshtastic.traceroute", packet=packet, route=routeDiscovery) + try: + route_discovery = None + if isinstance(packet, dict): + decoded = packet.get('decoded', {}) + # It might be in 'routing', 'routing_app', or 'traceroute' + route_discovery = decoded.get('routing') or decoded.get('routing_app') or decoded.get('traceroute') + + if not route_discovery and 'payload' in decoded: + logging.debug(f"onResponseTraceRoute: Route not found in decoded, full packet: {packet}") + elif hasattr(packet, 'decoded'): + route_discovery = getattr(packet.decoded, 'routing', + getattr(packet.decoded, 'routing_app', + getattr(packet.decoded, 'traceroute', None))) + + logging.info(f"onResponseTraceRoute: Received traceroute response. Route data present: {route_discovery is not None}") + logging.info(f"DEBUG: Traceroute packet keys: {packet.keys() if isinstance(packet, dict) else 'not a dict'}") + + # Always call super to allow library internal processing (printing to stdout etc) + super().onResponseTraceRoute(packet) + + # Notify bot logic + pub.sendMessage("meshtastic.traceroute", packet=packet, route=route_discovery) + except Exception as e: + logging.error(f"Error in onResponseTraceRoute: {e}", exc_info=True) def sendHeartbeat(self): try: super().sendHeartbeat() except (OSError, BrokenPipeError) as e: logging.error(f"Heartbeat failed: {e}") - # TODO: Decide if we want to handle the error on this thread - # self._reconnect_with_backoff() + # Shutdown and notify the error handler to trigger a clean restart from the main thread. + # This avoids nested reconnection attempts on the heartbeat thread. self._shutdown_and_call_error_handler() def _sendPacket( @@ -87,7 +109,8 @@ def _sendPacket( pkiEncrypted: Optional[bool] = False, publicKey: Optional[bytes] = None, ): - logging.debug(f"Sending packet to {destinationId} (Payload: {meshPacket.decoded.payload})") + port_val = meshPacket.decoded.portnum + logging.info(f"_sendPacket: Attempting to send Port {port_val} to {destinationId} (wantAck={wantAck})") try: super()._sendPacket( meshPacket=meshPacket, @@ -97,11 +120,15 @@ def _sendPacket( pkiEncrypted=pkiEncrypted, publicKey=publicKey ) + logging.info(f"_sendPacket: Successfully handed Port {port_val} to {destinationId} to meshtastic library") except (OSError, BrokenPipeError) as e: - logging.error(f"sendPacket failed: {e}") + logging.error(f"_sendPacket failed (connection error): {e}") self.packet_queue.put((meshPacket, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey)) - # self._reconnect_with_backoff() self._shutdown_and_call_error_handler(e) + except Exception as e: + logging.error(f"_sendPacket failed (unexpected error): {e}", exc_info=True) + # We still queue it just in case it's recoverable + self.packet_queue.put((meshPacket, destinationId, wantAck, hopLimit, pkiEncrypted, publicKey)) def _shutdown_and_call_error_handler(self, conn_error: Optional[Exception] = None): try: diff --git a/src/tcp_proxy.py b/src/tcp_proxy.py index cb779b6..8a6e9ee 100644 --- a/src/tcp_proxy.py +++ b/src/tcp_proxy.py @@ -1,270 +1,270 @@ -import socket -import select -import threading -import logging -import time - -class TcpProxy: - def __init__(self, target_host, target_port=4403, listen_host='0.0.0.0', listen_port=4403): - self.target_host = target_host - self.target_port = int(target_port) - self.listen_host = listen_host - self.listen_port = int(listen_port) - self.server_socket = None - self.target_socket = None - self.clients = [] - self.running = False - self.init_buffer = b'' - self.init_buffer_done = False - self.buffer_time = 5.0 # seconds to buffer startup data (increased for safety) - - def start(self): - self.running = True - self.thread = threading.Thread(target=self._run) - self.thread.daemon = True - self.thread.start() - - def stop(self): - self.running = False - if self.server_socket: - try: - self.server_socket.close() - except: - pass - if self.target_socket: - try: - self.target_socket.close() - except: - pass - - def get_status(self): - if not self.running: - return "Proxy: Offline" - - silence = time.time() - self.last_target_activity if hasattr(self, 'last_target_activity') else 0 - return { - "connected": self.target_socket is not None and self.target_socket.fileno() != -1, - "clients": len(self.clients), - "silence_secs": int(silence) - } - - def _run(self): - logging.info(f"Starting TCP Proxy on {self.listen_host}:{self.listen_port} -> {self.target_host}:{self.target_port}") - - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self.server_socket.bind((self.listen_host, self.listen_port)) - except Exception as e: - logging.error(f"Failed to bind proxy port {self.listen_port}: {e}") - self.running = False - return - - self.server_socket.listen(5) - - # Connect to target - backoff = 1 - while self.running: - try: - self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.target_socket.connect((self.target_host, self.target_port)) - logging.info(f"Proxy connected to target device at {self.target_host}:{self.target_port}") - break - except Exception as e: - logging.error(f"Failed to connect to target ({self.target_host}): {e}. Retrying in {backoff}s...") - time.sleep(backoff) - backoff = min(backoff * 2, 60) - - if not self.running: - return - - start_time = time.time() - last_target_activity = time.time() - watchdog_timeout = 300.0 # Reconnect if no data from target for 5 minutes - last_heartbeat_log = time.time() - - while self.running: - try: - # Filter out closed sockets from inputs - # We rebuild the list of inputs every time to ensure we are using the current target_socket - # (which might have changed after a reconnect) - inputs = [self.server_socket, self.target_socket] - current_inputs = [s for s in inputs + self.clients if s and s.fileno() != -1] - readable, _, _ = select.select(current_inputs, [], [], 1.0) - except Exception as e: - logging.error(f"Select error: {e}") - # Clean up closed sockets from our list - self.clients = [c for c in self.clients if c.fileno() != -1] - continue - - current_time = time.time() - - # Heartbeat Logging & Watchdog Check - if current_time - last_heartbeat_log > 60.0: - silence_duration = current_time - last_target_activity - logging.info(f"Proxy Heartbeat: Connected. Last data from radio {silence_duration:.1f}s ago. Clients: {len(self.clients)}") - last_heartbeat_log = current_time - - # Watchdog: Force reconnect if silence is too long - if current_time - last_target_activity > watchdog_timeout: - logging.warning(f"Watchdog: No data from radio for {watchdog_timeout}s. Forcing reconnect...") - try: - self.target_socket.close() - except: - pass - - # Reconnect logic - reconnected = False - backoff = 1 - while self.running and not reconnected: - try: - self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.target_socket.connect((self.target_host, self.target_port)) - logging.info("Watchdog: Reconnected to target successfully.") - last_target_activity = time.time() # Reset timer - reconnected = True - except Exception as ex: - logging.error(f"Watchdog reconnect failed: {ex}. Retrying in {backoff}s...") - time.sleep(backoff) - backoff = min(backoff * 2, 10) - - # Check for init buffer timeout - if not self.init_buffer_done and (current_time - start_time > self.buffer_time): - self.init_buffer_done = True - if self.init_buffer: - logging.info(f"Init buffer capture finished. Size: {len(self.init_buffer)} bytes") - - for sock in readable: - if sock is self.server_socket: - try: - client_socket, addr = self.server_socket.accept() - logging.info(f"New proxy connection from {addr}") - self.clients.append(client_socket) - # Replay init buffer - if self.init_buffer: - try: - # Send in chunks to avoid overwhelming the client's startup sequence - chunk_size = 1024 - for i in range(0, len(self.init_buffer), chunk_size): - chunk = self.init_buffer[i:i+chunk_size] - client_socket.sendall(chunk) - time.sleep(0.05) # 50ms delay between chunks - logging.info(f"Sent {len(self.init_buffer)} bytes of cached init data to {addr}") - except Exception as e: - logging.error(f"Error sending init buffer to client: {e}") - except Exception as e: - logging.error(f"Error accepting connection: {e}") - - elif sock is self.target_socket: - last_target_activity = time.time() # Update activity timestamp - try: - data = self.target_socket.recv(16384) - if not data: - logging.warning("Target closed connection. Restarting proxy connection...") - # Close the target socket - self.target_socket.close() - - # Attempt to reconnect loop - reconnected = False - backoff = 1 - while self.running and not reconnected: - try: - self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.target_socket.connect((self.target_host, self.target_port)) - logging.info("Reconnected to target.") - reconnected = True - # We don't reset inputs because target_socket is updated - except: - time.sleep(backoff) - backoff = min(backoff * 2, 30) - - if not reconnected: - self.running = False # Give up - break # Break the inner loop to refresh select() with new socket - - if not self.init_buffer_done: - self.init_buffer += data - - # Broadcast to all clients - for client in self.clients[:]: - try: - client.sendall(data) - except: - if client in self.clients: - self.clients.remove(client) - try: - client.close() - except: - pass - except Exception as e: - logging.error(f"Error reading from target: {e}") - # We should probably attempt reconnect here too, but for simplicity let's break - # and let the user restart if it's a hard fail. - # Or better, treating it as a disconnect: - self.target_socket.close() - # Simple reconnect attempt (blocking) - ideally this would be async but - # blocking here for a few seconds is better than crashing - try: - time.sleep(5) - self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.target_socket.connect((self.target_host, self.target_port)) - logging.info("Reconnected to target after error.") - except: - logging.error("Failed to reconnect immediately.") - - else: - # Data from a client - try: - data = sock.recv(16384) - if not data: - if sock in self.clients: - self.clients.remove(sock) - sock.close() - else: - # Forward to target - try: - self.target_socket.sendall(data) - except Exception as e: - logging.error(f"Error sending to target: {e}. Attempting to reconnect...") - # Force a reconnection attempt - try: - self.target_socket.close() - except: - pass - - # Reconnect logic - reconnected = False - backoff = 1 - while self.running and not reconnected: - try: - self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.target_socket.connect((self.target_host, self.target_port)) - logging.info("Reconnected to target successfully.") - # Resend the data that failed - self.target_socket.sendall(data) - reconnected = True - except Exception as ex: - logging.error(f"Reconnect failed: {ex}. Retrying in {backoff}s...") - time.sleep(backoff) - backoff = min(backoff * 2, 10) - - if not reconnected: - self.running = False - except: - if sock in self.clients: - self.clients.remove(sock) - try: - sock.close() - except: - pass - - # Cleanup - if self.server_socket: - try: self.server_socket.close() - except: pass - if self.target_socket: - try: self.target_socket.close() - except: pass - for c in self.clients: - try: c.close() - except: pass +import asyncio +import logging +import time +from collections import deque +import threading + +class TcpProxy: + def __init__(self, target_host, target_port=4403, listen_host='0.0.0.0', listen_port=4403, handshake_cache_size=100, rolling_cache_size=100): + self.target_host = target_host + self.target_port = int(target_port) + self.listen_host = listen_host + self.listen_port = int(listen_port) + + self.server = None + self.target_reader = None + self.target_writer = None + + self.clients = set() + + self.running = False + self.loop = None + self.thread = None + + self.handshake_packets = [] + self.handshake_max_count = handshake_cache_size + self.rolling_packets = deque(maxlen=rolling_cache_size) + + self.last_target_activity = time.time() + self.reconnecting = False + + def start(self): + self.running = True + self.thread = threading.Thread(target=self._run_loop, daemon=True) + self.thread.start() + + def stop(self): + self.running = False + if self.loop: + self.loop.call_soon_threadsafe(self._stop_loop) + + def _stop_loop(self): + if self.server: + self.server.close() + for writer in self.clients: + try: writer.close() + except: pass + if self.target_writer: + try: self.target_writer.close() + except: pass + + def get_status(self): + if not self.running: + return "Proxy: Offline" + + silence = time.time() - self.last_target_activity + client_count = len(self.clients) + cached_count = len(self.handshake_packets) + len(self.rolling_packets) + + state = "Reconnecting" if self.reconnecting else ("Online" if self.target_writer else "Offline") + + return { + "state": state, + "connected": self.target_writer is not None and not self.reconnecting, + "clients": client_count, + "silence_secs": int(silence), + "cached_packets": cached_count + } + + def _run_loop(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop.run_until_complete(self._async_run()) + + async def _async_run(self): + logging.info(f"Starting TCP Proxy on {self.listen_host}:{self.listen_port} -> {self.target_host}:{self.target_port}") + + try: + self.server = await asyncio.start_server( + self._handle_client, self.listen_host, self.listen_port) + except Exception as e: + logging.error(f"Failed to bind proxy port {self.listen_port}: {e}") + self.running = False + return + + asyncio.create_task(self._target_connection_manager()) + asyncio.create_task(self._watchdog()) + + try: + async with self.server: + while self.running: + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + finally: + self._stop_loop() + + async def _watchdog(self): + last_heartbeat_log = time.time() + while self.running: + current_time = time.time() + if self.target_writer and not self.reconnecting: + if current_time - self.last_target_activity > 300.0: + logging.warning(f"Watchdog: No data from radio for 300s. Forcing reconnect...") + try: self.target_writer.close() + except: pass + self.target_reader = None + self.target_writer = None + + if current_time - last_heartbeat_log > 60.0: + client_count = len(self.clients) + status = "Connected" if self.target_writer and not self.reconnecting else "RECONNECTING" + silence = current_time - self.last_target_activity + logging.info(f"Proxy Heartbeat: {status}. Last radio data {silence:.1f}s ago. Clients: {client_count}") + last_heartbeat_log = current_time + + await asyncio.sleep(5) + + async def _target_connection_manager(self): + backoff_time = 5.0 + max_backoff_time = 60.0 + backoff_rate = 2.0 + + while self.running: + if self.target_writer is None or self.target_reader is None: + self.reconnecting = True + self._disconnect_all_clients() + self.handshake_packets.clear() + self.rolling_packets.clear() + + try: + logging.info(f"Proxy attempting to connect to target device at {self.target_host}:{self.target_port}...") + reader, writer = await asyncio.wait_for( + asyncio.open_connection(self.target_host, self.target_port), + timeout=5.0 + ) + self.target_reader = reader + self.target_writer = writer + self.last_target_activity = time.time() + self.reconnecting = False + backoff_time = 5.0 # Reset backoff on success + logging.info(f"Proxy successfully connected to target device at {self.target_host}:{self.target_port}") + asyncio.create_task(self._read_from_target()) + except (asyncio.TimeoutError, ConnectionError, OSError) as e: + logging.error(f"Failed to connect to target ({self.target_host}): {e}. Retrying in {backoff_time:.1f}s...") + await asyncio.sleep(backoff_time) + backoff_time = min(backoff_time * backoff_rate, max_backoff_time) + except Exception as e: + logging.error(f"Unexpected error in target connection manager: {e}", exc_info=True) + await asyncio.sleep(backoff_time) + backoff_time = min(backoff_time * backoff_rate, max_backoff_time) + else: + await asyncio.sleep(1) + + def _disconnect_all_clients(self): + for writer in list(self.clients): + try: writer.close() + except: pass + self.clients.clear() + logging.info("Disconnected all proxy clients to force re-sync.") + + async def _read_from_target(self): + reader = self.target_reader + writer = self.target_writer + + in_buffer = b'' + while self.running and self.target_reader == reader: + try: + data = await reader.read(16384) + if not data: + logging.warning("Radio closed connection. Triggering re-sync...") + break + self.last_target_activity = time.time() + + in_buffer += data + + while len(in_buffer) >= 4: + if in_buffer[0:2] != b'\x94\xc3': + idx = in_buffer.find(b'\x94\xc3') + if idx == -1: + in_buffer = b'' + break + in_buffer = in_buffer[idx:] + continue + + length = (in_buffer[2] << 8) | in_buffer[3] + total_len = length + 4 + + if len(in_buffer) < total_len: + break + + packet = in_buffer[:total_len] + in_buffer = in_buffer[total_len:] + + if len(self.handshake_packets) < self.handshake_max_count: + self.handshake_packets.append(packet) + self.rolling_packets.append(packet) + + for client_writer in list(self.clients): + try: + client_writer.write(packet) + await client_writer.drain() + except Exception as e: + logging.debug(f"Failed to forward packet to client: {e}") + self._remove_client(client_writer) + except Exception as e: + logging.error(f"Error reading from radio: {e}") + break + + if self.target_writer == writer: + try: writer.close() + except: pass + self.target_writer = None + self.target_reader = None + + async def _handle_client(self, reader, writer): + addr = writer.get_extra_info('peername') + logging.info(f"+++ PROXY: New connection accepted from {addr}") + self.clients.add(writer) + + h_snapshot = list(self.handshake_packets) + r_snapshot = list(self.rolling_packets) + + if addr[0] not in ('127.0.0.1', 'localhost'): + try: + await asyncio.sleep(2.0) + for p in h_snapshot: + writer.write(p) + await writer.drain() + await asyncio.sleep(0.05) + for p in r_snapshot: + writer.write(p) + await writer.drain() + await asyncio.sleep(0.01) + logging.info(f"Replayed {len(h_snapshot) + len(r_snapshot)} packets to {addr}") + except Exception as e: + self._remove_client(writer) + return + + while self.running: + try: + data = await reader.read(16384) + if not data: + break + if self.target_writer and not self.reconnecting: + try: + self.target_writer.write(data) + await self.target_writer.drain() + except Exception as e: + logging.error(f"Error sending to radio: {e}") + try: self.target_writer.close() + except: pass + self.target_writer = None + except Exception as e: + logging.debug(f"Error receiving from client: {e}") + break + + self._remove_client(writer) + + def _remove_client(self, writer): + addr = None + try: + addr = writer.get_extra_info('peername') + logging.info(f"--- PROXY: Removing client {addr}") + except: + logging.info("--- PROXY: Removing unknown client") + + if writer in self.clients: + self.clients.remove(writer) + try: writer.close() + except: pass diff --git a/src/traceroute.py b/src/traceroute.py new file mode 100644 index 0000000..631ee69 --- /dev/null +++ b/src/traceroute.py @@ -0,0 +1,94 @@ +""" +Traceroute command handling: send traceroute requests and upload TRACEROUTE_APP responses. +""" + +import logging +import os +import threading +import time +from typing import TYPE_CHECKING + +from meshtastic.protobuf import mesh_pb2, portnums_pb2 + +if TYPE_CHECKING: + from src.bot import MeshtasticBot + +logger = logging.getLogger(__name__) + +# Firmware enforces ~30s minimum between traceroutes. We rate-limit client-side to avoid +# sending requests the radio will reject (no ROUTING_APP packet). +TR_MIN_INTERVAL_SEC = int(os.getenv("TR_MIN_INTERVAL_SEC", "30")) +_last_tr_time: float = 0 +_tr_lock = threading.Lock() + +TR_HOPS_LIMIT = int(os.getenv("TR_HOPS_LIMIT", '5')) +if TR_HOPS_LIMIT < 3: + logger.warning(f"TR_HOPS_LIMIT is less than 3, traceroutes are likely to fail. Capping at 3.") + TR_HOPS_LIMIT = 3 +elif TR_HOPS_LIMIT < 5: + logger.warning(f"TR_HOPS_LIMIT is less than 5, traceroutes are likely to fail") + +if TR_HOPS_LIMIT > 7: + logger.warning(f"TR_HOPS_LIMIT is greater than the Meshtastic limit of 7. Capping at 7.") + TR_HOPS_LIMIT = 7 + +def on_traceroute_command(bot: "MeshtasticBot", target_node_id: int, channel_index: int = 0): + """ + Send a traceroute request to the target node. + + Args: + bot: The MeshtasticBot instance + target_node_id: Target node ID (integer, e.g. 1623194643) + channel_index: Channel index (default 0) + """ + global _last_tr_time + + if not bot.interface or not bot.init_complete: + logger.warning("Traceroute: bot not connected, skipping") + return + + with _tr_lock: + now = time.monotonic() + elapsed = now - _last_tr_time + if elapsed < TR_MIN_INTERVAL_SEC: + logger.info( + f"Traceroute: rate limited (target={target_node_id}, " + f"{TR_MIN_INTERVAL_SEC - int(elapsed)}s remaining)" + ) + return + _last_tr_time = now + + try: + # Use sendData directly instead of sendTraceRoute: sendTraceRoute blocks until response + # (or timeout ~2min), causing a backlog when responses are slow/lost. TR responses + # arrive via meshtastic.receive and are handled by bot.on_receive. + r = mesh_pb2.RouteDiscovery() + bot.interface.sendData( + r, + destinationId=target_node_id, + portNum=portnums_pb2.PortNum.TRACEROUTE_APP, + wantResponse=True, + channelIndex=channel_index, + hopLimit=TR_HOPS_LIMIT, + ) + logger.info(f"Traceroute: sent to target={target_node_id}") + except Exception as e: + logger.error(f"Traceroute: failed to send to {target_node_id}: {e}") + + +def setup_traceroute_handler(bot: "MeshtasticBot"): + """ + Subscribe to TRACEROUTE_APP packets and upload them to storage APIs. + + Call this once when the bot is initialized. TRACEROUTE_APP packets + received via meshtastic.receive are already passed to storage_apis in + bot.on_receive, so no extra subscription is needed for upload. + + This function exists for any traceroute-specific setup (e.g. filtering + or logging). The main packet flow is: receive -> on_receive -> storage_apis. + """ + # TRACEROUTE_APP packets are handled by bot.on_receive which forwards + # all packets to storage_apis. No additional subscription needed. + # We could add a dedicated handler here if we needed traceroute-specific + # logic (e.g. only upload TR packets, or different handling). + pass diff --git a/src/ws_client.py b/src/ws_client.py new file mode 100644 index 0000000..8641dce --- /dev/null +++ b/src/ws_client.py @@ -0,0 +1,177 @@ +""" +WebSocket client for receiving commands from the Meshflow API. + +Connects to ws/nodes/?api_key= and invokes callbacks when commands +(e.g. traceroute) are received. +""" + +import asyncio +import json +import logging +from typing import Callable, Optional + +logger = logging.getLogger(__name__) + + +class MeshflowWSClient: + """ + WebSocket client that connects to the Meshflow API node command endpoint. + + Runs in a background thread and invokes callbacks for received commands. + Reconnects with exponential backoff on disconnect. + """ + + def __init__( + self, + ws_url: str, + api_key: str, + on_traceroute: Callable[[int], None], + on_connect: Optional[Callable[[], None]] = None, + on_disconnect: Optional[Callable[[], None]] = None, + ): + """ + Args: + ws_url: Base WebSocket URL (e.g. ws://localhost:8000) + api_key: NodeAPIKey for authentication + on_traceroute: Callback(target_node_id: int) when traceroute command received + on_connect: Optional callback when connected + on_disconnect: Optional callback when disconnected + """ + self.ws_url = ws_url.rstrip("/") + self.api_key = api_key + self.on_traceroute = on_traceroute + self.on_connect = on_connect + self.on_disconnect = on_disconnect + + self._running = False + self._task: Optional[asyncio.Task] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._backoff = 1.0 # Reset on successful connect so reconnects start fast + + def _get_ws_endpoint(self) -> str: + return f"{self.ws_url}/ws/nodes/?api_key={self.api_key}" + + def start(self): + """Start the WebSocket client in a background thread.""" + if self._running: + return + self._running = True + import threading + + def run(): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + try: + self._loop.run_until_complete(self._run()) + finally: + self._loop.close() + + thread = threading.Thread(target=run, daemon=True) + thread.start() + logger.info("MeshflowWSClient: started") + + def stop(self): + """Stop the WebSocket client.""" + self._running = False + if self._loop and self._task: + self._loop.call_soon_threadsafe(self._task.cancel) + + async def _run(self): + """Main loop with reconnection.""" + backoff = 1.0 + max_backoff = 300.0 + + while self._running: + try: + await self._connect_and_receive() + except asyncio.CancelledError: + logger.info("MeshflowWSClient: stopped") + break + except Exception as e: + logger.warning( + f"MeshflowWSClient: connection lost ({type(e).__name__}: {e}). " + f"Reconnecting in {backoff:.0f}s..." + ) + if self.on_disconnect: + try: + self.on_disconnect() + except Exception: + pass + + if not self._running: + break + + await asyncio.sleep(backoff) + backoff = getattr(self, "_backoff", backoff) # Use reset value from successful connect + backoff = min(backoff * 1.5, max_backoff) + + logger.info("MeshflowWSClient: run loop ended") + + async def _connect_and_receive(self): + """Connect to WebSocket and receive messages until disconnect.""" + try: + import websockets + from websockets.exceptions import ConnectionClosed + except ImportError: + raise ImportError("websockets package required. Install with: pip install websockets") + + endpoint = self._get_ws_endpoint() + # Django Channels AllowedHostsOriginValidator requires Origin header. + # Derive from ws_url (e.g. ws://localhost:8000 -> http://localhost:8000) + origin = self.ws_url.replace("ws://", "http://").replace("wss://", "https://") + async with websockets.connect( + endpoint, + origin=origin, + close_timeout=5, + ping_interval=20, + ping_timeout=10, + ) as ws: + self._backoff = 1.0 # Reset so next reconnect starts with short delay + logger.info("MeshflowWSClient: connected") + if self.on_connect: + try: + self.on_connect() + except Exception as e: + logger.warning(f"MeshflowWSClient: on_connect error: {e}") + + while self._running: + try: + msg = await asyncio.wait_for(ws.recv(), timeout=60.0) + except asyncio.TimeoutError: + continue + except ConnectionClosed as e: + code = getattr(getattr(e, "rcvd", None), "code", None) + logger.info(f"MeshflowWSClient: connection closed by server (code={code})") + raise + + try: + data = json.loads(msg) + except json.JSONDecodeError: + logger.warning(f"MeshflowWSClient: invalid JSON: {msg[:100]}") + continue + + cmd_type = data.get("type") + if cmd_type == "traceroute": + target = data.get("target") + if target is not None: + try: + target_id = int(target) + logger.info(f"MeshflowWSClient: received traceroute command, target={target_id}") + # Run in thread so a blocking/long-running TR doesn't block receiving + # further commands (e.g. multiple TRs in quick succession, or TR that never returns) + task = asyncio.create_task(asyncio.to_thread(self.on_traceroute, target_id)) + + def _task_done(t): + if t.cancelled(): + return + exc = t.exception() + if exc: + logger.warning(f"MeshflowWSClient: traceroute task failed: {exc}") + + task.add_done_callback(_task_done) + except (TypeError, ValueError): + logger.warning(f"MeshflowWSClient: invalid traceroute target: {target}") + else: + logger.warning("MeshflowWSClient: traceroute command missing target") + else: + logger.debug(f"MeshflowWSClient: ignored command type: {cmd_type}") diff --git a/test/__init__.py b/test/__init__.py index 1b6ba50..55313a0 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -30,6 +30,7 @@ def assert_message_sent(self, expected_response: str, to: MeshNode, want_ack: bo for call_args in self.mock_interface.sendText.call_args_list: if (call_args[1]['destinationId'] == to.user.id and call_args[1]['wantAck'] == want_ack + and call_args[1].get('hopLimit') == 5 and call_args[0][0].strip() == expected_response): return @@ -43,7 +44,8 @@ def assert_message_sent(self, expected_response: str, to: MeshNode, want_ack: bo self.mock_interface.sendText.assert_called_once_with( expected_response, destinationId=to.user.id, - wantAck=want_ack + wantAck=want_ack, + hopLimit=5, ) def assert_reaction_sent(self, emoji: str, reply_id: int, channel=0, sender_id: str = None): diff --git a/test/commands/test_help.py b/test/commands/test_help.py index db04c36..0fac70d 100644 --- a/test/commands/test_help.py +++ b/test/commands/test_help.py @@ -19,7 +19,7 @@ def test_handle_packet_no_additional_message(self): response = self.mock_interface.sendText.call_args[0][0] - skipped_commands = ['!admin'] + skipped_commands = [] # Ensure every command in CommandFactory is mentioned in the response for command in CommandFactory.commands.keys(): diff --git a/test/commands/test_tr.py b/test/commands/test_tr.py new file mode 100644 index 0000000..2be4fb3 --- /dev/null +++ b/test/commands/test_tr.py @@ -0,0 +1,93 @@ +import unittest +from unittest.mock import MagicMock, call +from src.commands.tr import TracerouteCommand +from test.commands import CommandTestCase +from test.test_setup_data import build_test_text_packet + +class TestTracerouteCommand(CommandTestCase): + command: TracerouteCommand + + def setUp(self): + super().setUp() + self.command = TracerouteCommand(bot=self.bot) + # Mock sendTraceRoute since it's used in handle_packet + self.bot.interface.sendTraceRoute = MagicMock() + + def test_handle_packet_basic(self): + # !tr from node 1 + sender_id = self.test_nodes[1].user.id + packet = build_test_text_packet('!tr', sender_id, self.bot.my_id) + packet['hopStart'] = 3 + packet['hopLimit'] = 2 + # Ensure we know the SNR for the test + packet['rxSnr'] = 5.5 + + self.command.handle_packet(packet) + + # Check starting message sent to sender + expected_msg = f"{self.test_nodes[1].user.long_name} you are 1 hops away (Signal: 5.5 dB). Starting full traceroute..." + self.mock_interface.sendText.assert_any_call(expected_msg, destinationId=sender_id, wantAck=True) + + # Check sendTraceRoute called for sender + self.bot.interface.sendTraceRoute.assert_called_once_with(sender_id, hopLimit=7) + + # Check pending_traces entry + self.assertEqual(self.bot.pending_traces[sender_id], [sender_id]) + + def test_handle_packet_zero_hops(self): + sender_id = self.test_nodes[1].user.id + packet = build_test_text_packet('!tr', sender_id, self.bot.my_id) + packet['hopStart'] = 3 + packet['hopLimit'] = 3 + + self.command.handle_packet(packet) + + # Check zero hops message + expected_msg = f"{self.test_nodes[1].user.long_name} you are Zero Hops from me. No traceroute required!" + self.mock_interface.sendText.assert_any_call(expected_msg, destinationId=sender_id, wantAck=True) + self.bot.interface.sendTraceRoute.assert_not_called() + + def test_handle_packet_to_specific_node(self): + # Requester is node 1, Target is node 2 + requester_id = self.test_nodes[1].user.id + target_node = self.test_nodes[2] + target_short = target_node.user.short_name + + packet = build_test_text_packet(f'!tr {target_short}', requester_id, self.bot.my_id) + + self.command.handle_packet(packet) + + expected_msg = f"Starting traceroute to {target_node.user.long_name} ({target_node.user.id}) for you..." + self.mock_interface.sendText.assert_any_call(expected_msg, destinationId=requester_id, wantAck=True) + + self.bot.interface.sendTraceRoute.assert_called_once_with(target_node.user.id, hopLimit=7) + self.assertEqual(self.bot.pending_traces[target_node.user.id], [requester_id]) + + def test_handle_packet_unknown_shortname(self): + requester_id = self.test_nodes[1].user.id + packet = build_test_text_packet('!tr NONEXIST', requester_id, self.bot.my_id) + + self.command.handle_packet(packet) + + expected_msg = "Could not find node with short name 'NONEXIST'" + self.mock_interface.sendText.assert_any_call(expected_msg, destinationId=requester_id, wantAck=True) + self.bot.interface.sendTraceRoute.assert_not_called() + + def test_handle_packet_to_self(self): + # Bot's ID is typically !00000001 in test setup + requester_id = self.test_nodes[1].user.id + # We need the bot's short name if we want to test by shortname, + # but the command specifically checks against self.bot.my_id. + # Let's find a way to trigger the "I am already here" message. + + # Manually find/set a short name for the bot if needed, or just use words[1] + self.bot.get_node_by_short_name = MagicMock(return_value=MagicMock(id=self.bot.my_id, long_name="Bot")) + + packet = build_test_text_packet('!tr BOT', requester_id, self.bot.my_id) + self.command.handle_packet(packet) + + expected_msg = "I am already here! No traceroute required." + self.mock_interface.sendText.assert_any_call(expected_msg, destinationId=requester_id, wantAck=True) + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_base_feature.py b/test/test_base_feature.py index c0910d9..57ce3ef 100644 --- a/test/test_base_feature.py +++ b/test/test_base_feature.py @@ -21,23 +21,30 @@ def test_reply_in_channel(self): sender = self.test_non_admin_nodes[1] packet = build_test_text_packet('!test', sender.user.id, self.bot.my_id, channel=1) self.feature.reply_in_channel(packet, "Test message") - self.mock_interface.sendText.assert_called_once_with("Test message", channelIndex=1, wantAck=False) + self.mock_interface.sendText.assert_called_once_with( + "Test message", channelIndex=1, wantAck=False, hopLimit=5 + ) def test_message_in_channel(self): self.feature.message_in_channel(1, "Test message") - self.mock_interface.sendText.assert_called_once_with("Test message", channelIndex=1, wantAck=False) + self.mock_interface.sendText.assert_called_once_with( + "Test message", channelIndex=1, wantAck=False, hopLimit=5 + ) def test_reply_in_dm(self): sender = self.test_non_admin_nodes[1] packet = build_test_text_packet('!test', sender.user.id, self.bot.my_id) self.feature.reply_in_dm(packet, "Test message") - self.mock_interface.sendText.assert_called_once_with("Test message", destinationId=sender.user.id, wantAck=False) + self.mock_interface.sendText.assert_called_once_with( + "Test message", destinationId=sender.user.id, wantAck=False, hopLimit=5 + ) def test_message_in_dm(self): sender = self.test_non_admin_nodes[1] self.feature.message_in_dm(sender.user.id, "Test message") - self.mock_interface.sendText.assert_called_once_with("Test message", destinationId=sender.user.id, - wantAck=False) + self.mock_interface.sendText.assert_called_once_with( + "Test message", destinationId=sender.user.id, wantAck=False, hopLimit=5 + ) def test_react_in_channel(self): sender = self.test_non_admin_nodes[1] diff --git a/test/test_bot.py b/test/test_bot.py index cadbb00..d86c315 100644 --- a/test/test_bot.py +++ b/test/test_bot.py @@ -15,8 +15,8 @@ def setUp(self): def test_connect(self, mock_pub): self.bot.connect() self.bot.interface.connect.assert_called_once() + mock_pub.subscribe.assert_any_call(self.bot.on_receive, "meshtastic.receive") mock_pub.subscribe.assert_any_call(self.bot.on_receive_text, "meshtastic.receive.text") - mock_pub.subscribe.assert_any_call(self.bot.on_receive_user, "meshtastic.receive.user") mock_pub.subscribe.assert_any_call(self.bot.on_node_updated, "meshtastic.node.updated") mock_pub.subscribe.assert_any_call(self.bot.on_connection, "meshtastic.connection.established") diff --git a/test/test_tcp_proxy.py b/test/test_tcp_proxy.py new file mode 100644 index 0000000..d8a65a6 --- /dev/null +++ b/test/test_tcp_proxy.py @@ -0,0 +1,48 @@ +import unittest +import asyncio +from unittest.mock import MagicMock, AsyncMock, patch +from src.tcp_proxy import TcpProxy + +class TestTcpProxy(unittest.TestCase): + def setUp(self): + self.proxy = TcpProxy("127.0.0.1", 4403, "127.0.0.1", 4404) + + def test_status_fields(self): + status = self.proxy.get_status() + self.assertIn("Offline", status) + + self.proxy.running = True + self.proxy.target_writer = MagicMock() + status = self.proxy.get_status() + self.assertEqual(status["state"], "Online") + self.assertEqual(status["clients"], 0) + + def test_remove_client(self): + mock_writer = MagicMock() + mock_writer.get_extra_info.return_value = ("127.0.0.1", 12345) + + self.proxy.clients.add(mock_writer) + self.proxy._remove_client(mock_writer) + + self.assertEqual(len(self.proxy.clients), 0) + mock_writer.close.assert_called_once() + + @patch('asyncio.start_server', new_callable=AsyncMock) + def test_async_run_binds_server(self, mock_start_server): + async def run_test(): + self.proxy.running = True + + # Cancel the watchdog and connection manager immediately to avoid hang + async def stop_soon(): + await asyncio.sleep(0.1) + self.proxy.running = False + + asyncio.create_task(stop_soon()) + await self.proxy._async_run() + + mock_start_server.assert_called_once() + + asyncio.run(run_test()) + +if __name__ == "__main__": + unittest.main()