Skip to content

项目概述

这是一个基于Node.js和Express框架开发的AI对话应用,通过OpenAI SDK调用阿里云的通义千问模型(qwen-plus),实现了完整的对话系统。当前该项目交互不是很好,主要是学习下AI前后端流式交互模式。

技术栈

  • 前端:Vue3 + vue-element-plus-x + hook-fetch
  • 后端:express + knex + mysql2 + openai

核心功能

1. AI对话功能

  • 实现了基于SSE(Server-Sent Events)的流式输出,提升用户体验
  • 每日对话次数限制(通过环境变量配置)
  • 自动保存对话记录到数据库

2. 会话管理

  • 创建新会话,生成唯一会话ID
  • 获取会话列表
  • 支持会话复用机制(空标题会话复用)

3. 消息管理

  • 获取指定会话的消息列表
  • 保存用户和AI的对话记录
  • 按时间顺序排序消息

核心代码(主要是后端代码)

javascript
require('dotenv').config();
const express = require('express');
const router = express.Router();
const knex = require('../db'); // 导入 Knex 实例
const { v4: uuidv4 } = require('uuid');
const { OpenAI } = require('openai');

// 创建OpenAI客户端(配置为使用DashScope的Qwen模型)
const openai = new OpenAI({
  apiKey: process.env.DASHSCOPE_API_KEY, //API密钥
  baseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",
});

/**
 * @description: 对话接口
 * @param {*} message 用户输入的消息内容
 * @param {*} sessionId 会话ID
 */
router.post("/api/chat", async (req, res) => {
  const { message, sessionId } = req.body;
  if (!message || !sessionId) {
    return res.status(200).json({ error: "缺少消息内容或会话ID" });
  }

  // 设置响应头,支持SSE流式输出
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  //保存用户对话记录到数据库
  await knex("t_messages").insert({
    message_id: uuidv4(),  //信息id
    conversation_id: sessionId, // 会话id
    role: "user", // 角色  user/ai
    content: message, // 用户输入的消息内容
    created_at: knex.fn.now(),
  });

  // 系统提示词,引导模型展示思考过程
  const systemPrompt = `你是一个AI助手,在回答用户问题之前,请先展示你的思考过程。
          
思考过程应包含以下步骤:
1. 思考步骤1:分析问题
2. 思考步骤2:确定解决方法
3. 思考步骤3:执行计划
4. 思考步骤4:验证结果

最后,用"最终答案:"引出你的答案。`;

  // openai 调用模型,获取流式响应
  const stream = await openai.chat.completions.create({
    model: "qwen-plus",
    messages: [
      {
        role: "system",
        content: systemPrompt,
      },
      { role: "user", content: message },
    ],
    temperature: 0.7,
    max_tokens: 1000,
    stream: true, // 启用流式输出
  });

  // 处理流式响应
  let fullResponse = ""; //记录完整响应内容
  let isResponseFinished = false; //标记响应是否已完成
  let isResponseClosed = false; //标记连接是否已关闭

  // 监听响应完成事件
  res.on("finish", () => {
    isResponseFinished = true;
    console.log("响应已正常完成");
  });
  // 监听前端连接关闭事件
  res.on("close", () => {
    isResponseClosed = true;
    console.log("连接已关闭");
    // // 只有当响应未正常完成时,才存储部分数据
    if (!isResponseFinished && fullResponse.length > 0) {
      // 异步存储部分数据,使用IIFE避免阻塞
      try {
        (async () => {
          await knex("t_messages").insert({
            message_id: uuidv4(),
            conversation_id: sessionId,
            role: "ai",
            content: fullResponse,
            created_at: knex.fn.now(),
          });
        })();
      } catch (dbError) {
        console.error("存储部分响应失败:", dbError);
      }
    }
  });

  // 处理流式响应,将每个chunk的内容发送给前端
  try {
    for await (const chunk of stream) {
      // 检查连接是否已关闭,如果已关闭则中断循环
      if (isResponseFinished || isResponseClosed) {
        break;
      }
      const content = chunk.choices[0]?.delta?.content || "";
      if (content) {
        fullResponse += content;
        // 发送SSE事件
        res.write(`data: ${JSON.stringify({ content, done: false })}\n\n`);
      }
    }
  } catch (error) {
    console.error("流式处理异常:", error);
  }

  // 解析思考过程和最终答案
  const thinkingMatch = fullResponse.match(
    /思考步骤\d+:(.*?)(?=思考步骤\d+|$)/gs,
  );
  const finalAnswerMatch = fullResponse.match(/最终答案:(.*)/s);

  const thinkingProcess = thinkingMatch
    ? thinkingMatch.map((step) => step.trim())
    : [];
  const finalAnswer = finalAnswerMatch
    ? finalAnswerMatch[1].trim()
    : fullResponse;

  // 发送完成事件,包含完整结构化数据,并修改done状态为true
  res.write(
    `data: ${JSON.stringify({
      content: fullResponse,
      done: true,
      reply: finalAnswer,
      thinkingProcess: thinkingProcess,
    })}\n\n`,
  );

  // 如果连接未关闭,说明流式响应正常完成
  if (!isResponseFinished && !isResponseClosed) {
    await knex("t_messages").insert({
      message_id: uuidv4(),
      conversation_id: sessionId,
      role: "ai",
      content: fullResponse,
      created_at: knex.fn.now(),
    });
  }

  res.end();
});

问题思考

1.systemPrompt 参数的作用?

我的理解是它是用来规范AI输出的,确保AI的回答符合我们的预期.

2.设置响应头的作用?

设置响应头的作用是告诉浏览器,服务器返回的是SSE流,浏览器需要按照SSE的格式解析.以及流式输出的作用是实时返回对话内容,提升用户体验.

JavaScript
//SSE事件流格式,告诉浏览器这是一个需要持续接收的事件流,而不是普通的一次性HTTP响应
res.setHeader("Content-Type", "text/event-stream");
// 告诉浏览器禁止缓存,保证是最新的数据内容
res.setHeader("Cache-Control", "no-cache");
//保持HTTP请求连接持久化,不是响应后立即关闭,维持SSE的基础
res.setHeader("Connection", "keep-alive");

3.响应时间的完成和关闭事件处理是为了什么?

finish事件: isResponseFinished 标记响应是否已完成,用户后续ai对话内容防止响应关闭后的数据的重复存储

close事件: isResponseClosed 标记连接是否已关闭,响应关闭后用来存储已经获取到的AI响应内容存储到数据库

后续优化

前端:

  • 增加会话名称功能,用户可以自定义会话名称,方便识别不同的对话.
  • 增加会话删除功能,用户可以在前端删除不需要的会话.

后端:

  • 目前是使用 OpenAI 插件进行AI交互,后续学习使用langchain,langgraph等框架,实现更复杂的对话功能.

设计:

  • 目前对话的是没有区分用户的,所以交互上可能会出现使用同个会话导致数据混乱的问题,后续再考虑增加用户认证功能,区分不同用户的对话记录.

上次更新于: