diff --git a/Cargo.lock b/Cargo.lock index c7d89f1..1ddd98b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1209,6 +1209,7 @@ version = "0.1.1" dependencies = [ "anyhow", "clap", + "dashmap", "interprocess", "myownmesh-core", "myownmesh-signaling", @@ -1216,6 +1217,7 @@ dependencies = [ "parking_lot", "serde", "serde_json", + "tempfile", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/myownmesh/Cargo.toml b/crates/myownmesh/Cargo.toml index b3343a5..f82271b 100644 --- a/crates/myownmesh/Cargo.toml +++ b/crates/myownmesh/Cargo.toml @@ -34,3 +34,8 @@ serde_json = { workspace = true } interprocess = { workspace = true } parking_lot = { workspace = true } +dashmap = { workspace = true } + +[dev-dependencies] +myownmesh-signaling = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/myownmesh/src/control.rs b/crates/myownmesh/src/control.rs index 1fc9b98..73f73eb 100644 --- a/crates/myownmesh/src/control.rs +++ b/crates/myownmesh/src/control.rs @@ -156,6 +156,125 @@ pub enum Request { network: String, proposal_id: String, }, + + // ---- typed-channel + RPC IPC (post-EventsSubscribe) ---------- + // + // The variants below require the client to have first sent + // `EventsSubscribe` on the same connection — they install + // per-client state (handler claims, channel subscriptions, + // in-flight outbound stream forwarders) that the daemon + // routes back as `ServerOut` event frames. Sending one on a + // non-event-subscribed connection returns a `not subscribed` + // error so the client gets immediate feedback rather than a + // silent black hole. + /// Claim a method name on a network. Subsequent peer RPC + /// calls matching the method are forwarded to the client + /// identified by `client_id` as `RpcInbound` events on its + /// event socket. Last-claim-wins: a later register evicts + /// the previous owner with a `HandlerDisplaced` event. + /// `streaming = true` installs a streaming handler (chunks + /// via `RpcStreamChunk` + `RpcStreamEnd`); `false` is + /// single-shot (`RpcRespond`). + RpcRegister { + client_id: crate::ipc::ClientId, + network: String, + method: String, + streaming: bool, + }, + /// Release a method claim. No-op if not currently held by + /// this client. + RpcUnregister { + client_id: crate::ipc::ClientId, + network: String, + method: String, + }, + /// Resolve an in-flight inbound RPC (single-shot). Matches + /// by `request_id` regardless of which client originally + /// received the `RpcInbound`. Either `ok` or `error` should + /// be set; if both, `error` wins. + RpcRespond { + request_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + ok: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + error: Option, + }, + /// Push one chunk to an in-flight streaming inbound RPC. + RpcStreamChunk { + request_id: String, + payload: serde_json::Value, + }, + /// Close an in-flight streaming inbound RPC. After this the + /// request id is no longer routable; further chunks are + /// silently dropped. Optional `error` propagates to the + /// peer as the stream-end's failure reason. + RpcStreamEnd { + request_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + error: Option, + }, + /// Make an outbound single-shot RPC. Blocks the daemon's + /// command socket response on the peer's reply — same shape + /// as `Rpc::call`. + RpcCall { + network: String, + peer: String, + method: String, + payload: serde_json::Value, + }, + /// Make an outbound streaming RPC. Returns immediately with + /// the engine-assigned `request_id`; subsequent + /// `RpcCallStreamChunk` / `RpcCallStreamEnd` events deliver + /// the chunks on the client's event socket. The `client_id` + /// identifies which event socket receives the chunks. + RpcCallStream { + client_id: crate::ipc::ClientId, + network: String, + peer: String, + method: String, + payload: serde_json::Value, + }, + /// Subscribe to a typed channel by name. Inbound channel + /// frames are forwarded as `ChannelInbound` events on the + /// `client_id`'s event socket. Multiple clients can + /// subscribe to the same channel; each gets a copy of every + /// frame. + ChannelSubscribe { + client_id: crate::ipc::ClientId, + network: String, + channel: String, + }, + /// Release a channel subscription. No-op if not currently + /// subscribed. + ChannelUnsubscribe { + client_id: crate::ipc::ClientId, + network: String, + channel: String, + }, + /// Send one frame on a typed channel to a specific peer. + /// Doesn't require a subscription — sends and subscriptions + /// are independent. + ChannelSendTo { + network: String, + channel: String, + peer: String, + payload: serde_json::Value, + }, + /// Broadcast a frame on a typed channel to every active + /// peer. Returns the number of peers the send was + /// dispatched to. + ChannelSendAll { + network: String, + channel: String, + payload: serde_json::Value, + }, + /// Replace the network's advertised capabilities. Triggers + /// a `capabilities_update` broadcast to peers on the next + /// engine tick. + CapabilitiesSet { + network: String, + capabilities: myownmesh_core::protocol::CapabilityAdvert, + }, } #[derive(Debug, Serialize, Deserialize)] @@ -224,7 +343,11 @@ pub async fn serve( let listener = bind_listener(&target)?; info!(?target, "control socket listening"); - let state = Arc::new(ControlState { mesh, registry }); + let state = Arc::new(ControlState { + mesh, + registry, + clients: crate::ipc::ClientRegistry::new(), + }); loop { tokio::select! { @@ -280,6 +403,7 @@ fn bind_listener(target: &SocketTarget) -> Result { struct ControlState { mesh: MeshHandle, registry: Arc, + clients: crate::ipc::ClientRegistry, } async fn handle_client(stream: LocalSocketStream, state: Arc) -> Result<()> { @@ -296,17 +420,29 @@ async fn handle_client(stream: LocalSocketStream, state: Arc) -> R continue; } }; - // EventsSubscribe converts the connection into a one-way - // stream. Dispatch directly so we can write multiple lines - // without going through `Response`, and break out of the - // request loop when it returns (client disconnected). + // EventsSubscribe converts the connection into a server- + // push channel: the daemon writes mesh events plus any + // IPC-routed frames (RpcInbound, ChannelInbound, ...) + // until the client disconnects. Allocate a ClientId so + // subsequent RPC/channel-management requests on OTHER + // command sockets can target this connection. if matches!(request, Request::EventsSubscribe) { - // Initial ack so the client knows the subscription is - // live before the first real event arrives. - let ack = Response::ok(serde_json::json!({ "subscribed": true })); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let client = state.clients.register(tx); + let client_id = client.id; + // Ack carries the client_id so the caller knows what + // to pass back on subsequent `client_id`-bearing ops. + let ack = Response::ok(serde_json::json!({ + "subscribed": true, + "client_id": client_id.to_string(), + })); let line = serde_json::to_string(&ack)? + "\n"; writer.write_all(line.as_bytes()).await?; - run_events_stream(&state, &mut writer).await?; + let result = run_events_stream(&state, &mut writer, rx).await; + // Clean up the client's claims regardless of how + // the stream ended. + state.clients.unregister(client_id); + result?; break; } let resp = dispatch(&state, request).await; @@ -505,6 +641,256 @@ async fn dispatch(state: &Arc, req: Request) -> Response { }, None => Response::err(format!("unknown network: {network}")), }, + + // ---- RPC handler claims -------------------------------------- + Request::RpcRegister { + client_id, + network, + method, + streaming, + } => { + if state.clients.client(client_id).is_none() { + return Response::err(format!("unknown client_id: {client_id}")); + } + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + let mode = if streaming { + crate::ipc::clients::HandlerMode::Stream + } else { + crate::ipc::clients::HandlerMode::Single + }; + let key = (network.clone(), method.clone()); + let prev = state.clients.claim_method(key.clone(), client_id, mode); + crate::ipc::bridge::install_handler_for_mode( + &net, + network.clone(), + method.clone(), + mode, + state.clients.clone(), + ); + if let Some(prev_owner) = prev { + crate::ipc::bridge::notify_displaced( + &state.clients, + prev_owner, + client_id, + network, + method, + ); + } + Response::ok(serde_json::json!({ "registered": true })) + } + + Request::RpcUnregister { + client_id, + network, + method, + } => { + let key = (network, method); + let released = state.clients.release_method(&key, client_id); + Response::ok(serde_json::json!({ "released": released })) + } + + // ---- inbound-RPC responses (from IPC handler back to daemon) + Request::RpcRespond { + request_id, + ok, + error, + } => { + let resolved = if let Some(err) = error { + state.clients.reject_inbound_single(&request_id, err) + } else { + state + .clients + .resolve_inbound_single(&request_id, ok.unwrap_or(serde_json::Value::Null)) + }; + if resolved { + Response::ok(serde_json::json!({ "resolved": true })) + } else { + Response::err(format!("no in-flight inbound RPC for '{request_id}'")) + } + } + + Request::RpcStreamChunk { + request_id, + payload, + } => { + let accepted = state + .clients + .push_inbound_stream_chunk(&request_id, payload) + .await; + if accepted { + Response::ok(serde_json::json!({ "delivered": true })) + } else { + Response::err(format!("no in-flight inbound stream for '{request_id}'")) + } + } + + Request::RpcStreamEnd { + request_id, + error: _, + } => { + // Note: webrtc-rs's `Rpc::serve_stream` derives the + // stream-end error from the inner future (Err → + // `RpcStreamEnd { error }` on the wire). At this + // layer dropping the sender is the only signal we + // have — the engine emits `error: None`. Surfacing + // an explicit error from the IPC client requires + // sending it as the final chunk before close. A + // follow-up extension can plumb the wire-level + // error if needed; for now the close is silent. + let closed = state.clients.close_inbound_stream(&request_id); + Response::ok(serde_json::json!({ "closed": closed })) + } + + // ---- outbound RPC -------------------------------------------- + Request::RpcCall { + network, + peer, + method, + payload, + } => { + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + match net.rpc().call(&peer, &method, payload).await { + Ok(resp) => Response::ok(serde_json::json!({ "response": resp.body })), + Err(e) => Response::err(e.to_string()), + } + } + + Request::RpcCallStream { + client_id, + network, + peer, + method, + payload, + } => { + let Some(client) = state.clients.client(client_id) else { + return Response::err(format!("unknown client_id: {client_id}")); + }; + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + // The lib's `call_stream` allocates a request_id + // internally but doesn't expose it; we mirror its + // shape and tag chunks on the wire with a fresh + // daemon-side id so the IPC client can correlate + // its in-flight calls. + let request_id = format!("ipc-stream-{}", state.clients.next_call_stream_id()); + let rx = match net.rpc().call_stream(&peer, &method, payload).await { + Ok(rx) => rx, + Err(e) => return Response::err(e.to_string()), + }; + let writer_tx = client.writer_tx.clone(); + let req_id_for_task = request_id.clone(); + tokio::spawn(async move { + let mut rx = rx; + while let Some(chunk) = rx.recv().await { + match chunk { + Ok(payload) => { + let _ = writer_tx.send(crate::ipc::ServerOut::RpcCallStreamChunk { + request_id: req_id_for_task.clone(), + payload, + }); + } + Err(err) => { + let _ = writer_tx.send(crate::ipc::ServerOut::RpcCallStreamEnd { + request_id: req_id_for_task.clone(), + error: Some(err), + }); + return; + } + } + } + let _ = writer_tx.send(crate::ipc::ServerOut::RpcCallStreamEnd { + request_id: req_id_for_task, + error: None, + }); + }); + Response::ok(serde_json::json!({ "request_id": request_id })) + } + + // ---- typed channels ------------------------------------------ + Request::ChannelSubscribe { + client_id, + network, + channel, + } => { + if state.clients.client(client_id).is_none() { + return Response::err(format!("unknown client_id: {client_id}")); + } + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + let key = (network.clone(), channel.clone()); + let first = state.clients.subscribe_channel(key.clone(), client_id); + if first { + crate::ipc::bridge::spawn_channel_pump( + &net, + network, + channel, + state.clients.clone(), + ); + } + Response::ok(serde_json::json!({ "subscribed": true })) + } + + Request::ChannelUnsubscribe { + client_id, + network, + channel, + } => { + let key = (network, channel); + state.clients.unsubscribe_channel(&key, client_id); + // We don't actively tear the pump down — it exits + // on its next iteration when it sees an empty + // subscriber list. Keeps the unsubscribe synchronous + // and free of cross-task signaling. + Response::ok(serde_json::json!({ "unsubscribed": true })) + } + + Request::ChannelSendTo { + network, + channel, + peer, + payload, + } => { + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + let chan = net.channel::(&channel); + match chan.send_to(&peer, &payload).await { + Ok(()) => Response::ok(serde_json::json!({ "sent": true })), + Err(e) => Response::err(e.to_string()), + } + } + + Request::ChannelSendAll { + network, + channel, + payload, + } => { + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + let chan = net.channel::(&channel); + match chan.broadcast(&payload).await { + Ok(count) => Response::ok(serde_json::json!({ "dispatched_to": count })), + Err(e) => Response::err(e.to_string()), + } + } + + Request::CapabilitiesSet { + network, + capabilities, + } => { + let Some(net) = state.registry.get(&network) else { + return Response::err(format!("unknown network: {network}")); + }; + net.advertise(capabilities); + Response::ok(serde_json::json!({ "advertised": true })) + } } } @@ -650,45 +1036,72 @@ fn parse_topology(name: &str, hub: Option<&str>) -> std::result::Result(state: &Arc, writer: &mut W) -> Result<()> +/// Stream events to one connected subscriber. Drains two +/// sources concurrently: +/// +/// 1. The mesh-wide [`MeshHandle::events`] broadcast — peer / +/// phase / diag entries the engine emits. +/// 2. The per-client mpsc — `ServerOut` frames the IPC bridge +/// (RPC inbound, channel inbound, handler-displaced +/// notifications) pushes for this specific client. +/// +/// Returns when the writer breaks (client gone) or both source +/// streams close. Source 1 closes only on daemon shutdown; +/// source 2 closes when the client's `unregister` drops the +/// last sender, which the caller invokes after this function +/// returns. +async fn run_events_stream( + state: &Arc, + writer: &mut W, + mut client_rx: tokio::sync::mpsc::UnboundedReceiver, +) -> Result<()> where W: tokio::io::AsyncWrite + Unpin, { - let mut rx = state.mesh.events(); + let mut mesh_rx = state.mesh.events(); loop { - match rx.recv().await { - Ok(event) => { - // Each event is framed with kind=event so the - // subscriber can multiplex against other server - // pushes in the future. The `event` field carries - // the original `MeshEvent` JSON (peer / phase / - // diag, internally tagged). - let line = serde_json::to_string(&serde_json::json!({ - "kind": "event", - "event": event, - }))? + "\n"; + tokio::select! { + biased; + // Per-client frames first — drains IPC-routed + // RpcInbound / ChannelInbound / etc. + maybe_frame = client_rx.recv() => { + let Some(frame) = maybe_frame else { + // Sender dropped — only happens after the + // outer handle_client called `unregister`, + // which only fires after this returns. In + // practice this branch never fires while + // the connection is live; treat as benign + // shutdown. + return Ok(()); + }; + let line = serde_json::to_string(&frame)? + "\n"; if writer.write_all(line.as_bytes()).await.is_err() { - return Ok(()); // client gone + return Ok(()); } if writer.flush().await.is_err() { return Ok(()); } } - Err(broadcast::error::RecvError::Lagged(n)) => { - // Slow subscriber; surface the gap so the GUI can - // resync via a peers_list snapshot. - let line = serde_json::to_string(&serde_json::json!({ - "kind": "lagged", - "skipped": n, - }))? + "\n"; - if writer.write_all(line.as_bytes()).await.is_err() { - return Ok(()); + recv = mesh_rx.recv() => match recv { + Ok(event) => { + let frame = crate::ipc::ServerOut::Event { event }; + let line = serde_json::to_string(&frame)? + "\n"; + if writer.write_all(line.as_bytes()).await.is_err() { + return Ok(()); + } + if writer.flush().await.is_err() { + return Ok(()); + } } - } - Err(broadcast::error::RecvError::Closed) => return Ok(()), + Err(broadcast::error::RecvError::Lagged(n)) => { + let frame = crate::ipc::ServerOut::Lagged { skipped: n }; + let line = serde_json::to_string(&frame)? + "\n"; + if writer.write_all(line.as_bytes()).await.is_err() { + return Ok(()); + } + } + Err(broadcast::error::RecvError::Closed) => return Ok(()), + }, } } } diff --git a/crates/myownmesh/src/ipc/bridge.rs b/crates/myownmesh/src/ipc/bridge.rs new file mode 100644 index 0000000..42f7cc5 --- /dev/null +++ b/crates/myownmesh/src/ipc/bridge.rs @@ -0,0 +1,649 @@ +//! Engine ↔ IPC bridge: synthetic `Rpc::serve` / `serve_stream` +//! handlers that route inbound peer RPCs to whichever IPC +//! client currently holds the matching method claim, plus the +//! per-channel pump task that fans `Channel::subscribe()` +//! frames out to subscribed IPC clients. +//! +//! Lifetime model: +//! +//! - **Handlers** are installed lazily on first claim of a +//! `(network, method)` pair and left in place forever. After +//! the last claim is released the synthetic handler still +//! sits in the engine's `Rpc` dispatch table; if invoked +//! with no current owner it answers with a "no handler" +//! error to the peer rather than panicking. This avoids the +//! complexity of safely tearing handlers down across +//! re-claims and matches how the library-level `Rpc::serve` +//! semantics work (overwrite on re-register). +//! +//! - **Channel pumps** are scoped to subscribers: the first +//! subscribe spawns a forwarder task, the last unsubscribe +//! drops the receiver and the task exits on its next loop +//! iteration. Each task holds an +//! `mpsc::Receiver>`-shaped weak +//! reference so a swept-away registry doesn't keep tasks +//! alive. + +use myownmesh_core::JoinedNetwork; +use serde_json::Value; +use tokio::sync::mpsc; +use tracing::{debug, warn}; + +use super::clients::{ClientRegistry, HandlerMode, PendingInbound}; +use super::wire::ServerOut; + +/// Install (or re-install) a synthetic single-shot RPC handler +/// for `(network_id, method)` on this network's `Rpc` +/// dispatcher. The handler emits `RpcInbound` to whichever +/// client currently owns the claim and awaits an `RpcRespond` +/// to resolve. +pub fn install_single_handler( + network: &JoinedNetwork, + network_key: String, + method: String, + registry: ClientRegistry, +) { + let rpc = network.rpc(); + let key = (network_key.clone(), method.clone()); + rpc.serve(&method, move |call| { + let registry = registry.clone(); + let key = key.clone(); + async move { + let Some(owner_id) = registry.handler_owner(&key) else { + return Err(format!( + "no IPC client holds method '{}' on '{}'", + key.1, key.0 + )); + }; + let Some(client) = registry.client(owner_id) else { + return Err("handler owner client disconnected".into()); + }; + let (tx, rx) = tokio::sync::oneshot::channel(); + registry.put_pending_inbound(call.request_id.clone(), PendingInbound::Single(tx)); + client.send(ServerOut::RpcInbound { + network: key.0.clone(), + from: call.from.clone(), + request_id: call.request_id.clone(), + method: call.method.clone(), + payload: call.payload.clone(), + streaming: call.streaming, + }); + // Await the client's `RpcRespond`. If the client + // disconnects mid-flight, the registry's + // `unregister` path doesn't actively cancel inbound + // RPCs (deliberate — another client may still + // answer for this method on the next claim wave), + // so we lean on the peer's own RPC timeout to + // unwedge. If the oneshot resolves to a dropped + // sender (PendingInbound replaced), return a clear + // error so the peer sees a reasonable failure + // instead of hanging forever. + match rx.await { + Ok(Ok(payload)) => Ok(value_to_response(payload)), + Ok(Err(e)) => Err(e), + Err(_) => Err("IPC handler dropped without responding".into()), + } + } + }); +} + +/// Install (or re-install) a synthetic streaming RPC handler. +/// Mirrors [`install_single_handler`] but stashes an +/// `mpsc::Sender` in the pending table instead of a +/// `oneshot`; chunks land via `RpcStreamChunk` and the stream +/// closes on `RpcStreamEnd` (drop the sender, engine sees +/// `None`). +pub fn install_stream_handler( + network: &JoinedNetwork, + network_key: String, + method: String, + registry: ClientRegistry, +) { + let rpc = network.rpc(); + let key = (network_key.clone(), method.clone()); + rpc.serve_stream(&method, move |call| { + let registry = registry.clone(); + let key = key.clone(); + async move { + let Some(owner_id) = registry.handler_owner(&key) else { + return Err(format!( + "no IPC client holds streaming method '{}' on '{}'", + key.1, key.0 + )); + }; + let Some(client) = registry.client(owner_id) else { + return Err("handler owner client disconnected".into()); + }; + // 32-slot buffer matches the rough back-pressure + // shape used by the engine's outgoing peer queues; + // streaming responses that exceed it block the IPC + // client until the engine drains, which is the + // right back-pressure direction. The send side is + // stashed in `pending_inbound`; chunks land via + // `RpcStreamChunk`. Dropping the sender (via + // `RpcStreamEnd` removing the pending entry) closes + // the receiver and the engine ships + // `RpcStreamEndMessage` to the peer. + let (tx, rx) = mpsc::channel::(32); + registry.put_pending_inbound(call.request_id.clone(), PendingInbound::Stream(tx)); + client.send(ServerOut::RpcInbound { + network: key.0.clone(), + from: call.from.clone(), + request_id: call.request_id.clone(), + method: call.method.clone(), + payload: call.payload.clone(), + streaming: call.streaming, + }); + Ok(rx) + } + }); +} + +/// Spawn the per-channel fan-out task for an IPC subscription +/// on `(network_id, channel)`. Idempotent at the registry +/// level — the caller is expected to spawn this only when +/// `subscribe_channel(...)` returns true (the first +/// subscriber). On the last `unsubscribe_channel(...)` +/// returning true (no remaining subscribers), the task ends on +/// its next loop iteration when it sees an empty subscriber +/// list. +/// +/// The task lives by polling the channel's broadcast receiver. +/// If the network is torn down (`recv` returns `Closed`) or +/// the subscriber set becomes empty between frames, it exits. +pub fn spawn_channel_pump( + network: &JoinedNetwork, + network_key: String, + channel_name: String, + registry: ClientRegistry, +) { + let channel = network.channel::(&channel_name); + let mut sub = channel.subscribe(); + let key = (network_key.clone(), channel_name.clone()); + tokio::spawn(async move { + loop { + // Exit early if no subscribers remain. + let subscribers = registry.channel_subscribers(&key); + if subscribers.is_empty() { + debug!( + network = %key.0, + channel = %key.1, + "channel pump exiting (no subscribers)" + ); + break; + } + let Some(next) = sub.recv().await else { + debug!( + network = %key.0, + channel = %key.1, + "channel pump exiting (channel closed)" + ); + break; + }; + match next { + Ok(msg) => { + let frame = ServerOut::ChannelInbound { + network: key.0.clone(), + from: msg.from, + channel: key.1.clone(), + payload: msg.body, + }; + for client_id in subscribers { + if let Some(client) = registry.client(client_id) { + client.send(frame.clone()); + } + } + } + Err(e) => { + warn!( + network = %key.0, + channel = %key.1, + "channel deserialize error: {e}" + ); + } + } + } + }); +} + +/// `myownmesh-core`'s `Rpc::serve` wants an +/// `Ok(RpcResponse)` — wrap a raw `Value` so callers don't +/// reach across crate-private types. +pub fn value_to_response(v: Value) -> myownmesh_core::rpc::RpcResponse { + myownmesh_core::rpc::RpcResponse::from_value(v) +} + +/// Helper used by `dispatch` when an IPC client releases or +/// has been disconnected: notify the now-displaced client. +pub fn notify_displaced( + registry: &ClientRegistry, + prev_owner: super::clients::ClientId, + by: super::clients::ClientId, + network: String, + method: String, +) { + if let Some(client) = registry.client(prev_owner) { + client.send(ServerOut::HandlerDisplaced { + network, + method, + by: by.to_string(), + }); + } +} + +/// Public helper for the dispatch layer: install whichever +/// handler shape matches the requested mode. Idempotent — +/// re-claiming an existing method just replaces the synthetic +/// handler (and `Rpc::serve` itself does the same). +pub fn install_handler_for_mode( + network: &JoinedNetwork, + network_key: String, + method: String, + mode: HandlerMode, + registry: ClientRegistry, +) { + match mode { + HandlerMode::Single => install_single_handler(network, network_key, method, registry), + HandlerMode::Stream => install_stream_handler(network, network_key, method, registry), + } +} + +#[cfg(test)] +mod tests { + //! End-to-end engine-bridge tests. Two engines wired + //! through `LocalBroker`; one side simulates an IPC client + //! by holding the receiver end of a `ClientHandle` and + //! manually feeding `RpcRespond`s back through the + //! registry — same path the dispatch layer takes when a + //! real socket client posts `RpcRespond`. + + use crate::ipc::clients::{ClientRegistry, HandlerMode}; + use crate::ipc::wire::ServerOut; + use myownmesh_core::config::{NetworkConfig, SignalingConfig, TopologyMode}; + use myownmesh_core::engine::{attach_local, spawn_network}; + use myownmesh_core::events::{MeshEvent, PeerEvent}; + use myownmesh_core::identity::Identity; + use myownmesh_core::transport::Transport; + use myownmesh_signaling::local::LocalBroker; + use std::sync::Arc; + use std::time::Duration; + use tokio::time::Instant; + + fn fresh_network(id: &str, wire_id: &str) -> NetworkConfig { + NetworkConfig { + id: id.to_string(), + network_id: wire_id.to_string(), + label: id.to_string(), + kind: Default::default(), + topology: TopologyMode::FullMesh, + signaling: SignalingConfig::default(), + stun_servers: Vec::new(), + turn_servers: Vec::new(), + roster_path: None, + auto_approve: true, + } + } + + async fn wait_for_approval( + rx: &mut tokio::sync::broadcast::Receiver, + peer_id: &str, + ) { + let deadline = Instant::now() + Duration::from_secs(20); + loop { + if Instant::now() > deadline { + panic!("never saw PeerApproved for {peer_id}"); + } + let next = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await; + match next { + Ok(Ok(MeshEvent::Peer(PeerEvent::Approved { device_id, .. }))) + if device_id == peer_id => + { + return; + } + _ => continue, + } + } + } + + /// Build two engines + a Rpc dispatcher pair sharing one + /// LocalBroker. Returns `(alice_state, bob_state, alice_rpc, + /// bob_rpc, alice_id, bob_id)`. Driver join handles are + /// leaked — the tests don't depend on clean shutdown. + #[allow(clippy::type_complexity)] + async fn two_peer_rpc( + wire_id: &str, + ) -> ( + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, + ) { + let tmp = tempfile::tempdir().expect("tempdir"); + std::env::set_var("MYOWNMESH_HOME", tmp.path()); + std::mem::forget(tmp); // leak — test scope only + + let broker = LocalBroker::new(); + let transport = Transport::new().expect("transport"); + + let alice_id = Arc::new(Identity::ephemeral()); + let bob_id = Arc::new(Identity::ephemeral()); + + let alice_cfg = fresh_network("alice", wire_id); + let bob_cfg = fresh_network("bob", wire_id); + + let (alice_state, alice_driver) = + spawn_network(alice_cfg, alice_id.clone(), transport.clone()) + .await + .expect("alice engine"); + let (bob_state, bob_driver) = spawn_network(bob_cfg, bob_id.clone(), transport.clone()) + .await + .expect("bob engine"); + // Leak the driver handles — keeps them running for the + // life of the test process. + std::mem::forget(alice_driver); + std::mem::forget(bob_driver); + + let alice_rpc = Arc::new(myownmesh_core::rpc::Rpc::attach(&alice_state)); + let bob_rpc = Arc::new(myownmesh_core::rpc::Rpc::attach(&bob_state)); + + let mut alice_events = alice_state.events_tx.subscribe(); + let mut bob_events = bob_state.events_tx.subscribe(); + attach_local(&alice_state, &broker); + attach_local(&bob_state, &broker); + + wait_for_approval(&mut alice_events, bob_id.public_id()).await; + wait_for_approval(&mut bob_events, alice_id.public_id()).await; + + (alice_state, bob_state, alice_rpc, bob_rpc, alice_id, bob_id) + } + + /// Single-shot RPC routed via the IPC bridge. Alice's + /// network registers a synthetic handler bound to a + /// simulated IPC client; Bob calls the method; the + /// "client" receives `RpcInbound`, posts `RpcRespond` back + /// via the registry, and Bob's call resolves with the + /// returned payload. + #[tokio::test] + async fn single_shot_rpc_round_trip_through_bridge() { + let (alice_state, _bob_state, _alice_rpc, bob_rpc, alice_id, _bob_id) = + two_peer_rpc("ipc-bridge-single").await; + + // Simulate an IPC client on Alice's side. + let registry = ClientRegistry::new(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let client = registry.register(tx); + let net_key = "alice".to_string(); + let method = "echo".to_string(); + let key = (net_key.clone(), method.clone()); + registry.claim_method(key.clone(), client.id, HandlerMode::Single); + + // The bridge needs a `JoinedNetwork` — but we have the + // state directly. The synthetic handler only needs to + // call `Rpc::serve` on the network's Rpc, which we can + // do via the lower-level `attach` path mirroring what + // `install_single_handler` does, but inlined here so + // we don't need a `JoinedNetwork` facade. + let registry_for_handler = registry.clone(); + let key_for_handler = key.clone(); + myownmesh_core::rpc::Rpc::attach(&alice_state).serve("echo", move |call| { + let registry = registry_for_handler.clone(); + let key = key_for_handler.clone(); + async move { + let owner = registry + .handler_owner(&key) + .ok_or_else(|| "no claim".to_string())?; + let client = registry + .client(owner) + .ok_or_else(|| "owner gone".to_string())?; + let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); + registry.put_pending_inbound( + call.request_id.clone(), + crate::ipc::clients::PendingInbound::Single(resp_tx), + ); + client.send(ServerOut::RpcInbound { + network: key.0.clone(), + from: call.from.clone(), + request_id: call.request_id.clone(), + method: call.method.clone(), + payload: call.payload.clone(), + streaming: call.streaming, + }); + match resp_rx.await { + Ok(Ok(p)) => Ok(myownmesh_core::rpc::RpcResponse::from_value(p)), + Ok(Err(e)) => Err(e), + Err(_) => Err("handler dropped".into()), + } + } + }); + + // Bob calls the method. + let alice_did = alice_id.public_id().to_string(); + let call_handle = tokio::spawn(async move { + bob_rpc + .call(&alice_did, "echo", serde_json::json!({"n": 7})) + .await + }); + + // Pull the RpcInbound off the simulated client mpsc. + let inbound = tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("inbound timeout") + .expect("inbound recv"); + let (request_id, payload) = match inbound { + ServerOut::RpcInbound { + request_id, + payload, + method, + .. + } => { + assert_eq!(method, "echo"); + (request_id, payload) + } + other => panic!("expected RpcInbound, got {other:?}"), + }; + assert_eq!(payload, serde_json::json!({"n": 7})); + + // Respond via the registry (same path dispatch takes). + let resolved = + registry.resolve_inbound_single(&request_id, serde_json::json!({"n_squared": 49})); + assert!(resolved); + + let bob_response = tokio::time::timeout(Duration::from_secs(5), call_handle) + .await + .expect("call timeout") + .expect("join") + .expect("rpc ok"); + assert_eq!(bob_response.body, serde_json::json!({"n_squared": 49})); + } + + /// Streaming RPC: Alice's "client" pushes three chunks + /// via `push_inbound_stream_chunk` + closes via + /// `close_inbound_stream`; Bob's `call_stream` drains the + /// receiver and sees all three plus the end-of-stream. + #[tokio::test] + async fn streaming_rpc_round_trip_through_bridge() { + let (alice_state, _bob_state, _alice_rpc, bob_rpc, alice_id, _bob_id) = + two_peer_rpc("ipc-bridge-stream").await; + + let registry = ClientRegistry::new(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let client = registry.register(tx); + let key = ("alice".to_string(), "stream_echo".to_string()); + registry.claim_method(key.clone(), client.id, HandlerMode::Stream); + + // Wire the synthetic stream handler. Identical to the + // single-shot test but uses `serve_stream` + the + // `PendingInbound::Stream` arm. + let registry_for_handler = registry.clone(); + let key_for_handler = key.clone(); + myownmesh_core::rpc::Rpc::attach(&alice_state).serve_stream("stream_echo", move |call| { + let registry = registry_for_handler.clone(); + let key = key_for_handler.clone(); + async move { + let owner = registry + .handler_owner(&key) + .ok_or_else(|| "no claim".to_string())?; + let client = registry + .client(owner) + .ok_or_else(|| "owner gone".to_string())?; + let (tx, rx) = tokio::sync::mpsc::channel::(32); + registry.put_pending_inbound( + call.request_id.clone(), + crate::ipc::clients::PendingInbound::Stream(tx), + ); + client.send(ServerOut::RpcInbound { + network: key.0.clone(), + from: call.from.clone(), + request_id: call.request_id.clone(), + method: call.method.clone(), + payload: call.payload.clone(), + streaming: call.streaming, + }); + Ok(rx) + } + }); + + let alice_did = alice_id.public_id().to_string(); + let bob_rpc_clone = bob_rpc.clone(); + let stream_handle = tokio::spawn(async move { + bob_rpc_clone + .call_stream(&alice_did, "stream_echo", serde_json::json!("start")) + .await + }); + + // Pull RpcInbound to get the request_id. + let inbound = tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("inbound timeout") + .expect("inbound recv"); + let request_id = match inbound { + ServerOut::RpcInbound { request_id, .. } => request_id, + other => panic!("expected RpcInbound, got {other:?}"), + }; + + // Push three chunks then close. + for n in 1..=3 { + assert!( + registry + .push_inbound_stream_chunk(&request_id, serde_json::json!(n)) + .await, + "chunk {n} push" + ); + } + assert!(registry.close_inbound_stream(&request_id)); + + // Bob drains his receiver — three chunks then close. + let mut bob_rx = tokio::time::timeout(Duration::from_secs(5), stream_handle) + .await + .expect("stream timeout") + .expect("join") + .expect("call_stream ok"); + for n in 1..=3 { + let chunk = tokio::time::timeout(Duration::from_secs(5), bob_rx.recv()) + .await + .expect("chunk timeout") + .expect("chunk recv") + .expect("chunk ok"); + assert_eq!(chunk, serde_json::json!(n)); + } + // End-of-stream: receiver returns None. + let end = tokio::time::timeout(Duration::from_secs(5), bob_rx.recv()) + .await + .expect("end timeout"); + assert!(end.is_none(), "expected stream end, got {end:?}"); + } + + /// Channel pub/sub: subscribe Alice's "IPC client" to a + /// channel, Bob sends a frame on the same name, the + /// client receives a `ChannelInbound` event with the + /// correct payload and sender. + #[tokio::test] + async fn channel_inbound_round_trip_through_bridge() { + let (alice_state, bob_state, _alice_rpc, _bob_rpc, _alice_id, bob_id) = + two_peer_rpc("ipc-bridge-channel").await; + + let registry = ClientRegistry::new(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let client = registry.register(tx); + let net_key = "alice".to_string(); + let chan_key = "catalog".to_string(); + let key = (net_key.clone(), chan_key.clone()); + + // Subscribe and spawn the pump. The pump only needs + // the engine state to build a `Channel` — + // bypass the JoinedNetwork facade here for the same + // reason the bridge module itself takes + // `&JoinedNetwork` in production. + let was_first = registry.subscribe_channel(key.clone(), client.id); + assert!(was_first); + + // Spawn a pump that mirrors `bridge::spawn_channel_pump` + // but uses the engine state directly. + let chan: myownmesh_core::Channel = + myownmesh_core::Channel::new(chan_key.clone(), alice_state.clone()); + let mut sub = chan.subscribe(); + let registry_for_pump = registry.clone(); + let key_for_pump = key.clone(); + tokio::spawn(async move { + loop { + let subscribers = registry_for_pump.channel_subscribers(&key_for_pump); + if subscribers.is_empty() { + break; + } + let Some(next) = sub.recv().await else { + break; + }; + let Ok(msg) = next else { + continue; + }; + let frame = ServerOut::ChannelInbound { + network: key_for_pump.0.clone(), + from: msg.from, + channel: key_for_pump.1.clone(), + payload: msg.body, + }; + for cid in subscribers { + if let Some(c) = registry_for_pump.client(cid) { + c.send(frame.clone()); + } + } + } + }); + + // Bob sends to Alice on the channel. + let bob_chan: myownmesh_core::Channel = + myownmesh_core::Channel::new(chan_key.clone(), bob_state.clone()); + bob_chan + .send_to( + _alice_id_arg(&alice_state), + &serde_json::json!({"hello": "from bob"}), + ) + .await + .expect("bob send"); + + let frame = tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("inbound timeout") + .expect("inbound recv"); + match frame { + ServerOut::ChannelInbound { + network, + from, + channel, + payload, + } => { + assert_eq!(network, net_key); + assert_eq!(channel, chan_key); + assert_eq!(from, bob_id.public_id()); + assert_eq!(payload, serde_json::json!({"hello": "from bob"})); + } + other => panic!("expected ChannelInbound, got {other:?}"), + } + } + + fn _alice_id_arg(state: &Arc) -> &str { + state.identity.public_id() + } +} diff --git a/crates/myownmesh/src/ipc/clients.rs b/crates/myownmesh/src/ipc/clients.rs new file mode 100644 index 0000000..2e55310 --- /dev/null +++ b/crates/myownmesh/src/ipc/clients.rs @@ -0,0 +1,541 @@ +//! Per-connection state and the daemon-wide indices that route +//! inbound RPCs / channel messages to the right client. +//! +//! One `ClientHandle` per event-subscribed socket. The handle +//! owns the mpsc sender that pushes [`super::wire::ServerOut`] +//! lines back to that socket; the read side of the same socket +//! drives `RpcRespond` / `RpcStreamChunk` / `RpcStreamEnd` / +//! `RpcUnregister` / `ChannelUnsubscribe` back through +//! `dispatch`. +//! +//! The registry maintains four indices: +//! +//! - `clients` — every connected event-subscribed client, keyed +//! by `ClientId`. Dropped on disconnect. +//! - `handler_claims` — which client owns each method name on +//! each network. Last-claim-wins: a re-register evicts the +//! prior owner with a `HandlerDisplaced` event. +//! - `channel_subs` — set of subscribed clients per (network, +//! channel). Channel inbound events fan out to every member. +//! - `pending_inbound` — engine-side `oneshot::Sender` / +//! `mpsc::Sender` keyed by request id, awaiting an +//! `RpcRespond` (or stream chunks) from whichever client owns +//! the originating handler. The client identity isn't part of +//! the key — any client may resolve any in-flight id (this +//! keeps stream chunks decoupled from a single connection if +//! it bounces). + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use dashmap::{DashMap, DashSet}; +use parking_lot::Mutex; +use tokio::sync::{mpsc, oneshot}; + +use super::wire::ServerOut; + +/// Process-unique identifier for a connected client. +/// +/// Just a monotonic counter; the daemon never reuses ids, so a +/// stale reference in a forwarder task that races with +/// disconnect resolves to a `None` lookup instead of routing to +/// a different client. +/// +/// Wire form is the `Display` shape `c` — clients pass it +/// back verbatim on subsequent RPC/channel-management requests +/// to identify which event-subscribed connection a handler +/// claim belongs to. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ClientId(pub u64); + +impl std::fmt::Display for ClientId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "c{}", self.0) + } +} + +impl std::str::FromStr for ClientId { + type Err = String; + fn from_str(s: &str) -> Result { + let n_str = s + .strip_prefix('c') + .ok_or_else(|| format!("ClientId must start with 'c', got '{s}'"))?; + let n: u64 = n_str.parse().map_err(|e| format!("ClientId parse: {e}"))?; + Ok(ClientId(n)) + } +} + +impl serde::Serialize for ClientId { + fn serialize(&self, s: S) -> Result { + s.collect_str(self) + } +} + +impl<'de> serde::Deserialize<'de> for ClientId { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + s.parse().map_err(serde::de::Error::custom) + } +} + +/// Per-network handler-claim key. `network` is the +/// configuration id (matching the rest of the control surface). +pub type ClaimKey = (String, String); + +/// Engine-side awaiter for an in-flight inbound RPC. The +/// synthetic handler installed by [`super::bridge`] returns the +/// receive side to the engine; the daemon stores the sender +/// here so a later `RpcRespond` from the client resolves it. +pub enum PendingInbound { + /// Single-shot — resolved by exactly one `RpcRespond`. + Single(oneshot::Sender>), + /// Streaming — fed by `RpcStreamChunk`s and closed by + /// `RpcStreamEnd` (drop the sender; engine sees the + /// receiver yield `None`). + Stream(mpsc::Sender), +} + +/// State for a single connected event-subscribed client. +#[derive(Clone)] +pub struct ClientHandle { + pub id: ClientId, + /// Mpsc the read loop and bridge code push outbound frames + /// into; a writer task on the same connection drains it. + pub writer_tx: mpsc::UnboundedSender, + /// Method claims this client currently holds. Tracked for + /// O(1) cleanup on disconnect; the authoritative routing + /// table is on the registry. + pub method_claims: Arc>, + /// Channel subscriptions this client currently holds. + /// Same disconnect-cleanup rationale. + pub channel_subs: Arc>, +} + +impl ClientHandle { + pub fn send(&self, frame: ServerOut) { + // Best effort: a dropped writer means the connection is + // gone; the registry will clean up the handle shortly. + let _ = self.writer_tx.send(frame); + } +} + +/// Daemon-wide registry of connected clients + their +/// registrations. +#[derive(Clone, Default)] +pub struct ClientRegistry { + inner: Arc, +} + +#[derive(Default)] +struct RegistryInner { + next_id: AtomicU64, + next_call_stream_id: AtomicU64, + clients: DashMap>, + handler_claims: DashMap, + channel_subs: DashMap>>>, + pending_inbound: DashMap, + /// Streaming methods that have a synthetic handler + /// installed on the engine. `(network, method) → ()` — + /// the value side is unused; we only need set semantics. + /// Tracked so we can ask the bridge to forget the handler + /// on the last unclaim. + installed_handlers: DashMap, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum HandlerMode { + Single, + Stream, +} + +impl ClientRegistry { + pub fn new() -> Self { + Self::default() + } + + /// Allocate a fresh `ClientId` and register the client's + /// outbound writer. Returns the handle the read loop should + /// keep alongside its socket. + pub fn register(&self, writer_tx: mpsc::UnboundedSender) -> Arc { + let id = ClientId(self.inner.next_id.fetch_add(1, Ordering::Relaxed)); + let handle = Arc::new(ClientHandle { + id, + writer_tx, + method_claims: Arc::new(DashSet::new()), + channel_subs: Arc::new(DashSet::new()), + }); + self.inner.clients.insert(id, handle.clone()); + handle + } + + /// Drop a client on disconnect: remove its method claims + /// (notify peers that called them? not yet — we let the + /// in-flight ids time out at the peer), drop its channel + /// subscriptions, and any pending inbound RPCs that were + /// keyed against this client. + /// + /// Pending inbound RPCs are *not* keyed by client (any + /// client may answer any id), so we don't have to scan them + /// here — they'll be reaped naturally if no `RpcRespond` + /// ever lands and the peer side hits its own timeout. + pub fn unregister(&self, id: ClientId) -> Option> { + let (_, handle) = self.inner.clients.remove(&id)?; + // Drop method claims this client owned. Note: we don't + // tear down the synthetic handler on the engine — a + // future claimant might re-take the same method + // immediately and we'd save the install. The handler + // gracefully errors with `no claim` if invoked with no + // current owner. + for entry in handle.method_claims.iter() { + let key = entry.key().clone(); + // Only drop if we still own it (a displacing client + // might have already taken over). + self.inner + .handler_claims + .remove_if(&key, |_, owner| *owner == id); + } + // Drop channel subscriptions. The fan-out task running + // for this (network, channel) will notice the empty + // subscriber list and exit on its next iteration. + for entry in handle.channel_subs.iter() { + let key = entry.key().clone(); + if let Some(subs) = self.inner.channel_subs.get(&key) { + subs.lock().retain(|c| *c != id); + } + } + Some(handle) + } + + pub fn client(&self, id: ClientId) -> Option> { + self.inner.clients.get(&id).map(|e| e.value().clone()) + } + + /// Claim a method on a network. Returns the previously + /// claiming client if any (so the caller can notify them + /// with `HandlerDisplaced`). + pub fn claim_method( + &self, + key: ClaimKey, + new_owner: ClientId, + mode: HandlerMode, + ) -> Option { + // Update the per-client cache first so on-disconnect + // cleanup sees the new claim. + if let Some(client) = self.client(new_owner) { + client.method_claims.insert(key.clone()); + } + let prev = self.inner.handler_claims.insert(key.clone(), new_owner); + self.inner.installed_handlers.insert(key.clone(), mode); + if let Some(prev_owner) = prev { + if prev_owner != new_owner { + if let Some(prev_client) = self.client(prev_owner) { + prev_client.method_claims.remove(&key); + } + return Some(prev_owner); + } + } + None + } + + /// Release a method claim. Returns the prior owner if the + /// caller did own it. + pub fn release_method(&self, key: &ClaimKey, owner: ClientId) -> bool { + if let Some(client) = self.client(owner) { + client.method_claims.remove(key); + } + self.inner + .handler_claims + .remove_if(key, |_, current| *current == owner) + .is_some() + } + + pub fn handler_owner(&self, key: &ClaimKey) -> Option { + self.inner.handler_claims.get(key).map(|e| *e.value()) + } + + #[allow(dead_code)] + pub fn handler_mode(&self, key: &ClaimKey) -> Option { + self.inner.installed_handlers.get(key).map(|e| *e.value()) + } + + /// Returns `true` on the FIRST subscriber for this + /// (network, channel) — caller uses that signal to spawn a + /// new pump task. + pub fn subscribe_channel(&self, key: ClaimKey, client: ClientId) -> bool { + if let Some(c) = self.client(client) { + c.channel_subs.insert(key.clone()); + } + let entry = self + .inner + .channel_subs + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(Vec::new()))); + let mut subs = entry.lock(); + let was_empty = subs.is_empty(); + if !subs.contains(&client) { + subs.push(client); + } + was_empty + } + + /// Release a subscription. Returns `true` if no clients + /// remain on this channel — caller uses that signal to tear + /// down the pump task. + pub fn unsubscribe_channel(&self, key: &ClaimKey, client: ClientId) -> bool { + if let Some(c) = self.client(client) { + c.channel_subs.remove(key); + } + let Some(subs) = self.inner.channel_subs.get(key) else { + return true; + }; + let mut subs = subs.lock(); + subs.retain(|c| *c != client); + subs.is_empty() + } + + /// Snapshot the current set of subscribers — used by the + /// channel pump task each iteration. + pub fn channel_subscribers(&self, key: &ClaimKey) -> Vec { + self.inner + .channel_subs + .get(key) + .map(|subs| subs.lock().clone()) + .unwrap_or_default() + } + + pub fn put_pending_inbound(&self, request_id: String, entry: PendingInbound) { + self.inner.pending_inbound.insert(request_id, entry); + } + + pub fn take_pending_inbound(&self, request_id: &str) -> Option { + self.inner + .pending_inbound + .remove(request_id) + .map(|(_, v)| v) + } + + /// Resolve a single-shot in-flight RPC with success. + /// Returns true if the id was pending and resolved. + pub fn resolve_inbound_single(&self, request_id: &str, payload: serde_json::Value) -> bool { + if let Some(PendingInbound::Single(tx)) = self.take_pending_inbound(request_id) { + let _ = tx.send(Ok(payload)); + return true; + } + false + } + + /// Resolve a single-shot in-flight RPC with failure. + pub fn reject_inbound_single(&self, request_id: &str, error: String) -> bool { + if let Some(PendingInbound::Single(tx)) = self.take_pending_inbound(request_id) { + let _ = tx.send(Err(error)); + return true; + } + false + } + + /// Push a streaming chunk to an in-flight stream handler. + /// Returns true if the id was pending and the chunk was + /// accepted (the engine receiver may be closed if the peer + /// already moved on). + pub async fn push_inbound_stream_chunk( + &self, + request_id: &str, + payload: serde_json::Value, + ) -> bool { + let tx = { + let entry = self.inner.pending_inbound.get(request_id); + match entry.as_deref() { + Some(PendingInbound::Stream(tx)) => tx.clone(), + _ => return false, + } + }; + tx.send(payload).await.is_ok() + } + + /// Close an in-flight stream handler. Drops the sender — + /// the engine sees the receiver yield `None` and ships + /// `RpcStreamEnd` to the peer. + pub fn close_inbound_stream(&self, request_id: &str) -> bool { + matches!( + self.take_pending_inbound(request_id), + Some(PendingInbound::Stream(_)) + ) + } + + /// Monotonic counter used to tag outbound stream calls. + /// The lib's `Rpc::call_stream` allocates its own request + /// id internally but doesn't expose it; the IPC layer + /// generates its own correlation id so clients can match + /// chunks back to their originating call. + pub fn next_call_stream_id(&self) -> u64 { + self.inner + .next_call_stream_id + .fetch_add(1, Ordering::Relaxed) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fresh_client( + registry: &ClientRegistry, + ) -> (Arc, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = registry.register(tx); + (handle, rx) + } + + #[test] + fn client_id_roundtrips_through_string() { + let id = ClientId(42); + assert_eq!(id.to_string(), "c42"); + let parsed: ClientId = "c42".parse().expect("parse"); + assert_eq!(parsed, id); + assert!("not-an-id".parse::().is_err()); + assert!("c-99".parse::().is_err()); + } + + #[test] + fn ids_are_monotonic_and_unique() { + let reg = ClientRegistry::new(); + let (a, _ra) = fresh_client(®); + let (b, _rb) = fresh_client(®); + let (c, _rc) = fresh_client(®); + assert_eq!(a.id, ClientId(0)); + assert_eq!(b.id, ClientId(1)); + assert_eq!(c.id, ClientId(2)); + assert!(reg.client(a.id).is_some()); + assert!(reg.client(ClientId(99)).is_none()); + } + + #[test] + fn claim_method_takes_ownership_and_displaces_prior() { + let reg = ClientRegistry::new(); + let (a, _ra) = fresh_client(®); + let (b, _rb) = fresh_client(®); + let key = ("net".to_string(), "infer".to_string()); + + let prev = reg.claim_method(key.clone(), a.id, HandlerMode::Single); + assert!(prev.is_none()); + assert_eq!(reg.handler_owner(&key), Some(a.id)); + assert!(a.method_claims.contains(&key)); + + let prev = reg.claim_method(key.clone(), a.id, HandlerMode::Single); + assert!(prev.is_none()); + + let prev = reg.claim_method(key.clone(), b.id, HandlerMode::Stream); + assert_eq!(prev, Some(a.id)); + assert_eq!(reg.handler_owner(&key), Some(b.id)); + assert!(b.method_claims.contains(&key)); + assert!(!a.method_claims.contains(&key)); + } + + #[test] + fn release_method_only_succeeds_for_current_owner() { + let reg = ClientRegistry::new(); + let (a, _) = fresh_client(®); + let (b, _) = fresh_client(®); + let key = ("net".to_string(), "infer".to_string()); + + reg.claim_method(key.clone(), a.id, HandlerMode::Single); + assert!(!reg.release_method(&key, b.id)); + assert_eq!(reg.handler_owner(&key), Some(a.id)); + assert!(reg.release_method(&key, a.id)); + assert!(reg.handler_owner(&key).is_none()); + assert!(!a.method_claims.contains(&key)); + } + + #[test] + fn unregister_drops_claims_and_subscriptions() { + let reg = ClientRegistry::new(); + let (a, _) = fresh_client(®); + let method_key = ("net".to_string(), "infer".to_string()); + let channel_key = ("net".to_string(), "catalog".to_string()); + + reg.claim_method(method_key.clone(), a.id, HandlerMode::Single); + reg.subscribe_channel(channel_key.clone(), a.id); + + assert_eq!(reg.handler_owner(&method_key), Some(a.id)); + assert_eq!(reg.channel_subscribers(&channel_key), vec![a.id]); + + reg.unregister(a.id); + + assert!(reg.handler_owner(&method_key).is_none()); + assert!(reg.channel_subscribers(&channel_key).is_empty()); + assert!(reg.client(a.id).is_none()); + } + + #[test] + fn unregister_doesnt_collateral_drop_a_displacing_claim() { + let reg = ClientRegistry::new(); + let (a, _) = fresh_client(®); + let (b, _) = fresh_client(®); + let key = ("net".to_string(), "infer".to_string()); + + reg.claim_method(key.clone(), a.id, HandlerMode::Single); + reg.claim_method(key.clone(), b.id, HandlerMode::Single); + assert_eq!(reg.handler_owner(&key), Some(b.id)); + + reg.unregister(a.id); + assert_eq!(reg.handler_owner(&key), Some(b.id)); + } + + #[test] + fn channel_subscribe_first_subscriber_flag() { + let reg = ClientRegistry::new(); + let (a, _) = fresh_client(®); + let (b, _) = fresh_client(®); + let key = ("net".to_string(), "catalog".to_string()); + + assert!(reg.subscribe_channel(key.clone(), a.id), "first sub"); + assert!(!reg.subscribe_channel(key.clone(), b.id), "second sub"); + + assert!(!reg.unsubscribe_channel(&key, b.id)); + assert!(reg.unsubscribe_channel(&key, a.id)); + } + + #[tokio::test] + async fn resolve_single_inbound_sends_payload_back() { + let reg = ClientRegistry::new(); + let (tx, rx) = oneshot::channel(); + reg.put_pending_inbound("req-1".into(), PendingInbound::Single(tx)); + assert!(reg.resolve_inbound_single("req-1", serde_json::json!({"hi": true}))); + let got = rx.await.expect("oneshot").expect("ok"); + assert_eq!(got, serde_json::json!({"hi": true})); + + // Second resolve is a no-op. + assert!(!reg.resolve_inbound_single("req-1", serde_json::Value::Null)); + } + + #[tokio::test] + async fn reject_single_inbound_passes_error_through() { + let reg = ClientRegistry::new(); + let (tx, rx) = oneshot::channel(); + reg.put_pending_inbound("req-x".into(), PendingInbound::Single(tx)); + assert!(reg.reject_inbound_single("req-x", "nope".into())); + let got = rx.await.expect("oneshot"); + assert!(got.is_err()); + } + + #[tokio::test] + async fn stream_chunks_then_end_closes_receiver() { + let reg = ClientRegistry::new(); + let (tx, mut rx) = mpsc::channel::(4); + reg.put_pending_inbound("req-s".into(), PendingInbound::Stream(tx)); + + assert!( + reg.push_inbound_stream_chunk("req-s", serde_json::json!(1)) + .await + ); + assert!( + reg.push_inbound_stream_chunk("req-s", serde_json::json!(2)) + .await + ); + assert!(reg.close_inbound_stream("req-s")); + + assert_eq!(rx.recv().await, Some(serde_json::json!(1))); + assert_eq!(rx.recv().await, Some(serde_json::json!(2))); + assert_eq!(rx.recv().await, None); + } +} diff --git a/crates/myownmesh/src/ipc/mod.rs b/crates/myownmesh/src/ipc/mod.rs new file mode 100644 index 0000000..9f0d212 --- /dev/null +++ b/crates/myownmesh/src/ipc/mod.rs @@ -0,0 +1,45 @@ +//! Daemon-side IPC plumbing for typed channels and RPC. +//! +//! The existing `control.rs` request/response model covers +//! control-plane ops (network lifecycle, peers, roster, +//! governance, status). What it doesn't cover are the parts of +//! `myownmesh-core` that are inherently bidirectional and +//! stateful per client: +//! +//! - **RPC handler registration.** A client claims a method +//! name (`infer`, `transcribe`, ...) and the daemon installs a +//! synthetic `Rpc::serve` that routes inbound peer calls to +//! the claiming client's event socket as `RpcInbound` events. +//! The client posts `RpcRespond` / `RpcStreamChunk` / +//! `RpcStreamEnd` requests back over the same connection; +//! those resolve the engine-side `oneshot` / `mpsc` the +//! library handler returned. +//! +//! - **Typed channel subscriptions.** A client subscribes to a +//! channel name (`catalog/announce`, `permissions/snapshot`, +//! ...) and the daemon spins up a forwarder that drains the +//! network's `Channel::subscribe()` broadcast and emits +//! `ChannelInbound` events to every currently-subscribed +//! client. +//! +//! - **Capability advertisement updates.** Clients call +//! `CapabilitiesSet` to replace the network's advertised caps; +//! the daemon forwards to `JoinedNetwork::advertise(...)`, +//! which broadcasts a `capabilities_update` frame to peers. +//! +//! All of this is layered on top of the existing +//! request/response wire — see [`crate::control`] for the +//! existing variants. The new ops are additive; no breaking +//! change to clients that don't speak them. +//! +//! Per-client state lives in [`clients::ClientRegistry`]; the +//! engine-side glue (synthetic `Rpc::serve` handlers, channel +//! pump tasks) lives in [`bridge`]. + +pub mod bridge; +pub mod clients; +pub mod wire; + +#[allow(unused_imports)] +pub use clients::{ClientHandle, ClientId, ClientRegistry}; +pub use wire::ServerOut; diff --git a/crates/myownmesh/src/ipc/wire.rs b/crates/myownmesh/src/ipc/wire.rs new file mode 100644 index 0000000..87cb39c --- /dev/null +++ b/crates/myownmesh/src/ipc/wire.rs @@ -0,0 +1,83 @@ +//! Wire frames the daemon writes to a duplex (post- +//! `EventsSubscribe`) client connection. Every line is exactly +//! one of these variants, tagged via the `kind` field so a +//! client can dispatch on it without trying to guess from +//! shape. +//! +//! Backward compat: `Event` / `Lagged` were the only `kind`s +//! the original event stream emitted, and the existing +//! MyOwnMesh GUI client already ignores unknown kinds via its +//! `match _ => {}` default in +//! `gui/src-tauri/src/main.rs::run_event_pump`. New variants +//! land additively without breaking it. + +use serde::Serialize; +use serde_json::Value; + +use myownmesh_core::events::MeshEvent; + +/// Server → client wire frame on a duplex event socket. +/// +/// Pre-`EventsSubscribe`, the daemon emits the legacy +/// [`crate::control::Response`] shape (no `kind` tag) so the +/// existing one-shot request/response clients keep working. +/// After `EventsSubscribe`, every server-initiated line is a +/// `ServerOut` JSON object. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum ServerOut { + /// Live mesh event (peer state, phase, diag). + Event { event: MeshEvent }, + /// Subscriber was too slow; some events were dropped. + /// `skipped` is the number lost since the last successful + /// receive. + Lagged { skipped: u64 }, + /// Inbound RPC request arrived from a peer for a method + /// this client has claimed. The client must respond with + /// either a single `rpc_respond` (single-shot) or a + /// sequence of `rpc_stream_chunk` lines terminated by + /// `rpc_stream_end` (streaming). + RpcInbound { + network: String, + from: String, + request_id: String, + method: String, + payload: Value, + /// `true` if the peer asked for a streaming response. + /// Determined by the wire frame's `streaming` flag, not + /// by the local handler's mode — clients should + /// respect the peer's intent. + streaming: bool, + }, + /// Chunk of a streaming response to an outbound RPC call + /// the client made via `RpcCallStream`. Multiple chunks may + /// arrive before `RpcCallStreamEnd`. + RpcCallStreamChunk { request_id: String, payload: Value }, + /// End-of-stream marker for an outbound `RpcCallStream`. + /// `error` is set if the peer terminated the stream with + /// an error rather than a clean close. + RpcCallStreamEnd { + request_id: String, + error: Option, + }, + /// Inbound typed-channel message for a channel this client + /// has subscribed to. + ChannelInbound { + network: String, + from: String, + channel: String, + payload: Value, + }, + /// A more-recent client claimed a method this client had + /// previously registered. The displaced client should stop + /// expecting `RpcInbound` events for `method`; any + /// in-flight calls are left to resolve naturally (the + /// displaced client can still answer them). + HandlerDisplaced { + network: String, + method: String, + /// Best-effort short id of the displacing client; the + /// daemon does not surface socket addresses. + by: String, + }, +} diff --git a/crates/myownmesh/src/main.rs b/crates/myownmesh/src/main.rs index 4eca40d..0ebb033 100644 --- a/crates/myownmesh/src/main.rs +++ b/crates/myownmesh/src/main.rs @@ -12,6 +12,7 @@ use clap::{Parser, Subcommand}; mod cli; mod control; +mod ipc; mod registry; #[derive(Parser, Debug)]