FastGPT 并发扛不住?从限流、队列到模型网关的实战改造方案
FastGPT 高并发解决方案|附源码
在企业级 AI 应用落地过程中,FastGPT 这类知识库问答、智能客服、工作流编排平台,往往会很快遇到一个现实问题:单用户体验很好,但一旦并发上来,响应变慢、排队严重、模型接口超时、数据库压力飙升,甚至整个服务不可用。
尤其是在以下场景中,高并发问题会被迅速放大:
- 企业内部几百人同时使用知识库问答;
- 客服系统接入微信公众号、官网、企微、飞书等多个入口;
- 营销活动期间大量用户同时向 AI 助手提问;
- 工作流中同时调用向量检索、数据库查询、插件接口和大模型;
- 多租户 SaaS 平台中,不同客户共享同一套 FastGPT 服务。
本文将围绕 FastGPT 高并发解决方案 展开,结合系统架构、限流、队列、缓存、模型网关、数据库优化、异步任务等方面,给出一套可落地的实践方案,并附上部分核心源码示例,帮助你构建更稳定、更可扩展的 FastGPT 服务。
一、FastGPT 高并发的核心瓶颈
在讨论解决方案之前,需要先明确 FastGPT 在高并发下容易出现瓶颈的位置。通常来说,一个完整的 FastGPT 请求链路大致如下:
用户请求
↓
FastGPT API 服务
↓
权限校验 / 应用配置读取
↓
知识库检索 / 向量数据库查询
↓
Prompt 组装
↓
调用大模型接口
↓
流式返回 / 非流式返回
↓
记录对话日志 / Token 消耗 / 统计数据
这条链路中,每一个环节都可能成为瓶颈。
1. 大模型接口是最主要瓶颈
无论你使用 OpenAI、Azure OpenAI、Claude、通义千问、智谱、DeepSeek,还是本地部署的大模型,大模型推理本身都属于高延迟、高成本资源。
常见问题包括:
- 模型接口有 RPM、TPM、QPS 限制;
- 单次请求耗时较长,通常几秒到几十秒;
- 流式响应会长期占用连接;
- 高并发下容易触发 429、超时、连接中断;
- 多个用户同时请求会造成模型服务排队。
因此,FastGPT 高并发优化的关键不是盲目增加应用服务实例,而是要围绕 模型调用能力 做统一调度。
2. 数据库连接数和慢查询
FastGPT 通常依赖 MongoDB、PostgreSQL、Redis、向量数据库等组件。如果并发请求全部直接打到数据库,容易出现:
- 数据库连接数耗尽;
- 慢查询增多;
- 索引未命中导致 CPU 飙升;
- 聊天记录写入压力过大;
- 知识库检索耗时变长;
- 多租户数据隔离查询变复杂。
数据库不是不能抗并发,而是不能让所有请求都无控制地直接压上去。
3. 向量检索性能不足
FastGPT 的核心能力之一是知识库问答。用户提问后,系统通常会执行:
- 查询向量化;
- 向量数据库相似度检索;
- 元数据过滤;
- TopK 结果重排;
- 拼接上下文。
在高并发下,向量检索可能成为另一个瓶颈,尤其是知识库规模较大、TopK 设置过高、过滤条件复杂、同时使用重排模型时。
4. Node.js 服务连接被长期占用
FastGPT 服务端常见技术栈包含 Node.js。Node.js 适合 I/O 密集型任务,但在高并发流式输出场景中,也会遇到以下问题:
- HTTP 长连接数量过多;
- 单实例内存增长;
- 未关闭的流连接导致资源泄漏;
- SSE 响应被客户端长时间占用;
- 上游模型慢,下游连接也被拖慢。
因此,需要对请求生命周期、连接数量和超时策略进行严格控制。
二、高并发架构设计思路
要让 FastGPT 支撑高并发,不能只靠“加服务器”。更合理的做法是将系统拆成几个关键层次:
客户端
↓
Nginx / API Gateway
↓
FastGPT Web/API 服务集群
↓
限流与鉴权层
↓
任务队列 / 模型网关
↓
Redis 缓存
↓
MongoDB / PostgreSQL / 向量数据库
↓
LLM Provider / 本地模型服务
高并发优化的目标不是让所有请求同时执行,而是做到:
- 入口可控:防止瞬时流量打垮服务;
- 资源隔离:不同租户、不同应用、不同模型分配独立额度;
- 异步削峰:非实时任务进入队列;
- 缓存复用:减少重复查询和重复模型调用;
- 服务水平扩展:API 层、Worker 层、模型网关都能横向扩容;
- 失败可恢复:超时、重试、降级、熔断机制完善。
三、方案一:入口限流,保护系统不被打穿
高并发系统第一步一定是限流。没有限流的系统,本质上是在赌流量不会突然变大。
FastGPT 可以从以下几个维度限流:
- 按 IP 限流;
- 按用户限流;
- 按团队 / 租户限流;
- 按应用限流;
- 按模型限流;
- 按接口路径限流;
- 按 Token 消耗限流。
例如,一个普通用户每分钟最多发起 20 次对话请求,企业客户每分钟最多 500 次请求,管理员账号则配置更高额度。
Redis 滑动窗口限流源码
下面是一个基于 Redis 的滑动窗口限流示例,适合放在 FastGPT API 入口层。
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL!);
interface RateLimitOptions {
key: string;
limit: number;
windowSeconds: number;
}
export async function checkRateLimit(options: RateLimitOptions) {
const { key, limit, windowSeconds } = options;
const now = Date.now();
const windowStart = now - windowSeconds * 1000;
const redisKey = `rate_limit:${key}`;
const pipeline = redis.pipeline();
pipeline.zremrangebyscore(redisKey, 0, windowStart);
pipeline.zadd(redisKey, now, `${now}-${Math.random()}`);
pipeline.zcard(redisKey);
pipeline.expire(redisKey, windowSeconds + 5);
const results = await pipeline.exec();
const current = results?.[2]?.[1] as number;
if (current > limit) {
return {
allowed: false,
current,
limit
};
}
return {
allowed: true,
current,
limit
};
}
在接口中使用:
export async function chatHandler(req, res) {
const userId = req.user.id;
const appId = req.body.appId;
const result = await checkRateLimit({
key: `chat:${userId}:${appId}`,
limit: 30,
windowSeconds: 60
});
if (!result.allowed) {
return res.status(429).json({
message: '请求过于频繁,请稍后再试'
});
}
// 继续执行 FastGPT 对话逻辑
}
限流的意义并不是拒绝用户,而是让系统在流量异常时仍然保持可用。相比所有人都卡死,部分请求被限流是更可控的策略。
四、方案二:引入模型网关,统一调度 LLM 请求
FastGPT 高并发下最容易被打爆的是模型接口。建议不要让业务代码直接调用多个模型厂商,而是引入 LLM Gateway 模型网关。
模型网关负责:
- 统一管理多个模型供应商;
- 实现 API Key 池;
- 按权重分配请求;
- 失败自动切换;
- 根据模型限额做队列排队;
- 统计每个模型的延迟、错误率、Token 消耗;
- 对 429、503、超时等错误做重试和熔断。
模型网关调用示例
type ModelProvider = 'openai' | 'azure' | 'deepseek' | 'qwen';
interface ChatRequest {
model: string;
messages: Array<{
role: 'system' | 'user' | 'assistant';
content: string;
}>;
stream?: boolean;
}
interface ProviderConfig {
provider: ModelProvider;
baseUrl: string;
apiKey: string;
weight: number;
enabled: boolean;
}
const providers: ProviderConfig[] = [
{
provider: 'openai',
baseUrl: process.env.OPENAI_BASE_URL!,
apiKey: process.env.OPENAI_API_KEY!,
weight: 5,
enabled: true
},
{
provider: 'deepseek',
baseUrl: process.env.DEEPSEEK_BASE_URL!,
apiKey: process.env.DEEPSEEK_API_KEY!,
weight: 3,
enabled: true
}
];
function pickProvider() {
const available = providers.filter((item) => item.enabled);
const totalWeight = available.reduce((sum, item) => sum + item.weight, 0);
let random = Math.random() * totalWeight;
for (const provider of available) {
random -= provider.weight;
if (random <= 0) return provider;
}
return available[0];
}
export async function callLLM(request: ChatRequest) {
const provider = pickProvider();
const response = await fetch(`${provider.baseUrl}/chat/completions`, {
method: 'POST',
headers: {
Authorization: `Bearer ${provider.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(request)
});
if (!response.ok) {
throw new Error(`LLM request failed: ${response.status}`);
}
return response;
}
在生产环境中,模型网关还应增加:
- 最大并发数控制;
- 单 Key 请求配额;
- 请求排队;
- 失败重试;
- 熔断恢复;
- 供应商健康检查;
- Token 成本统计。
五、方案三:队列削峰,避免瞬时流量压垮系统
并不是所有 FastGPT 请求都必须同步执行。例如:
- 批量导入知识库;
- 文档切分;
- 向量化;
- 批量总结;
- 离线问答生成;
- 对话日志分析;
- 用户行为统计;
- 长任务工作流。
这些任务应该进入队列,由 Worker 异步消费,而不是在 API 请求中直接执行。
常见队列方案包括:
- BullMQ + Redis;
- RabbitMQ;
- Kafka;
- NATS;
- 云厂商消息队列。
对于 FastGPT 场景,BullMQ 已经可以满足大多数中小规模并发需求。
BullMQ 队列源码
import { Queue, Worker, JobsOptions } from 'bullmq';
import Redis from 'ioredis';
const connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null
});
export const embeddingQueue = new Queue('embedding', {
connection
});
export async function addEmbeddingJob(data: {
datasetId: string;
documentId: string;
chunks: string[];
}) {
const options: JobsOptions = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 3000
},
removeOnComplete: true,
removeOnFail: 1000
};
return embeddingQueue.add('create-embedding', data, options);
}
new Worker(
'embedding',
async (job) => {
const { datasetId, documentId, chunks } = job.data;
for (const chunk of chunks) {
// 1. 调用 embedding 模型
// 2. 写入向量数据库
// 3. 更新文档处理进度
console.log('processing chunk', {
datasetId,
documentId,
size: chunk.length
});
}
},
{
connection,
concurrency: 5
}
);
这里的关键参数是 concurrency。它决定 Worker 同时处理多少个任务。并发不是越高越好,而是要根据模型接口、数据库写入能力和机器资源来调优。
六、方案四:缓存热点数据,减少重复计算
FastGPT 中有大量数据并不需要每次请求都查数据库,例如:
- 应用配置;
- 模型配置;
- 团队权限;
- 用户基础信息;
- 知识库元信息;
- Prompt 模板;
- 插件配置;
- 常见问题答案。
将这些数据缓存到 Redis,可以显著降低数据库压力。
应用配置缓存示例
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL!);
export async function getAppConfig(appId: string) {
const cacheKey = `app_config:${appId}`;
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const appConfig = await db.app.findUnique({
where: { id: appId }
});
if (!appConfig) {
throw new Error('应用不存在');
}
await redis.set(cacheKey, JSON.stringify(appConfig), 'EX', 60);
return appConfig;
}
缓存时间不宜过长。对于应用配置这类数据,通常 30 秒到 5 分钟即可。既能减轻数据库压力,又不会造成配置更新后长期不生效。
如果配置变更较频繁,可以在更新配置时主动删除缓存:
export async function updateAppConfig(appId: string, data: unknown) {
const result = await db.app.update({
where: { id: appId },
data
});
await redis.del(`app_config:${appId}`);
return result;
}
七、方案五:优化向量检索与知识库召回
知识库问答的并发能力,很大程度取决于向量检索性能。优化方向包括:
1. 控制 TopK
很多系统默认 TopK 设置过大,例如一次召回 20 条、50 条甚至更多。实际上,大多数问答场景中,TopK 设置为 3 到 8 已经足够。
TopK 太大会导致:
- 向量数据库查询变慢;
- 上下文拼接过长;
- Prompt Token 增加;
- 模型响应变慢;
- 成本上升。
2. 增加元数据过滤索引
如果知识库按团队、应用、文档、权限过滤,需要确保过滤字段有索引。例如:
teamId
datasetId
collectionId
documentId
status
否则向量检索之后再做大量过滤,会严重影响性能。
3. 避免每次都重排
Rerank 模型可以提升答案质量,但也会增加延迟和成本。建议采用策略化重排:
- 普通问题不重排;
- TopK 较大时才重排;
- 高级应用开启重排;
- 免费用户关闭重排;
- 命中高置信度结果时跳过重排。
4. 对相同问题做短期缓存
对于企业知识库,用户经常会问类似问题,例如“报销流程是什么”“如何申请 VPN”“年假怎么计算”。可以对规范化后的问题做短期缓存。
import crypto from 'crypto';
function normalizeQuestion(question: string) {
return question.trim().replace(/\s+/g, '').toLowerCase();
}
function hashQuestion(question: string) {
return crypto
.createHash('sha256')
.update(normalizeQuestion(question))
.digest('hex');
}
export async function getCachedAnswer(appId: string, question: string) {
const hash = hashQuestion(question);
const key = `answer_cache:${appId}:${hash}`;
const cached = await redis.get(key);
return cached ? JSON.parse(cached) : null;
}
export async function setCachedAnswer(appId: string, question: string, answer: unknown) {
const hash = hashQuestion(question);
const key = `answer_cache:${appId}:${hash}`;
await redis.set(key, JSON.stringify(answer), 'EX', 300);
}
需要注意:如果答案依赖用户权限、实时数据或个性化上下文,就不适合直接缓存完整答案。可以只缓存检索结果或 Prompt 片段。
八、方案六:数据库连接池与索引优化
数据库优化是高并发系统的基础工程。建议重点检查以下内容:
MongoDB 优化建议
如果 FastGPT 使用 MongoDB,需要关注:
- 是否为高频查询字段建立索引;
- 查询是否包含租户隔离字段;
- 是否存在无分页的大集合查询;
- 聊天日志是否无限增长;
- 是否开启慢查询分析;
- 连接池是否合理配置;
- 是否需要按时间归档历史数据。
常见索引示例:
db.chats.createIndex({ userId: 1, updateTime: -1 });
db.chats.createIndex({ teamId: 1, appId: 1, updateTime: -1 });
db.datasets.createIndex({ teamId: 1, updateTime: -1 });
db.apikeys.createIndex({ key: 1 }, { unique: true });
PostgreSQL 优化建议
如果使用 PostgreSQL 存储业务数据或向量数据,需要关注:
EXPLAIN ANALYZE分析慢 SQL;- 为过滤字段建立组合索引;
- 避免在 WHERE 条件中对索引字段使用函数;
- 控制连接池大小;
- 使用只读副本分担读查询;
- 定期 VACUUM 和 ANALYZE;
- 大表按时间或租户分区。
九、方案七:流式响应也要控制并发
很多人以为 SSE 流式响应只是“边生成边返回”,不会消耗太多资源。实际上,流式响应会长期占用:
- 一个客户端连接;
- 一个服务端请求上下文;
- 一个上游模型连接;
- 一部分内存;
- 日志记录与 Token 统计资源。
因此,FastGPT 高并发场景下必须控制流式连接数量。
可以增加一个全局连接计数器:
let activeStreams = 0;
const MAX_ACTIVE_STREAMS = Number(process.env.MAX_ACTIVE_STREAMS || 500);
export function acquireStream() {
if (activeStreams >= MAX_ACTIVE_STREAMS) {
return false;
}
activeStreams += 1;
return true;
}
export function releaseStream() {
activeStreams = Math.max(0, activeStreams - 1);
}
在 SSE 接口中使用:
export async function streamChatHandler(req, res) {
if (!acquireStream()) {
return res.status(503).json({
message: '当前请求较多,请稍后重试'
});
}
try {
req.on('close', () => {
releaseStream();
});
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
// 调用模型并逐步写入响应
} catch (error) {
releaseStream();
throw error;
}
}
生产环境建议使用 Redis 维护分布式连接数,而不是单机变量。否则多实例部署时,每个实例只能看到自己的连接数量。
十、方案八:超时、重试、熔断和降级
高并发系统不能假设所有外部服务都稳定。大模型接口、向量数据库、插件 API、第三方业务系统都有可能失败。
建议为所有外部调用设置:
- 连接超时;
- 请求超时;
- 最大重试次数;
- 指数退避;
- 熔断策略;
- 降级响应。
fetch 超时示例
export async function fetchWithTimeout(
url: string,
options: RequestInit,
timeoutMs = 30000
) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, {
...options,
signal: controller.signal
});
return response;
} finally {
clearTimeout(timer);
}
}
对于大模型调用,不建议无限重试。因为模型请求成本高,而且重试可能进一步放大拥塞。一般建议:
- 网络错误可以重试 1 到 2 次;
- 429 不立即重试,应进入队列或切换供应商;
- 401、403 不重试;
- 400 参数错误不重试;
- 5xx 可短暂退避后重试。
十一、推荐部署架构
一个较为稳妥的 FastGPT 高并发部署架构如下:
Nginx / SLB
↓
FastGPT API x N
↓
Redis Cluster
↓
MongoDB Replica Set
↓
Vector DB Cluster
↓
LLM Gateway x N
↓
OpenAI / Azure / DeepSeek / Qwen / Local LLM
同时配合:
- API 服务无状态化;
- Redis 存储限流、缓存、队列状态;
- MongoDB 使用副本集;
- 向量数据库独立部署;
- 模型网关横向扩容;
- Worker 单独部署;
- 日志系统独立收集;
- Prometheus + Grafana 监控;
- Loki / ELK 做日志检索。
十二、监控指标必须完善
没有监控,就无法判断系统到底卡在哪里。建议至少监控以下指标:
API 层
- QPS;
- P50 / P95 / P99 响应时间;
- 错误率;
- 429 数量;
- 5xx 数量;
- 当前活跃连接数;
- SSE 连接数。
模型层
- 每个模型请求数;
- 每个模型平均延迟;
- 429 次数;
- 超时次数;
- Token 消耗;
- 单 Key 使用量;
- 模型供应商错误率。
数据库层
- 连接数;
- 慢查询;
- CPU / 内存;
- 读写 QPS;
- 索引命中率;
- 磁盘 I/O。
队列层
- 等待任务数;
- 处理中任务数;
- 失败任务数;
- 平均等待时间;
- Worker 并发数;
- 重试次数。
十三、完整落地步骤建议
如果你正在改造现有 FastGPT 系统,可以按以下顺序推进:
-
先加入口限流
防止系统被瞬时流量打穿,这是最优先的保护措施。 -
接入 Redis 缓存
缓存应用配置、用户信息、权限信息和热点问答,降低数据库压力。 -
拆分异步任务
文档处理、向量化、统计分析等任务进入队列,由 Worker 消费。 -
建设模型网关
统一管理模型供应商、API Key、限额、重试、熔断和降级。 -
优化数据库索引
根据真实慢查询补充索引,而不是凭感觉乱加索引。 -
控制流式连接数
对 SSE 请求设置最大并发、超时和异常关闭处理。 -
完善监控告警
用数据判断瓶颈,持续优化系统容量。
十四、总结
FastGPT 高并发优化不是单点技术问题,而是一套完整的系统工程。真正稳定的方案,不是简单地增加服务器,而是围绕请求链路进行分层治理。
核心思路可以概括为:
- 入口限流:先保护系统;
- 缓存热点:减少重复查询;
- 队列削峰:让长任务异步执行;
- 模型网关:统一调度昂贵的大模型资源;
- 数据库优化:用索引、连接池和归档降低压力;
- 流式控制:避免长连接耗尽服务资源;
- 熔断降级:外部服务异常时保证核心功能可用;
- 监控驱动:用真实指标指导扩容和优化。
对于大多数企业来说,FastGPT 的并发瓶颈并不一定出现在 FastGPT 本身,而是出现在模型接口、数据库、向量检索和无控制的长连接上。只要把这些关键环节治理好,FastGPT 完全可以支撑从几十人内部试用,到上千、上万用户规模的生产级 AI 应用。
如果你正在搭建企业知识库、AI 客服、智能助手或多租户 AI 平台,建议从限流、缓存、队列和模型网关四个方向优先改造。它们投入成本相对可控,但对系统稳定性和并发能力的提升最明显。