diff --git a/Cargo.lock b/Cargo.lock index 574e47c..b29a8d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,7 +98,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -236,6 +236,7 @@ dependencies = [ "serde_json", "tokio", "tokio-util", + "tower 0.4.13", "tracing", ] @@ -1444,7 +1445,7 @@ dependencies = [ "tokio-native-tls", "tokio-rustls", "tokio-util", - "tower", + "tower 0.5.2", "tower-http", "tower-service", "url", @@ -1471,8 +1472,8 @@ dependencies = [ [[package]] name = "rmcp" -version = "0.1.5" -source = "git+https://github.com/gpmcp/rust-sdk?rev=b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c#b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c" +version = "0.2.1" +source = "git+https://github.com/gpmcp/rust-sdk?rev=e615300feae62a01f90f40fc7711ab15c3f0a88c#e615300feae62a01f90f40fc7711ab15c3f0a88c" dependencies = [ "axum", "base64", @@ -1503,8 +1504,8 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "0.1.5" -source = "git+https://github.com/gpmcp/rust-sdk?rev=b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c#b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c" +version = "0.2.1" +source = "git+https://github.com/gpmcp/rust-sdk?rev=e615300feae62a01f90f40fc7711ab15c3f0a88c#e615300feae62a01f90f40fc7711ab15c3f0a88c" dependencies = [ "darling", "proc-macro2", @@ -2017,6 +2018,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -2046,7 +2058,7 @@ dependencies = [ "http-body", "iri-string", "pin-project-lite", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", ] diff --git a/crates/gpmcp-layer-core/Cargo.toml b/crates/gpmcp-layer-core/Cargo.toml index 2c26c82..254c664 100644 --- a/crates/gpmcp-layer-core/Cargo.toml +++ b/crates/gpmcp-layer-core/Cargo.toml @@ -13,7 +13,7 @@ derive_builder = "0.20.2" tokio = { version = "1.45.1", features = ["time"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } -rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c", features = [ +rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "e615300feae62a01f90f40fc7711ab15c3f0a88c", features = [ "client", "transport-sse-client", "reqwest", diff --git a/crates/gpmcp-layer-core/src/config.rs b/crates/gpmcp-layer-core/src/config.rs index 1029fd4..8d9fe9f 100644 --- a/crates/gpmcp-layer-core/src/config.rs +++ b/crates/gpmcp-layer-core/src/config.rs @@ -128,6 +128,9 @@ pub enum Transport { Sse { url: String, }, + Http { + url: String, + }, } /// Main runner configuration diff --git a/crates/gpmcp-layer-core/src/runner/inner.rs b/crates/gpmcp-layer-core/src/runner/inner.rs index fa3ee17..30ae495 100644 --- a/crates/gpmcp-layer-core/src/runner/inner.rs +++ b/crates/gpmcp-layer-core/src/runner/inner.rs @@ -48,8 +48,11 @@ impl GpmcpRunnerInner { // Determine transport type and create appropriate managers // For SSE transport, start the server process first - if matches!(self.runner_config.transport, Transport::Sse { .. }) { - info!("Starting server process for SSE transport"); + if matches!( + self.runner_config.transport, + Transport::Sse { .. } | Transport::Http { .. } + ) { + info!("Starting server process"); let _handle = self .process_manager .start_server(self.out.clone(), self.err.clone()) diff --git a/crates/gpmcp-layer-core/src/runner/service_coordinator.rs b/crates/gpmcp-layer-core/src/runner/service_coordinator.rs index cec85af..1a0861c 100644 --- a/crates/gpmcp-layer-core/src/runner/service_coordinator.rs +++ b/crates/gpmcp-layer-core/src/runner/service_coordinator.rs @@ -35,6 +35,10 @@ impl ServiceCoordinator { .serve(sse_transport) .await .context("Failed to create SSE service")?, + super::transport_manager::TransportVariant::Http(http_transport) => client_info + .serve(http_transport) + .await + .context("Failed to create HTTP service")?, }; info!("Service coordinator created successfully"); diff --git a/crates/gpmcp-layer-core/src/runner/transport_manager.rs b/crates/gpmcp-layer-core/src/runner/transport_manager.rs index ecf9e7b..971df67 100644 --- a/crates/gpmcp-layer-core/src/runner/transport_manager.rs +++ b/crates/gpmcp-layer-core/src/runner/transport_manager.rs @@ -1,7 +1,8 @@ use crate::config::{RunnerConfig, Transport}; use anyhow::{Context, Result}; use backon::{BackoffBuilder, ExponentialBuilder, Retryable}; -use rmcp::transport::{SseClientTransport, TokioChildProcess}; +use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig; +use rmcp::transport::{SseClientTransport, StreamableHttpClientTransport, TokioChildProcess}; use std::time::Duration; use tokio::process::Command; use tracing::{info, warn}; @@ -14,6 +15,7 @@ pub struct TransportManager { pub enum TransportVariant { Stdio(TokioChildProcess), Sse(SseClientTransport), + Http(StreamableHttpClientTransport), } impl TransportManager { @@ -28,6 +30,10 @@ impl TransportManager { info!("Creating SSE transport for URL: {}", url); Self::create_sse_transport(url).await? } + Transport::Http { url } => { + warn!("Creating HTTP transport for URL: {}", url); + Self::create_http_transport(url).await? + } }; Ok(Self { transport }) @@ -73,7 +79,7 @@ impl TransportManager { // Poll the server to check if it's ready using list_tools request // Use shorter timeout for faster failure in test environments - Self::poll_server_readiness(&url_string, 100, 1000).await?; + Self::poll_server_readiness(&url_string, 100, 1000, Self::test_server_connectivity).await?; // Create SSE transport let transport = SseClientTransport::start(url_string) @@ -84,8 +90,17 @@ impl TransportManager { } /// Poll the server readiness by attempting to connect and call list_tools - async fn poll_server_readiness(url: &str, max_attempts: u32, interval_ms: u64) -> Result<()> { - info!(url=%url, max_attempts=?max_attempts, interval_ms=?interval_ms, "Polling server readiness"); + async fn poll_server_readiness( + url: &str, + max_attempts: u32, + interval_ms: u64, + f: F, + ) -> Result<()> + where + F: Fn(String) -> R + Send + Sync, + R: Future> + Send, + { + info!(url = % url, max_attempts =? max_attempts, interval_ms = ? interval_ms, "Polling server readiness"); let poll = ExponentialBuilder::new() .with_jitter() @@ -95,16 +110,16 @@ impl TransportManager { .with_max_delay(Duration::from_secs(1)) .build(); - (|| Self::test_server_connectivity(url)).retry(poll).await + (|| f(url.to_string())).retry(poll).await } /// Test server connectivity by creating a temporary connection and calling list_tools - async fn test_server_connectivity(url: &str) -> Result<()> { + async fn test_server_connectivity(url: String) -> Result<()> { use rmcp::ServiceExt; use rmcp::model::ClientInfo; // Create a temporary SSE transport for testing - let test_transport = SseClientTransport::start(url.to_string()) + let test_transport = SseClientTransport::start(url) .await .context("Failed to create test SSE transport")?; @@ -143,4 +158,56 @@ impl TransportManager { pub fn into_transport(self) -> TransportVariant { self.transport } + + async fn create_http_transport(url: impl ToString) -> Result { + let url_string = url.to_string(); + Self::poll_server_readiness(&url_string, 100, 1000, Self::test_server_connectivity_http) + .await?; + let transport = StreamableHttpClientTransport::with_client( + reqwest::Client::new(), + StreamableHttpClientTransportConfig::with_uri(url_string), + ); + + Ok(TransportVariant::Http(transport)) + } + async fn test_server_connectivity_http(url: String) -> Result<()> { + use rmcp::ServiceExt; + use rmcp::model::ClientInfo; + + // Create a temporary SSE transport for testing + let test_transport = StreamableHttpClientTransport::with_client( + reqwest::Client::new(), + StreamableHttpClientTransportConfig::with_uri(url), + ); + + // Create minimal client info for testing + let client_info = ClientInfo { + protocol_version: rmcp::model::ProtocolVersion::default(), + capabilities: rmcp::model::ClientCapabilities::default(), + client_info: rmcp::model::Implementation { + name: "gpmcp-readiness-test".to_string(), + version: "0.1.0".to_string(), + }, + }; + + // Try to establish service and call list_tools + let service = client_info + .serve(test_transport) + .await + .context("Failed to create test service")?; + + // Call list_tools to verify server is responding + let _result = service + .list_tools(Default::default()) + .await + .context("Server not responding to list_tools")?; + + // Cancel the test service + service + .cancel() + .await + .context("Failed to cancel test service")?; + + Ok(()) + } } diff --git a/crates/gpmcp-layer/Cargo.toml b/crates/gpmcp-layer/Cargo.toml index 13aa32c..9966d2e 100644 --- a/crates/gpmcp-layer/Cargo.toml +++ b/crates/gpmcp-layer/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" gpmcp-layer-core = { path = "../gpmcp-layer-core" } anyhow = "1.0" async-trait = "0.1" -rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c", features = [ +rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "e615300feae62a01f90f40fc7711ab15c3f0a88c", features = [ "client", "transport-sse-client", "reqwest", diff --git a/examples/test-mcp-server/Cargo.toml b/examples/test-mcp-server/Cargo.toml index 92f0200..f74eb45 100644 --- a/examples/test-mcp-server/Cargo.toml +++ b/examples/test-mcp-server/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] serde_json = "1.0.140" -rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c", features = [ +rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "e615300feae62a01f90f40fc7711ab15c3f0a88c", features = [ "client", "transport-sse-client", "reqwest", @@ -21,3 +21,4 @@ axum = "0.8.4" tracing = "0.1.41" anyhow = "1.0.98" tokio-util = "0.7.15" +tower = "0.4.13" diff --git a/examples/test-mcp-server/src/counter.rs b/examples/test-mcp-server/src/counter.rs index 6af288a..c2f1143 100644 --- a/examples/test-mcp-server/src/counter.rs +++ b/examples/test-mcp-server/src/counter.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rmcp::{ - Error as McpError, RoleServer, ServerHandler, handler::server::router::tool::ToolRouter, + ErrorData as McpError, RoleServer, ServerHandler, handler::server::router::tool::ToolRouter, model::*, service::RequestContext, tool, tool_handler, tool_router, }; use serde_json::json;