From 56915fa3327a24a2f1e60f44d262b42348e30a56 Mon Sep 17 00:00:00 2001 From: Jelle van der Waa Date: Sat, 18 Oct 2025 16:44:32 +0200 Subject: [PATCH 1/3] varlink: introduce a server_factory fixture and switch to pytest Introduce a re-usable server_factory fixture to spin up and teardown a varlink test server. Allowing most of the tests to be ported over to pytest assertions. --- varlink/tests/conftest.py | 32 ++++++ varlink/tests/test_basic_network.py | 137 ++++++++++++++------------ varlink/tests/test_orgexamplemore.py | 101 +++++++++---------- varlink/tests/test_server_encoding.py | 65 +++++------- 4 files changed, 181 insertions(+), 154 deletions(-) create mode 100644 varlink/tests/conftest.py diff --git a/varlink/tests/conftest.py b/varlink/tests/conftest.py new file mode 100644 index 0000000..2806052 --- /dev/null +++ b/varlink/tests/conftest.py @@ -0,0 +1,32 @@ +import threading + +import pytest + +import varlink + + +@pytest.fixture +def server_factory(): + servers = [] + + def _create_server(address: str, request_handler: varlink.RequestHandler): + server = varlink.ThreadingServer(address, request_handler) + ready_event = threading.Event() + + def run_server() -> None: + ready_event.set() + server.serve_forever() + + server_thread = threading.Thread(target=run_server) + server_thread.start() + ready_event.wait() + servers.append((server, server_thread)) + + return server + + yield _create_server + + for srv, srv_thread in servers: + srv.shutdown() + srv.server_close() + srv_thread.join() diff --git a/varlink/tests/test_basic_network.py b/varlink/tests/test_basic_network.py index 611c80e..28d3634 100755 --- a/varlink/tests/test_basic_network.py +++ b/varlink/tests/test_basic_network.py @@ -1,8 +1,9 @@ import os import socket +import sys import threading -import unittest -from sys import platform + +import pytest import varlink @@ -19,63 +20,75 @@ class ServiceRequestHandler(varlink.RequestHandler): service = service -class TestService(unittest.TestCase): - def do_run(self, address): - server = varlink.ThreadingServer(address, ServiceRequestHandler) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.daemon = True - server_thread.start() - - try: - with varlink.Client(address) as client, client.open("org.varlink.service") as _connection: - info = _connection.GetInfo() - - self.assertEqual(len(info["interfaces"]), 1) - self.assertEqual(info["interfaces"][0], "org.varlink.service") - self.assertEqual(info, service.GetInfo()) - - desc = _connection.GetInterfaceDescription(info["interfaces"][0]) - self.assertEqual(desc, service.GetInterfaceDescription("org.varlink.service")) - - _connection.close() - - finally: - server.shutdown() - server.server_close() - - def test_tcp(self) -> None: - self.do_run("tcp:127.0.0.1:23450") - - def test_anon_unix(self) -> None: - if platform.startswith("linux"): - self.do_run(f"unix:@org.varlink.service_anon_test{os.getpid()}{threading.current_thread().name}") - - def test_unix(self) -> None: - if hasattr(socket, "AF_UNIX"): - self.do_run(f"unix:org.varlink.service_anon_test_{os.getpid()}{threading.current_thread().name}") - - def test_wrong_url(self) -> None: - self.assertRaises( - ConnectionError, self.do_run, f"uenix:org.varlink.service_wrong_url_test_{os.getpid()}" - ) - - def test_reuse_open(self) -> None: - address = "tcp:127.0.0.1:23450" - server = varlink.ThreadingServer(address, ServiceRequestHandler) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.daemon = True - server_thread.start() - - try: - with varlink.Client(address) as client: - connection = client.open_connection() - re_use = client.open("org.varlink.service", False, connection) - - info = re_use.GetInfo() - self.assertEqual(len(info["interfaces"]), 1) - self.assertEqual(info["interfaces"][0], "org.varlink.service") - self.assertEqual(info, service.GetInfo()) - connection.close() - finally: - server.shutdown() - server.server_close() +def test_tcp(server_factory) -> None: + address = "tcp:127.0.0.1:23450" + server_factory(address, ServiceRequestHandler) + + with varlink.Client(address) as client, client.open("org.varlink.service") as connection: + info = connection.GetInfo() + + assert len(info["interfaces"]) == 1 + assert info["interfaces"][0] == "org.varlink.service" + assert info == service.GetInfo() + + desc = connection.GetInterfaceDescription(info["interfaces"][0]) + assert desc == service.GetInterfaceDescription("org.varlink.service") + + connection.close() + + +@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="Only runs on Linux") +def test_anon_unix(server_factory) -> None: + address = f"unix:@org.varlink.service_anon_test{os.getpid()}{threading.current_thread().name}" + server_factory(address, ServiceRequestHandler) + + with varlink.Client(address) as client, client.open("org.varlink.service") as connection: + info = connection.GetInfo() + + assert len(info["interfaces"]) == 1 + assert info["interfaces"][0] == "org.varlink.service" + assert info == service.GetInfo() + + desc = connection.GetInterfaceDescription(info["interfaces"][0]) + assert desc == service.GetInterfaceDescription("org.varlink.service") + + connection.close() + + +@pytest.mark.skipif(not hasattr(socket, "AF_UNIX"), reason="No UNIX socket support") +def test_unix(server_factory) -> None: + address = f"unix:org.varlink.service_anon_test_{os.getpid()}{threading.current_thread().name}" + server_factory(address, ServiceRequestHandler) + + with varlink.Client(address) as client, client.open("org.varlink.service") as connection: + info = connection.GetInfo() + + assert len(info["interfaces"]) == 1 + assert info["interfaces"][0] == "org.varlink.service" + assert info == service.GetInfo() + + desc = connection.GetInterfaceDescription(info["interfaces"][0]) + assert desc == service.GetInterfaceDescription("org.varlink.service") + + connection.close() + + +def test_reuse_open(server_factory) -> None: + address = "tcp:127.0.0.1:23450" + server_factory(address, ServiceRequestHandler) + + with varlink.Client(address) as client: + connection = client.open_connection() + re_use = client.open("org.varlink.service", False, connection) + info = re_use.GetInfo() + + assert len(info["interfaces"]) == 1 + assert info["interfaces"][0] == "org.varlink.service" + assert info == service.GetInfo() + connection.close() + + +def test_wrong_url(server_factory) -> None: + address = f"uenix:org.varlink.service_wrong_url_test_{os.getpid()}" + with pytest.raises(ConnectionError): + server_factory(address, ServiceRequestHandler) diff --git a/varlink/tests/test_orgexamplemore.py b/varlink/tests/test_orgexamplemore.py index 561e231..9b7cb48 100755 --- a/varlink/tests/test_orgexamplemore.py +++ b/varlink/tests/test_orgexamplemore.py @@ -22,11 +22,11 @@ import socket import sys import textwrap -import threading import time -import unittest from sys import platform +import pytest + import varlink ######## CLIENT ############# @@ -213,57 +213,52 @@ def epilog(): ######## UNITTEST ############# -class TestService(unittest.TestCase): - def test_service(self) -> None: - address = "tcp:127.0.0.1:23451" - Example.sleep_duration = 0.1 +def test_service(server_factory) -> None: + address = "tcp:127.0.0.1:23451" + Example.sleep_duration = 0.1 + server_factory(address, ServiceRequestHandler) - server = varlink.ThreadingServer(address, ServiceRequestHandler) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.daemon = True - server_thread.start() - try: - client = varlink.Client.new_with_address(address) + client = varlink.Client.new_with_address(address) - run_client(client) + run_client(client) - with ( - client.open("org.example.more", namespaced=True) as con1, - client.open("org.example.more", namespaced=True) as con2, - ): - self.assertEqual(con1.Ping("Test").pong, "Test") - - it = con1.TestMore(10, _more=True) - - m = next(it) - self.assertTrue(hasattr(m.state, "start")) - self.assertFalse(hasattr(m.state, "end")) - self.assertFalse(hasattr(m.state, "progress")) - self.assertIsNotNone(m.state.start) - - for i in range(0, 110, 10): - m = next(it) - self.assertTrue(hasattr(m.state, "progress")) - self.assertFalse(hasattr(m.state, "start")) - self.assertFalse(hasattr(m.state, "end")) - self.assertIsNotNone(m.state.progress) - self.assertEqual(i, m.state.progress) - - if i > 50: - ret = con2.Ping("Test") - self.assertEqual("Test", ret.pong) - - m = next(it) - self.assertTrue(hasattr(m.state, "end")) - self.assertFalse(hasattr(m.state, "start")) - self.assertFalse(hasattr(m.state, "progress")) - self.assertIsNotNone(m.state.end) - - self.assertRaises(StopIteration, next, it) - - con1.StopServing(_oneway=True) - time.sleep(0.5) - self.assertRaises(ConnectionError, con1.Ping, "Test") - finally: - server.shutdown() - server.server_close() + with ( + client.open("org.example.more", namespaced=True) as con1, + client.open("org.example.more", namespaced=True) as con2, + ): + assert con1.Ping("Test").pong == "Test" + + it = con1.TestMore(10, _more=True) + + m = next(it) + assert hasattr(m.state, "start") + assert not hasattr(m.state, "end") + assert not hasattr(m.state, "progress") + assert m.state.start is not None + + for i in range(0, 110, 10): + m = next(it) + assert hasattr(m.state, "progress") + assert not hasattr(m.state, "start") + assert not hasattr(m.state, "end") + assert m.state.progress is not None + assert i == m.state.progress + + if i > 50: + ret = con2.Ping("Test") + assert ret.pong == "Test" + + m = next(it) + assert hasattr(m.state, "end") + assert not hasattr(m.state, "start") + assert not hasattr(m.state, "progress") + assert m.state.end is not None + + with pytest.raises(StopIteration): + next(it) + + con1.StopServing(_oneway=True) + time.sleep(0.5) + + with pytest.raises(ConnectionError): + con1.Ping("Test") diff --git a/varlink/tests/test_server_encoding.py b/varlink/tests/test_server_encoding.py index f6a2a87..bb8bd0a 100644 --- a/varlink/tests/test_server_encoding.py +++ b/varlink/tests/test_server_encoding.py @@ -6,10 +6,7 @@ import dataclasses import os -import threading -import time import typing -import unittest import varlink import varlink.error @@ -84,41 +81,31 @@ def GetOrder(self, num): return GetOrderResult(order=order) -class TestService(unittest.TestCase): +def test_ping(server_factory): address = "tcp:127.0.0.1:23451" + server_factory(address, ServiceRequestHandler) + client = varlink.Client.new_with_address(address) + with client.open("org.example.encoding") as conn: + response = conn.Ping("Foo") + assert response["pong"] == "Foo" - @classmethod - def setUpClass(cls): - cls._server = varlink.ThreadingServer(cls.address, ServiceRequestHandler) - cls._server_thread = threading.Thread(target=cls._server.serve_forever) - cls._server_thread.start() - time.sleep(0.1) - - @classmethod - def tearDownClass(cls): - cls._server.shutdown() - cls._server.server_close() - cls._server_thread.join() - - def test_ping(self): - client = varlink.Client.new_with_address(self.address) - with client.open("org.example.encoding") as conn: - response = conn.Ping("Foo") - self.assertEqual(response["pong"], "Foo") - - def test_get_order(self): - client = varlink.Client.new_with_address(self.address) - with client.open("org.example.encoding") as conn: - response = conn.GetOrder(4638547) - # response will be a dict represenation of GetOrderResult - self.assertIn("order", response) - order = response["order"] - self.assertEqual(order.get("order_num"), 4638547) - self.assertEqual(order.get("customer"), "Joe's Discount Store") - self.assertEqual(len(order.get("shipments", [])), 2) - shipment1 = order["shipments"][0] - self.assertEqual(shipment1.get("name"), "Furniture") - self.assertIsNotNone(shipment1.get("weight")) - shipment2 = order["shipments"][1] - self.assertEqual(shipment2.get("name"), "Electronics") - self.assertIsNone(shipment2.get("weight")) + +def test_get_order(server_factory): + address = "tcp:127.0.0.1:23451" + server_factory(address, ServiceRequestHandler) + + client = varlink.Client.new_with_address(address) + with client.open("org.example.encoding") as conn: + response = conn.GetOrder(4638547) + # response will be a dict represenation of GetOrderResult + assert "order" in response + order = response["order"] + assert order.get("order_num") == 4638547 + assert order.get("customer") == "Joe's Discount Store" + assert len(order.get("shipments", [])) == 2 + shipment1 = order["shipments"][0] + assert shipment1.get("name") == "Furniture" + assert shipment1.get("weight") is not None + shipment2 = order["shipments"][1] + assert shipment2.get("name") == "Electronics" + assert shipment2.get("weight") is None From f299cf8b06c9bbfa33c307e0e2506f9717328ed4 Mon Sep 17 00:00:00 2001 From: Jelle van der Waa Date: Sat, 18 Oct 2025 17:10:29 +0200 Subject: [PATCH 2/3] varlink: parameterize address tests The three tests use the same test logic so combine them into one test with pytest parametrize's decorator. --- varlink/tests/test_basic_network.py | 56 ++++++++++------------------- 1 file changed, 19 insertions(+), 37 deletions(-) diff --git a/varlink/tests/test_basic_network.py b/varlink/tests/test_basic_network.py index 28d3634..1bbd596 100755 --- a/varlink/tests/test_basic_network.py +++ b/varlink/tests/test_basic_network.py @@ -20,44 +20,26 @@ class ServiceRequestHandler(varlink.RequestHandler): service = service -def test_tcp(server_factory) -> None: - address = "tcp:127.0.0.1:23450" - server_factory(address, ServiceRequestHandler) - - with varlink.Client(address) as client, client.open("org.varlink.service") as connection: - info = connection.GetInfo() - - assert len(info["interfaces"]) == 1 - assert info["interfaces"][0] == "org.varlink.service" - assert info == service.GetInfo() - - desc = connection.GetInterfaceDescription(info["interfaces"][0]) - assert desc == service.GetInterfaceDescription("org.varlink.service") - - connection.close() - - -@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="Only runs on Linux") -def test_anon_unix(server_factory) -> None: - address = f"unix:@org.varlink.service_anon_test{os.getpid()}{threading.current_thread().name}" - server_factory(address, ServiceRequestHandler) - - with varlink.Client(address) as client, client.open("org.varlink.service") as connection: - info = connection.GetInfo() - - assert len(info["interfaces"]) == 1 - assert info["interfaces"][0] == "org.varlink.service" - assert info == service.GetInfo() - - desc = connection.GetInterfaceDescription(info["interfaces"][0]) - assert desc == service.GetInterfaceDescription("org.varlink.service") - - connection.close() - +test_addresses = [ + ("tcp:127.0.0.1:23450", False, ""), + ( + f"unix:@org.varlink.service_anon_test{os.getpid()}{threading.current_thread().name}", + not sys.platform.startswith("linux"), + "Only runs on Linux", + ), + ( + f"unix:org.varlink.service_anon_test_{os.getpid()}{threading.current_thread().name}", + not hasattr(socket, "AF_UNIX"), + "No UNIX socket support", + ), +] + + +@pytest.mark.parametrize("address,skip,skip_reason", test_addresses) +def test_address(server_factory, address, skip, skip_reason) -> None: + if skip: + pytest.skip(skip_reason) -@pytest.mark.skipif(not hasattr(socket, "AF_UNIX"), reason="No UNIX socket support") -def test_unix(server_factory) -> None: - address = f"unix:org.varlink.service_anon_test_{os.getpid()}{threading.current_thread().name}" server_factory(address, ServiceRequestHandler) with varlink.Client(address) as client, client.open("org.varlink.service") as connection: From 510fb82aa6142c4319dfe3c1a8189c217a08c476 Mon Sep 17 00:00:00 2001 From: Jelle van der Waa Date: Sat, 18 Oct 2025 17:43:30 +0200 Subject: [PATCH 3/3] varlink: rework `server_factory` fixture to take a `service` Construct `ServiceRequestHandler` once and re-use it in the fixture. --- varlink/tests/conftest.py | 8 ++++++-- varlink/tests/test_basic_network.py | 10 +++------- varlink/tests/test_orgexamplemore.py | 8 ++------ varlink/tests/test_server_encoding.py | 8 ++------ 4 files changed, 13 insertions(+), 21 deletions(-) diff --git a/varlink/tests/conftest.py b/varlink/tests/conftest.py index 2806052..2503807 100644 --- a/varlink/tests/conftest.py +++ b/varlink/tests/conftest.py @@ -5,12 +5,16 @@ import varlink +class ServiceRequestHandler(varlink.RequestHandler): ... + + @pytest.fixture def server_factory(): servers = [] - def _create_server(address: str, request_handler: varlink.RequestHandler): - server = varlink.ThreadingServer(address, request_handler) + def _create_server(address: str, service: varlink.Service): + ServiceRequestHandler.service = service + server = varlink.ThreadingServer(address, ServiceRequestHandler) ready_event = threading.Event() def run_server() -> None: diff --git a/varlink/tests/test_basic_network.py b/varlink/tests/test_basic_network.py index 1bbd596..b4d4f31 100755 --- a/varlink/tests/test_basic_network.py +++ b/varlink/tests/test_basic_network.py @@ -16,10 +16,6 @@ ) -class ServiceRequestHandler(varlink.RequestHandler): - service = service - - test_addresses = [ ("tcp:127.0.0.1:23450", False, ""), ( @@ -40,7 +36,7 @@ def test_address(server_factory, address, skip, skip_reason) -> None: if skip: pytest.skip(skip_reason) - server_factory(address, ServiceRequestHandler) + server_factory(address, service) with varlink.Client(address) as client, client.open("org.varlink.service") as connection: info = connection.GetInfo() @@ -57,7 +53,7 @@ def test_address(server_factory, address, skip, skip_reason) -> None: def test_reuse_open(server_factory) -> None: address = "tcp:127.0.0.1:23450" - server_factory(address, ServiceRequestHandler) + server_factory(address, service) with varlink.Client(address) as client: connection = client.open_connection() @@ -73,4 +69,4 @@ def test_reuse_open(server_factory) -> None: def test_wrong_url(server_factory) -> None: address = f"uenix:org.varlink.service_wrong_url_test_{os.getpid()}" with pytest.raises(ConnectionError): - server_factory(address, ServiceRequestHandler) + server_factory(address, service) diff --git a/varlink/tests/test_orgexamplemore.py b/varlink/tests/test_orgexamplemore.py index 9b7cb48..77323bc 100755 --- a/varlink/tests/test_orgexamplemore.py +++ b/varlink/tests/test_orgexamplemore.py @@ -75,10 +75,6 @@ def run_client(client): ) -class ServiceRequestHandler(varlink.RequestHandler): - service = service - - class ActionFailed(varlink.VarlinkError): def __init__(self, reason): varlink.VarlinkError.__init__( @@ -138,7 +134,7 @@ def TestObject(self, object): def run_server(address): - with varlink.ThreadingServer(address, ServiceRequestHandler) as server: + with varlink.ThreadingServer(address, service) as server: print("Listening on", server.server_address) try: server.serve_forever() @@ -216,7 +212,7 @@ def epilog(): def test_service(server_factory) -> None: address = "tcp:127.0.0.1:23451" Example.sleep_duration = 0.1 - server_factory(address, ServiceRequestHandler) + server_factory(address, service) client = varlink.Client.new_with_address(address) diff --git a/varlink/tests/test_server_encoding.py b/varlink/tests/test_server_encoding.py index bb8bd0a..21b3a59 100644 --- a/varlink/tests/test_server_encoding.py +++ b/varlink/tests/test_server_encoding.py @@ -49,10 +49,6 @@ def default(self, obj): ) -class ServiceRequestHandler(varlink.RequestHandler): - service = service - - @service.interface("org.example.encoding") class EncodingExample: sleep_duration = 1 @@ -83,7 +79,7 @@ def GetOrder(self, num): def test_ping(server_factory): address = "tcp:127.0.0.1:23451" - server_factory(address, ServiceRequestHandler) + server_factory(address, service) client = varlink.Client.new_with_address(address) with client.open("org.example.encoding") as conn: response = conn.Ping("Foo") @@ -92,7 +88,7 @@ def test_ping(server_factory): def test_get_order(server_factory): address = "tcp:127.0.0.1:23451" - server_factory(address, ServiceRequestHandler) + server_factory(address, service) client = varlink.Client.new_with_address(address) with client.open("org.example.encoding") as conn: