一、概述

MCP服务端当前支持两种与客户端的数据通信方式:标准输入输出(stdio)  和 基于Http的服务器推送事件(http sse)

1.1 标准输入输出(stdio)

原理:  标准输入输出是一种用于本地通信的传输方式。在这种模式下,MCP 客户端会将服务器程序作为子进程启动,双方通过约定的标准输入和标准输出(可能是通过共享文件等方法)进行数据交换。具体而言,客户端通过标准输入发送请求,服务器通过标准输出返回响应。。

适用场景:  标准输入输出方式适用于客户端和服务器在同一台机器上运行的场景(本地自行编写服务端或将别人编写的服务端代码pull到本地执行),确保了高效、低延迟的通信。这种直接的数据传输方式减少了网络延迟和传输开销,适合需要快速响应的本地应用。

1.2 基于Http的服务器推送事件(http sse)

原理:  客户端和服务端通过 HTTP 协议进行通信,利用 SSE 实现服务端向客户端的实时数据推送,服务端定义了/see与/messages接口用于推送与接收数据。这里要注意SSE协议和WebSocket协议的区别,SSE协议是单向的,客户端和服务端建立连接后,只能由服务端向客户端进行消息推送。而WebSocket协议客户端和服务端建立连接后,客户端可以通过send向服务端发送数据,并通过onmessage事件接收服务端传过来的数据。

适用场景:  适用于客户端和服务端位于不同物理位置的场景,尤其是对于分布式或远程部署的场景,基于 HTTP 和 SSE 的传输方式更合适。


二、MCP开发应用

MCP Server应用平台

主要有以下这些:

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

 
这里面有很多已经开发好的mcp应用,可以拿来直接使用即可。
Cherry Studio客户端推荐的mcp平台为:
 
 
但是大家可以发现一个问题,就是MCP Server应用平台,发布的应用,80%以上,运行方式都是stdio,这种方式适合本地开发。
但是想在dify里面使用,一般都是sse方式,因为适合远程调用。即使是不同命名空间,不同的工作流依然可以调用,只要网络能联通,就没问题。
而且sse方式,只需要运行一次,就可以让成千上万个dify应用调用,还是很方便的。
 

MCP应用开发

以mysql为例子,进入网站,搜索mysql,找到Mysql_mcp_server_pro,链接如下:

github地址如下:

 
这个应用只支持mysql 5.6,由于我的mysql版本是8.0,需要修改对应的源码才行。

mysql表数据

新建表score

CREATE TABLE `score` (  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',  `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '用户名',  `chinese` int DEFAULT '0' COMMENT '语文',  `mathematics` int DEFAULT '0' COMMENT '数学',  `english` int DEFAULT '0' COMMENT '英语',  `total` int DEFAULT '0' COMMENT '总分',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='成绩表';

 

插入数据

INSERT INTO `score` VALUES (1, '张三', 95, 97, 99, 291);INSERT INTO `score` VALUES (2, '李四', 73, 85, 95, 253);INSERT INTO `score` VALUES (3, '王二', 88, 83, 96, 267);INSERT INTO `score` VALUES (4, '周五', 92, 88, 93, 273);INSERT INTO `score` VALUES (5, '马六', 85, 91, 97, 273);

 

python代码

新建空目录Mysql_mcp_server_pro,新建2个文件.evn,server.py
.env文件内容如下:
# MySQL数据库配置MYSQL_HOST=192.168.20.128MYSQL_PORT=3306MYSQL_USER=rootMYSQL_PASSWORD=abcd@1234MYSQL_DATABASE=test

这个是mysql连接信息

 

server.py

import osimport uvicornfrom mcp.server.sse import SseServerTransportfrom mysql.connector import connect, Errorfrom mcp.server import Serverfrom mcp.types import Tool, TextContentfrom starlette.applications import Starlettefrom starlette.routing import Route, Mountfrom dotenv import load_dotenvdef get_db_config():    """从环境变量获取数据库配置信息    返回:        dict: 包含数据库连接所需的配置信息        - host: 数据库主机地址        - port: 数据库端口        - user: 数据库用户名        - password: 数据库密码        - database: 数据库名称    异常:        ValueError: 当必需的配置信息缺失时抛出    """    # 加载.env文件    load_dotenv()    config = {        "host": os.getenv("MYSQL_HOST", "localhost"),        "port": int(os.getenv("MYSQL_PORT", "3306")),        "user": os.getenv("MYSQL_USER"),        "password": os.getenv("MYSQL_PASSWORD"),        "database": os.getenv("MYSQL_DATABASE"),    }    print(config)    if not all([config["user"], config["password"], config["database"]]):        raise ValueError("缺少必需的数据库配置")    return configdef execute_sql(query: str) -> list[TextContent]:    """执行SQL查询语句    参数:        query (str): 要执行的SQL语句,支持多条语句以分号分隔    返回:        list[TextContent]: 包含查询结果的TextContent列表        - 对于SELECT查询:返回CSV格式的结果,包含列名和数据        - 对于SHOW TABLES:返回数据库中的所有表名        - 对于其他查询:返回执行状态和影响行数        - 多条语句的结果以"---"分隔    异常:        Error: 当数据库连接或查询执行失败时抛出    """    config = get_db_config()    try:        with connect(**config) as conn:            with conn.cursor() as cursor:                statements = [stmt.strip() for stmt in query.split(";") if stmt.strip()]                results = []                for statement in statements:                    try:                        cursor.execute(statement)                        # 检查语句是否返回了结果集 (SELECT, SHOW, EXPLAIN, etc.)                        if cursor.description:                            columns = [desc[0] for desc in cursor.description]                            rows = cursor.fetchall()                            # 将每一行的数据转换为字符串,特殊处理None值                            formatted_rows = []                            for row in rows:                                formatted_row = [                                    "NULL" if value is None else str(value)                                    for value in row                                ]                                formatted_rows.append(",".join(formatted_row))                            # 将列名和数据合并为CSV格式                            results.append(                                "n".join([",".join(columns)] + formatted_rows)                            )                        # 如果语句没有返回结果集 (INSERT, UPDATE, DELETE, etc.)                        else:                            conn.commit()  # 只有在非查询语句时才提交                            results.append(f"查询执行成功。影响行数: {cursor.rowcount}")                    except Error as stmt_error:                        # 单条语句执行出错时,记录错误并继续执行                        results.append(                            f"执行语句 '{statement}' 出错: {str(stmt_error)}"                        )                        # 可以在这里选择是否继续执行后续语句,目前是继续                return [TextContent(type="text", text="n---n".join(results))]    except Error as e:        print(f"执行SQL '{query}' 时出错: {e}")        return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]def get_table_name(text: str) -> list[TextContent]:    """根据表的中文注释搜索数据库中的表名    参数:        text (str): 要搜索的表中文注释关键词    返回:        list[TextContent]: 包含查询结果的TextContent列表        - 返回匹配的表名、数据库名和表注释信息        - 结果以CSV格式返回,包含列名和数据    """    config = get_db_config()    sql = "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT "    sql += f"FROM information_schema.TABLES WHERE TABLE_SCHEMA = '{config['database']}' AND TABLE_COMMENT LIKE '%{text}%';"    return execute_sql(sql)def get_table_desc(text: str) -> list[TextContent]:    """获取指定表的字段结构信息    参数:        text (str): 要查询的表名,多个表名以逗号分隔    返回:        list[TextContent]: 包含查询结果的TextContent列表        - 返回表的字段名、字段注释等信息        - 结果按表名和字段顺序排序        - 结果以CSV格式返回,包含列名和数据    """    config = get_db_config()    # 将输入的表名按逗号分割成列表    table_names = [name.strip() for name in text.split(",")]    # 构建IN条件    table_condition = "','".join(table_names)    sql = "SELECT TABLE_NAME, COLUMN_NAME, COLUMN_COMMENT "    sql += (        f"FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{config['database']}' "    )    sql += f"AND TABLE_NAME IN ('{table_condition}') ORDER BY TABLE_NAME, ORDINAL_POSITION;"    return execute_sql(sql)def get_lock_tables() -> list[TextContent]:    sql = """SELECT    p2.`HOST` AS 被阻塞方host,    p2.`USER` AS 被阻塞方用户,    r.trx_id AS 被阻塞方事务id,    r.trx_mysql_thread_id AS 被阻塞方线程号,    TIMESTAMPDIFF(SECOND, r.trx_wait_started, CURRENT_TIMESTAMP) AS 等待时间,    r.trx_query AS 被阻塞的查询,    l.OBJECT_NAME AS 阻塞方锁住的表,    m.LOCK_MODE AS 被阻塞方的锁模式,    m.LOCK_TYPE AS '被阻塞方的锁类型(表锁还是行锁)',    m.INDEX_NAME AS 被阻塞方锁住的索引,    m.OBJECT_SCHEMA AS 被阻塞方锁对象的数据库名,    m.OBJECT_NAME AS 被阻塞方锁对象的表名,    m.LOCK_DATA AS 被阻塞方事务锁定记录的主键值,    p.`HOST` AS 阻塞方主机,    p.`USER` AS 阻塞方用户,    b.trx_id AS 阻塞方事务id,    b.trx_mysql_thread_id AS 阻塞方线程号,    b.trx_query AS 阻塞方查询,    l.LOCK_MODE AS 阻塞方的锁模式,    l.LOCK_TYPE AS '阻塞方的锁类型(表锁还是行锁)',    l.INDEX_NAME AS 阻塞方锁住的索引,    l.OBJECT_SCHEMA AS 阻塞方锁对象的数据库名,    l.OBJECT_NAME AS 阻塞方锁对象的表名,    l.LOCK_DATA AS 阻塞方事务锁定记录的主键值,    IF(p.COMMAND = 'Sleep', CONCAT(p.TIME, ' 秒'), 0) AS 阻塞方事务空闲的时间    FROM performance_schema.data_lock_waits w    INNER JOIN performance_schema.data_locks l ON w.BLOCKING_ENGINE_LOCK_ID = l.ENGINE_LOCK_ID    INNER JOIN performance_schema.data_locks m ON w.REQUESTING_ENGINE_LOCK_ID = m.ENGINE_LOCK_ID    INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID    INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID    INNER JOIN information_schema.PROCESSLIST p ON p.ID = b.trx_mysql_thread_id    INNER JOIN information_schema.PROCESSLIST p2 ON p2.ID = r.trx_mysql_thread_id    ORDER BY 等待时间 DESC;"""    return execute_sql(sql)# 初始化服务器app = Server("operateMysql")@app.list_tools()async def list_tools() -> list[Tool]:    """列出可用的MySQL工具    返回:        list[Tool]: 工具列表,当前仅包含execute_sql工具    """    return [        Tool(            name="execute_sql",            description="在MySQL8.0数据库上执行SQL",            inputSchema={                "type": "object",                "properties": {                    "query": {"type": "string", "description": "要执行的SQL语句"}                },                "required": ["query"],            },        ),        Tool(            name="get_table_name",            description="根据表中文名搜索数据库中对应的表名",            inputSchema={                "type": "object",                "properties": {                    "text": {"type": "string", "description": "要搜索的表中文名"}                },                "required": ["text"],            },        ),        Tool(            name="get_table_desc",            description="根据表名搜索数据库中对应的表结构,支持多表查询",            inputSchema={                "type": "object",                "properties": {                    "text": {"type": "string", "description": "要搜索的表名"}                },                "required": ["text"],            },        ),        Tool(            name="get_lock_tables",            description="获取当前mysql服务器InnoDB 的行级锁",            inputSchema={"type": "object", "properties": {}},        ),    ]@app.call_tool()async def call_tool(name: str, arguments: dict) -> list[TextContent]:    if name == "execute_sql":        query = arguments.get("query")        if not query:            raise ValueError("缺少查询语句")        return execute_sql(query)    elif name == "get_table_name":        text = arguments.get("text")        if not text:            raise ValueError("缺少表信息")        return get_table_name(text)    elif name == "get_table_desc":        text = arguments.get("text")        if not text:            raise ValueError("缺少表信息")        return get_table_desc(text)    elif name == "get_lock_tables":        return get_lock_tables()    raise ValueError(f"未知的工具: {name}")sse = SseServerTransport("/messages/")# Handler for SSE connectionsasync def handle_sse(request):    async with sse.connect_sse(        request.scope, request.receive, request._send    ) as streams:        await app.run(streams[0], streams[1], app.create_initialization_options())# Create Starlette app with routesstarlette_app = Starlette(    debug=True,    routes=[        Route("/sse", endpoint=handle_sse),        Mount("/messages/", app=sse.handle_post_message),    ],)if __name__ == "__main__":    uvicorn.run(starlette_app, host="0.0.0.0", port=9000)

这里面,主要提供了4个工具方法,分别是:

execute_sqlget_table_nameget_table_descget_lock_tables

 

安装python依赖

pip install mcppip install mysql-connector-pythonpip install uvicornpip install python-dotenvpip install starlette

 

启动应用

python server.py

输出:

INFO:     Started server process [23756]INFO:     Waiting for application startup.INFO:     Application startup complete.INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)INFO:     127.0.0.1:60896 - "GET /sse HTTP/1.1" 200 OK

 

访问页面:http://127.***0.0.1:9000/sse

效果如下:

 

三、测试MCP应用

客户端添加MCP

以Cherry Studio客户端为例子,注意:必须是Cherry Studio最新版本,才有MCP设置。
 

 添加MCP服务器

输入名称:mysql_mcp_server_pro

类型:sse

URL:http://127.***0.0.1:9000/sse

 

点击保存

 

保存成功后,就可以看到工具列表了,只有4个

测试MCP应用

 返回主页,点击新建助手,选择模型

 在输入框,找到MCP服务器

 开启MCP

 

先来看mysql的数据表score,内容如下:

 

 查询成绩表score的内容

 查询成绩表的表名是什么

 

查询成绩表的表结构

 

获取当前mysql的锁

 

总结一下,AI模型调用MCP,还是很方便的。

有些时候AI模型做不到的,你可以自己写一个MCP应用。比如上面提到的查询mysql表数据,还有很多呢。

比如查询内部CRM系统,gitlab信息,内部业务系统,处理特定格式excel文件等等,都可以的。

 

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]