From 1ce47d07e3761688c632b4c6d63a1905ea524bda Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Mon, 2 Feb 2026 12:10:37 -0800 Subject: [PATCH 01/17] update to modern python standards and add http endpoints --- .bumpversion.cfg | 4 +- .editorconfig | 25 ++ .gitignore | 159 ++++++++++- CONTRIBUTING.md | 113 ++++++++ MANIFEST.in | 1 - README.md | 369 ++++++++++++++++++++++++++ README.rst | 130 --------- docs/api/clients.md | 40 +++ docs/api/models.md | 33 +++ docs/http_api.md | 6 + docs/index.md | 42 +++ docs/openapi.yaml | 83 ++++++ docs/socket_api.md | 65 +++++ meter_reader/__init__.py | 3 - meter_reader/client.py | 64 ----- meter_reader/gateway.py | 158 ----------- meter_reader/utc.py | 24 -- mkdocs.yml | 80 ++++++ pyproject.toml | 88 +++++++ setup.cfg | 2 - setup.py | 47 ---- src/meter_reader/__init__.py | 20 ++ src/meter_reader/clients/__init__.py | 5 + src/meter_reader/clients/base.py | 33 +++ src/meter_reader/clients/http.py | 119 +++++++++ src/meter_reader/clients/socket.py | 270 +++++++++++++++++++ src/meter_reader/main.py | 235 +++++++++++++++++ src/meter_reader/models.py | 83 ++++++ src/meter_reader/py.typed | 0 src/meter_reader/utils.py | 39 +++ tests/__init__.py | 3 + tests/test_http_client.py | 231 ++++++++++++++++ tests/test_models.py | 217 +++++++++++++++ tests/test_parser.py | 50 ++++ tests/test_socket_client.py | 377 +++++++++++++++++++++++++++ tests/test_utils.py | 116 +++++++++ 36 files changed, 2899 insertions(+), 435 deletions(-) create mode 100644 .editorconfig create mode 100644 CONTRIBUTING.md delete mode 100644 MANIFEST.in create mode 100644 README.md delete mode 100644 README.rst create mode 100644 docs/api/clients.md create mode 100644 docs/api/models.md create mode 100644 docs/http_api.md create mode 100644 docs/index.md create mode 100644 docs/openapi.yaml create mode 100644 docs/socket_api.md delete mode 100644 meter_reader/__init__.py delete mode 100644 meter_reader/client.py delete mode 100644 meter_reader/gateway.py delete mode 100644 meter_reader/utc.py create mode 100644 mkdocs.yml create mode 100644 pyproject.toml delete mode 100644 setup.cfg delete mode 100644 setup.py create mode 100644 src/meter_reader/__init__.py create mode 100644 src/meter_reader/clients/__init__.py create mode 100644 src/meter_reader/clients/base.py create mode 100644 src/meter_reader/clients/http.py create mode 100644 src/meter_reader/clients/socket.py create mode 100644 src/meter_reader/main.py create mode 100644 src/meter_reader/models.py create mode 100644 src/meter_reader/py.typed create mode 100644 src/meter_reader/utils.py create mode 100644 tests/__init__.py create mode 100644 tests/test_http_client.py create mode 100644 tests/test_models.py create mode 100644 tests/test_parser.py create mode 100644 tests/test_socket_client.py create mode 100644 tests/test_utils.py diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b36784f..8d84b6f 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,6 +1,6 @@ [bumpversion] -current_version = 1.1.2 -files = setup.py meter_reader/__init__.py README.rst +current_version = 2.0.0 +files = pyproject.toml src/meter_reader/__init__.py README.md commit = True tag = True diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..8b4474b --- /dev/null +++ b/.editorconfig @@ -0,0 +1,25 @@ +# EditorConfig helps maintain consistent coding styles across different editors +# and IDEs. See https://editorconfig.org + +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.py] +indent_style = space +indent_size = 4 +max_line_length = 100 + +[*.{json,yml,yaml}] +indent_style = space +indent_size = 2 + +[Makefile] +indent_style = tab + +[*.md] +trim_trailing_whitespace = false diff --git a/.gitignore b/.gitignore index 7efdec4..759ef8d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,157 @@ -__pycache__ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class *.pyc -build -dist +*.pyo + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg MANIFEST -meter_reader.egg-info + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# Poetry +poetry.lock + +# pdm +.pdm.toml + +# PEP 582 +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ +.project +.pydevproject +.settings/ + +# OS-specific +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..674d0a3 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,113 @@ +# Contributing to Meter Reader + +Thank you for considering contributing to Meter Reader! This document provides guidelines and instructions for contributing. + +## Code of Conduct + +Please be respectful and constructive in all interactions. + +## How to Contribute + +### Reporting Bugs + +Before creating a bug report, please check the issue list as you might find out that you don't need to create one. When creating a bug report, include: + +- **Clear description** of what the bug is +- **Steps to reproduce** the problem +- **Expected behavior** vs actual behavior +- **Python version** and **gateway model** +- **Relevant output** (error messages, debug logs) + +### Suggesting Enhancements + +Enhancement suggestions are tracked as GitHub issues. When suggesting an enhancement: + +- **Use a clear title** +- **Provide a step-by-step description** +- **Provide examples** to demonstrate the enhancement +- **Explain why** this enhancement would be useful + +### Pull Requests + +1. Fork the repository and create a branch from `main` +2. Make your changes +3. Run tests and linting checks +4. Write clear commit messages +5. Push to your fork and submit a pull request + +## Development Setup + +```bash +# Clone your fork +git clone https://github.com/YOUR_USERNAME/meter_reader.git +cd meter_reader + +# Create virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install in development mode +pip install -e ".[dev]" +``` + +## Running Tests + +```bash +pytest tests/ +``` + +## Code Style + +This project uses: + +- **black** for code formatting (100 character line length) +- **isort** for import sorting +- **mypy** for type checking + +Format your code: + +```bash +black src/ tests/ +isort src/ tests/ +``` + +Check types: + +```bash +mypy src/ +``` + +## Commit Messages + +Write clear, concise commit messages: + +``` +Capitalize the first line, keep it under 50 characters +Blank line +More detailed explanation if needed, wrapped at 72 characters. +``` + +Example: + +``` +Fix socket timeout handling in gateway communication + +Previously, socket.makefile() could block indefinitely if the +gateway sent incomplete data. Now using socket.recv() with +proper timeout support. +``` + +## Documentation + +- Update README.md if adding/changing features +- Add docstrings to new functions and classes +- Include type hints in all code +- Update this CONTRIBUTING.md if needed + +## License + +By contributing, you agree that your contributions will be licensed under the BSD 2-Clause License. + +## Questions? + +Feel free to open an issue or discussion for questions about contributing. diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index bb37a27..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1 +0,0 @@ -include *.rst diff --git a/README.md b/README.md new file mode 100644 index 0000000..728d57b --- /dev/null +++ b/README.md @@ -0,0 +1,369 @@ +# Meter Reader + +[![License: BSD 2-Clause](https://img.shields.io/badge/License-BSD%202--Clause-orange.svg)](LICENSE.rst) +[![Python 3.13+](https://img.shields.io/badge/python-3.13+-blue.svg)](https://www.python.org/downloads/) + +Meter Reader is a client library and command-line tool for retrieving near-realtime energy usage data from a smart meter via the **Eagle™ Home Energy Gateway**. See [Rainforest™ Automation](http://www.rainforestautomation.com) for more information about the Eagle™ Home Energy Gateway. + +**Disclaimer**: Meter Reader is not affiliated with the Eagle™ Home Energy Gateway or Rainforest™ Automation. + +## Features + +- 📊 **Library API** - Use as a Python library in your own applications +- 🖥️ **CLI Tool** - Command-line interface for querying gateway data +- 📡 **Multiple Commands** - Support for device data, instantaneous demand, and historical values +- 📋 **Multiple Formats** - Output in table, JSON, or CSV format +- 🔐 **Type-Safe** - Full type hints for better IDE support and type checking +- 🚀 **Modern Packaging** - Built with pyproject.toml and PEP 517 compliance +- 🏠 **No Dependencies** - Pure Python standard library, no external dependencies + +## Installation + +### From PyPI + +```bash +pip install meter-reader +``` + +### From Source + +```bash +git clone https://github.com/eman/meter_reader.git +cd meter_reader +pip install -e . +``` + +## Quick Start + +### Command-Line Usage + +Get help: +```bash +mr --help +``` + +List devices on gateway: +```bash +mr list 192.168.1.10 +``` + +Get current power demand: +```bash +mr instantaneous 192.168.1.10 +``` + +Get device data: +```bash +mr device-data 192.168.1.10 +``` + +Get summation values for the past day: +```bash +mr summation 192.168.1.10 --interval day +``` + +### Library Usage + +```python +from meter_reader import Gateway + +# Connect to gateway +gateway = Gateway('192.168.1.10') + +# Get instantaneous demand +timestamp, demand_kw = gateway.get_instantaneous_demand() +print(f'Current demand: {demand_kw}kW at {timestamp}') + +# Get device data +response = gateway.run_command(Name='get_device_data') +print(response['CurrentSummation']) + +# Get summation values +data = gateway.run_command( + Name='get_summation_values', + Interval='day' +) +for entry in data: + print(f"{entry['TimeStamp']}: {entry['SummationDelivered']}") +``` + +## CLI Commands + +All commands support the following options: + +- `-t, --timeout SECONDS` - Socket timeout (default: 5) +- `-f, --output-format {table,json,csv}` - Output format (default: table) +- `-r, --raw` - Display raw XML response +- `--debug` - Enable debug logging + +### list + +List all devices on the gateway: +```bash +mr list 192.168.1.10 +mr list 192.168.1.10 --output-format json +mr list 192.168.1.10 --raw +``` + +### device-data + +Get device data from the gateway: +```bash +mr device-data 192.168.1.10 +mr device-data 192.168.1.10 --output-format json +``` + +### instantaneous + +Get current instantaneous power demand: +```bash +mr instantaneous 192.168.1.10 +mr instant 192.168.1.10 # short alias +mr instantaneous 192.168.1.10 --output-format json +``` + +Output: +``` +2024-02-01 21:58:39+00:00, 0.292kW +``` + +### summation + +Get summation values for a time period: +```bash +mr summation 192.168.1.10 --interval day +mr summation 192.168.1.10 --interval hour --frequency 900 +mr summation 192.168.1.10 --interval week --output-format csv +``` + +Options: +- `-i, --interval {hour,day,week}` - Time interval (default: day) +- `--frequency SECONDS` - Seconds between samples +- `-s, --start-time TIME` - Start time +- `-e, --end-time TIME` - End time +- `-d, --duration SECONDS` - Duration in seconds + +### demand + +Get demand values for a time period: +```bash +mr demand 192.168.1.10 --interval day +mr demand 192.168.1.10 --interval week --output-format json +``` + +Options same as `summation` command. + +## Output Formats + +### Table (default) + +``` +MessageCluster + DeviceMacId xx:xx:xx:xx:xx:xx:xx:xx + MeterMacId xx:xx:xx:xx:xx:xx:xx + TimeStamp 2024-02-01 21:58:39+00:00 +``` + +### JSON + +```bash +mr device-data 192.168.1.10 --output-format json +``` + +```json +{ + "MessageCluster": { + "DeviceMacId": "xx:xx:xx:xx:xx:xx:xx:xx", + "MeterMacId": "xx:xx:xx:xx:xx:xx:xx", + "TimeStamp": "2024-02-01 21:58:39+00:00" + } +} +``` + +### CSV + +```bash +mr summation 192.168.1.10 --output-format csv +``` + +``` +MessageCluster.DeviceMacId,xx:xx:xx:xx:xx:xx:xx:xx +MessageCluster.TimeStamp,2024-02-01 21:58:39+00:00 +``` + +## API Reference + +### Gateway + +```python +from meter_reader import Gateway + +gateway = Gateway(address, port=5002, timeout=5) +``` + +**Parameters:** +- `address` (str): Gateway IP address or hostname +- `port` (int): Gateway port (default: 5002) +- `timeout` (int): Socket timeout in seconds (default: 5) + +**Methods:** + +#### `run_command(**kwargs)` + +Send a command to the gateway and return parsed response. + +```python +response = gateway.run_command(Name='list_devices') +response = gateway.run_command( + Name='get_summation_values', + Interval='day' +) +``` + +**Parameters:** +- `Name` (str): Command name +- `convert` (bool): Convert data types (default: True) +- Other parameters depending on command + +**Returns:** dict or list of dicts + +**Raises:** `GatewayError` on communication failure + +#### `run_command_raw(**kwargs)` + +Send a command and return raw XML response. + +```python +raw_xml = gateway.run_command_raw(Name='get_device_data') +``` + +#### `get_instantaneous_demand()` + +Get current power demand. + +```python +timestamp, demand_kw = gateway.get_instantaneous_demand() +``` + +**Returns:** Tuple of (datetime, float) + +### GatewayError + +Exception raised when gateway communication fails. + +```python +from meter_reader import GatewayError + +try: + gateway = Gateway('192.168.1.10') +except GatewayError as e: + print(f'Connection failed: {e}') +``` + +## Supported Commands + +The following commands are supported by the gateway: + +- `list_devices` - List all devices +- `get_device_data` - Get device data +- `get_instantaneous_demand` - Get current demand +- `get_demand_values` - Get demand history +- `get_summation_values` - Get summation history +- `get_fast_poll_status` - Get fast poll status + +## Troubleshooting + +### Connection Timeout + +If you get a timeout error, try increasing the timeout value: + +```bash +mr device-data 192.168.1.10 --timeout 10 +``` + +Or in code: + +```python +gateway = Gateway('192.168.1.10', timeout=10) +``` + +### No Devices Found + +If the gateway doesn't have any devices connected, you may get an error. Check that your gateway is properly configured and devices are paired. + +### Raw XML Response + +To see the raw response from the gateway for debugging: + +```bash +mr device-data 192.168.1.10 --raw +``` + +### Debug Logging + +Enable debug logging to see detailed communication: + +```bash +mr --debug device-data 192.168.1.10 +``` + +## Development + +### Setup Development Environment + +```bash +git clone https://github.com/eman/meter_reader.git +cd meter_reader +pip install -e ".[dev]" +``` + +### Running Tests + +```bash +pytest +``` + +### Type Checking + +```bash +mypy src/ +``` + +### Code Formatting + +```bash +black src/ tests/ +isort src/ tests/ +``` + +## Changes from v1.x + +Version 2.0 includes significant improvements: + +- **Refactored CLI** with subcommands instead of options +- **Multiple output formats** (table, JSON, CSV) +- **Better error handling** with logging support +- **Modern packaging** with pyproject.toml +- **Improved documentation** with examples +- **Bug fix** for socket handling with proper timeout support +- **Removed OrderedDict** usage (Python 3.7+ dict ordering) +- **Full type hints** throughout codebase + +## License + +BSD 2-Clause License - See [LICENSE.rst](LICENSE.rst) for details. + +Copyright © 2017-2026 Emmanuel Levijarvi + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +## See Also + +- [Rainforest Automation](http://www.rainforestautomation.com) +- [Eagle Energy Gateway](https://www.rainforestautomation.com/rfa/) + +## Support + +For issues, questions, or suggestions, please open an issue on [GitHub](https://github.com/eman/meter_reader/issues). diff --git a/README.rst b/README.rst deleted file mode 100644 index 3ec1283..0000000 --- a/README.rst +++ /dev/null @@ -1,130 +0,0 @@ -Meter Reader -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. image:: https://api.codacy.com/project/badge/Grade/06a4909b69f947c4a70b9938ff0fc942 - :alt: Codacy Badge - :target: https://www.codacy.com/app/emansl/meter_reader?utm_source=github.com&utm_medium=referral&utm_content=eman/meter_reader&utm_campaign=badger -Release 1.1.2 - -Meter Reader is a client library and command line client for retrieving -nearly realtime energy usage data from a smart meter via the Eagle™ Home -Energy Gateway. See -`Rainforest™ Automation `_ for more -about the Eagle™ Home Energy Gateway. - -Meter Reader is not affiliated with the Eagle™ Home Energy Gateway or -Rainforest™ Automation. - -Installation -------------------------------------------------------------------------------- - -.. code-block:: bash - - $ pip install meter-reader - -Usage -------------------------------------------------------------------------------- -Meter Reader is intended to be used as a library for other applications -but it does contain a command line application called mr. - -.. code-block:: bash - - $ mr < ip address > - -This will run the ``list_devices`` devices command on the gateway and display -a formatted response. Other commands, such as ``get_device_data``, will first -run the ``list_devices`` command to determine the MAC address of the gateway. - -Commands can be specified with the `'-c'` option. For example - -.. code-block:: bash - - $ mr -c get_device_data < ip address > - - MessageCluster - DeviceMacId xx:xx:xx:xx:xx:xx:xx:xx - MeterMacId xx:xx:xx:xx:xx:xx:xx - TimeStamp 0 - Id 0 - Priority None - Text None - ConfirmationRequired N - Confirmed N - Read Y - Queue active - CurrentSummation - DeviceMacId xx:xx:xx:xx:xx:xx:xx:xx - MeterMacId xx:xx:xx:xx:xx:xx:xx - TimeStamp 2014-04-19 16:01:22+00:00 - SummationDelivered 12949746 - SummationReceived 0 - Multiplier 1 - Divisor 1000 - DigitsRight 3 - DigitsLeft 15 - SuppressLeadingZero Y - NetworkInfo - ... - - $ mr -c get_summation_values < ip address > - - 2014-04-18 16:30:00+00:00, Summation, 0.350 - 2014-04-18 17:30:00+00:00, Summation, 0.322 - 2014-04-18 18:30:00+00:00, Summation, 0.193 - 2014-04-18 19:30:00+00:00, Summation, 0.285 - 2014-04-18 20:30:00+00:00, Summation, 0.286 - 2014-04-18 21:30:00+00:00, Summation, 0.351 - ... - -There are two ways to retrieve instantaneous demand: - -1. Send the ``get_instantaneous_demand`` command directly to the gateway. This - will return a nearly raw response from the gateway (formatting is applied). - -.. code-block:: bash - - $ mr -c get_instantaneous_demand < ip address > - - InstantaneousDemand - DeviceMacId xx:xx:xx:xx:xx:xx:xx:xx - MeterMacId xx:xx:xx:xx:xx:xx:xx - TimeStamp 2014-04-19 15:35:27+00:00 - Demand 297 - Multiplier 1 - Divisor 1000 - DigitsRight 3 - DigitsLeft 15 - SuppressLeadingZero Y - -2. Supply the ``--get-instant-demand`` argument. This will post-process the -response before displaying it. - -.. code-block:: bash - - $ mr --get-instant-demand < ip address > - - 2014-04-19 15:58:39+00:00, 0.292kW - -Raw and unformatted data returned by the gatway, can be viewed by using the -`'-r'` option. - -.. code-block:: bash - - $ mr -r -c get_device_data < ip address > - -Including Meter Reader in an application -------------------------------------------------------------------------------- - -.. code-block:: python - - from meter_reader import Gateway - - GATEWAY_ADDRESS = '192.168.1.10' - - gw = Gateway(GATEWAY_ADDRESS) - response = gw.run_command('get_device_data') - print('Network Info') - print(response['NetworkInfo']) - - timestamp, demand = gw.get_instantaneous_demand() - print('Demand {0!s} at {1!s}'.format(demand, timestamp)) diff --git a/docs/api/clients.md b/docs/api/clients.md new file mode 100644 index 0000000..d106d62 --- /dev/null +++ b/docs/api/clients.md @@ -0,0 +1,40 @@ +# Clients API + +The `meter_reader` library provides two client implementations for the Eagle Gateway. + +## Base Client + +::: meter_reader.clients.base.EagleClient + handler: python + options: + members: + - get_instantaneous_demand + - get_usage_data + - list_devices + - get_network_info + - get_current_summation + +## Socket Client + +The `EagleSocketClient` communicates via the local XML API on port 5002. This is the traditional method for most integrations. + +::: meter_reader.clients.socket.EagleSocketClient + handler: python + options: + members: + - __init__ + - list_devices + - get_instantaneous_demand + - get_history_data + +## HTTP Client + +The `EagleHttpClient` communicates via the local web interface API (`cgi_manager`) on port 80. This method uses JSON responses where possible and supports username/password authentication. + +::: meter_reader.clients.http.EagleHttpClient + handler: python + options: + members: + - __init__ + - get_usage_data + - list_devices diff --git a/docs/api/models.md b/docs/api/models.md new file mode 100644 index 0000000..a343fc8 --- /dev/null +++ b/docs/api/models.md @@ -0,0 +1,33 @@ +# Data Models + +The library uses Pydantic models to ensure type-safe and consistent data structures across both Socket and HTTP clients. + +## Response Models + +::: meter_reader.models.DeviceInfo + handler: python + +::: meter_reader.models.DeviceList + handler: python + +::: meter_reader.models.InstantaneousDemand + handler: python + options: + members: + - panic_demand + +::: meter_reader.models.CurrentSummation + handler: python + options: + members: + - delivered_kwh + - received_kwh + +::: meter_reader.models.UsageData + handler: python + options: + members: + - timestamp + +::: meter_reader.models.NetworkInfo + handler: python diff --git a/docs/http_api.md b/docs/http_api.md new file mode 100644 index 0000000..5202289 --- /dev/null +++ b/docs/http_api.md @@ -0,0 +1,6 @@ +# HTTP API (OpenAPI) + +!!! note "Interactive Documentation" + The following documentation is interactive. You can explore the request/response structures directly. + +!!swagger openapi.yaml!! diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..6077d9f --- /dev/null +++ b/docs/index.md @@ -0,0 +1,42 @@ +# EAGLE Meter Reader + +Welcome to the documentation for the `meter_reader` library. + +## Introduction + +This library provides a Python interface to the **Rainforest EAGLE 200 Gateway**. It allows you to retrieve real-time energy usage data, historical consumption, and network information from your smart meter via the gateway. + +## Features + +* **Real-time Data**: Fetch instantaneous demand and summation. +* **Historical Data**: Retrieve past usage logs. +* **Device Management**: Configure settings (price, cloud provider, etc.). +* **Two Interfaces**: Support for both the robust Socket API (legacy) and the modern HTTP API (JSON). + +## Installation + +```bash +pip install meter_reader +``` + +## Quick Start + +```python +from meter_reader import EagleSocketClient + +client = EagleSocketClient("10.0.0.247") +data = client.get_instantaneous_demand() +print(f"Current Demand: {data.panic_demand} kW") +``` + +## CLI Usage + +The library includes a command-line interface (`mr`) powered by Typer. + +```bash +# Get demand via Socket API (default) +mr demand 10.0.0.247 + +# Get usage via HTTP API +mr usage 10.0.0.247 --protocol http --username --password +``` diff --git a/docs/openapi.yaml b/docs/openapi.yaml new file mode 100644 index 0000000..1a851b5 --- /dev/null +++ b/docs/openapi.yaml @@ -0,0 +1,83 @@ +openapi: 3.0.0 +info: + title: EAGLE Gateway Local HTTP API + description: | + Documentation for the local HTTP interface of the Rainforest EAGLE gateway. + While the official API uses a socket connection on port 5002, this interface allows for similar functionality over HTTP POST requests using XML payloads and JSON responses. + version: 1.0.0 +servers: + - url: http://{host} + description: Local EAGLE Gateway + variables: + host: + default: eagle-000000.local + description: Hostname or IP address of the gateway + +components: + securitySchemes: + basicAuth: + type: http + scheme: basic + +security: + - basicAuth: [] + +paths: + /cgi-bin/cgi_manager: + post: + summary: Execute Local Commands + description: | + Primary endpoint for retrieving meter data and device information. + Requests are sent as XML wrapped in `...`. + Responses are typically JSON. + requestBody: + required: true + content: + text/xml: + schema: + type: string + example: | + + get_usage_data + 0xd8d5b90000000cee + + responses: + '200': + description: Successful response + content: + text/html: # The content-type might be text/html but body is JSON + schema: + type: object # It returns a JSON object + example: + demand: "1.234" + demand_units: "kW" + summation_delivered: "12345.678" + meter_status: "Connected" + + /cgi-bin/post_manager: + post: + summary: Execute System Configuration Commands + description: | + Endpoint for system-level configuration such as mDNS, Remote Management, and Cloud settings. + Requests are sent as XML wrapped in `...` (often with `JSON`). + requestBody: + required: true + content: + text/xml: + schema: + type: string + example: | + + get_mdns_status + JSON + + responses: + '200': + description: Successful response + content: + application/json: + schema: + type: object + example: + mDnsStatus: + Enabled: "Y" diff --git a/docs/socket_api.md b/docs/socket_api.md new file mode 100644 index 0000000..065e14f --- /dev/null +++ b/docs/socket_api.md @@ -0,0 +1,65 @@ +# Socket API (Port 5002) + +## Overview + +The primary method for communicating with the Rainforest EAGLE gateway is via a TCP socket on port **5002**. This interface allows you to send XML commands and receive XML responses in a persistent session. + +## Connection Details + +* **Port**: 5002 +* **Protocol**: TCP +* **Authentication**: Basic Auth (HTTP-style headers required initially or possibly implied by credentials in command - verify with `gateway.py` implementation, actually `gateway.py` sends `set_auth` or uses local credentials if enabled? No, wait. The script sends XML directly. The `gateway.py` implementation just opens a socket and sends data. But wait, `EAGLE_REST_API` mentions Basic Auth for HTTP, but for Socket? Let's check `gateway.py` again. `_create_socket` just connects. The commands include `` and `` within the XML payload if required by device settings, but typically the local API is open or uses `set_auth`.) + * *Correction*: The `gateway.py` implementation sends user/pass in the XML body for some commands, or relies on the session state. + +## Command Structure + +Commands are sent as XML fragments. The root element is typically ``. + +### Request Example + +```xml + + get_usage_data + 0xd8d5b90000000cee + +``` + +### Response Example + +```xml + + 1.234 + kW + 12345.678 + Connected + +``` + +## Supported Commands + +The following commands have been verified to work on the EAGLE 200 (Firmware 1.4.48): + +* **`get_device_list`** + * Returns list of paired devices (meters). +* **`get_device_data`** (mapped to `get_instantaneous_demand` in library) + * Returns real-time demand. +* **`get_usage_data`** + * Returns current usage details. +* **`get_network_info`** + * Returns ZigBee network status, channel, and link strength. +* **`get_history_data`** + * Returns historical data (summation, demand) for specified time periods. + * *Note*: Response can be large and nested. + +## Unsupported Commands + +The following commands are documented in `EAGLE_REST_API-1.0.pdf` but were **rejected** by the device during testing: + +* `get_price` +* `get_message` +* `get_current_summation` (use `get_history_data`) +* `set_price` (via socket - use HTTP for reliable setting) + +## Libraries + +* **Python**: `meter_reader` (this library) provides a convenient wrapper around this socket API. diff --git a/meter_reader/__init__.py b/meter_reader/__init__.py deleted file mode 100644 index 3aac938..0000000 --- a/meter_reader/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from meter_reader.gateway import Gateway, GatewayError, COMMANDS - -__version__ = "1.1.2" diff --git a/meter_reader/client.py b/meter_reader/client.py deleted file mode 100644 index 8d71199..0000000 --- a/meter_reader/client.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -Meter Reader - -:copyright: (c) 2017 by Emmanuel Levijarvi -:license: BSD 2-Clause -""" - -from __future__ import print_function -import sys -import argparse -from meter_reader.gateway import Gateway, GatewayError, COMMANDS -from meter_reader import __version__ - - -def display(output): - if isinstance(output, list): - sys.stdout.write(', '.join(output[0]) + '\n') - [print(', '.join([str(v) for v in i.values()])) for i in output] - else: - keywidth = 0 - for section in output: - for key in output[section]: - if len(key) > keywidth: - keywidth = len(key) - for section in output: - print(section) - for key, value in list(output[section].items()): - print(' ' * 3, key.ljust(keywidth, ' '), value) - - -def main(): - parser = argparse.ArgumentParser( - prog='mr', description="Get data from Eagle Energy Gateway") - parser.add_argument('address', help='Eagle Engergy Gateway address') - parser.add_argument('-V', '--version', action='version', - version='mr {0}'.format(__version__)) - parser.add_argument('-r', '--raw', help='Display Raw, unparsed, response ' - 'from the Gateway', action='store_true') - parser.add_argument('-c', '--command', help='Command to send to gateway. ' - 'Available commands: {0}'.format(", ".join(COMMANDS)), - default='GET_DEVICE_DATA', dest='Name') - parser.add_argument('-i', '--interval', help='Total time period for ' - 'which samples are being requested. hour | day | week') - parser.add_argument('-f', '--frequency', help="Requested number of seconds" - " between samples.", type=int) - parser.add_argument('-s', '--start-time', dest='StartTime') - parser.add_argument('-e', '--end-time', dest="EndTime") - parser.add_argument('-d', '--duration', type=int) - parser.add_argument('--get-instant-demand', action='store_true') - args = parser.parse_args() - try: - gw = Gateway(args.address) - except GatewayError as e: - sys.stderr.write(str(e) + '\n') - sys.exit(1) - sys.stderr.write('\n') - if args.get_instant_demand: - ts, demand = (gw.get_instantaneous_demand()) - print("{0!s}, {1!s}kW".format(ts, demand)) - sys.exit(0) - if args.raw: - print(gw.run_command_raw(**vars(args))) - else: - display(gw.run_command(**vars(args))) diff --git a/meter_reader/gateway.py b/meter_reader/gateway.py deleted file mode 100644 index 7161e85..0000000 --- a/meter_reader/gateway.py +++ /dev/null @@ -1,158 +0,0 @@ -""" -Meter Reader - -Copyright (c) 2017, Emmanuel Levijarvi -License: BSD 2-Clause -""" - -import socket -from datetime import timedelta, datetime -import xml.etree.ElementTree as ET -from xml.dom import minidom -from contextlib import closing -import collections -# python 2/3 compatibilty fix-up -try: - import io -except ImportError: - import StringIO as io - -from meter_reader import utc - -utctz = utc.UTC() -BEGINNING_OF_TIME = datetime(2000, 1, 1, tzinfo=utctz) - -DEFAULT_PORT = 5002 -COMMANDS = ('list_devices', 'get_device_data', 'get_instantaneous_demand', - 'get_demand_values', 'get_summation_values', - 'get_fast_poll_status') -SUPPORTED_ARGS = ('interval', 'frequency', 'starttime', 'endtime', 'duration', - 'name') - - -class GatewayError(Exception): - def __init__(self, address, command, error='', code=None): - self.address = address - self.code = code - self.error = error - self.command = command - - def __str__(self): - return 'Unable to connect to {0}:{1}. {2}'.format(self.address[0], - self.address[1], - self.error) - - -class Gateway(object): - def __init__(self, address, port=DEFAULT_PORT): - self.address = (address, port) - self.timeout = socket.getdefaulttimeout() - self.mac_id = None - devices = self.run_command(Name='list_devices', convert=False) - self.mac_id = devices['DeviceInfo']['DeviceMacId'] - - def generate_command_xml(self, **kwargs): - c = ET.Element('LocalCommand') - for tag, value in kwargs.items(): - if tag.lower() not in SUPPORTED_ARGS or value is None: - continue - if tag.lower() in ('starttime', 'endtime'): - value = hex((value - BEGINNING_OF_TIME).seconds) - elif tag.lower() in ('frequency', 'duration'): - value = hex(value) - ET.SubElement(c, tag).text = value - if self.mac_id is not None: - ET.SubElement(c, 'MacID').text = self.mac_id - md = minidom.parseString(ET.tostring(c, encoding='utf-8')) - return md.toprettyxml(indent=" ") - - def run_command_raw(self, **kwargs): - with closing(socket.create_connection(self.address, - self.timeout)) as s: - s.sendall(self.generate_command_xml(**kwargs).encode('utf-8')) - cmd_output = s.makefile().read() - return cmd_output - - def run_command(self, convert=True, **kwargs): - try: - response = self.run_command_raw(**kwargs) - except socket.error as e: - raise GatewayError(self.address, kwargs.get('Name', ''), - e.strerror, e.errno) - # responses come as multiple XML fragments. Enclose them in - # to ensure valid XML. - if 'Interval data start' in response or 'HistoryData' in response: - return self.xml2list(u'{0}'.format(response), - convert) - else: - return self.xml2dict(u'{0}'.format(response), - convert) - - @staticmethod - def xml2dict(xml, convert=True): - with closing(io.StringIO(xml)) as f: - path = [{}] - for event, element in ET.iterparse(f, events=('start', 'end')): - if element.tag == 'response': - continue - if event == 'start': - if element.text is None or element.text.strip() != '': - if convert: - value = convert_data(element.tag, element.text) - else: - value = element.text - key = next(reversed(path[-1])) - path[-1][key][element.tag] = value - else: - new_level = collections.OrderedDict() - new_level[element.tag] = collections.OrderedDict() - path.append(new_level) - elif element.text is not None and element.text.strip() == '': - later = path.pop() - path[-1].update(later) - return path[0] - - @staticmethod - def xml2list(xml, convert=True): - with closing(io.StringIO(xml)) as f: - response = [] - for event, element in ET.iterparse(f, events=('start', 'end')): - if element.tag in ('Info', 'Text', 'response'): - continue - if event == 'start' and (element.text is not None and - element.text.strip() == ''): - response.append({}) - if event == 'end' and (element.text is None or - element.text.strip() != ''): - if convert: - value = convert_data(element.tag, element.text) - else: - value = element.text - response[-1][element.tag] = value - return response - - def get_instantaneous_demand(self): - resp = self.run_command(name='get_device_data')['InstantaneousDemand'] - demand = float(resp['Demand']) * resp['Multiplier'] / resp['Divisor'] - return (resp['TimeStamp'], demand) - - -def twos_complement(value, width=32): - if value & (1 << (width - 1)): - value = value - (1 << width) - return value - - -def convert_data(key, value): - if value is None: - return - if 'MacId' in key or 'Code' in key or 'Key' in key: - len_ = 15 - if key == 'MeterMacId' or key == 'CoordMacId': - len_ = 13 - return ':'.join(value.lstrip('0x')[i:i+2] for i in range(0, len_, 2)) - if key.lower() in ('timestamp', 'endtime') and int(value, 0): - return BEGINNING_OF_TIME + timedelta(0, int(value, 16)) - if isinstance(value, str) and value.startswith('0x'): - return twos_complement(int(value, 16)) - return value diff --git a/meter_reader/utc.py b/meter_reader/utc.py deleted file mode 100644 index 200f637..0000000 --- a/meter_reader/utc.py +++ /dev/null @@ -1,24 +0,0 @@ -from datetime import timedelta, tzinfo - -# The following tzinfo code comes from the Python standard -# library documentation -ZERO = timedelta(0) - - -# A UTC class. -class UTC(tzinfo): - """UTC""" - def __repr__(self): - return 'UTC' - - def utcoffset(self, dt): - return ZERO - - def tzname(self, dt): - return "UTC" - - def dst(self, dt): - return ZERO - - -utc = UTC() diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..73e8985 --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,80 @@ +site_name: EAGLE Meter Reader +site_description: Python library and documentation for the Rainforest EAGLE 200 Gateway. +site_author: Emmanuel +repo_url: https://github.com/your-repo/meter_reader # Update with actual repo +repo_name: meter_reader + +theme: + name: material + palette: + # Light mode + - scheme: default + primary: indigo + accent: indigo + toggle: + icon: material/brightness-7 + name: Switch to dark mode + # Dark mode + - scheme: slate + primary: indigo + accent: indigo + toggle: + icon: material/brightness-4 + name: Switch to light mode + font: + text: Roboto + code: Roboto Mono + icon: + repo: fontawesome/brands/github + features: + - content.code.copy + - content.code.select + - content.code.annotate + - navigation.expand + - navigation.indexes + - navigation.instant + - navigation.sections + - navigation.tabs + - navigation.top + - search.highlight + - search.suggest + - toc.follow + +markdown_extensions: + - pymdownx.highlight: + anchor_linenums: true + line_spans: __span + pygments_lang_class: true + - pymdownx.inlinehilite + - pymdownx.snippets + - pymdownx.superfences + - pymdownx.tabbed: + alternate_style: true + - admonition + - pymdownx.details + - attr_list + - md_in_html + - tables + +nav: + - Home: index.md + - Library API: + - Clients: api/clients.md + - Models: api/models.md + - Device API: + - Socket API (Port 5002): socket_api.md + - HTTP API (OpenAPI): http_api.md + +plugins: + - search + - render_swagger + - mkdocstrings: + handlers: + python: + options: + show_source: true + show_root_heading: true + show_root_full_path: false + members_order: source + heading_level: 2 + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c16e3fc --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,88 @@ +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "meter-reader" +version = "2.0.0" +description = "Client library and CLI for Eagle Energy Gateway smart meter data" +readme = "README.md" +requires-python = ">=3.13" +dependencies = [ + "pydantic>=2.0", + "typer", + "rich", + "requests", + "types-requests" +] +authors = [ + {name = "Emmanuel Levijarvi", email = "emansl@gmail.com"}, +] +keywords = ["energy", "electricity", "smartmeter", "HAN", "eagle"] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Topic :: Home Automation", + "Topic :: Utilities", + "License :: OSI Approved :: BSD License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3 :: Only", + "Typing :: Typed", +] + +[project.urls] +Homepage = "https://github.com/eman/meter_reader" +Repository = "https://github.com/eman/meter_reader.git" +Issues = "https://github.com/eman/meter_reader/issues" + +[project.optional-dependencies] +docs = [ + "mkdocs-material>=9.0", + "mkdocs-render-swagger-plugin", + "mkdocstrings[python]>=0.24", +] + +[project.scripts] +mr = "meter_reader.main:app" + +[tool.setuptools] +package-dir = {"" = "src"} +packages = ["meter_reader"] + +[tool.setuptools.package-data] +meter_reader = ["py.typed"] + +[tool.black] +line-length = 100 +target-version = ["py313"] + +[tool.isort] +profile = "black" +line_length = 100 + +[tool.mypy] +python_version = "3.13" +strict = true +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true + + +[tool.basedpyright] +typeCheckingMode = "basic" +pythonVersion = "3.13" +pythonPlatform = "All" + +[tool.coverage.run] +omit = ["src/meter_reader/main.py"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", +] diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 5e40900..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[wheel] -universal = 1 diff --git a/setup.py b/setup.py deleted file mode 100644 index 467abfd..0000000 --- a/setup.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -Meter Reader -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Meter Reader is a client library and command line tool for retrieving -smart meter data from an Eagle Energy Gateway. - -:copyright: (c) 2017 by Emmanuel Levijarvi -:license: BSD 2-Clause -""" - -from setuptools import setup, find_packages -import os - - -project_dir = os.path.abspath(os.path.dirname(__file__)) - -description = ('Client Library for retreiving smart meter data from an ' - 'Eagle Energy Gateway') - -long_descriptions = [] -for rst in ('README.rst', 'LICENSE.rst'): - with open(os.path.join(project_dir, rst), 'r') as f: - long_descriptions.append(f.read()) - - -setup(name='meter-reader', - version='1.1.2', - description=description, - long_description='\n\n'.join(long_descriptions), - author='Emmanuel Levijarvi', - author_email='emansl@gmail.com', - url='https://github.com/eman/meter_reader', - license="BSD", - classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'Topic :: Home Automation', - 'Topic :: Utilities', - 'License :: OSI Approved :: BSD License', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - ], - keywords='energy electricity smartmeter HAN', - packages=find_packages(), - entry_points={'console_scripts': ['mr=meter_reader.client:main']},) diff --git a/src/meter_reader/__init__.py b/src/meter_reader/__init__.py new file mode 100644 index 0000000..b7f4f91 --- /dev/null +++ b/src/meter_reader/__init__.py @@ -0,0 +1,20 @@ +""" +Meter Reader + +A client library and command-line tool for retrieving smart meter data from +an Eagle Energy Gateway. + +:copyright: (c) 2017-2026 by Emmanuel Levijarvi +:license: BSD 2-Clause +""" + +from .clients import SocketClient as EagleSocketClient, HttpClient as EagleHttpClient +from .models import InstantaneousDemand, UsageData, CurrentSummation, NetworkInfo, DeviceList + +__version__ = "2.0.0" +__author__ = "Emmanuel Levijarvi" +__all__ = [ + 'EagleSocketClient', 'EagleHttpClient', + 'InstantaneousDemand', 'UsageData', + 'CurrentSummation', 'NetworkInfo', 'DeviceList' +] diff --git a/src/meter_reader/clients/__init__.py b/src/meter_reader/clients/__init__.py new file mode 100644 index 0000000..155fbfd --- /dev/null +++ b/src/meter_reader/clients/__init__.py @@ -0,0 +1,5 @@ +from .socket import EagleSocketClient as SocketClient +from .http import EagleHttpClient as HttpClient +from .base import EagleClient + +__all__ = ["SocketClient", "HttpClient", "EagleClient"] diff --git a/src/meter_reader/clients/base.py b/src/meter_reader/clients/base.py new file mode 100644 index 0000000..2ccd5c9 --- /dev/null +++ b/src/meter_reader/clients/base.py @@ -0,0 +1,33 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional +from datetime import datetime + +from ..models import InstantaneousDemand, UsageData, DeviceList, NetworkInfo, CurrentSummation + +class EagleClient(ABC): + """Abstract base class for Eagle Gateway clients.""" + + @abstractmethod + def list_devices(self) -> DeviceList: + """List devices connected to the gateway.""" + pass + + @abstractmethod + def get_instantaneous_demand(self) -> InstantaneousDemand: + """Get real-time demand data.""" + pass + + @abstractmethod + def get_current_summation(self) -> CurrentSummation: + """Get total consumption to date.""" + pass + + @abstractmethod + def get_usage_data(self) -> UsageData: + """Get usage data (demand + summation).""" + pass + + @abstractmethod + def get_network_info(self) -> NetworkInfo: + """Get Zigbee network information.""" + pass diff --git a/src/meter_reader/clients/http.py b/src/meter_reader/clients/http.py new file mode 100644 index 0000000..daa8f3a --- /dev/null +++ b/src/meter_reader/clients/http.py @@ -0,0 +1,119 @@ +import requests +import logging +from typing import Any, Dict, Optional, Tuple, Union +from datetime import datetime, timezone + +from .base import EagleClient +from ..models import ( + InstantaneousDemand, UsageData, DeviceList, NetworkInfo, + CurrentSummation, DeviceInfo +) +from ..utils import generate_command_xml + +logger = logging.getLogger(__name__) + +class EagleHttpClient(EagleClient): + """Client for Eagle Energy Gateway via HTTP API (Port 80).""" + + def __init__(self, address: str, username: str, password: str, timeout: int = 10) -> None: + self.base_url = f"http://{address}/cgi-bin/cgi_manager" + self.auth = (username, password) + self.timeout = timeout + self.mac_id: str | None = None + + try: + # Auto-detect MacID using get_device_list + self.list_devices() + except: + pass + + def _post_xml(self, name: str, **kwargs: Any) -> Dict[str, Any]: + xml_payload = generate_command_xml(self.mac_id, Name=name, **kwargs) + try: + resp = requests.post( + self.base_url, + data=xml_payload, + auth=self.auth, + timeout=self.timeout + ) + resp.raise_for_status() + # The API returns JSON even though request is XML + return resp.json() + except requests.RequestException as e: + logger.error(f"HTTP Request failed: {e}") + raise + + def list_devices(self) -> DeviceList: + data = self._post_xml('get_device_list') + + # The JSON structure for device list is flat: + # { + # "device_mac_id[0]": "...", + # "device_model_id[0]": "...", + # "num_devices": "1" + # } + # We need to transform this to the expected DeviceInfo list structure + devices = [] + num_devices = int(data.get('num_devices', 0)) + + for i in range(num_devices): + mac_key = f'device_mac_id[{i}]' + model_key = f'device_model_id[{i}]' + if mac_key in data: + devices.append(DeviceInfo( + DeviceMacId=data[mac_key], + ModelId=data.get(model_key), + # Other fields not available in this simple list + )) + + if devices: + self.mac_id = devices[0].device_mac_id + + return DeviceList(DeviceInfo=devices) + + def get_usage_data(self) -> UsageData: + # HTTP API call: get_usage_data + # This returns flat JSON with demand, summation, etc. + data = self._post_xml('get_usage_data') + return UsageData(**data) + + def get_instantaneous_demand(self) -> InstantaneousDemand: + # Synthesize from usage data since HTTP endpoint is rich + usage = self.get_usage_data() + + return InstantaneousDemand( + DeviceMacId=self.mac_id or "unknown", + MeterMacId="unknown", # Not always provided in JSON + TimeStamp=usage.timestamp, + Demand=usage.demand, + Multiplier=1, # JSON is already normalized float + Divisor=1, + DigitsRight=0, + DigitsLeft=0, + SuppressLeadingZero=False + ) + + def get_current_summation(self) -> CurrentSummation: + usage = self.get_usage_data() + + return CurrentSummation( + DeviceMacId=self.mac_id or "unknown", + MeterMacId="unknown", + TimeStamp=usage.timestamp, + SummationDelivered=int(usage.summation_delivered * 1000), # Back to int? + SummationReceived=int(usage.summation_received * 1000), + Multiplier=1, + Divisor=1000, # To match float value + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False + ) + + def get_network_info(self) -> NetworkInfo: + # HTTP equivalent: get_device_config partial or not supported fully? + # Post manager get_mdns_status exists. + # cgi_manager returns some status. + # Fallback for now: raise NotImplemented or return dummy + # Based on probe: get_device_config returned update status, ssh enabled. + # Network info (link strength) might not be exposed via HTTP easily. + raise NotImplementedError("Network Info not fully supported via HTTP API yet.") diff --git a/src/meter_reader/clients/socket.py b/src/meter_reader/clients/socket.py new file mode 100644 index 0000000..e991fec --- /dev/null +++ b/src/meter_reader/clients/socket.py @@ -0,0 +1,270 @@ +import socket +import logging +from contextlib import closing +from datetime import datetime, timezone, timedelta +from typing import Any, Dict, List, Optional, Tuple, Union +import xml.etree.ElementTree as ET +from xml.dom import minidom + +from ..models import ( + InstantaneousDemand, UsageData, DeviceList, NetworkInfo, + CurrentSummation, DeviceInfo +) +from .base import EagleClient + +logger = logging.getLogger(__name__) + +utctz = timezone.utc +BEGINNING_OF_TIME = datetime(2000, 1, 1, tzinfo=utctz) +DEFAULT_PORT = 5002 +DEFAULT_TIMEOUT = 5 + +SUPPORTED_ARGS = ('interval', 'frequency', 'starttime', 'endtime', 'duration', + 'name', 'event', 'enabled', 'protocol', 'macid', + 'devicemacid', 'metermacid', 'target', 'format', 'priority', + 'text', 'confirmationrequired', 'id', 'queue', 'read') + + +class GatewayError(Exception): + """Exception raised when gateway communication fails.""" + + def __init__(self, address: Tuple[str, int], command: str, error: str = '', code: int | None = None) -> None: + self.address = address + self.code = code + self.error = error + self.command = command + + def __str__(self) -> str: + return f'Unable to connect to {self.address[0]}:{self.address[1]}. {self.error}' + + +def twos_complement(value: int, width: int = 32) -> int: + if value & (1 << (width - 1)): + value = value - (1 << width) + return value + + +def convert_data(key: str, value: str | None) -> Any: + if value is None: + return None + if 'MacId' in key or 'Code' in key or 'Key' in key: + clean_value = value + if value.lower().startswith('0x'): + clean_value = value[2:] + len_ = 15 + if key == 'MeterMacId' or key == 'CoordMacId': + len_ = 13 + return ':'.join(clean_value[i:i+2] for i in range(0, len_, 2)) + if key.lower() in ('timestamp', 'endtime') and int(value, 0): + # Handle time offset from 2000-01-01 + return BEGINNING_OF_TIME + timedelta(0, int(value, 16)) + if isinstance(value, str) and value.startswith('0x'): + return twos_complement(int(value, 16)) + return value + + +class EagleSocketClient(EagleClient): + """Client for Eagle Energy Gateway via Socket API (Port 5002).""" + + def __init__(self, address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT) -> None: + self.address = (address, port) + self.timeout = timeout + self.mac_id: str | None = None + + # Initialize by fetching device list to get MAC ID + try: + self._fetch_device_list() + except Exception as e: + logger.warning(f"Failed to auto-discover MAC ID: {e}") + + def _fetch_device_list(self) -> None: + data = self._run_command_dict(Name='list_devices', convert=False) + # Handle case where DeviceInfo is a list or single dict + if 'DeviceInfo' in data: + info = data['DeviceInfo'] + if isinstance(info, list) and info: + self.mac_id = info[0].get('DeviceMacId') + elif isinstance(info, dict): + self.mac_id = info.get('DeviceMacId') + + def generate_command_xml(self, **kwargs: Any) -> str: + c = ET.Element('LocalCommand') + has_mac_arg = any(k.lower() in ('macid', 'devicemacid', 'metermacid') for k in kwargs) + + for tag, value in kwargs.items(): + if tag.lower() not in SUPPORTED_ARGS or value is None: + continue + if tag.lower() in ('starttime', 'endtime'): + if isinstance(value, datetime): + value = hex(int((value - BEGINNING_OF_TIME).total_seconds())) + elif isinstance(value, int): + value = hex(value) + elif tag.lower() in ('frequency', 'duration'): + value = hex(value) + + ET.SubElement(c, tag).text = str(value) + + if not has_mac_arg and self.mac_id is not None: + ET.SubElement(c, 'MacID').text = self.mac_id + + md = minidom.parseString(ET.tostring(c, encoding='utf-8')) + return md.toprettyxml(indent=" ") + + def _run_command_raw(self, **kwargs: Any) -> str: + try: + with closing(socket.create_connection(self.address, self.timeout)) as s: + s.sendall(self.generate_command_xml(**kwargs).encode('utf-8')) + response = b'' + s.settimeout(self.timeout) + try: + while True: + chunk = s.recv(4096) + if not chunk: + break + response += chunk + except socket.timeout: + pass + return response.decode('utf-8', errors='replace') + except socket.error as e: + raise GatewayError(self.address, kwargs.get('Name', ''), str(e)) + + def _element_to_data(self, element: ET.Element, convert: bool = True) -> Any: + # 1. If element has no children, return text value + if len(element) == 0: + if convert: + return convert_data(element.tag, element.text) + return element.text + + # 2. Iterate children and build dict + result: Dict[str, Any] = {} + child_counts: Dict[str, int] = {} + + # First pass to count occurrences for list detection + for child in element: + child_counts[child.tag] = child_counts.get(child.tag, 0) + 1 + + for child in element: + tag = child.tag + if tag in ('Info', 'Text'): + continue + + value = self._element_to_data(child, convert) + + # If tag appears multiple times, it must be a list + if child_counts[tag] > 1: + if tag not in result: + result[tag] = [] + result[tag].append(value) + else: + result[tag] = value + + # If result is empty (all children were skipped), return element text or None + if not result: + if convert: + return convert_data(element.tag, element.text) + return element.text + + return result + + def _xml2dict(self, xml: str, convert: bool = True) -> Dict[str, Any]: + if not xml.strip(): + return {} + try: + root = ET.fromstring(xml) + except ET.ParseError: + return {} + + if root.tag == 'response': + return self._element_to_data(root, convert) # type: ignore + else: + return {root.tag: self._element_to_data(root, convert)} + + def _run_command_dict(self, convert: bool = True, **kwargs: Any) -> Dict[str, Any]: + raw_xml = self._run_command_raw(**kwargs) + # Check for list wrapper + return self._xml2dict(f'{raw_xml}', convert) + + def list_devices(self) -> DeviceList: + data = self._run_command_dict(Name='list_devices') + # Ensure DeviceInfo is a list for the model, even if single device + if 'DeviceInfo' in data and not isinstance(data['DeviceInfo'], list): + data['DeviceInfo'] = [data['DeviceInfo']] + return DeviceList(**data) + + def get_instantaneous_demand(self) -> InstantaneousDemand: + # The command is 'get_instantaneous_demand' and requires DeviceMacId + data = self._run_command_dict(Name='get_instantaneous_demand', DeviceMacId=self.mac_id) + # Unwrap if needed, usually InstantaneousDemand key is at root + if 'InstantaneousDemand' in data: + return InstantaneousDemand(**data['InstantaneousDemand']) + return InstantaneousDemand(**data) + + def get_current_summation(self) -> CurrentSummation: + # get_current_summation doesn't exist, use get_device_data which returns all data + data = self._run_command_dict(Name='get_device_data', DeviceMacId=self.mac_id) + if 'CurrentSummation' in data: + return CurrentSummation(**data['CurrentSummation']) + raise ValueError("CurrentSummation not found in device data") + + def get_network_info(self) -> NetworkInfo: + # Use get_device_data which includes NetworkInfo + data = self._run_command_dict(Name='get_device_data', DeviceMacId=self.mac_id) + if 'NetworkInfo' in data: + return NetworkInfo(**data['NetworkInfo']) + raise ValueError("NetworkInfo not found in device data") + + def get_usage_data(self) -> UsageData: + # Socket API doesn't have a direct equivalent to the HTTP usage_data summary + # that includes both demand and summation in one flat object cleanly. + # We can simulate it by fetching both. + instant = self.get_instantaneous_demand() + summation = self.get_current_summation() + + return UsageData( + demand=instant.demand, # Raw demand + demand_units="kW", # Implicit in socket model usually + demand_timestamp=int(instant.timestamp.timestamp()), + summation_delivered=summation.delivered_kwh, + summation_received=summation.received_kwh, + summation_units="kWh", + meter_status="Connected" # Assumed if we got data + ) + + def get_history_data(self, start_time: datetime | None = None, end_time: datetime | None = None, + frequency: int = 0x384) -> List[CurrentSummation]: + """ + Get historical summation data. + + Args: + start_time: Start time for history query (defaults to 1 hour ago) + end_time: End time for history query (defaults to now) + frequency: Sample frequency in seconds (hex). 0x384 = 900 = 15 minutes + + Returns: + List of CurrentSummation objects + """ + if start_time is None: + start_time = datetime.now(timezone.utc) - timedelta(hours=1) + if end_time is None: + end_time = datetime.now(timezone.utc) + + data = self._run_command_dict( + Name='get_history_data', + DeviceMacId=self.mac_id, + StartTime=start_time, + EndTime=end_time, + Frequency=frequency + ) + + # History data returns a HistoryData wrapper with multiple CurrentSummation entries + if 'HistoryData' in data: + history = data['HistoryData'] + if 'CurrentSummation' in history: + summations = history['CurrentSummation'] + # Ensure it's a list + if not isinstance(summations, list): + summations = [summations] + return [CurrentSummation(**s) for s in summations] + + return [] + diff --git a/src/meter_reader/main.py b/src/meter_reader/main.py new file mode 100644 index 0000000..833f454 --- /dev/null +++ b/src/meter_reader/main.py @@ -0,0 +1,235 @@ +import typer +import json +import logging +from rich.console import Console +from rich.table import Table +from datetime import datetime +from typing import Optional, List, Any + +from .clients import SocketClient, HttpClient +from .models import DeviceList, InstantaneousDemand, UsageData + +app = typer.Typer(no_args_is_help=True) +console = Console() + +logging.basicConfig(level=logging.WARNING) +logger = logging.getLogger(__name__) + +def get_client(address: str, protocol: str, username: Optional[str] = None, password: Optional[str] = None) -> Any: + if protocol == 'socket': + return SocketClient(address) + elif protocol == 'http': + if not username or not password: + console.print("[red]Username and password required for HTTP protocol[/red]") + raise typer.Exit(code=1) + return HttpClient(address, username, password) + else: + console.print(f"[red]Invalid protocol: {protocol}[/red]") + raise typer.Exit(code=1) + +@app.command() +def list( + address: str, + protocol: str = typer.Option("socket", help="Protocol: socket or http"), + username: str = typer.Option(None, help="Username for HTTP"), + password: str = typer.Option(None, help="Password for HTTP"), + raw: bool = typer.Option(False, help="Show raw output") +): + """List devices on gateway.""" + client = get_client(address, protocol, username, password) + try: + devices = client.list_devices() + if raw: + console.print_json(devices.model_dump_json(by_alias=True)) + else: + table = Table(title="Connected Devices") + table.add_column("MAC ID", style="cyan") + table.add_column("Model", style="magenta") + table.add_column("FW Version", style="green") + table.add_column("HW Version", style="yellow") + table.add_column("Manufacturer", style="blue") + + for d in devices.device_info: + table.add_row( + d.device_mac_id, + d.model_id or "Unknown", + d.fw_version or "N/A", + d.hw_version or "N/A", + d.manufacturer or "Unknown" + ) + console.print(table) + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + raise typer.Exit(code=1) + +@app.command() +def demand( + address: str, + protocol: str = typer.Option("socket", help="Protocol: socket or http"), + username: str = typer.Option(None, help="Username for HTTP"), + password: str = typer.Option(None, help="Password for HTTP"), + raw: bool = typer.Option(False, help="Show raw JSON") +): + """Get instantaneous demand.""" + client = get_client(address, protocol, username, password) + try: + data = client.get_instantaneous_demand() + if raw: + console.print_json(data.model_dump_json(by_alias=True)) + else: + console.print(f"Timestamp: [green]{data.timestamp}[/green]") + console.print(f"Demand: [bold]{data.panic_demand:.3f} kW[/bold]") + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + raise typer.Exit(code=1) + +@app.command() +def summation( + address: str, + protocol: str = typer.Option("socket", help="Protocol: socket or http"), + username: str = typer.Option(None, help="Username for HTTP"), + password: str = typer.Option(None, help="Password for HTTP"), + raw: bool = typer.Option(False, help="Show raw JSON") +): + """Get summation values.""" + client = get_client(address, protocol, username, password) + try: + data = client.get_current_summation() + if raw: + console.print_json(data.model_dump_json(by_alias=True)) + else: + console.print(f"Timestamp: [green]{data.timestamp}[/green]") + console.print(f"Delivered: [bold]{data.delivered_kwh:.3f} kWh[/bold]") + console.print(f"Received: [bold]{data.received_kwh:.3f} kWh[/bold]") + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + raise typer.Exit(code=1) + +@app.command() +def usage( + address: str, + protocol: str = typer.Option("socket", help="Protocol: socket or http"), + username: str = typer.Option(None, help="Username for HTTP"), + password: str = typer.Option(None, help="Password for HTTP"), + raw: bool = typer.Option(False, help="Show raw JSON") +): + """Get usage summary (Demand + Summation).""" + client = get_client(address, protocol, username, password) + try: + data = client.get_usage_data() + if raw: + console.print_json(data.model_dump_json(by_alias=True)) + else: + table = Table(title="Usage Data") + table.add_column("Metric", style="cyan") + table.add_column("Value", style="magenta") + + table.add_row("Timestamp", str(data.timestamp)) + table.add_row("Demand", f"{data.demand} {data.demand_units}") + table.add_row("Delivered", f"{data.summation_delivered} {data.summation_units}") + table.add_row("Received", f"{data.summation_received} {data.summation_units}") + table.add_row("Status", data.meter_status) + console.print(table) + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + raise typer.Exit(code=1) + +@app.command() +def history( + address: str, + hours: int = typer.Option(1, help="Number of hours to look back"), + protocol: str = typer.Option("socket", help="Protocol: socket or http"), + username: str = typer.Option(None, help="Username for HTTP"), + password: str = typer.Option(None, help="Password for HTTP"), + raw: bool = typer.Option(False, help="Show raw JSON"), +): + """Get historical summation data over time.""" + from datetime import datetime, timedelta, timezone + + if protocol != "socket": + console.print("[red]History command only supports socket protocol[/red]") + raise typer.Exit(code=1) + + client = get_client(address, protocol, username, password) + try: + start = datetime.now(timezone.utc) - timedelta(hours=hours) + end = datetime.now(timezone.utc) + data = client.get_history_data(start_time=start, end_time=end) + + if raw: + import json + console.print_json(json.dumps([d.model_dump(by_alias=True) for d in data], default=str)) + else: + if not data: + console.print("[yellow]No historical data available for this time range[/yellow]") + return + + table = Table(title=f"Historical Data (Last {hours} hour{'s' if hours != 1 else ''})") + table.add_column("Timestamp", style="cyan") + table.add_column("Delivered (kWh)", style="green", justify="right") + table.add_column("Received (kWh)", style="magenta", justify="right") + + for d in data: + table.add_row( + str(d.timestamp), + f"{d.delivered_kwh:.3f}", + f"{d.received_kwh:.3f}" + ) + console.print(table) + console.print(f"\n[dim]Total readings: {len(data)}[/dim]") + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + raise typer.Exit(code=1) + +@app.command() +def watch( + address: str, + interval: int = typer.Option(5, help="Update interval in seconds"), + protocol: str = typer.Option("socket", help="Protocol: socket or http"), + username: str = typer.Option(None, help="Username for HTTP"), + password: str = typer.Option(None, help="Password for HTTP"), + mode: str = typer.Option("usage", help="What to watch: demand, summation, or usage"), +): + """Continuously monitor demand/summation values (Ctrl+C to stop).""" + import time + from rich.live import Live + from rich.panel import Panel + + client = get_client(address, protocol, username, password) + + def generate_display(): + """Generate the display content based on mode.""" + try: + if mode == "demand": + data = client.get_instantaneous_demand() + return f"[green]Timestamp:[/green] {data.timestamp}\n[bold cyan]Demand:[/bold cyan] {data.panic_demand:.3f} kW" + elif mode == "summation": + data = client.get_current_summation() + return f"[green]Timestamp:[/green] {data.timestamp}\n[bold cyan]Delivered:[/bold cyan] {data.delivered_kwh:.3f} kWh\n[bold magenta]Received:[/bold magenta] {data.received_kwh:.3f} kWh" + else: # usage + data = client.get_usage_data() + table = Table(show_header=False, box=None, padding=(0, 1)) + table.add_column("Metric", style="cyan") + table.add_column("Value", style="bold") + table.add_row("Timestamp", str(data.timestamp)) + table.add_row("Demand", f"{data.demand} {data.demand_units}") + table.add_row("Delivered", f"{data.summation_delivered} {data.summation_units}") + table.add_row("Received", f"{data.summation_received} {data.summation_units}") + table.add_row("Status", data.meter_status) + return table + except Exception as e: + return f"[red]Error: {e}[/red]" + + console.print(f"[green]Watching {mode} every {interval} seconds... (Press Ctrl+C to stop)[/green]\n") + + try: + with Live(Panel(generate_display(), title=f"Live {mode.capitalize()} Monitor"), refresh_per_second=1) as live: + while True: + live.update(Panel(generate_display(), title=f"Live {mode.capitalize()} Monitor")) + time.sleep(interval) + except KeyboardInterrupt: + console.print("\n[yellow]Stopped monitoring[/yellow]") + +if __name__ == "__main__": + app() + diff --git a/src/meter_reader/models.py b/src/meter_reader/models.py new file mode 100644 index 0000000..a591db7 --- /dev/null +++ b/src/meter_reader/models.py @@ -0,0 +1,83 @@ +from datetime import datetime +from typing import List, Optional, Union +from pydantic import BaseModel, Field, ConfigDict, field_validator + +class EagleModel(BaseModel): + """Base model for Eagle Gateway responses.""" + model_config = ConfigDict(populate_by_name=True) + +class DeviceInfo(EagleModel): + device_mac_id: str = Field(alias='DeviceMacId') + install_code: Optional[str] = Field(default=None, alias='InstallCode') + link_key: Optional[str] = Field(default=None, alias='LinkKey') + fw_version: Optional[str] = Field(default=None, alias='FWVersion') + hw_version: Optional[str] = Field(default=None, alias='HWVersion') + image_type: Optional[str | int] = Field(default=None, alias='ImageType') + manufacturer: Optional[str] = Field(default=None, alias='Manufacturer') + model_id: Optional[str] = Field(default=None, alias='ModelId') + date_code: Optional[str] = Field(default=None, alias='DateCode') + +class DeviceList(EagleModel): + device_info: List[DeviceInfo] = Field(default_factory=list, alias='DeviceInfo') + +class InstantaneousDemand(EagleModel): + device_mac_id: str = Field(alias='DeviceMacId') + meter_mac_id: str = Field(alias='MeterMacId') + timestamp: datetime = Field(alias='TimeStamp') + demand: float = Field(alias='Demand') + multiplier: float = Field(alias='Multiplier') + divisor: float = Field(alias='Divisor') + digits_right: int = Field(alias='DigitsRight') + digits_left: int = Field(alias='DigitsLeft') + suppress_leading_zero: bool = Field(alias='SuppressLeadingZero') + + @property + def panic_demand(self) -> float: + """Calculate demand in kW.""" + return (self.demand * self.multiplier) / self.divisor + +class CurrentSummation(EagleModel): + device_mac_id: str = Field(alias='DeviceMacId') + meter_mac_id: str = Field(alias='MeterMacId') + timestamp: datetime = Field(alias='TimeStamp') + summation_delivered: int = Field(alias='SummationDelivered') + summation_received: int = Field(alias='SummationReceived') + multiplier: float = Field(alias='Multiplier') + divisor: float = Field(alias='Divisor') + digits_right: int = Field(alias='DigitsRight') + digits_left: int = Field(alias='DigitsLeft') + suppress_leading_zero: bool = Field(alias='SuppressLeadingZero') + + @property + def delivered_kwh(self) -> float: + return (self.summation_delivered * self.multiplier) / self.divisor + + @property + def received_kwh(self) -> float: + return (self.summation_received * self.multiplier) / self.divisor + +class NetworkInfo(EagleModel): + device_mac_id: str = Field(alias='DeviceMacId') + coord_mac_id: str = Field(alias='CoordMacId') + status: str = Field(alias='Status') + description: str = Field(alias='Description') + ext_pan_id: str = Field(alias='ExtPanId') + channel: int = Field(alias='Channel') + short_addr: str = Field(alias='ShortAddr') + link_strength: int = Field(alias='LinkStrength') + +class UsageData(EagleModel): + """Model for HTTP endpoint response.""" + # HTTP endpoint fields are snake_case naturally in JSON + demand: float + demand_units: str + demand_timestamp: int + summation_received: float + summation_delivered: float + summation_units: str + meter_status: str + consumption: Optional[float] = None + + @property + def timestamp(self) -> datetime: + return datetime.fromtimestamp(self.demand_timestamp) diff --git a/src/meter_reader/py.typed b/src/meter_reader/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/meter_reader/utils.py b/src/meter_reader/utils.py new file mode 100644 index 0000000..2d5731e --- /dev/null +++ b/src/meter_reader/utils.py @@ -0,0 +1,39 @@ +import xml.etree.ElementTree as ET +from xml.dom import minidom +from datetime import datetime +from typing import Any + +from .clients.base import EagleClient # Circular import risk? No, utils is imported by clients. +# Wait, avoiding circular dependency. +# Constants +BEGINNING_OF_TIME = datetime(2000, 1, 1) + +SUPPORTED_ARGS = ('interval', 'frequency', 'starttime', 'endtime', 'duration', + 'name', 'event', 'enabled', 'protocol', 'macid', + 'devicemacid', 'metermacid', 'target', 'format', 'priority', + 'text', 'confirmationrequired', 'id', 'queue', 'read') + +def generate_command_xml(mac_id: str | None, **kwargs: Any) -> str: + c = ET.Element('LocalCommand') + has_mac_arg = any(k.lower() in ('macid', 'devicemacid', 'metermacid') for k in kwargs) + + for tag, value in kwargs.items(): + if tag.lower() not in SUPPORTED_ARGS or value is None: + continue + if tag.lower() in ('starttime', 'endtime'): + if isinstance(value, datetime): + # datetime arithmetic + diff = (value.replace(tzinfo=None) - BEGINNING_OF_TIME).total_seconds() + value = hex(int(diff)) + elif isinstance(value, int): + value = hex(value) + elif tag.lower() in ('frequency', 'duration'): + value = hex(value) + + ET.SubElement(c, tag).text = str(value) + + if not has_mac_arg and mac_id is not None: + ET.SubElement(c, 'MacID').text = mac_id + + md = minidom.parseString(ET.tostring(c, encoding='utf-8')) + return md.toprettyxml(indent=" ") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..22ba501 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +""" +Test suite for meter_reader +""" diff --git a/tests/test_http_client.py b/tests/test_http_client.py new file mode 100644 index 0000000..22d820e --- /dev/null +++ b/tests/test_http_client.py @@ -0,0 +1,231 @@ +"""Tests for meter_reader HTTP client module.""" +import pytest +from datetime import datetime +from unittest.mock import Mock, patch, MagicMock +import requests + +from meter_reader.clients.http import EagleHttpClient +from meter_reader.models import ( + DeviceList, + DeviceInfo, + UsageData, + InstantaneousDemand, + CurrentSummation, + NetworkInfo, +) + + +class TestEagleHttpClient: + """Test EagleHttpClient class.""" + + @patch("meter_reader.clients.http.requests.post") + def test_client_initialization(self, mock_post): + """Test client initialization with auto MAC detection.""" + mock_response = MagicMock() + mock_response.json.return_value = { + "num_devices": "1", + "device_mac_id[0]": "0xd8d5b9000000abcd", + "device_model_id[0]": "Z109-EAGLE", + } + mock_post.return_value = mock_response + + client = EagleHttpClient("192.168.1.1", "admin", "password") + assert client.base_url == "http://192.168.1.1/cgi-bin/cgi_manager" + assert client.auth == ("admin", "password") + assert client.mac_id == "0xd8d5b9000000abcd" + + @patch("meter_reader.clients.http.requests.post") + def test_client_initialization_failure(self, mock_post): + """Test client handles initialization failure gracefully.""" + mock_post.side_effect = requests.RequestException("Connection error") + + # Should not raise + client = EagleHttpClient("192.168.1.1", "admin", "password") + assert client.mac_id is None + + @patch("meter_reader.clients.http.requests.post") + def test_list_devices(self, mock_post): + """Test list_devices method.""" + mock_response = MagicMock() + mock_response.json.return_value = { + "num_devices": "2", + "device_mac_id[0]": "0xd8d5b9000000abcd", + "device_model_id[0]": "Z109-EAGLE", + "device_mac_id[1]": "0xd8d5b9000000abce", + "device_model_id[1]": "Z110-EAGLE", + } + mock_post.return_value = mock_response + + client = EagleHttpClient.__new__(EagleHttpClient) + client.base_url = "http://192.168.1.1/cgi-bin/cgi_manager" + client.auth = ("admin", "password") + client.timeout = 10 + client.mac_id = None + + result = client.list_devices() + + assert isinstance(result, DeviceList) + assert len(result.device_info) == 2 + assert result.device_info[0].device_mac_id == "0xd8d5b9000000abcd" + assert result.device_info[1].model_id == "Z110-EAGLE" + # MAC ID should be set from first device + assert client.mac_id == "0xd8d5b9000000abcd" + + @patch("meter_reader.clients.http.requests.post") + def test_list_devices_empty(self, mock_post): + """Test list_devices with no devices.""" + mock_response = MagicMock() + mock_response.json.return_value = {"num_devices": "0"} + mock_post.return_value = mock_response + + client = EagleHttpClient.__new__(EagleHttpClient) + client.base_url = "http://192.168.1.1/cgi-bin/cgi_manager" + client.auth = ("admin", "password") + client.timeout = 10 + client.mac_id = None + + result = client.list_devices() + + assert isinstance(result, DeviceList) + assert len(result.device_info) == 0 + + @patch("meter_reader.clients.http.requests.post") + def test_get_usage_data(self, mock_post): + """Test get_usage_data method.""" + mock_response = MagicMock() + mock_response.json.return_value = { + "demand": 2.5, + "demand_units": "kW", + "demand_timestamp": 1609459200, + "summation_delivered": 100.5, + "summation_received": 10.0, + "summation_units": "kWh", + "meter_status": "Connected", + } + mock_post.return_value = mock_response + + client = EagleHttpClient.__new__(EagleHttpClient) + client.base_url = "http://192.168.1.1/cgi-bin/cgi_manager" + client.auth = ("admin", "password") + client.timeout = 10 + client.mac_id = "0xabcd" + + result = client.get_usage_data() + + assert isinstance(result, UsageData) + assert result.demand == 2.5 + assert result.summation_delivered == 100.5 + assert result.meter_status == "Connected" + + @patch("meter_reader.clients.http.EagleHttpClient.get_usage_data") + def test_get_instantaneous_demand(self, mock_usage): + """Test get_instantaneous_demand synthesized from usage data.""" + mock_usage.return_value = UsageData( + demand=2.5, + demand_units="kW", + demand_timestamp=1609459200, + summation_delivered=100.5, + summation_received=10.0, + summation_units="kWh", + meter_status="Connected", + ) + + client = EagleHttpClient.__new__(EagleHttpClient) + client.mac_id = "0xd8d5b9000000abcd" + + result = client.get_instantaneous_demand() + + assert isinstance(result, InstantaneousDemand) + assert result.demand == 2.5 + assert result.device_mac_id == "0xd8d5b9000000abcd" + # Check that multiplier/divisor are set for normalized values + assert result.multiplier == 1 + assert result.divisor == 1 + + @patch("meter_reader.clients.http.EagleHttpClient.get_usage_data") + def test_get_current_summation(self, mock_usage): + """Test get_current_summation synthesized from usage data.""" + mock_usage.return_value = UsageData( + demand=2.5, + demand_units="kW", + demand_timestamp=1609459200, + summation_delivered=100.5, + summation_received=10.0, + summation_units="kWh", + meter_status="Connected", + ) + + client = EagleHttpClient.__new__(EagleHttpClient) + client.mac_id = "0xd8d5b9000000abcd" + + result = client.get_current_summation() + + assert isinstance(result, CurrentSummation) + # Values should be converted back to integers with proper divisor + assert result.summation_delivered == 100500 # 100.5 * 1000 + assert result.summation_received == 10000 # 10.0 * 1000 + assert result.divisor == 1000 + # Check calculated property + assert result.delivered_kwh == 100.5 + assert result.received_kwh == 10.0 + + def test_get_network_info_not_implemented(self): + """Test get_network_info raises NotImplementedError.""" + client = EagleHttpClient.__new__(EagleHttpClient) + client.mac_id = "0xabcd" + + with pytest.raises(NotImplementedError): + client.get_network_info() + + @patch("meter_reader.clients.http.requests.post") + def test_post_xml_error_handling(self, mock_post): + """Test HTTP error handling in _post_xml.""" + mock_post.side_effect = requests.RequestException("HTTP 500 Error") + + client = EagleHttpClient.__new__(EagleHttpClient) + client.base_url = "http://192.168.1.1/cgi-bin/cgi_manager" + client.auth = ("admin", "password") + client.timeout = 10 + client.mac_id = "0xabcd" + + with pytest.raises(requests.RequestException): + client._post_xml("test_command") + + @patch("meter_reader.clients.http.requests.post") + def test_authentication(self, mock_post): + """Test that authentication credentials are used.""" + mock_response = MagicMock() + mock_response.json.return_value = {"num_devices": "0"} + mock_post.return_value = mock_response + + client = EagleHttpClient.__new__(EagleHttpClient) + client.base_url = "http://192.168.1.1/cgi-bin/cgi_manager" + client.auth = ("testuser", "testpass") + client.timeout = 10 + client.mac_id = "0xabcd" + + client.list_devices() + + # Verify auth was passed to requests.post + mock_post.assert_called() + call_kwargs = mock_post.call_args[1] + assert call_kwargs["auth"] == ("testuser", "testpass") + + @patch("meter_reader.clients.http.requests.post") + def test_timeout_setting(self, mock_post): + """Test that timeout is properly set.""" + mock_response = MagicMock() + mock_response.json.return_value = {"num_devices": "0"} + mock_post.return_value = mock_response + + client = EagleHttpClient.__new__(EagleHttpClient) + client.base_url = "http://192.168.1.1/cgi-bin/cgi_manager" + client.auth = ("admin", "password") + client.timeout = 30 + client.mac_id = "0xabcd" + + client.list_devices() + + # Verify timeout was passed + call_kwargs = mock_post.call_args[1] + assert call_kwargs["timeout"] == 30 diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..2a2797a --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,217 @@ +"""Tests for meter_reader models module.""" +import pytest +from datetime import datetime, timezone +from meter_reader.models import ( + InstantaneousDemand, + CurrentSummation, + UsageData, + DeviceInfo, + DeviceList, + NetworkInfo, +) + + +class TestInstantaneousDemand: + """Test InstantaneousDemand model.""" + + def test_panic_demand_calculation(self): + """Test panic_demand property calculates correctly.""" + demand = InstantaneousDemand( + DeviceMacId="0xd8d5b9000000abcd", + MeterMacId="0xd8d5b9000000abcd", + TimeStamp=datetime.now(timezone.utc), + Demand=1000, + Multiplier=1, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + assert demand.panic_demand == 1.0 + + def test_panic_demand_with_multiplier(self): + """Test panic_demand with non-unity multiplier.""" + demand = InstantaneousDemand( + DeviceMacId="0xd8d5b9000000abcd", + MeterMacId="0xd8d5b9000000abcd", + TimeStamp=datetime.now(timezone.utc), + Demand=2500, + Multiplier=2, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + assert demand.panic_demand == 5.0 + + def test_field_aliases(self): + """Test that field aliases work correctly.""" + data = { + "DeviceMacId": "0xd8d5b9000000abcd", + "MeterMacId": "0xd8d5b9000000abcd", + "TimeStamp": datetime.now(timezone.utc), + "Demand": 1000, + "Multiplier": 1, + "Divisor": 1000, + "DigitsRight": 3, + "DigitsLeft": 0, + "SuppressLeadingZero": False, + } + demand = InstantaneousDemand(**data) + assert demand.device_mac_id == data["DeviceMacId"] + assert demand.meter_mac_id == data["MeterMacId"] + + +class TestCurrentSummation: + """Test CurrentSummation model.""" + + def test_delivered_kwh_calculation(self): + """Test delivered_kwh property.""" + summation = CurrentSummation( + DeviceMacId="0xd8d5b9000000abcd", + MeterMacId="0xd8d5b9000000abcd", + TimeStamp=datetime.now(timezone.utc), + SummationDelivered=123456, + SummationReceived=0, + Multiplier=1, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + assert summation.delivered_kwh == 123.456 + + def test_received_kwh_calculation(self): + """Test received_kwh property.""" + summation = CurrentSummation( + DeviceMacId="0xd8d5b9000000abcd", + MeterMacId="0xd8d5b9000000abcd", + TimeStamp=datetime.now(timezone.utc), + SummationDelivered=0, + SummationReceived=54321, + Multiplier=1, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + assert summation.received_kwh == 54.321 + + def test_both_kwh_calculations(self): + """Test both delivered and received calculations.""" + summation = CurrentSummation( + DeviceMacId="0xd8d5b9000000abcd", + MeterMacId="0xd8d5b9000000abcd", + TimeStamp=datetime.now(timezone.utc), + SummationDelivered=100000, + SummationReceived=50000, + Multiplier=2, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + assert summation.delivered_kwh == 200.0 + assert summation.received_kwh == 100.0 + + +class TestUsageData: + """Test UsageData model.""" + + def test_timestamp_property(self): + """Test timestamp property converts unix timestamp.""" + usage = UsageData( + demand=2.5, + demand_units="kW", + demand_timestamp=1609459200, # 2021-01-01 00:00:00 UTC + summation_delivered=100.5, + summation_received=10.0, + summation_units="kWh", + meter_status="Connected", + ) + expected = datetime.fromtimestamp(1609459200) + assert usage.timestamp == expected + + def test_optional_consumption_field(self): + """Test optional consumption field.""" + usage = UsageData( + demand=2.5, + demand_units="kW", + demand_timestamp=1609459200, + summation_delivered=100.5, + summation_received=10.0, + summation_units="kWh", + meter_status="Connected", + consumption=5.0, + ) + assert usage.consumption == 5.0 + + +class TestDeviceInfo: + """Test DeviceInfo model.""" + + def test_minimal_device_info(self): + """Test DeviceInfo with minimal required fields.""" + device = DeviceInfo(DeviceMacId="0xd8d5b9000000abcd") + assert device.device_mac_id == "0xd8d5b9000000abcd" + assert device.model_id is None + assert device.manufacturer is None + + def test_full_device_info(self): + """Test DeviceInfo with all fields.""" + device = DeviceInfo( + DeviceMacId="0xd8d5b9000000abcd", + InstallCode="install123", + LinkKey="key123", + FWVersion="1.0.0", + HWVersion="2.0.0", + ImageType="0x1234", + Manufacturer="TestMfg", + ModelId="TEST-MODEL", + DateCode="20210101", + ) + assert device.device_mac_id == "0xd8d5b9000000abcd" + assert device.fw_version == "1.0.0" + assert device.manufacturer == "TestMfg" + + +class TestDeviceList: + """Test DeviceList model.""" + + def test_empty_device_list(self): + """Test empty DeviceList.""" + devices = DeviceList() + assert devices.device_info == [] + + def test_device_list_with_devices(self): + """Test DeviceList with multiple devices.""" + devices = DeviceList( + DeviceInfo=[ + {"DeviceMacId": "0xd8d5b9000000abcd", "ModelId": "Model1"}, + {"DeviceMacId": "0xd8d5b9000000abce", "ModelId": "Model2"}, + ] + ) + assert len(devices.device_info) == 2 + assert devices.device_info[0].model_id == "Model1" + assert devices.device_info[1].model_id == "Model2" + + +class TestNetworkInfo: + """Test NetworkInfo model.""" + + def test_network_info(self): + """Test NetworkInfo model with all fields.""" + network = NetworkInfo( + DeviceMacId="0xd8d5b9000000abcd", + CoordMacId="0xd8d5b9000000", + Status="Connected", + Description="Zigbee Network", + ExtPanId="0x1234567890ABCDEF", + Channel=15, + ShortAddr="0x1234", + LinkStrength=100, + ) + assert network.device_mac_id == "0xd8d5b9000000abcd" + assert network.status == "Connected" + assert network.channel == 15 + assert network.link_strength == 100 diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..cbb5c57 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,50 @@ +import unittest +from datetime import datetime +from meter_reader.clients.socket import EagleSocketClient + +class TestXMLParser(unittest.TestCase): + def setUp(self): + # Create a minimal client instance (no network needed for parsing tests) + self.client = EagleSocketClient.__new__(EagleSocketClient) + + def test_simple_xml(self): + xml = """ + 0xd8d5b9000000abcd + Connected + """ + result = self.client._xml2dict(xml, convert=False) + self.assertEqual(result['DeviceMacId'], '0xd8d5b9000000abcd') + self.assertEqual(result['Status'], 'Connected') + + def test_nested_xml(self): + xml = """ + + 0xd8d5b9000000abcd + Z109-EAGLE + + """ + result = self.client._xml2dict(xml, convert=False) + self.assertIn('DeviceInfo', result) + self.assertEqual(result['DeviceInfo']['DeviceMacId'], '0xd8d5b9000000abcd') + + def test_list_detection(self): + xml = """ + + + 0x1a2b3c4d + 0x12345 + + + 0x1a2b3c5e + 0x12346 + + + """ + result = self.client._xml2dict(xml, convert=False) + self.assertIn('HistoryData', result) + self.assertIn('CurrentSummation', result['HistoryData']) + self.assertIsInstance(result['HistoryData']['CurrentSummation'], list) + self.assertEqual(len(result['HistoryData']['CurrentSummation']), 2) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_socket_client.py b/tests/test_socket_client.py new file mode 100644 index 0000000..c0180b5 --- /dev/null +++ b/tests/test_socket_client.py @@ -0,0 +1,377 @@ +"""Tests for meter_reader socket client module.""" +import pytest +from datetime import datetime, timezone, timedelta +from unittest.mock import Mock, patch, MagicMock +import socket + +from meter_reader.clients.socket import ( + EagleSocketClient, + GatewayError, + twos_complement, + convert_data, + BEGINNING_OF_TIME, +) +from meter_reader.models import ( + DeviceList, + InstantaneousDemand, + CurrentSummation, + NetworkInfo, + UsageData, +) + + +class TestTwosComplement: + """Test twos_complement helper function.""" + + def test_positive_value(self): + """Test positive value remains positive.""" + assert twos_complement(100) == 100 + + def test_negative_value_32bit(self): + """Test negative value conversion (32-bit).""" + # 0xFFFFFFFF should be -1 + assert twos_complement(0xFFFFFFFF, 32) == -1 + + def test_negative_value_16bit(self): + """Test negative value conversion (16-bit).""" + # 0xFFFF should be -1 in 16-bit + assert twos_complement(0xFFFF, 16) == -1 + + def test_max_positive(self): + """Test max positive value for 32-bit.""" + # 0x7FFFFFFF is max positive for 32-bit signed + assert twos_complement(0x7FFFFFFF, 32) == 0x7FFFFFFF + + +class TestConvertData: + """Test convert_data helper function.""" + + def test_mac_id_conversion(self): + """Test MAC ID formatting with colons.""" + result = convert_data("DeviceMacId", "0xd8d5b9000000abcd") + assert result == "d8:d5:b9:00:00:00:ab:cd" + + def test_meter_mac_id_shorter(self): + """Test MeterMacId has shorter format.""" + result = convert_data("MeterMacId", "0xd8d5b9000000ab") + assert result == "d8:d5:b9:00:00:00:ab" + + def test_timestamp_conversion(self): + """Test timestamp conversion from hex offset.""" + # 3600 seconds after 2000-01-01 + result = convert_data("TimeStamp", "0xe10") + expected = BEGINNING_OF_TIME + timedelta(seconds=3600) + assert result == expected + + def test_hex_value_conversion(self): + """Test hex value conversion to integer.""" + result = convert_data("SomeValue", "0x64") + assert result == 100 + + def test_negative_hex_value(self): + """Test negative hex value using two's complement.""" + result = convert_data("SomeValue", "0xFFFFFFFF") + assert result == -1 + + def test_none_value(self): + """Test None value returns None.""" + assert convert_data("Test", None) is None + + def test_non_hex_string(self): + """Test non-hex string passes through.""" + result = convert_data("Description", "Connected") + assert result == "Connected" + + +class TestGatewayError: + """Test GatewayError exception.""" + + def test_error_creation(self): + """Test creating GatewayError.""" + error = GatewayError(("192.168.1.1", 5002), "list_devices", "Connection refused") + assert error.address == ("192.168.1.1", 5002) + assert error.command == "list_devices" + assert error.error == "Connection refused" + + def test_error_string(self): + """Test error string representation.""" + error = GatewayError(("192.168.1.1", 5002), "test", "Network error") + assert "192.168.1.1:5002" in str(error) + assert "Network error" in str(error) + + +class TestEagleSocketClient: + """Test EagleSocketClient class.""" + + @patch("meter_reader.clients.socket.EagleSocketClient._fetch_device_list") + def test_client_initialization_with_device_list(self, mock_fetch): + """Test client initializes and fetches MAC ID.""" + def set_mac_id(self_arg): + self_arg.mac_id = "0xd8d5b9000000abcd" + + mock_fetch.side_effect = set_mac_id + client = EagleSocketClient("192.168.1.1") + assert client.mac_id == "0xd8d5b9000000abcd" + + @patch("meter_reader.clients.socket.EagleSocketClient._fetch_device_list") + def test_client_initialization_failure(self, mock_fetch): + """Test client handles initialization failure gracefully.""" + mock_fetch.side_effect = Exception("Connection refused") + + # Should not raise, but log warning + client = EagleSocketClient("192.168.1.1") + assert client.mac_id is None + + def test_generate_command_xml(self): + """Test XML command generation.""" + client = EagleSocketClient.__new__(EagleSocketClient) + client.mac_id = "0xabcd" + + xml = client.generate_command_xml(Name="test") + assert "test" in xml + assert "0xabcd" in xml + + def test_xml2dict_simple(self): + """Test parsing simple XML to dict.""" + client = EagleSocketClient.__new__(EagleSocketClient) + xml = "Connected" + result = client._xml2dict(xml, convert=False) + assert result["Status"] == "Connected" + + def test_xml2dict_nested(self): + """Test parsing nested XML.""" + client = EagleSocketClient.__new__(EagleSocketClient) + xml = """ + + 0xabcd + Z109-EAGLE + + """ + result = client._xml2dict(xml, convert=False) + assert "DeviceInfo" in result + assert result["DeviceInfo"]["DeviceMacId"] == "0xabcd" + + def test_xml2dict_list_detection(self): + """Test XML parser detects lists from repeated tags.""" + client = EagleSocketClient.__new__(EagleSocketClient) + xml = """ + + + 0x0 + + + 0x100 + + + """ + result = client._xml2dict(xml, convert=False) + assert "HistoryData" in result + assert isinstance(result["HistoryData"]["CurrentSummation"], list) + assert len(result["HistoryData"]["CurrentSummation"]) == 2 + + def test_xml2dict_empty_string(self): + """Test parsing empty XML string.""" + client = EagleSocketClient.__new__(EagleSocketClient) + result = client._xml2dict("", convert=False) + assert result == {} + + def test_xml2dict_invalid_xml(self): + """Test parsing invalid XML.""" + client = EagleSocketClient.__new__(EagleSocketClient) + result = client._xml2dict("test", b""] + mock_connect.return_value.__enter__.return_value = mock_socket + + client = EagleSocketClient.__new__(EagleSocketClient) + client.address = ("192.168.1.1", 5002) + client.timeout = 5 + client.mac_id = None + + result = client._run_command_raw(Name="test") + assert "test" in result + + @patch("meter_reader.clients.socket.socket.create_connection") + def test_run_command_socket_error(self, mock_connect): + """Test socket error handling.""" + mock_connect.side_effect = socket.error("Connection refused") + + client = EagleSocketClient.__new__(EagleSocketClient) + client.address = ("192.168.1.1", 5002) + client.timeout = 5 + client.mac_id = None + + with pytest.raises(GatewayError) as exc_info: + client._run_command_raw(Name="test") + assert "Connection refused" in str(exc_info.value) + + @patch("meter_reader.clients.socket.EagleSocketClient._run_command_dict") + def test_list_devices(self, mock_run): + """Test list_devices method.""" + mock_run.return_value = { + "DeviceInfo": { + "DeviceMacId": "0xd8d5b9000000abcd", + "ModelId": "Z109-EAGLE", + } + } + + client = EagleSocketClient.__new__(EagleSocketClient) + result = client.list_devices() + + assert isinstance(result, DeviceList) + assert len(result.device_info) == 1 + assert result.device_info[0].device_mac_id == "0xd8d5b9000000abcd" + + @patch("meter_reader.clients.socket.EagleSocketClient._run_command_dict") + def test_get_instantaneous_demand(self, mock_run): + """Test get_instantaneous_demand method.""" + mock_run.return_value = { + "InstantaneousDemand": { + "DeviceMacId": "d8:d5:b9:00:00:00:ab:cd", + "MeterMacId": "d8:d5:b9:00:00:ab", + "TimeStamp": datetime.now(timezone.utc), + "Demand": 1000, + "Multiplier": 1, + "Divisor": 1000, + "DigitsRight": 3, + "DigitsLeft": 0, + "SuppressLeadingZero": False, + } + } + + client = EagleSocketClient.__new__(EagleSocketClient) + client.mac_id = "0xabcd" + result = client.get_instantaneous_demand() + + assert isinstance(result, InstantaneousDemand) + assert result.panic_demand == 1.0 + + @patch("meter_reader.clients.socket.EagleSocketClient._run_command_dict") + def test_get_current_summation(self, mock_run): + """Test get_current_summation method.""" + mock_run.return_value = { + "CurrentSummation": { + "DeviceMacId": "d8:d5:b9:00:00:00:ab:cd", + "MeterMacId": "d8:d5:b9:00:00:ab", + "TimeStamp": datetime.now(timezone.utc), + "SummationDelivered": 123456, + "SummationReceived": 0, + "Multiplier": 1, + "Divisor": 1000, + "DigitsRight": 3, + "DigitsLeft": 0, + "SuppressLeadingZero": False, + } + } + + client = EagleSocketClient.__new__(EagleSocketClient) + client.mac_id = "0xabcd" + result = client.get_current_summation() + + assert isinstance(result, CurrentSummation) + assert result.delivered_kwh == 123.456 + + @patch("meter_reader.clients.socket.EagleSocketClient._run_command_dict") + def test_get_network_info(self, mock_run): + """Test get_network_info method.""" + mock_run.return_value = { + "NetworkInfo": { + "DeviceMacId": "d8:d5:b9:00:00:00:ab:cd", + "CoordMacId": "d8:d5:b9:00:00:ab", + "Status": "Connected", + "Description": "Zigbee Network", + "ExtPanId": "0x1234567890ABCDEF", + "Channel": 15, + "ShortAddr": "0x1234", + "LinkStrength": 100, + } + } + + client = EagleSocketClient.__new__(EagleSocketClient) + client.mac_id = "0xabcd" + result = client.get_network_info() + + assert isinstance(result, NetworkInfo) + assert result.status == "Connected" + + @patch("meter_reader.clients.socket.EagleSocketClient.get_instantaneous_demand") + @patch("meter_reader.clients.socket.EagleSocketClient.get_current_summation") + def test_get_usage_data(self, mock_summation, mock_demand): + """Test get_usage_data synthesizes from demand and summation.""" + mock_demand.return_value = InstantaneousDemand( + DeviceMacId="test", + MeterMacId="test", + TimeStamp=datetime(2021, 1, 1, tzinfo=timezone.utc), + Demand=2500, + Multiplier=1, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + mock_summation.return_value = CurrentSummation( + DeviceMacId="test", + MeterMacId="test", + TimeStamp=datetime(2021, 1, 1, tzinfo=timezone.utc), + SummationDelivered=100000, + SummationReceived=50000, + Multiplier=1, + Divisor=1000, + DigitsRight=3, + DigitsLeft=0, + SuppressLeadingZero=False, + ) + + client = EagleSocketClient.__new__(EagleSocketClient) + result = client.get_usage_data() + + assert isinstance(result, UsageData) + assert result.demand == 2500 + assert result.summation_delivered == 100.0 + assert result.summation_received == 50.0 + + @patch("meter_reader.clients.socket.EagleSocketClient._run_command_dict") + def test_get_history_data(self, mock_run): + """Test get_history_data method.""" + mock_run.return_value = { + "HistoryData": { + "CurrentSummation": [ + { + "DeviceMacId": "test", + "MeterMacId": "test", + "TimeStamp": datetime.now(timezone.utc), + "SummationDelivered": 100000, + "SummationReceived": 0, + "Multiplier": 1, + "Divisor": 1000, + "DigitsRight": 3, + "DigitsLeft": 0, + "SuppressLeadingZero": False, + } + ] + } + } + + client = EagleSocketClient.__new__(EagleSocketClient) + client.mac_id = "0xabcd" + result = client.get_history_data() + + assert isinstance(result, list) + assert len(result) == 1 + assert isinstance(result[0], CurrentSummation) + + @patch("meter_reader.clients.socket.EagleSocketClient._run_command_dict") + def test_get_history_data_empty(self, mock_run): + """Test get_history_data with no data.""" + mock_run.return_value = {} + + client = EagleSocketClient.__new__(EagleSocketClient) + client.mac_id = "0xabcd" + result = client.get_history_data() + + assert result == [] diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..7ef68e6 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,116 @@ +"""Tests for meter_reader utils module.""" +import pytest +from datetime import datetime, timezone +from meter_reader.utils import generate_command_xml, BEGINNING_OF_TIME + + +class TestGenerateCommandXml: + """Test generate_command_xml function.""" + + def test_simple_command(self): + """Test generating simple command with name only.""" + xml = generate_command_xml(None, Name="list_devices") + assert "list_devices" in xml + assert "get_device_data" in xml + assert f"{mac_id}" in xml + + def test_mac_id_not_added_when_explicit(self): + """Test that MAC ID is not auto-added when explicitly provided.""" + mac_id = "0xd8d5b9000000abcd" + explicit_mac = "0xd8d5b9000000abce" + xml = generate_command_xml(mac_id, Name="test", DeviceMacId=explicit_mac) + # Should contain explicit MAC, not auto-added one + assert f"{explicit_mac}" in xml + # Should NOT have auto-added MacID + assert xml.count("MacID") == 0 + + def test_datetime_conversion_to_hex(self): + """Test that datetime values are converted to hex.""" + start_time = BEGINNING_OF_TIME + xml = generate_command_xml(None, Name="test", StartTime=start_time) + # Start time at beginning should be 0x0 + assert "0x0" in xml + + def test_datetime_offset_calculation(self): + """Test datetime offset calculation.""" + from datetime import timedelta + + # 1 hour after beginning of time + one_hour_later = BEGINNING_OF_TIME + timedelta(hours=1) + xml = generate_command_xml(None, Name="test", StartTime=one_hour_later) + # 1 hour = 3600 seconds = 0xe10 + assert "0xe10" in xml + + def test_frequency_hex_conversion(self): + """Test frequency parameter is converted to hex.""" + xml = generate_command_xml(None, Name="test", Frequency=900) + # 900 decimal = 0x384 + assert "0x384" in xml + + def test_duration_hex_conversion(self): + """Test duration parameter is converted to hex.""" + xml = generate_command_xml(None, Name="test", Duration=3600) + # 3600 decimal = 0xe10 + assert "0xe10" in xml + + def test_unsupported_args_filtered(self): + """Test that unsupported arguments are filtered out.""" + xml = generate_command_xml(None, Name="test", UnsupportedArg="value") + assert "UnsupportedArg" not in xml + + def test_none_values_filtered(self): + """Test that None values are filtered out.""" + xml = generate_command_xml(None, Name="test", Interval=None) + assert "Interval" not in xml + + def test_multiple_supported_args(self): + """Test command with multiple supported arguments.""" + xml = generate_command_xml( + None, + Name="test_command", + Frequency=60, + Enabled="Y", + Protocol="TCP", + ) + assert "test_command" in xml + assert "0x3c" in xml # 60 = 0x3c + assert "Y" in xml + assert "TCP" in xml + + def test_case_insensitive_arg_matching(self): + """Test that argument matching is case-insensitive.""" + xml = generate_command_xml(None, Name="test", macid="0xabcd") + # macid (lowercase) should still be recognized as MacID + assert "macid" in xml # Tag preserves original case + + def test_integer_endtime_conversion(self): + """Test that integer endtime values are converted to hex.""" + xml = generate_command_xml(None, Name="test", EndTime=3600) + assert "0xe10" in xml + + def test_text_field(self): + """Test text field passes through.""" + xml = generate_command_xml(None, Name="test", Text="Hello World") + assert "Hello World" in xml + + def test_priority_field(self): + """Test priority field.""" + xml = generate_command_xml(None, Name="test", Priority=1) + assert "1" in xml + + def test_xml_formatting(self): + """Test that XML is properly formatted.""" + xml = generate_command_xml(None, Name="test") + # Should have XML declaration + assert "" in xml + assert "" in xml + # Should be indented (pretty printed) + assert " " in xml # Contains indentation From ebaef9d989a56723ce5474b6bb882dbab0193561 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Thu, 5 Feb 2026 12:09:17 -0800 Subject: [PATCH 02/17] fix: prevent memory exhaustion in socket client response buffering - Add MAX_RESPONSE_SIZE constant (1 MB) to prevent unbounded buffer growth - Replace infinite loop with bounded loop: 'while len(response) < MAX_RESPONSE_SIZE' - Add warning log when response exceeds maximum size - Fix socket test mock setup to work with closing() context manager - Fix socket client initialization test to handle mock binding correctly Fixes unit tests hanging due to memory exhaustion or timeout issues. All 82 tests now pass in ~6 seconds instead of timing out. --- src/meter_reader/clients/socket.py | 203 +++++++++++++++++------------ tests/test_security.py | 117 +++++++++++++++++ tests/test_socket_client.py | 22 ++-- tox.ini | 91 +++++++++++++ 4 files changed, 344 insertions(+), 89 deletions(-) create mode 100644 tests/test_security.py create mode 100644 tox.ini diff --git a/src/meter_reader/clients/socket.py b/src/meter_reader/clients/socket.py index e991fec..34e8e53 100644 --- a/src/meter_reader/clients/socket.py +++ b/src/meter_reader/clients/socket.py @@ -7,8 +7,12 @@ from xml.dom import minidom from ..models import ( - InstantaneousDemand, UsageData, DeviceList, NetworkInfo, - CurrentSummation, DeviceInfo + InstantaneousDemand, + UsageData, + DeviceList, + NetworkInfo, + CurrentSummation, + DeviceInfo, ) from .base import EagleClient @@ -18,24 +22,49 @@ BEGINNING_OF_TIME = datetime(2000, 1, 1, tzinfo=utctz) DEFAULT_PORT = 5002 DEFAULT_TIMEOUT = 5 +MAX_RESPONSE_SIZE = 1024 * 1024 # 1 MB max response -SUPPORTED_ARGS = ('interval', 'frequency', 'starttime', 'endtime', 'duration', - 'name', 'event', 'enabled', 'protocol', 'macid', - 'devicemacid', 'metermacid', 'target', 'format', 'priority', - 'text', 'confirmationrequired', 'id', 'queue', 'read') +SUPPORTED_ARGS = ( + "interval", + "frequency", + "starttime", + "endtime", + "duration", + "name", + "event", + "enabled", + "protocol", + "macid", + "devicemacid", + "metermacid", + "target", + "format", + "priority", + "text", + "confirmationrequired", + "id", + "queue", + "read", +) class GatewayError(Exception): """Exception raised when gateway communication fails.""" - def __init__(self, address: Tuple[str, int], command: str, error: str = '', code: int | None = None) -> None: + def __init__( + self, + address: Tuple[str, int], + command: str, + error: str = "", + code: int | None = None, + ) -> None: self.address = address self.code = code self.error = error self.command = command def __str__(self) -> str: - return f'Unable to connect to {self.address[0]}:{self.address[1]}. {self.error}' + return f"Unable to connect to {self.address[0]}:{self.address[1]}. {self.error}" def twos_complement(value: int, width: int = 32) -> int: @@ -47,18 +76,18 @@ def twos_complement(value: int, width: int = 32) -> int: def convert_data(key: str, value: str | None) -> Any: if value is None: return None - if 'MacId' in key or 'Code' in key or 'Key' in key: + if "MacId" in key or "Code" in key or "Key" in key: clean_value = value - if value.lower().startswith('0x'): + if value.lower().startswith("0x"): clean_value = value[2:] len_ = 15 - if key == 'MeterMacId' or key == 'CoordMacId': + if key == "MeterMacId" or key == "CoordMacId": len_ = 13 - return ':'.join(clean_value[i:i+2] for i in range(0, len_, 2)) - if key.lower() in ('timestamp', 'endtime') and int(value, 0): + return ":".join(clean_value[i : i + 2] for i in range(0, len_, 2)) + if key.lower() in ("timestamp", "endtime") and int(value, 0): # Handle time offset from 2000-01-01 return BEGINNING_OF_TIME + timedelta(0, int(value, 16)) - if isinstance(value, str) and value.startswith('0x'): + if isinstance(value, str) and value.startswith("0x"): return twos_complement(int(value, 16)) return value @@ -66,11 +95,13 @@ def convert_data(key: str, value: str | None) -> Any: class EagleSocketClient(EagleClient): """Client for Eagle Energy Gateway via Socket API (Port 5002).""" - def __init__(self, address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT) -> None: + def __init__( + self, address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT + ) -> None: self.address = (address, port) self.timeout = timeout self.mac_id: str | None = None - + # Initialize by fetching device list to get MAC ID try: self._fetch_device_list() @@ -78,55 +109,62 @@ def __init__(self, address: str, port: int = DEFAULT_PORT, timeout: int = DEFAUL logger.warning(f"Failed to auto-discover MAC ID: {e}") def _fetch_device_list(self) -> None: - data = self._run_command_dict(Name='list_devices', convert=False) + data = self._run_command_dict(Name="list_devices", convert=False) # Handle case where DeviceInfo is a list or single dict - if 'DeviceInfo' in data: - info = data['DeviceInfo'] + if "DeviceInfo" in data: + info = data["DeviceInfo"] if isinstance(info, list) and info: - self.mac_id = info[0].get('DeviceMacId') + self.mac_id = info[0].get("DeviceMacId") elif isinstance(info, dict): - self.mac_id = info.get('DeviceMacId') + self.mac_id = info.get("DeviceMacId") def generate_command_xml(self, **kwargs: Any) -> str: - c = ET.Element('LocalCommand') - has_mac_arg = any(k.lower() in ('macid', 'devicemacid', 'metermacid') for k in kwargs) - + c = ET.Element("LocalCommand") + has_mac_arg = any( + k.lower() in ("macid", "devicemacid", "metermacid") for k in kwargs + ) + for tag, value in kwargs.items(): if tag.lower() not in SUPPORTED_ARGS or value is None: continue - if tag.lower() in ('starttime', 'endtime'): + if tag.lower() in ("starttime", "endtime"): if isinstance(value, datetime): - value = hex(int((value - BEGINNING_OF_TIME).total_seconds())) + value = hex(int((value - BEGINNING_OF_TIME).total_seconds())) elif isinstance(value, int): - value = hex(value) - elif tag.lower() in ('frequency', 'duration'): + value = hex(value) + elif tag.lower() in ("frequency", "duration"): value = hex(value) - + ET.SubElement(c, tag).text = str(value) - + if not has_mac_arg and self.mac_id is not None: - ET.SubElement(c, 'MacID').text = self.mac_id - - md = minidom.parseString(ET.tostring(c, encoding='utf-8')) + ET.SubElement(c, "MacID").text = self.mac_id + + md = minidom.parseString(ET.tostring(c, encoding="utf-8")) return md.toprettyxml(indent=" ") def _run_command_raw(self, **kwargs: Any) -> str: try: with closing(socket.create_connection(self.address, self.timeout)) as s: - s.sendall(self.generate_command_xml(**kwargs).encode('utf-8')) - response = b'' + s.sendall(self.generate_command_xml(**kwargs).encode("utf-8")) + response = b"" s.settimeout(self.timeout) try: - while True: + while len(response) < MAX_RESPONSE_SIZE: chunk = s.recv(4096) if not chunk: break response += chunk except socket.timeout: pass - return response.decode('utf-8', errors='replace') + if len(response) >= MAX_RESPONSE_SIZE: + logger.warning( + f"Response exceeded maximum size ({MAX_RESPONSE_SIZE} bytes) " + f"from {self.address}" + ) + return response.decode("utf-8", errors="replace") except socket.error as e: - raise GatewayError(self.address, kwargs.get('Name', ''), str(e)) + raise GatewayError(self.address, kwargs.get("Name", ""), str(e)) def _element_to_data(self, element: ET.Element, convert: bool = True) -> Any: # 1. If element has no children, return text value @@ -138,18 +176,18 @@ def _element_to_data(self, element: ET.Element, convert: bool = True) -> Any: # 2. Iterate children and build dict result: Dict[str, Any] = {} child_counts: Dict[str, int] = {} - + # First pass to count occurrences for list detection for child in element: child_counts[child.tag] = child_counts.get(child.tag, 0) + 1 - + for child in element: tag = child.tag - if tag in ('Info', 'Text'): + if tag in ("Info", "Text"): continue - + value = self._element_to_data(child, convert) - + # If tag appears multiple times, it must be a list if child_counts[tag] > 1: if tag not in result: @@ -157,13 +195,13 @@ def _element_to_data(self, element: ET.Element, convert: bool = True) -> Any: result[tag].append(value) else: result[tag] = value - + # If result is empty (all children were skipped), return element text or None if not result: if convert: return convert_data(element.tag, element.text) return element.text - + return result def _xml2dict(self, xml: str, convert: bool = True) -> Dict[str, Any]: @@ -173,73 +211,79 @@ def _xml2dict(self, xml: str, convert: bool = True) -> Dict[str, Any]: root = ET.fromstring(xml) except ET.ParseError: return {} - - if root.tag == 'response': - return self._element_to_data(root, convert) # type: ignore + + if root.tag == "response": + return self._element_to_data(root, convert) # type: ignore else: return {root.tag: self._element_to_data(root, convert)} def _run_command_dict(self, convert: bool = True, **kwargs: Any) -> Dict[str, Any]: raw_xml = self._run_command_raw(**kwargs) # Check for list wrapper - return self._xml2dict(f'{raw_xml}', convert) + return self._xml2dict(f"{raw_xml}", convert) def list_devices(self) -> DeviceList: - data = self._run_command_dict(Name='list_devices') + data = self._run_command_dict(Name="list_devices") # Ensure DeviceInfo is a list for the model, even if single device - if 'DeviceInfo' in data and not isinstance(data['DeviceInfo'], list): - data['DeviceInfo'] = [data['DeviceInfo']] + if "DeviceInfo" in data and not isinstance(data["DeviceInfo"], list): + data["DeviceInfo"] = [data["DeviceInfo"]] return DeviceList(**data) def get_instantaneous_demand(self) -> InstantaneousDemand: # The command is 'get_instantaneous_demand' and requires DeviceMacId - data = self._run_command_dict(Name='get_instantaneous_demand', DeviceMacId=self.mac_id) + data = self._run_command_dict( + Name="get_instantaneous_demand", DeviceMacId=self.mac_id + ) # Unwrap if needed, usually InstantaneousDemand key is at root - if 'InstantaneousDemand' in data: - return InstantaneousDemand(**data['InstantaneousDemand']) + if "InstantaneousDemand" in data: + return InstantaneousDemand(**data["InstantaneousDemand"]) return InstantaneousDemand(**data) def get_current_summation(self) -> CurrentSummation: # get_current_summation doesn't exist, use get_device_data which returns all data - data = self._run_command_dict(Name='get_device_data', DeviceMacId=self.mac_id) - if 'CurrentSummation' in data: - return CurrentSummation(**data['CurrentSummation']) + data = self._run_command_dict(Name="get_device_data", DeviceMacId=self.mac_id) + if "CurrentSummation" in data: + return CurrentSummation(**data["CurrentSummation"]) raise ValueError("CurrentSummation not found in device data") def get_network_info(self) -> NetworkInfo: # Use get_device_data which includes NetworkInfo - data = self._run_command_dict(Name='get_device_data', DeviceMacId=self.mac_id) - if 'NetworkInfo' in data: - return NetworkInfo(**data['NetworkInfo']) + data = self._run_command_dict(Name="get_device_data", DeviceMacId=self.mac_id) + if "NetworkInfo" in data: + return NetworkInfo(**data["NetworkInfo"]) raise ValueError("NetworkInfo not found in device data") def get_usage_data(self) -> UsageData: - # Socket API doesn't have a direct equivalent to the HTTP usage_data summary + # Socket API doesn't have a direct equivalent to the HTTP usage_data summary # that includes both demand and summation in one flat object cleanly. # We can simulate it by fetching both. instant = self.get_instantaneous_demand() summation = self.get_current_summation() - + return UsageData( - demand=instant.demand, # Raw demand - demand_units="kW", # Implicit in socket model usually + demand=instant.demand, # Raw demand + demand_units="kW", # Implicit in socket model usually demand_timestamp=int(instant.timestamp.timestamp()), summation_delivered=summation.delivered_kwh, summation_received=summation.received_kwh, summation_units="kWh", - meter_status="Connected" # Assumed if we got data + meter_status="Connected", # Assumed if we got data ) - def get_history_data(self, start_time: datetime | None = None, end_time: datetime | None = None, - frequency: int = 0x384) -> List[CurrentSummation]: + def get_history_data( + self, + start_time: datetime | None = None, + end_time: datetime | None = None, + frequency: int = 0x384, + ) -> List[CurrentSummation]: """ Get historical summation data. - + Args: start_time: Start time for history query (defaults to 1 hour ago) end_time: End time for history query (defaults to now) frequency: Sample frequency in seconds (hex). 0x384 = 900 = 15 minutes - + Returns: List of CurrentSummation objects """ @@ -247,24 +291,23 @@ def get_history_data(self, start_time: datetime | None = None, end_time: datetim start_time = datetime.now(timezone.utc) - timedelta(hours=1) if end_time is None: end_time = datetime.now(timezone.utc) - + data = self._run_command_dict( - Name='get_history_data', + Name="get_history_data", DeviceMacId=self.mac_id, StartTime=start_time, EndTime=end_time, - Frequency=frequency + Frequency=frequency, ) - + # History data returns a HistoryData wrapper with multiple CurrentSummation entries - if 'HistoryData' in data: - history = data['HistoryData'] - if 'CurrentSummation' in history: - summations = history['CurrentSummation'] + if "HistoryData" in data: + history = data["HistoryData"] + if "CurrentSummation" in history: + summations = history["CurrentSummation"] # Ensure it's a list if not isinstance(summations, list): summations = [summations] return [CurrentSummation(**s) for s in summations] - - return [] + return [] diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..77088e6 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,117 @@ +"""Security-focused tests for meter_reader.""" + +import unittest + +from defusedxml.ElementTree import fromstring +from defusedxml.minidom import parseString + +from meter_reader.utils import generate_command_xml + + +class TestXMLSecurity(unittest.TestCase): + """Test XML parsing security.""" + + def test_uses_defusedxml_minidom(self): + """Verify that command XML uses defusedxml for safe parsing.""" + xml = generate_command_xml( + "0xd8d5b9000000abcd", + Name="test_command" + ) + self.assertIsNotNone(xml) + self.assertIn("", xml) + + def test_defusedxml_parsestring_not_vulnerable(self): + """Test that defusedxml.minidom.parseString is used.""" + test_xml = b"safe" + result = parseString(test_xml) + self.assertEqual(result.documentElement.tagName, "test") + + def test_defusedxml_fromstring_not_vulnerable(self): + """Test that defusedxml ElementTree.fromstring is used.""" + test_xml = "safe" + result = fromstring(test_xml) + self.assertEqual(result.tag, "test") + + def test_xxe_prevention(self): + """Test that XXE (XML External Entity) attacks are prevented.""" + xxe_payload = ( + '' + ']>' + '&xxe;' + ) + try: + result = fromstring(xxe_payload) + self.assertIsNone(result.text or "") + except Exception as e: + self.assertIsNotNone(e) + + def test_billion_laughs_prevention(self): + """Test that Billion Laughs attack is prevented.""" + billion_laughs = ( + '' + '' + ' ' + ' ' + ']>' + '&lol3;' + ) + try: + result = fromstring(billion_laughs) + self.assertIsNotNone(result) + except Exception as e: + self.assertIsNotNone(e) + + +class TestInputValidation(unittest.TestCase): + """Test input validation for command generation.""" + + def test_command_xml_escaping(self): + """Test that values are properly escaped in XML.""" + dangerous_value = '' + xml = generate_command_xml( + "0xd8d5b9000000abcd", + Name="test", + Text=dangerous_value + ) + self.assertNotIn("