diff --git a/serialx/platforms/serial_posix.py b/serialx/platforms/serial_posix.py index 6931942..95ba2a8 100644 --- a/serialx/platforms/serial_posix.py +++ b/serialx/platforms/serial_posix.py @@ -2,6 +2,7 @@ from __future__ import annotations +import array import asyncio import errno import fcntl @@ -320,8 +321,7 @@ def _get_modem_pins(self) -> ModemPins: """Get current modem control bits.""" assert self._fileno is not None - # A `bytearray` is critical here: `bytes` will not be mutated - buffer = bytearray((0x00000000).to_bytes(4, "little")) + buffer = array.array("i", [0x00000000]) try: fcntl.ioctl(self._fileno, termios.TIOCMGET, buffer) @@ -330,7 +330,7 @@ def _get_modem_pins(self) -> ModemPins: LOGGER.debug("Device is not a serial port, cannot get modem pins") return ModemPins() - n = int.from_bytes(buffer, "little") + n = buffer[0] return ModemPins( **{ name: PinState.HIGH if n & bit else PinState.LOW @@ -353,7 +353,7 @@ def _set_modem_pins(self, modem_pins: ModemPins) -> None: if all_pins_set: value = modem_pins_as_int(modem_pins) LOGGER.debug("Setting all with TIOCMSET: 0x%08X", value) - fcntl.ioctl(self._fileno, termios.TIOCMSET, value.to_bytes(4, "little")) + fcntl.ioctl(self._fileno, termios.TIOCMSET, array.array("i", [value])) else: to_set = modem_pins_mask_of_value(modem_pins, PinState.HIGH) to_clear = modem_pins_mask_of_value(modem_pins, PinState.LOW) @@ -361,13 +361,13 @@ def _set_modem_pins(self, modem_pins: ModemPins) -> None: if to_set: LOGGER.debug("Setting TIOCMBIS: 0x%08X", to_set) fcntl.ioctl( - self._fileno, termios.TIOCMBIS, to_set.to_bytes(4, "little") + self._fileno, termios.TIOCMBIS, array.array("i", [to_set]) ) if to_clear: LOGGER.debug("TIOCMBIC: 0x%08X", to_clear) fcntl.ioctl( - self._fileno, termios.TIOCMBIC, to_clear.to_bytes(4, "little") + self._fileno, termios.TIOCMBIC, array.array("i", [to_clear]) ) except OSError as exc: if exc.errno == errno.ENOTTY: @@ -461,20 +461,20 @@ def _write(self, data: Buffer, *, timeout: float | None) -> int: def num_unread_bytes(self) -> int: """Return the number of bytes waiting to be read.""" assert self._fileno is not None - buffer = bytearray((0x00000000).to_bytes(4, "little")) + buffer = array.array("i", [0x00000000]) fcntl.ioctl(self._fileno, termios.FIONREAD, buffer) - return int.from_bytes(buffer, "little") + return buffer[0] def num_unwritten_bytes(self) -> int: """Return the number of bytes waiting to be written.""" assert self._fileno is not None - buffer = bytearray((0x00000000).to_bytes(4, "little")) + buffer = array.array("i", [0x00000000]) fcntl.ioctl(self._fileno, termios.TIOCOUTQ, buffer) - return int.from_bytes(buffer, "little") + return buffer[0] def _reset_read_buffer(self) -> None: """Reset the read buffer.""" diff --git a/tests/test_serial_posix.py b/tests/test_serial_posix.py new file mode 100644 index 0000000..4f6df57 --- /dev/null +++ b/tests/test_serial_posix.py @@ -0,0 +1,117 @@ +"""POSIX serial port tests.""" + +import sys + +import pytest + +if sys.platform in ("win32", "emscripten"): + pytest.skip("POSIX-only tests", allow_module_level=True) + +import array +import errno +import termios +from typing import Any +from unittest.mock import patch + +from serialx import ModemPins, PinState +from serialx.platforms.serial_posix import PosixSerial + + +def test_num_unread_bytes_uses_native_int_ioctl_buffer() -> None: + """FIONREAD writes a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.FIONREAD + assert isinstance(arg, array.array) + assert arg.typecode == "i" + arg[0] = 123 + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + assert serial.num_unread_bytes() == 123 + + +def test_num_unwritten_bytes_uses_native_int_ioctl_buffer() -> None: + """TIOCOUTQ writes a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.TIOCOUTQ + assert isinstance(arg, array.array) + assert arg.typecode == "i" + arg[0] = 456 + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + assert serial.num_unwritten_bytes() == 456 + + +def test_get_modem_pins_uses_native_int_ioctl_buffer() -> None: + """TIOCMGET writes a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.TIOCMGET + assert isinstance(arg, array.array) + assert arg.typecode == "i" + arg[0] = termios.TIOCM_DTR + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + pins = serial.get_modem_pins() + + assert pins.dtr is PinState.HIGH + assert pins.rts is PinState.LOW + + +def test_set_modem_pins_uses_native_int_ioctl_buffer() -> None: + """TIOCMSET reads a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.TIOCMSET + assert isinstance(arg, array.array) + assert arg.typecode == "i" + assert arg[0] & termios.TIOCM_DTR + assert arg[0] & termios.TIOCM_RTS + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + serial.set_modem_pins( + ModemPins( + le=PinState.LOW, + dtr=PinState.HIGH, + rts=PinState.HIGH, + st=PinState.LOW, + sr=PinState.LOW, + cts=PinState.LOW, + car=PinState.LOW, + rng=PinState.LOW, + dsr=PinState.LOW, + ) + ) + + +def test_partial_set_modem_pins_uses_native_int_ioctl_buffer() -> None: + """TIOCMBIS/TIOCMBIC read native C ints, not little-endian byte strings.""" + + seen_requests = [] + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert isinstance(arg, array.array) + assert arg.typecode == "i" + seen_requests.append(request) + if request == termios.TIOCMBIS: + assert arg[0] == termios.TIOCM_DTR + elif request == termios.TIOCMBIC: + assert arg[0] == termios.TIOCM_RTS + else: + raise OSError(errno.ENOTTY, "unexpected ioctl") + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + serial.set_modem_pins(dtr=True, rts=False) + + assert seen_requests == [termios.TIOCMBIS, termios.TIOCMBIC]