From 22a10436f86931f64f07da81425cdb712b984454 Mon Sep 17 00:00:00 2001 From: Takeshi Yoshino <4511440+tyoshino@users.noreply.github.com> Date: Thu, 23 Apr 2026 02:40:35 +0000 Subject: [PATCH] fix(wish/python): stop the event loop before GC --- wish/python/src/wish_ext.cc | 66 ++++++++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/wish/python/src/wish_ext.cc b/wish/python/src/wish_ext.cc index 3233862..82fd4c4 100644 --- a/wish/python/src/wish_ext.cc +++ b/wish/python/src/wish_ext.cc @@ -3,6 +3,8 @@ #include #include +#include +#include #include #include @@ -56,6 +58,12 @@ struct TlsClientPy { nb::object on_message_cb; std::shared_ptr handler_ref; + // Tracks whether Run() is currently executing. Used by tls_clear to + // wait for the event loop to exit before it clears the callbacks. + std::atomic running{false}; + std::mutex stopped_mu; + std::condition_variable stopped_cv; + TlsClientPy(const std::string& ca, const std::string& cert, const std::string& key, const std::string& host, int port) : client(ca, cert, key, host, port) {}; @@ -67,6 +75,10 @@ struct PlainClientPy { nb::object on_message_cb; std::shared_ptr handler_ref; + std::atomic running{false}; + std::mutex stopped_mu; + std::condition_variable stopped_cv; + PlainClientPy(const std::string& host, int port) : client(host, port) {} }; @@ -84,8 +96,29 @@ static int tls_traverse(PyObject* self, visitproc visit, void* arg) { static int tls_clear(PyObject* self) { TlsClientPy* w = nb::inst_ptr(nb::handle(self)); - // Clear the C++ callbacks first so the lambda (which captures &*w) is - // dropped before we invalidate on_open_cb / on_message_cb. + + // 1. Ask the event loop to stop. + // + // event_base_loopexit is thread-safe. + w->client.Stop(); + + // 2. Release the GIL and wait for Run() to return. + // + // Without this step there is a data race: + // - Event loop thread reads on_open_ / on_message_ and then blocks + // waiting to acquire the GIL (nb::gil_scoped_acquire). + // - GC thread (holding the GIL) writes those same std::function + // objects via SetOnOpen({}) etc. → UB. + // By releasing the GIL here we let any in-flight callback finish, after + // which event_base_dispatch returns and Run() signals stopped_cv. + { + PyThreadState* ts = PyEval_SaveThread(); // release GIL + std::unique_lock lk(w->stopped_mu); + w->stopped_cv.wait(lk, [w] { return !w->running.load(std::memory_order_acquire); }); + PyEval_RestoreThread(ts); // reacquire GIL + } + + // 3. Event loop has exited; mutations are now single-threaded and safe. w->client.SetOnOpen({}); w->client.SetOnClose({}); w->client.SetOnMessage({}); @@ -113,6 +146,17 @@ static int plain_traverse(PyObject* self, visitproc visit, void* arg) { static int plain_clear(PyObject* self) { PlainClientPy* w = nb::inst_ptr(nb::handle(self)); + + // Same Stop-then-wait pattern as tls_clear. See comments there. + w->client.Stop(); + + { + PyThreadState* ts = PyEval_SaveThread(); + std::unique_lock lk(w->stopped_mu); + w->stopped_cv.wait(lk, [w] { return !w->running.load(std::memory_order_acquire); }); + PyEval_RestoreThread(ts); + } + w->client.SetOnOpen({}); w->client.SetOnClose({}); w->client.SetOnMessage({}); @@ -194,7 +238,14 @@ NB_MODULE(wish_ext, m) { } }); }) - .def("run", [](TlsClientPy& self) { self.client.Run(); }, nb::call_guard()) + .def("run", [](TlsClientPy& self) { + self.running.store(true, std::memory_order_release); + self.client.Run(); // blocks in event_base_dispatch with GIL released + { + std::lock_guard lk(self.stopped_mu); + self.running.store(false, std::memory_order_release); + } + self.stopped_cv.notify_all(); }, nb::call_guard()) .def("stop", [](TlsClientPy& self) { self.client.Stop(); }); // ---- PlainClient ------------------------------------------------------ @@ -244,6 +295,13 @@ NB_MODULE(wish_ext, m) { } }); }) - .def("run", [](PlainClientPy& self) { self.client.Run(); }, nb::call_guard()) + .def("run", [](PlainClientPy& self) { + self.running.store(true, std::memory_order_release); + self.client.Run(); + { + std::lock_guard lk(self.stopped_mu); + self.running.store(false, std::memory_order_release); + } + self.stopped_cv.notify_all(); }, nb::call_guard()) .def("stop", [](PlainClientPy& self) { self.client.Stop(); }); }