diff --git a/host_sim.py b/host_sim.py index cd49d52..1cd5b4e 100644 --- a/host_sim.py +++ b/host_sim.py @@ -1,10 +1,14 @@ +import logging import asyncio from sim_server import SimServer from not_tcp.host import StreamProxy +log = logging.getLogger(__name__) + + class HostSimulator(SimServer, StreamProxy): # Multiple inheritance is not a *crime*, it's just an abuse of the rules. # Tax avoidance is not tax evasion! @@ -12,7 +16,6 @@ class HostSimulator(SimServer, StreamProxy): async def run_server(port): - import sys import ntcp_http dut = ntcp_http.NtcpHttpServer() @@ -20,7 +23,7 @@ async def run_server(port): server = await asyncio.start_server( client_connected_cb=srv.client_connected, host="localhost", port=port) - sys.stderr.write(f"listening on port {port}\n") + log.info(f"listening on port {port}\n") await server.serve_forever() diff --git a/not_tcp/host.py b/not_tcp/host.py index e049535..2c47667 100644 --- a/not_tcp/host.py +++ b/not_tcp/host.py @@ -1,10 +1,12 @@ +from asyncio import StreamReader, StreamWriter from dataclasses import dataclass -import struct from enum import IntFlag from typing import Optional -from asyncio import StreamReader, StreamWriter import asyncio -import sys +import logging +import struct + +log = logging.getLogger(__name__) class Flag(IntFlag): @@ -104,6 +106,7 @@ def from_bytes(cls, buf: bytes) -> (Optional["Packet"], bytes): # Use as superclass; subclass to simulator or real class StreamProxy: lock = asyncio.Lock() + request_number = 0 def send(self, b: bytes()): # Must be implemented by subclass @@ -115,14 +118,19 @@ def recv(self) -> bytes: def client_connected( self, reader: StreamReader, writer: StreamWriter): - asyncio.create_task(self.client_loop(reader, writer)) + r = self.request_number = self.request_number + 1 + log.info(f'client {r} connected') + asyncio.create_task(self.client_loop(r, reader, writer)) - async def client_loop(self, reader: StreamReader, writer: StreamWriter): + async def client_loop(self, number: int, + reader: StreamReader, writer: StreamWriter): async with self.lock, asyncio.TaskGroup() as tg: - tg.create_task(self.run_inbound(reader)) - tg.create_task(self.run_outbound(writer)) + log.info(f'starting client {number} handler') + tg.create_task(self.run_inbound(number, reader)) + tg.create_task(self.run_outbound(number, writer)) + log.info(f'completed client {number} handlers') - async def run_inbound(self, reader: StreamReader): + async def run_inbound(self, number: int, reader: StreamReader): p1 = Packet(flags=Flag.START, stream_id=1, body=bytes()) self.send(p1.to_bytes()) want_bytes = 256 @@ -145,15 +153,23 @@ async def run_inbound(self, reader: StreamReader): # Input is done, in theory p3 = Packet(flags=Flag.END, stream_id=1, body=bytes()) self.send(p3.to_bytes()) + log.info(f"client {number} closed inbound connection") - async def run_outbound(self, writer: StreamWriter): + async def run_outbound(self, number: int, writer: StreamWriter): + olog = log.getChild("outbound") + total_bytes = 0 buffer = bytes() packet_count = 0 while True: - rcvd = self.recv() # Has its own timeout, but isn't async. So: - await asyncio.sleep(0) - buffer += rcvd - (p, rem) = Packet.from_bytes(buffer) + if len(buffer) > 0: + olog.debug( + f"buffer contains bytes: " + f"{total_bytes}:{total_bytes+len(buffer)}\n") + + buffer_len = len(buffer) + (p, buffer) = Packet.from_bytes(buffer) + consumed = (buffer_len - len(buffer)) + total_bytes += consumed if p is None: continue buffer = rem diff --git a/not_tcp/host_test.py b/not_tcp/host_test.py index 0181529..dd9333d 100644 --- a/not_tcp/host_test.py +++ b/not_tcp/host_test.py @@ -1,17 +1,17 @@ -import sys -import pytest import asyncio -from ntcp_http import NtcpHttpServer +import pytest from amaranth import Module from amaranth.lib.wiring import Component, In, Out from amaranth.lib import stream from host_sim import HostSimulator +from http_server import capitalizer from not_tcp.host import Packet, Flag -from sim_server import SimServer from not_tcp.not_tcp import StreamStop -from http_server import capitalizer +from ntcp_http import NtcpHttpServer +from sim_server import SimServer + pytest_plugins = ('pytest_asyncio',) @@ -65,12 +65,10 @@ def DISABLED_test_capitalize_server(): received_bytes = bytes() received_body = bytes() packets = [] - import sys for i in range(100): received_bytes += srv.recv() (packet, remainder) = Packet.from_bytes(received_bytes) if packet is not None: - sys.stderr.write(f"{packet}\n") received_bytes = remainder packets += [packet] received_body += packet.body @@ -88,7 +86,6 @@ def DISABLED_test_capitalize_server(): received_bytes += srv.recv() (packet, remainder) = Packet.from_bytes(received_bytes) if packet is not None: - sys.stderr.write(f"{packet}\n") received_bytes = remainder packets += [packet] received_body += packet.body diff --git a/ntcp_http_test.py b/ntcp_http_test.py index b0a9c7d..aff060c 100644 --- a/ntcp_http_test.py +++ b/ntcp_http_test.py @@ -52,13 +52,11 @@ def test_sim(): received_bytes = bytes() packets = [] - import sys # We shouldn't have more than 100 packets for this test. for i in range(100): received_bytes += srv.recv() (packet, remainder) = Packet.from_bytes(received_bytes) if packet is not None: - sys.stderr.write(f"packet: {packet}\n") received_bytes = remainder packets += [packet] if packet.end: diff --git a/sim_server.py b/sim_server.py index 7393cd8..9bba207 100644 --- a/sim_server.py +++ b/sim_server.py @@ -1,5 +1,5 @@ import queue -import sys +import logging import traceback from threading import Thread @@ -7,6 +7,8 @@ from stream_fixtures import StreamSender, StreamCollector +log = logging.getLogger(__name__) + class SimServer: """ @@ -100,10 +102,10 @@ def _run_sim(self, sim): def runnable(): try: # Uncomment this line, and indent the next, to get debug info. - # with sim.write_vcd("testout.vcd"): - sim.run() + with sim.write_vcd("testout.vcd"): + sim.run() except Exception as e: - sys.stderr.write(f"error in Amaranth simulation: {e}\n") + log.error("error in Amaranth simulation: ", e) # Try to force shutdown: self._sender.die = True raise e @@ -111,14 +113,15 @@ def runnable(): return runnable def __exit__(self, exe_type, exe_val, exe_traceback, **kwargs): + if exe_traceback is not None: + traceback.print_tb(exe_traceback) + assert self._sim_thread is not None # Shutting down the data input should shut down the simulator; # the data input is driving the tick. # self._data_in.shutdown() # .shutdown() is not available on python3.11, # so we have to use a flag. - if exe_traceback is not None: - traceback.print_tb(exe_traceback) self._sender.die = True self._sim_thread.join() diff --git a/stream_fixtures.py b/stream_fixtures.py index a908f09..9f0573f 100644 --- a/stream_fixtures.py +++ b/stream_fixtures.py @@ -1,12 +1,14 @@ """ Test fixtures for sending and receiving in streams. """ -import sys -import time import random import queue +import logging from typing import Iterable + +log = logging.getLogger(__name__) + __all__ = ["StreamCollector", "StreamSender"] @@ -99,15 +101,10 @@ async def collector(ctx): try: q.put(batch, block=False) except queue.Full: - sys.stderr.write( - f"queue full, saving {len(batch)} bytes " - "for later\n" - ) countup = 0 continue except Exception as e: - sys.stderr.write( - f"error in sending data from sim: {e}\n") + log.error("error in sending data from sim: ", e) return batch = bytes() countup = 0 @@ -150,7 +147,6 @@ class StreamSender: # Flag bit, to kill the send_queue_active thread die: bool = False - def __init__(self, stream, random_delay=False, @@ -199,10 +195,11 @@ async def sender(ctx): except queue.Empty: data = bytes() except queue.ShutDown: - sys.stderr.write("queue is shut down\n") + log.info("write-to-sim queue is shut down") return except Exception as e: - sys.stderr.write(f"unexpected exception: {e}\n") + log.error( + "write-to-sim queue unexpected exception: ", e) raise e if isinstance(data, str):