Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions wish/cpp/src/plain_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ bool PlainClient::Init() {
handler_->SetOnMessage(on_message_);
}

if (on_close_) {
handler_->SetOnClose(on_close_);
}

handler_->Start();

return true;
Expand All @@ -73,6 +77,13 @@ void PlainClient::SetOnMessage(MessageCallback cb) {
}
}

void PlainClient::SetOnClose(CloseCallback cb) {
on_close_ = cb;
if (handler_) {
handler_->SetOnClose(on_close_);
}
}

void PlainClient::Run() {
std::cout << "Client running..." << std::endl;

Expand Down
3 changes: 3 additions & 0 deletions wish/cpp/src/plain_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ class PlainClient {
public:
using OpenCallback = std::function<void(WishHandler*)>;
using MessageCallback = std::function<void(uint8_t, const std::string&)>;
using CloseCallback = std::function<void()>;

PlainClient(const std::string& host, int port);
~PlainClient();

bool Init();
void SetOnOpen(OpenCallback cb);
void SetOnMessage(MessageCallback cb);
void SetOnClose(CloseCallback cb);
void Run();
void Stop();

Expand All @@ -35,6 +37,7 @@ class PlainClient {

OpenCallback on_open_;
MessageCallback on_message_;
CloseCallback on_close_;
};

#endif // WISH_CPP_SRC_PLAIN_CLIENT_H_
11 changes: 11 additions & 0 deletions wish/cpp/src/tls_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ bool TlsClient::Init() {
handler_->SetOnMessage(on_message_);
}

if (on_close_) {
handler_->SetOnClose(on_close_);
}

handler_->Start();

return true;
Expand All @@ -99,6 +103,13 @@ void TlsClient::SetOnMessage(MessageCallback cb) {
}
}

void TlsClient::SetOnClose(CloseCallback cb) {
on_close_ = cb;
if (handler_) {
handler_->SetOnClose(on_close_);
}
}

void TlsClient::Run() {
std::cout << "Client running..." << std::endl;

Expand Down
3 changes: 3 additions & 0 deletions wish/cpp/src/tls_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TlsClient {
public:
using OpenCallback = std::function<void(WishHandler*)>;
using MessageCallback = std::function<void(uint8_t, const std::string&)>;
using CloseCallback = std::function<void()>;

TlsClient(const std::string& ca_file, const std::string& cert_file,
const std::string& key_file, const std::string& host, int port);
Expand All @@ -22,6 +23,7 @@ class TlsClient {
bool Init();
void SetOnOpen(OpenCallback cb);
void SetOnMessage(MessageCallback cb);
void SetOnClose(CloseCallback cb);
void Run();
void Stop();

Expand All @@ -42,6 +44,7 @@ class TlsClient {

OpenCallback on_open_;
MessageCallback on_message_;
CloseCallback on_close_;
};

#endif // WISH_CPP_SRC_TLS_CLIENT_H_
5 changes: 5 additions & 0 deletions wish/cpp/src/wish_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ void WishHandler::SetOnMessage(MessageCallback cb) { on_message_ = cb; }

void WishHandler::SetOnOpen(OpenCallback cb) { on_open_ = cb; }

void WishHandler::SetOnClose(CloseCallback cb) { on_close_ = cb; }

ssize_t WishHandler::RecvCallback(wslay_event_context* ctx, uint8_t* buf,
size_t len, int flags, void* user_data) {
WishHandler* handler = static_cast<WishHandler*>(user_data);
Expand Down Expand Up @@ -114,6 +116,9 @@ void WishHandler::EventCallback(struct bufferevent* bev, short events,
// Connection closed
std::cout << "Connection closed." << std::endl;
WishHandler* handler = static_cast<WishHandler*>(ctx);
// Notify before self-deletion so Python-side handles can be invalidated
// while the pointer is still valid.
if (handler->on_close_) handler->on_close_();
delete handler;
}
}
Expand Down
13 changes: 9 additions & 4 deletions wish/cpp/src/wish_handler.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#ifndef WISH_CPP_SRC_WISH_HANDLER_H_
#define WISH_CPP_SRC_WISH_HANDLER_H_

#include <event2/buffer.h>
#include <event2/bufferevent.h>

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include <event2/buffer.h>
#include <event2/bufferevent.h>

// wslay forward decl
extern "C" {
struct wslay_event_context;
Expand All @@ -30,6 +30,7 @@ class WishHandler {
using MessageCallback =
std::function<void(uint8_t opcode, const std::string&)>;
using OpenCallback = std::function<void()>;
using CloseCallback = std::function<void()>;

// Constructor takes an already created bufferevent
WishHandler(struct bufferevent* bev, bool is_server);
Expand All @@ -46,15 +47,19 @@ class WishHandler {

void SetOnMessage(MessageCallback cb);
void SetOnOpen(OpenCallback cb);
void SetOnClose(CloseCallback cb);

private:
struct bufferevent* bev_;
bool is_server_;
struct wslay_event_context* ctx_;
MessageCallback on_message_;
OpenCallback on_open_;
CloseCallback on_close_;

enum State { HANDSHAKE, OPEN, CLOSED };
enum State { HANDSHAKE,
OPEN,
CLOSED };
State state_;

// wslay callbacks
Expand Down
86 changes: 78 additions & 8 deletions wish/python/src/wish_ext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,42 @@
#include <nanobind/stl/function.h>
#include <nanobind/stl/string.h>

#include <memory>
#include <mutex>

#include "plain_client.h"
#include "tls_client.h"
#include "wish_handler.h"

namespace nb = nanobind;

// ---------------------------------------------------------------------------
// WishHandlerRef: a shared, nullable handle to WishHandler.
//
// The raw WishHandler* lives only as long as the connection is open.
// WishHandler::EventCallback fires on_close_ BEFORE self-deleting; our
// on_close hook nullifies ptr under the mutex so any concurrent call from
// the Python thread via send_text/send_binary sees nullptr and raises
// RuntimeError rather than dereferencing freed memory.
// ---------------------------------------------------------------------------

struct WishHandlerRef {
std::mutex mu;
WishHandler* ptr = nullptr;

int send_text(const std::string& msg) {
std::lock_guard<std::mutex> lock(mu);
if (!ptr) throw std::runtime_error("Connection is closed");
return ptr->SendText(msg);
}

int send_binary(const std::string& msg) {
std::lock_guard<std::mutex> lock(mu);
if (!ptr) throw std::runtime_error("Connection is closed");
return ptr->SendBinary(msg);
}
};

// ---------------------------------------------------------------------------
// Wrapper structs
//
Expand All @@ -24,16 +54,18 @@ struct TlsClientPy {
TlsClient client;
nb::object on_open_cb;
nb::object on_message_cb;
std::shared_ptr<WishHandlerRef> handler_ref;

TlsClientPy(const std::string& ca, const std::string& cert,
const std::string& key, const std::string& host, int port)
: client(ca, cert, key, host, port) {}
: client(ca, cert, key, host, port) {};
};

struct PlainClientPy {
PlainClient client;
nb::object on_open_cb;
nb::object on_message_cb;
std::shared_ptr<WishHandlerRef> handler_ref;

PlainClientPy(const std::string& host, int port)
: client(host, port) {}
Expand All @@ -55,7 +87,14 @@ static int tls_clear(PyObject* self) {
// Clear the C++ callbacks first so the lambda (which captures &*w) is
// dropped before we invalidate on_open_cb / on_message_cb.
w->client.SetOnOpen({});
w->client.SetOnClose({});
w->client.SetOnMessage({});
// Invalidate the safe handle so Python code can no longer call through it.
if (w->handler_ref) {
std::lock_guard<std::mutex> lock(w->handler_ref->mu);
w->handler_ref->ptr = nullptr;
}
w->handler_ref.reset();
w->on_open_cb = nb::object();
w->on_message_cb = nb::object();
return 0;
Expand All @@ -75,7 +114,14 @@ static int plain_traverse(PyObject* self, visitproc visit, void* arg) {
static int plain_clear(PyObject* self) {
PlainClientPy* w = nb::inst_ptr<PlainClientPy>(nb::handle(self));
w->client.SetOnOpen({});
w->client.SetOnClose({});
w->client.SetOnMessage({});
// Invalidate the safe handle so Python code can no longer call through it.
if (w->handler_ref) {
std::lock_guard<std::mutex> lock(w->handler_ref->mu);
w->handler_ref->ptr = nullptr;
}
w->handler_ref.reset();
w->on_open_cb = nb::object();
w->on_message_cb = nb::object();
return 0;
Expand All @@ -91,9 +137,9 @@ NB_MODULE(wish_ext, m) {
evthread_use_pthreads();
#endif

nb::class_<WishHandler>(m, "WishHandler")
.def("send_text", &WishHandler::SendText)
.def("send_binary", &WishHandler::SendBinary);
nb::class_<WishHandlerRef, std::shared_ptr<WishHandlerRef>>(m, "WishHandler")
.def("send_text", &WishHandlerRef::send_text)
.def("send_binary", &WishHandlerRef::send_binary);

// ---- TlsClient --------------------------------------------------------
static PyType_Slot tls_slots[] = {
Expand All @@ -110,13 +156,26 @@ NB_MODULE(wish_ext, m) {
})
.def("set_on_open", [](TlsClientPy& self, nb::object cb) {
self.on_open_cb = cb;
// Create a fresh WishHandlerRef for this connection attempt.
auto ref = std::make_shared<WishHandlerRef>();
self.handler_ref = ref;
// Wire the close notification: nullify ptr before WishHandler is
// deleted so Python cannot reach freed memory.
self.client.SetOnClose([ref]() {
std::lock_guard<std::mutex> lock(ref->mu);
ref->ptr = nullptr;
});
// Capture self by pointer; lifetime is safe because the lambda
// lives inside self.client and is always cleared before self
// is destroyed (either by tp_clear or ~TlsClient).
self.client.SetOnOpen([&self](WishHandler* handler) {
self.client.SetOnOpen([&self, ref](WishHandler* handler) {
{
std::lock_guard<std::mutex> lock(ref->mu);
ref->ptr = handler;
}
nb::gil_scoped_acquire acquire;
try {
self.on_open_cb(nb::cast(handler, nb::rv_policy::reference));
self.on_open_cb(ref);
} catch (nb::python_error& e) {
e.restore();
PyErr_WriteUnraisable(nullptr);
Expand Down Expand Up @@ -152,10 +211,21 @@ NB_MODULE(wish_ext, m) {
})
.def("set_on_open", [](PlainClientPy& self, nb::object cb) {
self.on_open_cb = cb;
self.client.SetOnOpen([&self](WishHandler* handler) {
// Create a fresh WishHandlerRef for this connection attempt.
auto ref = std::make_shared<WishHandlerRef>();
self.handler_ref = ref;
self.client.SetOnClose([ref]() {
std::lock_guard<std::mutex> lock(ref->mu);
ref->ptr = nullptr;
});
self.client.SetOnOpen([&self, ref](WishHandler* handler) {
{
std::lock_guard<std::mutex> lock(ref->mu);
ref->ptr = handler;
}
nb::gil_scoped_acquire acquire;
try {
self.on_open_cb(nb::cast(handler, nb::rv_policy::reference));
self.on_open_cb(ref);
} catch (nb::python_error& e) {
e.restore();
PyErr_WriteUnraisable(nullptr);
Expand Down
Loading