#mcp #context #protocols #model #ai #http-server #http-client

bin+lib mcprotocol-rs

A Rust implementation of the Model Context Protocol (MCP)

6 releases

new 0.1.5 Mar 14, 2025
0.1.4 Mar 13, 2025

#5 in #mcp

Download history 177/week @ 2025-03-06

177 downloads per month

MIT license

79KB
1K SLoC

mcprotocol-rs

Crates.io GitHub Documentation License:MIT

⚠️ 开发状态: 本项目目前处于积极开发中,API 可能会发生变化。

⚠️ Development Status: This project is under active development and the API may change.

mcprotocol-rs 是 Model Context Protocol (MCP) 的 Rust 实现。它提供了一个完整的框架来实现 MCP 客户端和服务器。

mcprotocol-rs is a Rust implementation of the Model Context Protocol (MCP). It provides a complete framework for implementing MCP clients and servers.

特性 | Features

  • 完整实现 MCP 2024-11-05 规范

  • 支持多种传输层

    • HTTP/SSE 传输
      • 基于 axum 的高性能服务器实现
      • 支持 SSE 实时消息推送
      • 内置认证支持
    • 标准输入/输出传输
      • 符合 MCP 规范的子进程管理
      • 支持服务器日志捕获
      • 自动处理进程生命周期
  • 异步 API 设计

    • 基于 tokio 的异步运行时
    • 完整的 Stream 支持
    • 非阻塞 I/O 操作
  • 完整的类型安全

  • 内置错误处理

  • 可扩展的架构

    • 模块化的传输层设计
    • 支持自定义传输实现
    • 工厂模式创建实例
  • Complete implementation of MCP 2024-11-05 specification

  • Multiple transport layer support

    • HTTP/SSE transport
      • High-performance server implementation based on axum
      • SSE real-time message push support
      • Built-in authentication support
    • Standard input/output transport
      • MCP-compliant subprocess management
      • Server log capture support
      • Automatic process lifecycle handling
  • Asynchronous API design

    • Based on tokio runtime
    • Complete Stream support
    • Non-blocking I/O operations
  • Complete type safety

  • Built-in error handling

  • Extensible architecture

    • Modular transport layer design
    • Custom transport implementation support
    • Factory pattern for instance creation

安装 | Installation

将以下内容添加到你的 Cargo.toml: Add this to your Cargo.toml:

[dependencies]
mcprotocol-rs = "0.1.2"

快速开始 | Quick Start

HTTP 服务器示例 | HTTP Server Example

use mcprotocol_rs::{
    transport::{
        ServerTransportFactory,
        TransportConfig,
        TransportType,
    },
    Result,
};

#[tokio::main]
async fn main() -> Result<()> {
    // 配置 HTTP 服务器
    let config = TransportConfig {
        transport_type: TransportType::Http {
            base_url: "127.0.0.1:3000".to_string(),
            auth_token: Some("your-auth-token".to_string()),
        },
        parameters: None,
    };

    // 使用工厂创建服务器
    let factory = ServerTransportFactory;
    let mut server = factory.create(config)?;
    
    // 初始化并启动服务器
    server.initialize().await?;

    // 保持服务器运行
    tokio::signal::ctrl_c().await?;
    Ok(())
}

Stdio 传输示例 | Stdio Transport Example

首先创建服务器程序 examples/stdio_server.rs: First create the server program examples/stdio_server.rs:

use mcprotocol_rs::message;
use mcprotocol_rs::{
    protocol::{Message, Response},
    transport::{ServerTransportFactory, TransportConfig, TransportType},
    Result,
};
use serde_json::json;
use std::collections::HashSet;

#[tokio::main]
async fn main() -> Result<()> {
    // 跟踪会话中使用的请求 ID
    // Track request IDs used in the session
    let mut session_ids = HashSet::new();

    // 配置 Stdio 服务器
    // Configure Stdio server
    let config = TransportConfig {
        transport_type: TransportType::Stdio {
            server_path: None,
            server_args: None,
        },
        parameters: None,
    };

    // 创建服务器实例
    // Create server instance
    let factory = ServerTransportFactory;
    let mut server = factory.create(config)?;

    // 初始化服务器
    // Initialize server
    server.initialize().await?;
    eprintln!("Server initialized and ready to receive messages...");

    // 持续接收和处理消息
    // Continuously receive and process messages
    loop {
        match server.receive().await {
            Ok(message) => {
                eprintln!("Received message: {:?}", message);

                // 根据消息类型处理
                // Process messages based on type
                match message {
                    Message::Request(request) => {
                        // 验证请求 ID 的唯一性
                        // Validate request ID uniqueness
                        if !request.validate_id_uniqueness(&mut session_ids) {
                            let error = Message::Response(Response::error(
                                message::ResponseError {
                                    code: message::error_codes::INVALID_REQUEST,
                                    message: "Request ID has already been used".to_string(),
                                    data: None,
                                },
                                request.id,
                            ));
                            if let Err(e) = server.send(error).await {
                                eprintln!("Error sending error response: {}", e);
                                break;
                            }
                            continue;
                        }

                        match request.method.as_str() {
                            "prompts/execute" => {
                                // 创建响应消息
                                // Create response message
                                let response = Message::Response(Response::success(
                                    json!({
                                        "content": "Hello from server!",
                                        "role": "assistant"
                                    }),
                                    request.id,
                                ));

                                // 发送响应
                                // Send response
                                if let Err(e) = server.send(response).await {
                                    eprintln!("Error sending response: {}", e);
                                    break;
                                }
                            }
                            _ => {
                                eprintln!("Unknown method: {}", request.method);
                                let error = Message::Response(Response::error(
                                    message::ResponseError {
                                        code: message::error_codes::METHOD_NOT_FOUND,
                                        message: "Method not found".to_string(),
                                        data: None,
                                    },
                                    request.id,
                                ));
                                if let Err(e) = server.send(error).await {
                                    eprintln!("Error sending error response: {}", e);
                                    break;
                                }
                            }
                        }
                    }
                    _ => {
                        eprintln!("Unexpected message type");
                    }
                }
            }
            Err(e) => {
                eprintln!("Error receiving message: {}", e);
                break;
            }
        }
    }

    // 关闭服务器
    // Close server
    server.close().await?;
    Ok(())
}

然后创建客户端程序 examples/stdio_client.rs: Then create the client program examples/stdio_client.rs:

use mcprotocol_rs::{
    protocol::{Message, Method, Request, RequestId},
    transport::{ClientTransportFactory, TransportConfig, TransportType},
    Result,
};
use serde_json::json;
use std::{collections::HashSet, env, time::Duration};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<()> {
    // 跟踪会话中使用的请求 ID
    // Track request IDs used in the session
    let mut session_ids = HashSet::new();

    // 获取服务器程序路径
    // Get server program path
    let server_path = env::current_dir()?.join("target/debug/examples/stdio_server");

    // 配置 Stdio 客户端
    // Configure Stdio client
    let config = TransportConfig {
        transport_type: TransportType::Stdio {
            server_path: Some(server_path.to_str().unwrap().to_string()),
            server_args: None,
        },
        parameters: None,
    };

    // 创建客户端实例
    // Create client instance
    let factory = ClientTransportFactory;
    let mut client = factory.create(config)?;

    // 初始化客户端
    // Initialize client
    client.initialize().await?;
    eprintln!("Client initialized and connected to server...");

    // 等待服务器初始化完成
    // Wait for server initialization to complete
    sleep(Duration::from_millis(100)).await;

    // 创建请求
    // Create request
    let request_id = RequestId::Number(1);
    let request = Request::new(
        Method::ExecutePrompt,
        Some(json!({
            "content": "Hello from client!",
            "role": "user"
        })),
        request_id,
    );

    // 验证请求 ID 的唯一性
    // Validate request ID uniqueness
    if !request.validate_id_uniqueness(&mut session_ids) {
        eprintln!("Request ID has already been used in this session");
        return Ok(());
    }

    // 发送消息
    // Send message
    eprintln!("Sending message to server...");
    client.send(Message::Request(request)).await?;

    // 接收服务器响应
    // Receive server response
    match client.receive().await {
        Ok(response) => {
            eprintln!("Received response: {:?}", response);
            match response {
                Message::Response(resp) => {
                    if let Some(result) = resp.result {
                        eprintln!("Server response result: {}", result);
                    }
                    if let Some(error) = resp.error {
                        eprintln!(
                            "Server response error: {} (code: {})",
                            error.message, error.code
                        );
                    }
                }
                _ => eprintln!("Unexpected response type"),
            }
        }
        Err(e) => {
            eprintln!("Error receiving response: {}", e);
        }
    }

    // 关闭客户端
    // Close client
    client.close().await?;
    Ok(())
}

运行示例 | Running the example:

# 1. 首先编译服务器程序 | First, build the server
cargo build --example stdio_server

# 2. 然后运行客户端程序 | Then run the client
cargo run --example stdio_client

客户端会自动启动服务器进程并通过标准输入/输出进行通信。 The client will automatically start the server process and communicate through stdin/stdout.

自定义传输实现 | Custom Transport Implementation

你可以通过实现 Transport trait 来创建自己的传输层:

You can create your own transport layer by implementing the Transport trait:

use mcprotocol_rs::{
    transport::Transport,
    protocol::Message,
    Result,
};
use async_trait::async_trait;

#[derive(Clone)]
struct MyTransport {
    // 你的传输层字段
    // Your transport fields
}

#[async_trait]
impl Transport for MyTransport {
    async fn initialize(&mut self) -> Result<()> {
        // 实现初始化逻辑
        // Implement initialization logic
    }

    async fn send(&self, message: Message) -> Result<()> {
        // 实现发送逻辑
        // Implement send logic
    }

    async fn receive(&self) -> Result<Message> {
        // 实现接收逻辑
        // Implement receive logic
    }

    async fn close(&mut self) -> Result<()> {
        // 实现关闭逻辑
        // Implement close logic
    }
}

生命周期示例 | Lifecycle Example

首先创建服务器程序 examples/lifecycle_server.rs: First create the server program examples/lifecycle_server.rs:

use mcprotocol_rs::{
    error_codes,
    protocol::ServerCapabilities,
    transport::{ServerTransportFactory, TransportConfig, TransportType},
    ImplementationInfo, Message, Response, ResponseError, Result, PROTOCOL_VERSION,
};
use serde_json::json;
use std::collections::HashSet;
use tokio;

#[tokio::main]
async fn main() -> Result<()> {
    // 跟踪会话中使用的请求 ID
    // Track request IDs used in the session
    let mut session_ids = HashSet::new();

    // 配置 Stdio 服务器
    // Configure Stdio server
    let config = TransportConfig {
        transport_type: TransportType::Stdio {
            server_path: None,
            server_args: None,
        },
        parameters: None,
    };

    // 创建服务器实例
    // Create server instance
    let factory = ServerTransportFactory;
    let mut server = factory.create(config)?;
    let mut initialized = false;

    // 启动服务器
    // Start server
    eprintln!("Server starting...");
    server.initialize().await?;

    // 处理消息循环
    // Message handling loop
    loop {
        match server.receive().await {
            Ok(message) => {
                match message {
                    Message::Request(request) => {
                        // 验证请求 ID 的唯一性
                        // Validate request ID uniqueness
                        if !request.validate_id_uniqueness(&mut session_ids) {
                            let error = ResponseError {
                                code: error_codes::INVALID_REQUEST,
                                message: "Request ID has already been used".to_string(),
                                data: None,
                            };
                            let response = Response::error(error, request.id);
                            server.send(Message::Response(response)).await?;
                            continue;
                        }

                        match request.method.as_str() {
                            "initialize" => {
                                // 处理初始化请求
                                // Handle initialize request
                                // ... initialization logic ...
                            }
                            "shutdown" => {
                                if !initialized {
                                    // 如果未初始化,发送错误
                                    // If not initialized, send error
                                    let error = ResponseError {
                                        code: error_codes::SERVER_NOT_INITIALIZED,
                                        message: "Server not initialized".to_string(),
                                        data: None,
                                    };
                                    let response = Response::error(error, request.id);
                                    server.send(Message::Response(response)).await?;
                                    continue;
                                }
                                // ... shutdown logic ...
                            }
                            _ => {
                                // ... handle other requests ...
                            }
                        }
                    }
                    Message::Notification(notification) => {
                        match notification.method.as_str() {
                            "initialized" => {
                                eprintln!("Server initialized");
                                initialized = true;
                            }
                            "exit" => {
                                eprintln!("Received exit notification");
                                break;
                            }
                            _ => {}
                        }
                    }
                    _ => {}
                }
            }
            Err(e) => {
                eprintln!("Error receiving message: {}", e);
                break;
            }
        }
    }

    server.close().await?;
    Ok(())
}

然后创建客户端程序 examples/lifecycle_client.rs: Then create the client program examples/lifecycle_client.rs:

use mcprotocol_rs::{
    transport::{ClientTransportFactory, TransportConfig, TransportType},
    ClientCapabilities, ImplementationInfo, Message, Method, Notification, Request, RequestId,
    Result, PROTOCOL_VERSION,
};
use serde_json::json;
use std::{collections::HashSet, env};
use tokio;

#[tokio::main]
async fn main() -> Result<()> {
    // 跟踪会话中使用的请求 ID
    // Track request IDs used in the session
    let mut session_ids = HashSet::new();

    // 获取服务器程序路径
    // Get server program path
    let server_path = env::current_dir()?.join("target/debug/examples/lifecycle_server");

    // 配置 Stdio 客户端
    // Configure Stdio client
    let config = TransportConfig {
        transport_type: TransportType::Stdio {
            server_path: Some(server_path.to_str().unwrap().to_string()),
            server_args: None,
        },
        parameters: None,
    };

    // 创建客户端实例
    // Create client instance
    let factory = ClientTransportFactory;
    let mut client = factory.create(config)?;

    // 初始化客户端
    // Initialize client
    client.initialize().await?;

    // 发送初始化请求
    // Send initialize request
    let init_request = Request::new(
        Method::Initialize,
        Some(json!({
            "protocolVersion": PROTOCOL_VERSION,
            "capabilities": ClientCapabilities {
                roots: None,
                sampling: None,
                experimental: None,
            },
            "clientInfo": ImplementationInfo {
                name: "Example Client".to_string(),
                version: "1.0.0".to_string(),
            }
        })),
        RequestId::Number(1),
    );

    // 验证请求 ID 的唯一性
    // Validate request ID uniqueness
    if !init_request.validate_id_uniqueness(&mut session_ids) {
        eprintln!("Request ID has already been used in this session");
        return Ok(());
    }

    client.send(Message::Request(init_request)).await?;

    // 等待初始化响应并处理
    // Wait for and handle initialization response
    // ... handle response ...

    // 发送关闭请求
    // Send shutdown request
    let shutdown_request = Request::new(Method::Shutdown, None, RequestId::Number(2));
    
    // 验证请求 ID 的唯一性
    // Validate request ID uniqueness
    if !shutdown_request.validate_id_uniqueness(&mut session_ids) {
        eprintln!("Request ID has already been used in this session");
        return Ok(());
    }

    client.send(Message::Request(shutdown_request)).await?;

    // 发送退出通知
    // Send exit notification
    let exit_notification = Notification::new(Method::Exit, None);
    client.send(Message::Notification(exit_notification)).await?;

    client.close().await?;
    Ok(())
}

运行生命周期示例 | Running the lifecycle example:

# 1. 首先编译服务器程序 | First, build the server
cargo build --example lifecycle_server

# 2. 然后运行客户端程序 | Then run the client
cargo run --example lifecycle_client

这个示例展示了完整的 MCP 生命周期,包括: This example demonstrates the complete MCP lifecycle, including:

  • 初始化阶段(initialize 请求和 initialized 通知)

  • 会话 ID 跟踪和验证

  • 版本协商

  • 优雅关闭(shutdown 请求和 exit 通知)

  • Initialization phase (initialize request and initialized notification)

  • Session ID tracking and validation

  • Version negotiation

  • Graceful shutdown (shutdown request and exit notification)

项目结构 | Project Structure

src/
├── client/           # 客户端实现 | Client implementation
├── server/           # 服务器实现 | Server implementation
├── protocol/         # MCP 协议实现 | MCP protocol implementation
└── transport/        # 传输层实现 | Transport layer implementation
    ├── http/         # HTTP/SSE 传输 | HTTP/SSE transport
    │   ├── client.rs # HTTP 客户端 | HTTP client
    │   └── server.rs # HTTP 服务器 | HTTP server
    └── stdio/        # 标准输入/输出传输 | Stdio transport
        ├── client.rs # Stdio 客户端 | Stdio client
        └── server.rs # Stdio 服务器 | Stdio server

贡献 | Contributing

欢迎提交 Pull Requests!对于重大更改,请先开 issue 讨论您想要更改的内容。

Pull Requests are welcome! For major changes, please open an issue first to discuss what you would like to change.

许可证 | License

本项目采用 MIT 许可证 - 详见 LICENSE 文件

This project is licensed under the MIT License - see the LICENSE file for details

Dependencies

~9–20MB
~274K SLoC