您的位置: 首页> 大数据> Django环境下使用wsgi启动MCP服务

Django环境下使用wsgi启动MCP服务

时间:2025-09-03 18:30:02 来源:互联网

Django环境下使用MCP

MCP(模型上下文协议)

Model Context Protocol(MCP)是一种由 Anthropic 开源的协议,旨在将大型语言模型直接连接至数据源,实现无缝集成。

function call 和 mcp

function call需要根据函数名称来判断,自行执行函数,而mcp可以直接call_tool来调用函数,最终再把结果返回给模型识别,本质上并没有什么区别,mcp更加规范便捷。

function call

if function_name == "get_current_weather":
    weather_result = get_current_weather(
       location=function_args["location"],
       unit=function_args.get("unit", "metric")
    )

mcp

        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            # 如何是需要使用工具,就解析工具
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)

            # 执行工具
            result = await self.session.call_tool(tool_name, tool_args)

准备工作

由于mcp 依赖python3.10+,而python3.10+又依赖openssl 1.1.1+,所以老项目下去使用mcp就有一定的门槛。最简单的方式就是使用新版本的os,安装新版本的python,debian11的openssl就符合要求。

升级python如何解决依赖

首先备份一下当前环境的包

pip freeze > requirements.txt

python3.10+的新环境下安装,对安装失败的包我们去掉就行,我们还是可以通过virtualenv来管理虚拟环境

python3 -m venv DjangoEnv
ip3 install -r requirements.txt

如果使用k8s的话,基础镜像需要更新,可以参考以下Dockerfile

FROM python:3.12.11-bookworm

RUN mkdir -p /var/log/supervisord/ /app/logs
RUN ln -snf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone
RUN echo 'deb https://mirrors.huaweicloud.com/debian/ bookworm main contrib non-free' > /etc/apt/sources.list
RUN echo 'deb https://mirrors.huaweicloud.com/debian/ bookworm-updates main contrib non-free' >> /etc/apt/sources.list
RUN echo 'deb https://mirrors.huaweicloud.com/debian/ bookworm-backports main contrib non-free' >> /etc/apt/sources.list
RUN rm -rf /etc/apt/sources.list.d/debian.sources
RUN apt-get update && apt-get install -y supervisor vim iproute2 && rm -rf /var/lib/apt/lists/*
CMD cat

启动mcp client

我们使用wsgi来启动socket服务,project/routing.py

from django.urls import re_path
from chat.consumers import ChatConsumer

websocket_urlpatterns = [
    re_path(r"chat", ChatConsumer.as_asgi()),
]

mcp client:consumers.py,支持stdio_server和sse_server

import json
import logging
import os
from channels.generic.websocket import AsyncWebsocketConsumer
from django.http.request import QueryDict
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client
from openai import AsyncOpenAI
from jinja2 import Template
from django.conf import settings

CONFIG = settings.CONFIG
logger_webconsol = logging.getLogger("webconsol")


class MCPClient:
    def __init__(self):
        """
        初始化 MCP 客户端,连接 OpenAI 接口。
        """
        self.openai_api_key = CONFIG.get("chat", "OPENAI_API_KEY")
        self.base_url = CONFIG.get("chat", "BASE_URL")
        self.model_name = CONFIG.get("chat", "MODEL")
        # 初始化 OpenAI 异步客户端
        self.client = AsyncOpenAI(base_url=self.base_url, api_key=self.openai_api_key)

        # 工具映射:prefixed_name -> (session, tool)
        self.tool_mapping = {}
        self.available_tools = []
        self.ai_role = "你是一个智能助手,专门协助用户使用各种工具完成任务。你应该理解用户的需求,选择合适的工具,并解释你的操作过程。如果遇到不明确的情况,要主动询问用户以澄清需求。"

        # stdio_client
        self.exit_stacks = []
        # sse_client
        self.server_urls = []

        # 存储每个服务器的会话及其上下文:server_id -> (session, session_context, streams_context)
        self.sessions = {}
        self.messages = [{"role": "system", "content": self.ai_role}]

    def update_ai_role(
        self, additional_instructions: list = []
    ):
        with open("templates/AiRole.jinja2") as f:
            template = Template(f.read())
            self.ai_role = template.render(
                {
                    "available_tools": self.available_tools,
                    "additional_instructions": additional_instructions,
                }
            )

    def update_message(self, messages: list):
        """
        维护消息列表,保证第一条消息为system role定义
        """
        # 确保第一条消息始终是系统角色消息
        if not self.messages or self.messages[0]["role"] != "system":
            self.messages = [{"role": "system", "content": self.ai_role}]
        # 添加新消息
        self.messages.extend(messages)
        # 如果消息数量超过11(包括系统消息),则删除最旧的非系统消息
        if len(self.messages) > 11:
            self.messages = [self.messages[0]] + self.messages[-10:]

    async def connect_to_stdio_server(self, servers_path: list[str]):
        """连接到 MCP 服务器并列出可用工具"""
        for server_script_path in servers_path:
            try:
                if (
                    server_script_path.endswith(".py")
                    and "__init__.py" not in server_script_path
                ):
                    server_params = StdioServerParameters(
                        command="python3", args=[server_script_path], env=None
                    )
                elif server_script_path.endswith(".sh"):
                    server_params = StdioServerParameters(
                        command="sh", args=[server_script_path], env=None
                    )
                else:
                    logger_webconsol.warning(
                        f"不支持的服务器脚本:{server_script_path}"
                    )

                # 启动 MCP 服务器并建立通信
                exit_stack = AsyncExitStack()
                stdio_transport = await exit_stack.enter_async_context(
                    stdio_client(server_params)
                )
                stdio, write = stdio_transport
                stdioSession = await exit_stack.enter_async_context(
                    ClientSession(stdio, write)
                )
                await stdioSession.initialize()
                self.exit_stacks.append(exit_stack)

                # 列出 MCP 服务器上的工具
                response = await stdioSession.list_tools()
                for tool in response.tools:
                    self.tool_mapping[f"stdio_{tool.name}"] = (stdioSession, tool)
                    self.available_tools.append(
                        {
                            "type": "function",
                            "function": {
                                "name": f"stdio_{tool.name}",
                                "description": tool.description,
                                "parameters": tool.inputSchema,
                            },
                        }
                    )
                logger_webconsol.info(
                    f"已连接到 {server_script_path},工具列表:{[tool.name for tool in response.tools]}"
                )
            except Exception as e:
                logger_webconsol.exception(
                    f"连接到 {server_script_path} 时发生错误: {str(e)}"
                )

    async def connect_to_sse_server(self):
        """
        初始化与所有 SSE 服务器的连接,并获取可用工具列表。
        """
        if not self.server_urls:
            return
        for server in self.server_urls:
            try:
                server_name = server["server_name"]
                # 创建 SSE 客户端并进入上下文
                streams_context = sse_client(url=server["server_url"])
                streams = await streams_context.__aenter__()
                session_context = ClientSession(*streams)
                session = await session_context.__aenter__()
                await session.initialize()

                # 存储会话及其上下文
                self.sessions[server_name] = (session, session_context, streams_context)

                # 获取工具列表并建立映射
                response = await session.list_tools()
                for tool in response.tools:
                    prefixed_name = (
                        f"{server_name}_{tool.name}"  # 为工具名添加服务器前缀
                    )
                    self.tool_mapping[prefixed_name] = (session, tool)
                    self.available_tools.append(
                        {
                            "type": "function",
                            "function": {
                                "name": prefixed_name,
                                "description": tool.description,
                                "parameters": tool.inputSchema,
                            },
                        }
                    )
                logger_webconsol.info(
                    f"已连接到 {server_name},工具列表:{[tool.name for tool in response.tools]}"
                )
            except Exception as e:
                logger_webconsol.exception(f"连接到 {server} 时发生错误: {str(e)}")

    async def cleanup(self):
        """
        清理所有会话和连接资源,确保无资源泄漏。
        """
        if self.exit_stacks:
            for exit_stack in self.exit_stacks:
                await exit_stack.aclose()

        if self.sessions:
            for server_id, (
                session,
                session_context,
                streams_context,
            ) in self.sessions.items():
                await session_context.__aexit__(None, None, None)  # 退出会话上下文
                await streams_context.__aexit__(None, None, None)  # 退出 SSE 流上下文

    async def process_query(self, query: str) -> str:
        """
        处理用户的自然语言查询,通过工具调用完成任务并返回结果。

        :param query: 用户输入的查询字符串。
        :return: 处理后的回复文本。
        """
        self.update_message([{"role": "user", "content": query}])
        logger_webconsol.info(self.messages)

        # 处理工具调用
        while True:
            # 向模型发送初始请求
            response = await self.client.chat.completions.create(
                model=self.model_name,
                messages=self.messages,
                tools=self.available_tools,
            )
            message = response.choices[0].message
            logger_webconsol.info(message)
            if response.choices[0].finish_reason != "tool_calls":  # stop
                self.update_message(
                    [{"role": message.role, "content": message.content}]
                )
                return message.content

            # 遍历所调用的tools
            for tool_call in message.tool_calls:
                prefixed_name = tool_call.function.name
                if prefixed_name in self.tool_mapping:
                    session, original_tool = self.tool_mapping[prefixed_name]
                    tool_args = json.loads(tool_call.function.arguments)
                    try:
                        result = await session.call_tool(original_tool.name, tool_args)
                        logger_webconsol.info(result)
                    except Exception as e:
                        result = {
                            "content": f"调用工具 {original_tool.name} 出错:{str(e)}"
                        }

                    self.update_message(
                        [
                            {
                                "role": "assistant",
                                "tool_calls": [
                                    {
                                        "id": tool_call.id,
                                        "type": "function",
                                        "function": {
                                            "name": prefixed_name,
                                            "arguments": json.dumps(tool_args),
                                        },
                                    }
                                ],
                            },
                            {
                                "role": "tool",
                                "tool_call_id": tool_call.id,
                                "content": str(result.content),
                            },
                        ]
                    )
                else:
                    logger_webconsol.warning(f"工具 {prefixed_name} 未找到")


class ChatConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.mcp_client = None
        self.aiactive = False

    async def connect(self):
        await self.accept()
        await self.init_mcp_client()

    async def init_mcp_client(self):
        try:
            stdio_servers = []
            current_dir = os.path.dirname(os.path.abspath(__file__))
            servers_dir = os.path.join(current_dir, "mcpservers")
            for file in os.listdir(servers_dir):
                if file.endswith(".py") and "__init__.py" not in file:
                    stdio_servers.append(os.path.join(servers_dir, file))
            logger_webconsol.info(f"stdio servers: {stdio_servers}")

            self.mcp_client = MCPClient()
            await self.mcp_client.connect_to_stdio_server(stdio_servers)
            await self.mcp_client.connect_to_sse_server()
            self.aiactive = True
        except Exception as e:
            self.mcp_client = None
            self.aiactive = False
            logger_webconsol.warning(f"Error connecting to MCP server: {e}")

    async def disconnect(self, message=None):
        try:
            await self.mcp_client.cleanup()
        except:
            pass
        await self.close()

    async def receive(self, text_data=None, bytes_data=None):
        message_dict = json.loads(text_data)
        query = message_dict.get("message", "")
        if self.aiactive:
            try:
                response = await self.mcp_client.process_query(query)
                await self.send(text_data=json.dumps({"message": response}))
            except Exception as e:
                await self.send(
                    text_data=json.dumps(
                        {"message": f"{query} Sorry, ai is not active.."}
                    )
                )
        else:
            await self.send(
                text_data=json.dumps({"message": f"{query} Sorry, ai is not active.."})
            )

启动mcp stdio_server

mcp server就是普通的异步函数,由于django的orm操作均为同步操作,我们需要使用sync_to_async转换。(stdio_server由client去主动拉起,而非自己手工启动)

from asgiref.sync import sync_to_async

@mcp.tool()
async def query_host_info():
    async_query = sync_to_async(sync_query_host_info) # 我们把具体方法写入sync_query_host_info
    return await async_query(
        hostname, assetsSn, product, location_place, server_status, q_type
    )
上一篇:手搓责任链框架 2:核心接口设计 下一篇:没有了

相关文章

相关应用

最近更新