6 releases
new 0.1.5 | Mar 14, 2025 |
---|---|
0.1.4 | Mar 13, 2025 |
#5 in #mcp
177 downloads per month
79KB
1K
SLoC
mcprotocol-rs
⚠️ 开发状态: 本项目目前处于积极开发中,API 可能会发生变化。
⚠️ Development Status: This project is under active development and the API may change.
mcprotocol-rs
是 Model Context Protocol (MCP) 的 Rust 实现。它提供了一个完整的框架来实现 MCP 客户端和服务器。
mcprotocol-rs
is a Rust implementation of the Model Context Protocol (MCP). It provides a complete framework for implementing MCP clients and servers.
特性 | Features
-
完整实现 MCP 2024-11-05 规范
-
支持多种传输层
- HTTP/SSE 传输
- 基于 axum 的高性能服务器实现
- 支持 SSE 实时消息推送
- 内置认证支持
- 标准输入/输出传输
- 符合 MCP 规范的子进程管理
- 支持服务器日志捕获
- 自动处理进程生命周期
- HTTP/SSE 传输
-
异步 API 设计
- 基于 tokio 的异步运行时
- 完整的 Stream 支持
- 非阻塞 I/O 操作
-
完整的类型安全
-
内置错误处理
-
可扩展的架构
- 模块化的传输层设计
- 支持自定义传输实现
- 工厂模式创建实例
-
Complete implementation of MCP 2024-11-05 specification
-
Multiple transport layer support
- HTTP/SSE transport
- High-performance server implementation based on axum
- SSE real-time message push support
- Built-in authentication support
- Standard input/output transport
- MCP-compliant subprocess management
- Server log capture support
- Automatic process lifecycle handling
- HTTP/SSE transport
-
Asynchronous API design
- Based on tokio runtime
- Complete Stream support
- Non-blocking I/O operations
-
Complete type safety
-
Built-in error handling
-
Extensible architecture
- Modular transport layer design
- Custom transport implementation support
- Factory pattern for instance creation
安装 | Installation
将以下内容添加到你的 Cargo.toml
:
Add this to your Cargo.toml
:
[dependencies]
mcprotocol-rs = "0.1.2"
快速开始 | Quick Start
HTTP 服务器示例 | HTTP Server Example
use mcprotocol_rs::{
transport::{
ServerTransportFactory,
TransportConfig,
TransportType,
},
Result,
};
#[tokio::main]
async fn main() -> Result<()> {
// 配置 HTTP 服务器
let config = TransportConfig {
transport_type: TransportType::Http {
base_url: "127.0.0.1:3000".to_string(),
auth_token: Some("your-auth-token".to_string()),
},
parameters: None,
};
// 使用工厂创建服务器
let factory = ServerTransportFactory;
let mut server = factory.create(config)?;
// 初始化并启动服务器
server.initialize().await?;
// 保持服务器运行
tokio::signal::ctrl_c().await?;
Ok(())
}
Stdio 传输示例 | Stdio Transport Example
首先创建服务器程序 examples/stdio_server.rs
:
First create the server program examples/stdio_server.rs
:
use mcprotocol_rs::message;
use mcprotocol_rs::{
protocol::{Message, Response},
transport::{ServerTransportFactory, TransportConfig, TransportType},
Result,
};
use serde_json::json;
use std::collections::HashSet;
#[tokio::main]
async fn main() -> Result<()> {
// 跟踪会话中使用的请求 ID
// Track request IDs used in the session
let mut session_ids = HashSet::new();
// 配置 Stdio 服务器
// Configure Stdio server
let config = TransportConfig {
transport_type: TransportType::Stdio {
server_path: None,
server_args: None,
},
parameters: None,
};
// 创建服务器实例
// Create server instance
let factory = ServerTransportFactory;
let mut server = factory.create(config)?;
// 初始化服务器
// Initialize server
server.initialize().await?;
eprintln!("Server initialized and ready to receive messages...");
// 持续接收和处理消息
// Continuously receive and process messages
loop {
match server.receive().await {
Ok(message) => {
eprintln!("Received message: {:?}", message);
// 根据消息类型处理
// Process messages based on type
match message {
Message::Request(request) => {
// 验证请求 ID 的唯一性
// Validate request ID uniqueness
if !request.validate_id_uniqueness(&mut session_ids) {
let error = Message::Response(Response::error(
message::ResponseError {
code: message::error_codes::INVALID_REQUEST,
message: "Request ID has already been used".to_string(),
data: None,
},
request.id,
));
if let Err(e) = server.send(error).await {
eprintln!("Error sending error response: {}", e);
break;
}
continue;
}
match request.method.as_str() {
"prompts/execute" => {
// 创建响应消息
// Create response message
let response = Message::Response(Response::success(
json!({
"content": "Hello from server!",
"role": "assistant"
}),
request.id,
));
// 发送响应
// Send response
if let Err(e) = server.send(response).await {
eprintln!("Error sending response: {}", e);
break;
}
}
_ => {
eprintln!("Unknown method: {}", request.method);
let error = Message::Response(Response::error(
message::ResponseError {
code: message::error_codes::METHOD_NOT_FOUND,
message: "Method not found".to_string(),
data: None,
},
request.id,
));
if let Err(e) = server.send(error).await {
eprintln!("Error sending error response: {}", e);
break;
}
}
}
}
_ => {
eprintln!("Unexpected message type");
}
}
}
Err(e) => {
eprintln!("Error receiving message: {}", e);
break;
}
}
}
// 关闭服务器
// Close server
server.close().await?;
Ok(())
}
然后创建客户端程序 examples/stdio_client.rs
:
Then create the client program examples/stdio_client.rs
:
use mcprotocol_rs::{
protocol::{Message, Method, Request, RequestId},
transport::{ClientTransportFactory, TransportConfig, TransportType},
Result,
};
use serde_json::json;
use std::{collections::HashSet, env, time::Duration};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<()> {
// 跟踪会话中使用的请求 ID
// Track request IDs used in the session
let mut session_ids = HashSet::new();
// 获取服务器程序路径
// Get server program path
let server_path = env::current_dir()?.join("target/debug/examples/stdio_server");
// 配置 Stdio 客户端
// Configure Stdio client
let config = TransportConfig {
transport_type: TransportType::Stdio {
server_path: Some(server_path.to_str().unwrap().to_string()),
server_args: None,
},
parameters: None,
};
// 创建客户端实例
// Create client instance
let factory = ClientTransportFactory;
let mut client = factory.create(config)?;
// 初始化客户端
// Initialize client
client.initialize().await?;
eprintln!("Client initialized and connected to server...");
// 等待服务器初始化完成
// Wait for server initialization to complete
sleep(Duration::from_millis(100)).await;
// 创建请求
// Create request
let request_id = RequestId::Number(1);
let request = Request::new(
Method::ExecutePrompt,
Some(json!({
"content": "Hello from client!",
"role": "user"
})),
request_id,
);
// 验证请求 ID 的唯一性
// Validate request ID uniqueness
if !request.validate_id_uniqueness(&mut session_ids) {
eprintln!("Request ID has already been used in this session");
return Ok(());
}
// 发送消息
// Send message
eprintln!("Sending message to server...");
client.send(Message::Request(request)).await?;
// 接收服务器响应
// Receive server response
match client.receive().await {
Ok(response) => {
eprintln!("Received response: {:?}", response);
match response {
Message::Response(resp) => {
if let Some(result) = resp.result {
eprintln!("Server response result: {}", result);
}
if let Some(error) = resp.error {
eprintln!(
"Server response error: {} (code: {})",
error.message, error.code
);
}
}
_ => eprintln!("Unexpected response type"),
}
}
Err(e) => {
eprintln!("Error receiving response: {}", e);
}
}
// 关闭客户端
// Close client
client.close().await?;
Ok(())
}
运行示例 | Running the example:
# 1. 首先编译服务器程序 | First, build the server
cargo build --example stdio_server
# 2. 然后运行客户端程序 | Then run the client
cargo run --example stdio_client
客户端会自动启动服务器进程并通过标准输入/输出进行通信。 The client will automatically start the server process and communicate through stdin/stdout.
自定义传输实现 | Custom Transport Implementation
你可以通过实现 Transport
trait 来创建自己的传输层:
You can create your own transport layer by implementing the Transport
trait:
use mcprotocol_rs::{
transport::Transport,
protocol::Message,
Result,
};
use async_trait::async_trait;
#[derive(Clone)]
struct MyTransport {
// 你的传输层字段
// Your transport fields
}
#[async_trait]
impl Transport for MyTransport {
async fn initialize(&mut self) -> Result<()> {
// 实现初始化逻辑
// Implement initialization logic
}
async fn send(&self, message: Message) -> Result<()> {
// 实现发送逻辑
// Implement send logic
}
async fn receive(&self) -> Result<Message> {
// 实现接收逻辑
// Implement receive logic
}
async fn close(&mut self) -> Result<()> {
// 实现关闭逻辑
// Implement close logic
}
}
生命周期示例 | Lifecycle Example
首先创建服务器程序 examples/lifecycle_server.rs
:
First create the server program examples/lifecycle_server.rs
:
use mcprotocol_rs::{
error_codes,
protocol::ServerCapabilities,
transport::{ServerTransportFactory, TransportConfig, TransportType},
ImplementationInfo, Message, Response, ResponseError, Result, PROTOCOL_VERSION,
};
use serde_json::json;
use std::collections::HashSet;
use tokio;
#[tokio::main]
async fn main() -> Result<()> {
// 跟踪会话中使用的请求 ID
// Track request IDs used in the session
let mut session_ids = HashSet::new();
// 配置 Stdio 服务器
// Configure Stdio server
let config = TransportConfig {
transport_type: TransportType::Stdio {
server_path: None,
server_args: None,
},
parameters: None,
};
// 创建服务器实例
// Create server instance
let factory = ServerTransportFactory;
let mut server = factory.create(config)?;
let mut initialized = false;
// 启动服务器
// Start server
eprintln!("Server starting...");
server.initialize().await?;
// 处理消息循环
// Message handling loop
loop {
match server.receive().await {
Ok(message) => {
match message {
Message::Request(request) => {
// 验证请求 ID 的唯一性
// Validate request ID uniqueness
if !request.validate_id_uniqueness(&mut session_ids) {
let error = ResponseError {
code: error_codes::INVALID_REQUEST,
message: "Request ID has already been used".to_string(),
data: None,
};
let response = Response::error(error, request.id);
server.send(Message::Response(response)).await?;
continue;
}
match request.method.as_str() {
"initialize" => {
// 处理初始化请求
// Handle initialize request
// ... initialization logic ...
}
"shutdown" => {
if !initialized {
// 如果未初始化,发送错误
// If not initialized, send error
let error = ResponseError {
code: error_codes::SERVER_NOT_INITIALIZED,
message: "Server not initialized".to_string(),
data: None,
};
let response = Response::error(error, request.id);
server.send(Message::Response(response)).await?;
continue;
}
// ... shutdown logic ...
}
_ => {
// ... handle other requests ...
}
}
}
Message::Notification(notification) => {
match notification.method.as_str() {
"initialized" => {
eprintln!("Server initialized");
initialized = true;
}
"exit" => {
eprintln!("Received exit notification");
break;
}
_ => {}
}
}
_ => {}
}
}
Err(e) => {
eprintln!("Error receiving message: {}", e);
break;
}
}
}
server.close().await?;
Ok(())
}
然后创建客户端程序 examples/lifecycle_client.rs
:
Then create the client program examples/lifecycle_client.rs
:
use mcprotocol_rs::{
transport::{ClientTransportFactory, TransportConfig, TransportType},
ClientCapabilities, ImplementationInfo, Message, Method, Notification, Request, RequestId,
Result, PROTOCOL_VERSION,
};
use serde_json::json;
use std::{collections::HashSet, env};
use tokio;
#[tokio::main]
async fn main() -> Result<()> {
// 跟踪会话中使用的请求 ID
// Track request IDs used in the session
let mut session_ids = HashSet::new();
// 获取服务器程序路径
// Get server program path
let server_path = env::current_dir()?.join("target/debug/examples/lifecycle_server");
// 配置 Stdio 客户端
// Configure Stdio client
let config = TransportConfig {
transport_type: TransportType::Stdio {
server_path: Some(server_path.to_str().unwrap().to_string()),
server_args: None,
},
parameters: None,
};
// 创建客户端实例
// Create client instance
let factory = ClientTransportFactory;
let mut client = factory.create(config)?;
// 初始化客户端
// Initialize client
client.initialize().await?;
// 发送初始化请求
// Send initialize request
let init_request = Request::new(
Method::Initialize,
Some(json!({
"protocolVersion": PROTOCOL_VERSION,
"capabilities": ClientCapabilities {
roots: None,
sampling: None,
experimental: None,
},
"clientInfo": ImplementationInfo {
name: "Example Client".to_string(),
version: "1.0.0".to_string(),
}
})),
RequestId::Number(1),
);
// 验证请求 ID 的唯一性
// Validate request ID uniqueness
if !init_request.validate_id_uniqueness(&mut session_ids) {
eprintln!("Request ID has already been used in this session");
return Ok(());
}
client.send(Message::Request(init_request)).await?;
// 等待初始化响应并处理
// Wait for and handle initialization response
// ... handle response ...
// 发送关闭请求
// Send shutdown request
let shutdown_request = Request::new(Method::Shutdown, None, RequestId::Number(2));
// 验证请求 ID 的唯一性
// Validate request ID uniqueness
if !shutdown_request.validate_id_uniqueness(&mut session_ids) {
eprintln!("Request ID has already been used in this session");
return Ok(());
}
client.send(Message::Request(shutdown_request)).await?;
// 发送退出通知
// Send exit notification
let exit_notification = Notification::new(Method::Exit, None);
client.send(Message::Notification(exit_notification)).await?;
client.close().await?;
Ok(())
}
运行生命周期示例 | Running the lifecycle example:
# 1. 首先编译服务器程序 | First, build the server
cargo build --example lifecycle_server
# 2. 然后运行客户端程序 | Then run the client
cargo run --example lifecycle_client
这个示例展示了完整的 MCP 生命周期,包括: This example demonstrates the complete MCP lifecycle, including:
-
初始化阶段(initialize 请求和 initialized 通知)
-
会话 ID 跟踪和验证
-
版本协商
-
优雅关闭(shutdown 请求和 exit 通知)
-
Initialization phase (initialize request and initialized notification)
-
Session ID tracking and validation
-
Version negotiation
-
Graceful shutdown (shutdown request and exit notification)
项目结构 | Project Structure
src/
├── client/ # 客户端实现 | Client implementation
├── server/ # 服务器实现 | Server implementation
├── protocol/ # MCP 协议实现 | MCP protocol implementation
└── transport/ # 传输层实现 | Transport layer implementation
├── http/ # HTTP/SSE 传输 | HTTP/SSE transport
│ ├── client.rs # HTTP 客户端 | HTTP client
│ └── server.rs # HTTP 服务器 | HTTP server
└── stdio/ # 标准输入/输出传输 | Stdio transport
├── client.rs # Stdio 客户端 | Stdio client
└── server.rs # Stdio 服务器 | Stdio server
贡献 | Contributing
欢迎提交 Pull Requests!对于重大更改,请先开 issue 讨论您想要更改的内容。
Pull Requests are welcome! For major changes, please open an issue first to discuss what you would like to change.
许可证 | License
本项目采用 MIT 许可证 - 详见 LICENSE 文件
This project is licensed under the MIT License - see the LICENSE file for details
Dependencies
~9–20MB
~274K SLoC