Skip to content

feat: add sse transport support #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,639 changes: 2,411 additions & 228 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ members = [
"examples/simple-mcp-client-core",
"examples/hello-world-mcp-server",
"examples/hello-world-mcp-server-core",
"examples/hello-world-server-sse",
"examples/hello-world-server-core-sse",
"examples/simple-mcp-client-sse",
"examples/simple-mcp-client-core-sse",
]

[workspace.dependencies]
Expand All @@ -26,13 +30,20 @@ async-trait = { version = "0.1" }
strum = { version = "0.27", features = ["derive"] }
thiserror = { version = "2.0" }
tokio-stream = { version = "0.1" }
uuid = { version = "1" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"std",
"fmt",
] }

axum = "0.8"
rustls = "0.23"
tokio-rustls = "0.26"
axum-server = { version = "0.7" }
reqwest = "0.12"
bytes = "1.10.1"

# [workspace.dependencies.windows]

Expand Down
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ args = ["fmt", "--all", "--", "--check"]

[tasks.clippy]
command = "cargo"
args = ["clippy"]
args = ["clippy", "--workspace", "--all-targets", "--all-features"]

[tasks.test]
install_crate = "nextest"
Expand Down
169 changes: 141 additions & 28 deletions README.md

Large diffs are not rendered by default.

Binary file added assets/examples/hello-world-server-sse.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/examples/simple-mcp-client-sse.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
31 changes: 27 additions & 4 deletions crates/rust-mcp-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,35 @@ async-trait = { workspace = true }
futures = { workspace = true }
thiserror = { workspace = true }

axum = { workspace = true, optional = true }
uuid = { workspace = true, features = ["v4"], optional = true }
tokio-stream = { workspace = true, optional = true }
axum-server = { version = "0.7", features = [], optional = true }
tracing.workspace = true

# rustls = { workspace = true, optional = true }
hyper = { version = "1.6.0" }

[dev-dependencies]
tracing-subscriber = { workspace = true, features = [
"env-filter",
"std",
"fmt",
] }

[features]
default = ["client", "server", "macros"] # All features enabled by default
server = [] # Server feature
client = [] # Client feature
default = [
"client",
"server",
"macros",
"hyper-server",
"ssl",
] # All features enabled by default
server = [] # Server feature
client = [] # Client feature
hyper-server = ["axum", "axum-server", "uuid", "tokio-stream"]
ssl = ["axum-server/tls-rustls"]
macros = ["rust-mcp-macros"]


[lints]
workspace = true
172 changes: 142 additions & 30 deletions crates/rust-mcp-sdk/README.md

Large diffs are not rendered by default.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions crates/rust-mcp-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use rust_mcp_schema::RpcError;
use rust_mcp_transport::error::TransportError;
use thiserror::Error;

#[cfg(feature = "hyper-server")]
use crate::hyper_servers::error::TransportServerError;

pub type SdkResult<T> = core::result::Result<T, McpSdkError>;

#[derive(Debug, Error)]
Expand All @@ -18,6 +21,9 @@ pub enum McpSdkError {
AnyError(Box<(dyn std::error::Error + Send + Sync)>),
#[error("{0}")]
SdkError(#[from] rust_mcp_schema::schema_utils::SdkError),
#[cfg(feature = "hyper-server")]
#[error("{0}")]
TransportServerError(#[from] TransportServerError),
}

#[deprecated(since = "0.2.0", note = "Use `McpSdkError` instead.")]
Expand Down
10 changes: 10 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
mod app_state;
pub mod error;
pub mod hyper_server;
pub mod hyper_server_core;
mod routes;
mod server;
mod session_store;

pub use server::*;
pub use session_store::*;
22 changes: 22 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/app_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::{sync::Arc, time::Duration};

use rust_mcp_schema::InitializeResult;
use rust_mcp_transport::TransportOptions;

use crate::mcp_traits::mcp_handler::McpServerHandler;

use super::{session_store::SessionStore, IdGenerator};

/// Application state struct for the Hyper server
///
/// Holds shared, thread-safe references to session storage, ID generator,
/// server details, handler, ping interval, and transport options.
#[derive(Clone)]
pub struct AppState {
pub session_store: Arc<dyn SessionStore>,
pub id_generator: Arc<dyn IdGenerator>,
pub server_details: Arc<InitializeResult>,
pub handler: Arc<dyn McpServerHandler>,
pub ping_interval: Duration,
pub transport_options: Arc<TransportOptions>,
}
33 changes: 33 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::net::AddrParseError;

use axum::{http::StatusCode, response::IntoResponse};
use thiserror::Error;

pub type TransportServerResult<T> = core::result::Result<T, TransportServerError>;

#[derive(Debug, Error, Clone)]
pub enum TransportServerError {
#[error("'sessionId' query string is missing!")]
SessionIdMissing,
#[error("No session found for the given ID: {0}.")]
SessionIdInvalid(String),
#[error("Stream IO Error: {0}.")]
StreamIoError(String),
#[error("{0}")]
AddrParseError(#[from] AddrParseError),
#[error("Server start error: {0}")]
ServerStartError(String),
#[error("Invalid options: {0}")]
InvalidServerOptions(String),
#[error("{0}")]
SslCertError(String),
}

impl IntoResponse for TransportServerError {
//consume self and returns a Response
fn into_response(self) -> axum::response::Response {
let mut response = StatusCode::INTERNAL_SERVER_ERROR.into_response();
response.extensions_mut().insert(self);
response
}
}
29 changes: 29 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/hyper_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

use rust_mcp_schema::InitializeResult;

use crate::mcp_server::{server_runtime::ServerRuntimeInternalHandler, ServerHandler};

use super::{HyperServer, HyperServerOptions};

/// Creates a new HyperServer instance with the provided handler and options
/// The handler must implement ServerHandler.
///
/// # Arguments
/// * `server_details` - Initialization result from the MCP schema
/// * `handler` - Implementation of the ServerHandlerCore trait
/// * `server_options` - Configuration options for the HyperServer
///
/// # Returns
/// * `HyperServer` - A configured HyperServer instance ready to start
pub fn create_server(
server_details: InitializeResult,
handler: impl ServerHandler,
server_options: HyperServerOptions,
) -> HyperServer {
HyperServer::new(
server_details,
Arc::new(ServerRuntimeInternalHandler::new(Box::new(handler))),
server_options,
)
}
26 changes: 26 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/hyper_server_core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use super::{HyperServer, HyperServerOptions};
use crate::mcp_server::{server_runtime_core::RuntimeCoreInternalHandler, ServerHandlerCore};
use rust_mcp_schema::InitializeResult;
use std::sync::Arc;

/// Creates a new HyperServer instance with the provided handler and options
/// The handler must implement ServerHandlerCore.
///
/// # Arguments
/// * `server_details` - Initialization result from the MCP schema
/// * `handler` - Implementation of the ServerHandlerCore trait
/// * `server_options` - Configuration options for the HyperServer
///
/// # Returns
/// * `HyperServer` - A configured HyperServer instance ready to start
pub fn create_server(
server_details: InitializeResult,
handler: impl ServerHandlerCore,
server_options: HyperServerOptions,
) -> HyperServer {
HyperServer::new(
server_details,
Arc::new(RuntimeCoreInternalHandler::new(Box::new(handler))),
server_options,
)
}
29 changes: 29 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pub mod fallback_routes;
pub mod messages_routes;
pub mod sse_routes;

use super::{app_state::AppState, HyperServerOptions};
use axum::Router;
use std::sync::Arc;

/// Constructs the Axum router with all application routes
///
/// Combines routes for Server-Sent Events, message handling, and fallback routes,
/// attaching the shared application state to the router.
///
/// # Arguments
/// * `state` - Shared application state wrapped in an Arc
/// * `server_options` - Reference to the HyperServer configuration options
///
/// # Returns
/// * `Router` - An Axum router configured with all application routes and state
pub fn app_routes(state: Arc<AppState>, server_options: &HyperServerOptions) -> Router {
Router::new()
.merge(sse_routes::routes(
state.clone(),
server_options.sse_endpoint(),
))
.merge(messages_routes::routes(state.clone()))
.with_state(state)
.merge(fallback_routes::routes())
}
15 changes: 15 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/routes/fallback_routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use axum::{
http::{StatusCode, Uri},
Router,
};

pub fn routes() -> Router {
Router::new().fallback(not_found)
}

pub async fn not_found(uri: Uri) -> (StatusCode, String) {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Server Error!\r\n uri: {}", uri),
)
}
50 changes: 50 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/routes/messages_routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::hyper_servers::{
app_state::AppState,
error::{TransportServerError, TransportServerResult},
};
use axum::{
extract::{Query, State},
response::IntoResponse,
routing::post,
Router,
};
use std::{collections::HashMap, sync::Arc};
use tokio::io::AsyncWriteExt;

const SSE_MESSAGES_PATH: &str = "/messages";

pub fn routes(_state: Arc<AppState>) -> Router<Arc<AppState>> {
Router::new().route(SSE_MESSAGES_PATH, post(handle_messages))
}

pub async fn handle_messages(
State(state): State<Arc<AppState>>,
Query(params): Query<HashMap<String, String>>,
message: String,
) -> TransportServerResult<impl IntoResponse> {
let session_id = params
.get("sessionId")
.ok_or(TransportServerError::SessionIdMissing)?;

let transmit =
state
.session_store
.get(session_id)
.await
.ok_or(TransportServerError::SessionIdInvalid(
session_id.to_string(),
))?;
let mut transmit = transmit.lock().await;

transmit
.write_all(format!("{message}\n").as_bytes())
.await
.map_err(|err| TransportServerError::StreamIoError(err.to_string()))?;

transmit
.flush()
.await
.map_err(|err| TransportServerError::StreamIoError(err.to_string()))?;

Ok(axum::http::StatusCode::OK)
}
Loading