Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/gpmcp-layer-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ derive_builder = "0.20.2"
tokio = { version = "1.45.1", features = ["time"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c", features = [
rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "e615300feae62a01f90f40fc7711ab15c3f0a88c", features = [
"client",
"transport-sse-client",
"reqwest",
Expand Down
3 changes: 3 additions & 0 deletions crates/gpmcp-layer-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub enum Transport {
Sse {
url: String,
},
Http {
url: String,
},
}

/// Main runner configuration
Expand Down
7 changes: 5 additions & 2 deletions crates/gpmcp-layer-core/src/runner/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ impl<Manager: RunnerProcessManager> GpmcpRunnerInner<Uninitialized, Manager> {
// Determine transport type and create appropriate managers

// For SSE transport, start the server process first
if matches!(self.runner_config.transport, Transport::Sse { .. }) {
info!("Starting server process for SSE transport");
if matches!(
self.runner_config.transport,
Transport::Sse { .. } | Transport::Http { .. }
) {
info!("Starting server process");
let _handle = self
.process_manager
.start_server(self.out.clone(), self.err.clone())
Expand Down
4 changes: 4 additions & 0 deletions crates/gpmcp-layer-core/src/runner/service_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl ServiceCoordinator {
.serve(sse_transport)
.await
.context("Failed to create SSE service")?,
super::transport_manager::TransportVariant::Http(http_transport) => client_info
.serve(http_transport)
.await
.context("Failed to create HTTP service")?,
};

info!("Service coordinator created successfully");
Expand Down
81 changes: 74 additions & 7 deletions crates/gpmcp-layer-core/src/runner/transport_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::config::{RunnerConfig, Transport};
use anyhow::{Context, Result};
use backon::{BackoffBuilder, ExponentialBuilder, Retryable};
use rmcp::transport::{SseClientTransport, TokioChildProcess};
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
use rmcp::transport::{SseClientTransport, StreamableHttpClientTransport, TokioChildProcess};
use std::time::Duration;
use tokio::process::Command;
use tracing::{info, warn};
Expand All @@ -14,6 +15,7 @@ pub struct TransportManager {
pub enum TransportVariant {
Stdio(TokioChildProcess),
Sse(SseClientTransport<reqwest::Client>),
Http(StreamableHttpClientTransport<reqwest::Client>),
}

impl TransportManager {
Expand All @@ -28,6 +30,10 @@ impl TransportManager {
info!("Creating SSE transport for URL: {}", url);
Self::create_sse_transport(url).await?
}
Transport::Http { url } => {
warn!("Creating HTTP transport for URL: {}", url);
Self::create_http_transport(url).await?
}
};

Ok(Self { transport })
Expand Down Expand Up @@ -73,7 +79,7 @@ impl TransportManager {

// Poll the server to check if it's ready using list_tools request
// Use shorter timeout for faster failure in test environments
Self::poll_server_readiness(&url_string, 100, 1000).await?;
Self::poll_server_readiness(&url_string, 100, 1000, Self::test_server_connectivity).await?;

// Create SSE transport
let transport = SseClientTransport::start(url_string)
Expand All @@ -84,8 +90,17 @@ impl TransportManager {
}

/// Poll the server readiness by attempting to connect and call list_tools
async fn poll_server_readiness(url: &str, max_attempts: u32, interval_ms: u64) -> Result<()> {
info!(url=%url, max_attempts=?max_attempts, interval_ms=?interval_ms, "Polling server readiness");
async fn poll_server_readiness<F, R>(
url: &str,
max_attempts: u32,
interval_ms: u64,
f: F,
) -> Result<()>
where
F: Fn(String) -> R + Send + Sync,
R: Future<Output = Result<()>> + Send,
{
info!(url = % url, max_attempts =? max_attempts, interval_ms = ? interval_ms, "Polling server readiness");

let poll = ExponentialBuilder::new()
.with_jitter()
Expand All @@ -95,16 +110,16 @@ impl TransportManager {
.with_max_delay(Duration::from_secs(1))
.build();

(|| Self::test_server_connectivity(url)).retry(poll).await
(|| f(url.to_string())).retry(poll).await
}

/// Test server connectivity by creating a temporary connection and calling list_tools
async fn test_server_connectivity(url: &str) -> Result<()> {
async fn test_server_connectivity(url: String) -> Result<()> {
use rmcp::ServiceExt;
use rmcp::model::ClientInfo;

// Create a temporary SSE transport for testing
let test_transport = SseClientTransport::start(url.to_string())
let test_transport = SseClientTransport::start(url)
.await
.context("Failed to create test SSE transport")?;

Expand Down Expand Up @@ -143,4 +158,56 @@ impl TransportManager {
pub fn into_transport(self) -> TransportVariant {
self.transport
}

async fn create_http_transport(url: impl ToString) -> Result<TransportVariant> {
let url_string = url.to_string();
Self::poll_server_readiness(&url_string, 100, 1000, Self::test_server_connectivity_http)
.await?;
let transport = StreamableHttpClientTransport::with_client(
reqwest::Client::new(),
StreamableHttpClientTransportConfig::with_uri(url_string),
);

Ok(TransportVariant::Http(transport))
}
async fn test_server_connectivity_http(url: String) -> Result<()> {
use rmcp::ServiceExt;
use rmcp::model::ClientInfo;

// Create a temporary SSE transport for testing
let test_transport = StreamableHttpClientTransport::with_client(
reqwest::Client::new(),
StreamableHttpClientTransportConfig::with_uri(url),
);

// Create minimal client info for testing
let client_info = ClientInfo {
protocol_version: rmcp::model::ProtocolVersion::default(),
capabilities: rmcp::model::ClientCapabilities::default(),
client_info: rmcp::model::Implementation {
name: "gpmcp-readiness-test".to_string(),
version: "0.1.0".to_string(),
},
};

// Try to establish service and call list_tools
let service = client_info
.serve(test_transport)
.await
.context("Failed to create test service")?;

// Call list_tools to verify server is responding
let _result = service
.list_tools(Default::default())
.await
.context("Server not responding to list_tools")?;

// Cancel the test service
service
.cancel()
.await
.context("Failed to cancel test service")?;

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/gpmcp-layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2024"
gpmcp-layer-core = { path = "../gpmcp-layer-core" }
anyhow = "1.0"
async-trait = "0.1"
rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c", features = [
rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "e615300feae62a01f90f40fc7711ab15c3f0a88c", features = [
"client",
"transport-sse-client",
"reqwest",
Expand Down
3 changes: 2 additions & 1 deletion examples/test-mcp-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2024"

[dependencies]
serde_json = "1.0.140"
rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "b9d7d61ebd6e8385cbc4aa105d4e25774fc1a59c", features = [
rmcp = { git = "https://github.com/gpmcp/rust-sdk", rev = "e615300feae62a01f90f40fc7711ab15c3f0a88c", features = [
"client",
"transport-sse-client",
"reqwest",
Expand All @@ -21,3 +21,4 @@ axum = "0.8.4"
tracing = "0.1.41"
anyhow = "1.0.98"
tokio-util = "0.7.15"
tower = "0.4.13"
2 changes: 1 addition & 1 deletion examples/test-mcp-server/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::sync::Arc;

use rmcp::{
Error as McpError, RoleServer, ServerHandler, handler::server::router::tool::ToolRouter,
ErrorData as McpError, RoleServer, ServerHandler, handler::server::router::tool::ToolRouter,
model::*, service::RequestContext, tool, tool_handler, tool_router,
};
use serde_json::json;
Expand Down
Loading