上一篇 下一篇 分享链接 返回 返回顶部

FastGPT 并发扛不住?从限流、队列到模型网关的实战改造方案

发布人:慈云数据-客服中心 发布时间:11小时前 阅读量:11

FastGPT 高并发解决方案|附源码

在企业级 AI 应用落地过程中,FastGPT 这类知识库问答、智能客服、工作流编排平台,往往会很快遇到一个现实问题:单用户体验很好,但一旦并发上来,响应变慢、排队严重、模型接口超时、数据库压力飙升,甚至整个服务不可用

尤其是在以下场景中,高并发问题会被迅速放大:

  • 企业内部几百人同时使用知识库问答;
  • 客服系统接入微信公众号、官网、企微、飞书等多个入口;
  • 营销活动期间大量用户同时向 AI 助手提问;
  • 工作流中同时调用向量检索、数据库查询、插件接口和大模型;
  • 多租户 SaaS 平台中,不同客户共享同一套 FastGPT 服务。

本文将围绕 FastGPT 高并发解决方案 展开,结合系统架构、限流、队列、缓存、模型网关、数据库优化、异步任务等方面,给出一套可落地的实践方案,并附上部分核心源码示例,帮助你构建更稳定、更可扩展的 FastGPT 服务。


一、FastGPT 高并发的核心瓶颈

在讨论解决方案之前,需要先明确 FastGPT 在高并发下容易出现瓶颈的位置。通常来说,一个完整的 FastGPT 请求链路大致如下:

用户请求
  ↓
FastGPT API 服务
  ↓
权限校验 / 应用配置读取
  ↓
知识库检索 / 向量数据库查询
  ↓
Prompt 组装
  ↓
调用大模型接口
  ↓
流式返回 / 非流式返回
  ↓
记录对话日志 / Token 消耗 / 统计数据

这条链路中,每一个环节都可能成为瓶颈。

1. 大模型接口是最主要瓶颈

无论你使用 OpenAI、Azure OpenAI、Claude、通义千问、智谱、DeepSeek,还是本地部署的大模型,大模型推理本身都属于高延迟、高成本资源。

常见问题包括:

  • 模型接口有 RPM、TPM、QPS 限制;
  • 单次请求耗时较长,通常几秒到几十秒;
  • 流式响应会长期占用连接;
  • 高并发下容易触发 429、超时、连接中断;
  • 多个用户同时请求会造成模型服务排队。

因此,FastGPT 高并发优化的关键不是盲目增加应用服务实例,而是要围绕 模型调用能力 做统一调度。


2. 数据库连接数和慢查询

FastGPT 通常依赖 MongoDB、PostgreSQL、Redis、向量数据库等组件。如果并发请求全部直接打到数据库,容易出现:

  • 数据库连接数耗尽;
  • 慢查询增多;
  • 索引未命中导致 CPU 飙升;
  • 聊天记录写入压力过大;
  • 知识库检索耗时变长;
  • 多租户数据隔离查询变复杂。

数据库不是不能抗并发,而是不能让所有请求都无控制地直接压上去。


3. 向量检索性能不足

FastGPT 的核心能力之一是知识库问答。用户提问后,系统通常会执行:

  1. 查询向量化;
  2. 向量数据库相似度检索;
  3. 元数据过滤;
  4. TopK 结果重排;
  5. 拼接上下文。

在高并发下,向量检索可能成为另一个瓶颈,尤其是知识库规模较大、TopK 设置过高、过滤条件复杂、同时使用重排模型时。


4. Node.js 服务连接被长期占用

FastGPT 服务端常见技术栈包含 Node.js。Node.js 适合 I/O 密集型任务,但在高并发流式输出场景中,也会遇到以下问题:

  • HTTP 长连接数量过多;
  • 单实例内存增长;
  • 未关闭的流连接导致资源泄漏;
  • SSE 响应被客户端长时间占用;
  • 上游模型慢,下游连接也被拖慢。

因此,需要对请求生命周期、连接数量和超时策略进行严格控制。


二、高并发架构设计思路

要让 FastGPT 支撑高并发,不能只靠“加服务器”。更合理的做法是将系统拆成几个关键层次:

客户端
  ↓
Nginx / API Gateway
  ↓
FastGPT Web/API 服务集群
  ↓
限流与鉴权层
  ↓
任务队列 / 模型网关
  ↓
Redis 缓存
  ↓
MongoDB / PostgreSQL / 向量数据库
  ↓
LLM Provider / 本地模型服务

高并发优化的目标不是让所有请求同时执行,而是做到:

  • 入口可控:防止瞬时流量打垮服务;
  • 资源隔离:不同租户、不同应用、不同模型分配独立额度;
  • 异步削峰:非实时任务进入队列;
  • 缓存复用:减少重复查询和重复模型调用;
  • 服务水平扩展:API 层、Worker 层、模型网关都能横向扩容;
  • 失败可恢复:超时、重试、降级、熔断机制完善。

三、方案一:入口限流,保护系统不被打穿

高并发系统第一步一定是限流。没有限流的系统,本质上是在赌流量不会突然变大。

FastGPT 可以从以下几个维度限流:

  • 按 IP 限流;
  • 按用户限流;
  • 按团队 / 租户限流;
  • 按应用限流;
  • 按模型限流;
  • 按接口路径限流;
  • 按 Token 消耗限流。

例如,一个普通用户每分钟最多发起 20 次对话请求,企业客户每分钟最多 500 次请求,管理员账号则配置更高额度。

Redis 滑动窗口限流源码

下面是一个基于 Redis 的滑动窗口限流示例,适合放在 FastGPT API 入口层。

import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL!);

interface RateLimitOptions {
  key: string;
  limit: number;
  windowSeconds: number;
}

export async function checkRateLimit(options: RateLimitOptions) {
  const { key, limit, windowSeconds } = options;
  const now = Date.now();
  const windowStart = now - windowSeconds * 1000;
  const redisKey = `rate_limit:${key}`;

  const pipeline = redis.pipeline();

  pipeline.zremrangebyscore(redisKey, 0, windowStart);
  pipeline.zadd(redisKey, now, `${now}-${Math.random()}`);
  pipeline.zcard(redisKey);
  pipeline.expire(redisKey, windowSeconds + 5);

  const results = await pipeline.exec();
  const current = results?.[2]?.[1] as number;

  if (current > limit) {
    return {
      allowed: false,
      current,
      limit
    };
  }

  return {
    allowed: true,
    current,
    limit
  };
}

在接口中使用:

export async function chatHandler(req, res) {
  const userId = req.user.id;
  const appId = req.body.appId;

  const result = await checkRateLimit({
    key: `chat:${userId}:${appId}`,
    limit: 30,
    windowSeconds: 60
  });

  if (!result.allowed) {
    return res.status(429).json({
      message: '请求过于频繁,请稍后再试'
    });
  }

  // 继续执行 FastGPT 对话逻辑
}

限流的意义并不是拒绝用户,而是让系统在流量异常时仍然保持可用。相比所有人都卡死,部分请求被限流是更可控的策略。


四、方案二:引入模型网关,统一调度 LLM 请求

FastGPT 高并发下最容易被打爆的是模型接口。建议不要让业务代码直接调用多个模型厂商,而是引入 LLM Gateway 模型网关

模型网关负责:

  • 统一管理多个模型供应商;
  • 实现 API Key 池;
  • 按权重分配请求;
  • 失败自动切换;
  • 根据模型限额做队列排队;
  • 统计每个模型的延迟、错误率、Token 消耗;
  • 对 429、503、超时等错误做重试和熔断。

模型网关调用示例

type ModelProvider = 'openai' | 'azure' | 'deepseek' | 'qwen';

interface ChatRequest {
  model: string;
  messages: Array<{
    role: 'system' | 'user' | 'assistant';
    content: string;
  }>;
  stream?: boolean;
}

interface ProviderConfig {
  provider: ModelProvider;
  baseUrl: string;
  apiKey: string;
  weight: number;
  enabled: boolean;
}

const providers: ProviderConfig[] = [
  {
    provider: 'openai',
    baseUrl: process.env.OPENAI_BASE_URL!,
    apiKey: process.env.OPENAI_API_KEY!,
    weight: 5,
    enabled: true
  },
  {
    provider: 'deepseek',
    baseUrl: process.env.DEEPSEEK_BASE_URL!,
    apiKey: process.env.DEEPSEEK_API_KEY!,
    weight: 3,
    enabled: true
  }
];

function pickProvider() {
  const available = providers.filter((item) => item.enabled);
  const totalWeight = available.reduce((sum, item) => sum + item.weight, 0);
  let random = Math.random() * totalWeight;

  for (const provider of available) {
    random -= provider.weight;
    if (random <= 0) return provider;
  }

  return available[0];
}

export async function callLLM(request: ChatRequest) {
  const provider = pickProvider();

  const response = await fetch(`${provider.baseUrl}/chat/completions`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${provider.apiKey}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify(request)
  });

  if (!response.ok) {
    throw new Error(`LLM request failed: ${response.status}`);
  }

  return response;
}

在生产环境中,模型网关还应增加:

  • 最大并发数控制;
  • 单 Key 请求配额;
  • 请求排队;
  • 失败重试;
  • 熔断恢复;
  • 供应商健康检查;
  • Token 成本统计。

五、方案三:队列削峰,避免瞬时流量压垮系统

并不是所有 FastGPT 请求都必须同步执行。例如:

  • 批量导入知识库;
  • 文档切分;
  • 向量化;
  • 批量总结;
  • 离线问答生成;
  • 对话日志分析;
  • 用户行为统计;
  • 长任务工作流。

这些任务应该进入队列,由 Worker 异步消费,而不是在 API 请求中直接执行。

常见队列方案包括:

  • BullMQ + Redis;
  • RabbitMQ;
  • Kafka;
  • NATS;
  • 云厂商消息队列。

对于 FastGPT 场景,BullMQ 已经可以满足大多数中小规模并发需求。

BullMQ 队列源码

import { Queue, Worker, JobsOptions } from 'bullmq';
import Redis from 'ioredis';

const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null
});

export const embeddingQueue = new Queue('embedding', {
  connection
});

export async function addEmbeddingJob(data: {
  datasetId: string;
  documentId: string;
  chunks: string[];
}) {
  const options: JobsOptions = {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 3000
    },
    removeOnComplete: true,
    removeOnFail: 1000
  };

  return embeddingQueue.add('create-embedding', data, options);
}

new Worker(
  'embedding',
  async (job) => {
    const { datasetId, documentId, chunks } = job.data;

    for (const chunk of chunks) {
      // 1. 调用 embedding 模型
      // 2. 写入向量数据库
      // 3. 更新文档处理进度
      console.log('processing chunk', {
        datasetId,
        documentId,
        size: chunk.length
      });
    }
  },
  {
    connection,
    concurrency: 5
  }
);

这里的关键参数是 concurrency。它决定 Worker 同时处理多少个任务。并发不是越高越好,而是要根据模型接口、数据库写入能力和机器资源来调优。


六、方案四:缓存热点数据,减少重复计算

FastGPT 中有大量数据并不需要每次请求都查数据库,例如:

  • 应用配置;
  • 模型配置;
  • 团队权限;
  • 用户基础信息;
  • 知识库元信息;
  • Prompt 模板;
  • 插件配置;
  • 常见问题答案。

将这些数据缓存到 Redis,可以显著降低数据库压力。

应用配置缓存示例

import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL!);

export async function getAppConfig(appId: string) {
  const cacheKey = `app_config:${appId}`;
  const cached = await redis.get(cacheKey);

  if (cached) {
    return JSON.parse(cached);
  }

  const appConfig = await db.app.findUnique({
    where: { id: appId }
  });

  if (!appConfig) {
    throw new Error('应用不存在');
  }

  await redis.set(cacheKey, JSON.stringify(appConfig), 'EX', 60);

  return appConfig;
}

缓存时间不宜过长。对于应用配置这类数据,通常 30 秒到 5 分钟即可。既能减轻数据库压力,又不会造成配置更新后长期不生效。

如果配置变更较频繁,可以在更新配置时主动删除缓存:

export async function updateAppConfig(appId: string, data: unknown) {
  const result = await db.app.update({
    where: { id: appId },
    data
  });

  await redis.del(`app_config:${appId}`);

  return result;
}

七、方案五:优化向量检索与知识库召回

知识库问答的并发能力,很大程度取决于向量检索性能。优化方向包括:

1. 控制 TopK

很多系统默认 TopK 设置过大,例如一次召回 20 条、50 条甚至更多。实际上,大多数问答场景中,TopK 设置为 3 到 8 已经足够。

TopK 太大会导致:

  • 向量数据库查询变慢;
  • 上下文拼接过长;
  • Prompt Token 增加;
  • 模型响应变慢;
  • 成本上升。

2. 增加元数据过滤索引

如果知识库按团队、应用、文档、权限过滤,需要确保过滤字段有索引。例如:

teamId
datasetId
collectionId
documentId
status

否则向量检索之后再做大量过滤,会严重影响性能。

3. 避免每次都重排

Rerank 模型可以提升答案质量,但也会增加延迟和成本。建议采用策略化重排:

  • 普通问题不重排;
  • TopK 较大时才重排;
  • 高级应用开启重排;
  • 免费用户关闭重排;
  • 命中高置信度结果时跳过重排。

4. 对相同问题做短期缓存

对于企业知识库,用户经常会问类似问题,例如“报销流程是什么”“如何申请 VPN”“年假怎么计算”。可以对规范化后的问题做短期缓存。

import crypto from 'crypto';

function normalizeQuestion(question: string) {
  return question.trim().replace(/\s+/g, '').toLowerCase();
}

function hashQuestion(question: string) {
  return crypto
    .createHash('sha256')
    .update(normalizeQuestion(question))
    .digest('hex');
}

export async function getCachedAnswer(appId: string, question: string) {
  const hash = hashQuestion(question);
  const key = `answer_cache:${appId}:${hash}`;
  const cached = await redis.get(key);

  return cached ? JSON.parse(cached) : null;
}

export async function setCachedAnswer(appId: string, question: string, answer: unknown) {
  const hash = hashQuestion(question);
  const key = `answer_cache:${appId}:${hash}`;

  await redis.set(key, JSON.stringify(answer), 'EX', 300);
}

需要注意:如果答案依赖用户权限、实时数据或个性化上下文,就不适合直接缓存完整答案。可以只缓存检索结果或 Prompt 片段。


八、方案六:数据库连接池与索引优化

数据库优化是高并发系统的基础工程。建议重点检查以下内容:

MongoDB 优化建议

如果 FastGPT 使用 MongoDB,需要关注:

  • 是否为高频查询字段建立索引;
  • 查询是否包含租户隔离字段;
  • 是否存在无分页的大集合查询;
  • 聊天日志是否无限增长;
  • 是否开启慢查询分析;
  • 连接池是否合理配置;
  • 是否需要按时间归档历史数据。

常见索引示例:

db.chats.createIndex({ userId: 1, updateTime: -1 });
db.chats.createIndex({ teamId: 1, appId: 1, updateTime: -1 });
db.datasets.createIndex({ teamId: 1, updateTime: -1 });
db.apikeys.createIndex({ key: 1 }, { unique: true });

PostgreSQL 优化建议

如果使用 PostgreSQL 存储业务数据或向量数据,需要关注:

  • EXPLAIN ANALYZE 分析慢 SQL;
  • 为过滤字段建立组合索引;
  • 避免在 WHERE 条件中对索引字段使用函数;
  • 控制连接池大小;
  • 使用只读副本分担读查询;
  • 定期 VACUUM 和 ANALYZE;
  • 大表按时间或租户分区。

九、方案七:流式响应也要控制并发

很多人以为 SSE 流式响应只是“边生成边返回”,不会消耗太多资源。实际上,流式响应会长期占用:

  • 一个客户端连接;
  • 一个服务端请求上下文;
  • 一个上游模型连接;
  • 一部分内存;
  • 日志记录与 Token 统计资源。

因此,FastGPT 高并发场景下必须控制流式连接数量。

可以增加一个全局连接计数器:

let activeStreams = 0;
const MAX_ACTIVE_STREAMS = Number(process.env.MAX_ACTIVE_STREAMS || 500);

export function acquireStream() {
  if (activeStreams >= MAX_ACTIVE_STREAMS) {
    return false;
  }

  activeStreams += 1;
  return true;
}

export function releaseStream() {
  activeStreams = Math.max(0, activeStreams - 1);
}

在 SSE 接口中使用:

export async function streamChatHandler(req, res) {
  if (!acquireStream()) {
    return res.status(503).json({
      message: '当前请求较多,请稍后重试'
    });
  }

  try {
    req.on('close', () => {
      releaseStream();
    });

    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');

    // 调用模型并逐步写入响应
  } catch (error) {
    releaseStream();
    throw error;
  }
}

生产环境建议使用 Redis 维护分布式连接数,而不是单机变量。否则多实例部署时,每个实例只能看到自己的连接数量。


十、方案八:超时、重试、熔断和降级

高并发系统不能假设所有外部服务都稳定。大模型接口、向量数据库、插件 API、第三方业务系统都有可能失败。

建议为所有外部调用设置:

  • 连接超时;
  • 请求超时;
  • 最大重试次数;
  • 指数退避;
  • 熔断策略;
  • 降级响应。

fetch 超时示例

export async function fetchWithTimeout(
  url: string,
  options: RequestInit,
  timeoutMs = 30000
) {
  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), timeoutMs);

  try {
    const response = await fetch(url, {
      ...options,
      signal: controller.signal
    });

    return response;
  } finally {
    clearTimeout(timer);
  }
}

对于大模型调用,不建议无限重试。因为模型请求成本高,而且重试可能进一步放大拥塞。一般建议:

  • 网络错误可以重试 1 到 2 次;
  • 429 不立即重试,应进入队列或切换供应商;
  • 401、403 不重试;
  • 400 参数错误不重试;
  • 5xx 可短暂退避后重试。

十一、推荐部署架构

一个较为稳妥的 FastGPT 高并发部署架构如下:

Nginx / SLB
  ↓
FastGPT API x N
  ↓
Redis Cluster
  ↓
MongoDB Replica Set
  ↓
Vector DB Cluster
  ↓
LLM Gateway x N
  ↓
OpenAI / Azure / DeepSeek / Qwen / Local LLM

同时配合:

  • API 服务无状态化;
  • Redis 存储限流、缓存、队列状态;
  • MongoDB 使用副本集;
  • 向量数据库独立部署;
  • 模型网关横向扩容;
  • Worker 单独部署;
  • 日志系统独立收集;
  • Prometheus + Grafana 监控;
  • Loki / ELK 做日志检索。

十二、监控指标必须完善

没有监控,就无法判断系统到底卡在哪里。建议至少监控以下指标:

API 层

  • QPS;
  • P50 / P95 / P99 响应时间;
  • 错误率;
  • 429 数量;
  • 5xx 数量;
  • 当前活跃连接数;
  • SSE 连接数。

模型层

  • 每个模型请求数;
  • 每个模型平均延迟;
  • 429 次数;
  • 超时次数;
  • Token 消耗;
  • 单 Key 使用量;
  • 模型供应商错误率。

数据库层

  • 连接数;
  • 慢查询;
  • CPU / 内存;
  • 读写 QPS;
  • 索引命中率;
  • 磁盘 I/O。

队列层

  • 等待任务数;
  • 处理中任务数;
  • 失败任务数;
  • 平均等待时间;
  • Worker 并发数;
  • 重试次数。

十三、完整落地步骤建议

如果你正在改造现有 FastGPT 系统,可以按以下顺序推进:

  1. 先加入口限流
    防止系统被瞬时流量打穿,这是最优先的保护措施。

  2. 接入 Redis 缓存
    缓存应用配置、用户信息、权限信息和热点问答,降低数据库压力。

  3. 拆分异步任务
    文档处理、向量化、统计分析等任务进入队列,由 Worker 消费。

  4. 建设模型网关
    统一管理模型供应商、API Key、限额、重试、熔断和降级。

  5. 优化数据库索引
    根据真实慢查询补充索引,而不是凭感觉乱加索引。

  6. 控制流式连接数
    对 SSE 请求设置最大并发、超时和异常关闭处理。

  7. 完善监控告警
    用数据判断瓶颈,持续优化系统容量。


十四、总结

FastGPT 高并发优化不是单点技术问题,而是一套完整的系统工程。真正稳定的方案,不是简单地增加服务器,而是围绕请求链路进行分层治理。

核心思路可以概括为:

  • 入口限流:先保护系统;
  • 缓存热点:减少重复查询;
  • 队列削峰:让长任务异步执行;
  • 模型网关:统一调度昂贵的大模型资源;
  • 数据库优化:用索引、连接池和归档降低压力;
  • 流式控制:避免长连接耗尽服务资源;
  • 熔断降级:外部服务异常时保证核心功能可用;
  • 监控驱动:用真实指标指导扩容和优化。

对于大多数企业来说,FastGPT 的并发瓶颈并不一定出现在 FastGPT 本身,而是出现在模型接口、数据库、向量检索和无控制的长连接上。只要把这些关键环节治理好,FastGPT 完全可以支撑从几十人内部试用,到上千、上万用户规模的生产级 AI 应用。

如果你正在搭建企业知识库、AI 客服、智能助手或多租户 AI 平台,建议从限流、缓存、队列和模型网关四个方向优先改造。它们投入成本相对可控,但对系统稳定性和并发能力的提升最明显。

目录结构
全文