AI 工具扛不住并发?这套队列 + Worker 架构直接解决痛点
AI工具 高并发解决方案|附源码
在 AI 应用快速落地的过程中,很多团队都会遇到同一个问题:模型能力已经具备,但系统扛不住高并发。
尤其是接入大语言模型、文生图、语音识别、向量检索等 AI 能力后,单次请求耗时往往比普通业务接口更长。如果不做架构设计,用户量稍微上来,就容易出现:
- 接口响应变慢;
- 请求大量超时;
- 服务 CPU、内存飙升;
- 模型 API 被限流;
- 队列堆积不可控;
- 成本快速上涨;
- 用户体验严重下降。
本文将围绕 AI 工具类应用的高并发解决方案 展开,结合常见业务场景,介绍一套较为通用的架构设计思路,并附上可运行的简化源码,帮助你快速搭建一个支持高并发请求处理的 AI 服务框架。
一、AI 工具为什么更容易遇到高并发问题?
传统业务接口通常是数据库查询、缓存读取、简单计算,接口耗时可能在几十毫秒到几百毫秒之间。
但 AI 工具不同。
例如:
| AI 场景 | 单次请求耗时 |
|---|---|
| AI 写作 | 3 秒 - 30 秒 |
| AI 总结文档 | 5 秒 - 60 秒 |
| AI 文生图 | 10 秒 - 120 秒 |
| AI 数字人生成 | 数分钟 |
| AI 语音转文字 | 几秒到几十秒 |
| 向量知识库问答 | 1 秒 - 10 秒 |
当请求耗时变长后,即使 QPS 不算特别高,系统并发数也会迅速增加。
举个例子:
如果一个接口平均响应时间是 10 秒,每秒有 100 个请求进来,那么系统同时需要处理的请求数量大约是:
并发数 = QPS × 平均耗时
并发数 = 100 × 10 = 1000
这意味着系统需要同时维护 1000 个未完成任务。
如果这些请求全部直接打到模型服务或第三方 AI API,很容易出现:
- 上游服务被打爆;
- API 触发限流;
- 本地连接数耗尽;
- 请求堆积导致雪崩;
- 用户端长时间等待。
所以,AI 工具高并发的核心不是单纯“加机器”,而是要设计一套完整的 削峰、限流、异步、缓存、降级、监控 体系。
二、高并发 AI 系统的核心设计目标
一个可靠的 AI 高并发系统,通常需要满足以下目标:
1. 接口不能被慢任务拖死
AI 请求可能很慢,如果所有请求都同步等待模型返回,那么 Web 服务线程或协程会被大量占用,最终导致整个服务无法响应。
正确做法是:
- 短任务可以同步返回;
- 长任务改为异步任务;
- 客户端通过任务 ID 查询状态;
- 或使用 WebSocket / SSE 推送结果。
2. 请求量过大时必须可控
高并发并不可怕,可怕的是无限制接收请求。
系统需要具备:
- 全局限流;
- 用户级限流;
- IP 级限流;
- 模型级限流;
- 队列长度限制;
- 超时控制;
- 熔断降级。
3. 模型调用要有并发保护
无论是本地模型,还是第三方大模型 API,都不应该被无限并发调用。
例如某个大模型 API 只允许每分钟 300 次请求,那么本地服务就必须在调用前做控制。
常见手段包括:
- 令牌桶限流;
- 信号量控制并发;
- 请求队列;
- 分布式锁;
- Redis 计数器;
- API Key 池调度。
4. 结果尽量复用
很多 AI 工具类应用具有明显的重复请求特征。
例如:
- 同一篇文章摘要;
- 同一个 prompt 生成文案;
- 同一个 URL 解析;
- 同一份文件向量化;
- 同一个问题命中知识库。
这些请求可以通过缓存减少模型调用。
缓存可以分为:
- 本地内存缓存;
- Redis 缓存;
- 文件缓存;
- 向量检索缓存;
- Prompt Hash 缓存。
5. 系统必须具备可观测性
AI 服务出问题时,排查难度通常高于普通接口。
你需要知道:
- 当前请求数;
- 队列长度;
- 平均响应时间;
- 模型调用耗时;
- 错误率;
- 超时率;
- Token 消耗;
- 用户调用次数;
- 第三方 API 状态。
如果没有监控,一旦系统变慢,你很难判断到底是数据库慢、模型慢、网络慢,还是队列堆积。
三、推荐架构方案
下面是一套比较通用的 AI 工具高并发架构:
用户请求
↓
Nginx / API Gateway
↓
限流层 / 鉴权层
↓
Web 服务层 FastAPI / Spring Boot / Go
↓
任务队列 Redis / RabbitMQ / Kafka
↓
Worker 工作节点
↓
AI 模型服务 / 第三方 API
↓
结果缓存 Redis / MySQL / 对象存储
↓
用户查询结果 / SSE 推送
四、关键模块设计
1. 网关层限流
网关层是第一道防线。
常见方案:
- Nginx
limit_req; - Kong / APISIX 限流插件;
- Spring Cloud Gateway;
- 云厂商 API 网关;
- 自研 Redis 限流中间件。
例如 Nginx 可以做基础限流:
http {
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
server {
location /api/ {
limit_req zone=api_limit burst=20 nodelay;
proxy_pass http://backend;
}
}
}
这个配置表示:每个 IP 平均每秒允许 10 个请求,突发最多 20 个。
2. Web 层快速响应
对于耗时较长的 AI 任务,不建议 HTTP 请求一直阻塞等待。
推荐方式:
POST /tasks 创建任务,立即返回 task_id
GET /tasks/{id} 查询任务状态和结果
创建任务接口只负责:
- 参数校验;
- 鉴权;
- 限流;
- 生成任务 ID;
- 写入队列;
- 返回任务 ID。
真正的 AI 调用由后台 Worker 执行。
3. 队列削峰
队列是高并发系统中非常重要的削峰组件。
它的作用包括:
- 缓冲瞬时流量;
- 控制处理速度;
- 防止模型服务被打爆;
- 提高系统稳定性;
- 支持失败重试。
常用队列:
| 队列 | 适用场景 |
|---|---|
| Redis List / Stream | 中小型任务队列 |
| RabbitMQ | 可靠消息、延迟任务 |
| Kafka | 超高吞吐日志流 |
| Celery | Python 异步任务 |
| BullMQ | Node.js 任务队列 |
对于大部分 AI 工具类产品,初期使用 Redis 队列已经足够。
4. Worker 并发控制
Worker 是真正调用模型的地方。
这里一定要控制并发,否则队列里的任务会被瞬间拉出来全部执行,模型服务依然会被压垮。
常见控制方式:
semaphore = asyncio.Semaphore(10)
表示同一时刻最多只有 10 个模型调用在执行。
如果是多机器部署,则需要配合 Redis 做分布式限流。
5. 缓存结果
缓存可以显著降低成本。
例如:
cache_key = sha256(model + prompt + params)
只要用户输入相同,就可以直接返回之前生成的结果。
缓存策略建议:
- 短文本生成:缓存 1 小时到 1 天;
- 文档摘要:缓存 7 天或更久;
- 文件解析:永久缓存;
- 向量化结果:长期缓存;
- 用户私有数据:注意隔离用户 ID。
6. 超时与重试
AI 模型调用不应无限等待。
推荐设置:
- 单次模型调用超时:30 秒;
- 总任务超时:60 秒;
- 失败重试:1 - 3 次;
- 重试间隔:指数退避;
- 对不可恢复错误不重试。
例如:
第一次失败:1 秒后重试
第二次失败:3 秒后重试
第三次失败:10 秒后重试
7. 降级策略
当系统压力过高时,需要主动降级,而不是等系统崩溃。
可选降级策略:
- 暂停低优先级任务;
- 免费用户排队,付费用户优先;
- 关闭复杂模型,切换轻量模型;
- 降低最大输入长度;
- 返回“系统繁忙,请稍后重试”;
- 对重复请求直接返回缓存;
- 限制单用户同时任务数。
五、完整简化源码:FastAPI + Redis + 异步 Worker
下面提供一个简化版本的高并发 AI 任务处理服务。
功能包括:
- 创建 AI 任务;
- Redis 队列削峰;
- Worker 异步消费;
- 并发数控制;
- 任务状态查询;
- 结果缓存;
- 失败重试。
1. 安装依赖
pip install fastapi uvicorn redis httpx pydantic
需要本地启动 Redis:
docker run -d --name redis-ai -p 6379:6379 redis:7
2. 项目结构
ai-high-concurrency-demo/
├── main.py
├── worker.py
├── ai_client.py
├── config.py
└── requirements.txt
3. config.py
# config.py
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
TASK_QUEUE_KEY = "ai:task:queue"
TASK_RESULT_PREFIX = "ai:task:result:"
TASK_STATUS_PREFIX = "ai:task:status:"
CACHE_PREFIX = "ai:cache:"
# Worker 本地最大并发数
MAX_WORKER_CONCURRENCY = 5
# 任务最大重试次数
MAX_RETRY = 3
# 结果缓存时间,单位:秒
CACHE_TTL = 3600
4. ai_client.py
这里用 asyncio.sleep 模拟真实 AI 模型调用。实际项目中,你可以替换为 OpenAI、Claude、通义千问、DeepSeek、本地模型等接口。
# ai_client.py
import asyncio
import random
async def call_ai_model(prompt: str) -> str:
"""
模拟 AI 模型调用。
真实场景中,可以在这里请求大模型 API。
"""
await asyncio.sleep(random.randint(2, 5))
if random.random() < 0.1:
raise Exception("AI model temporary error")
return f"AI生成结果:{prompt}"
5. main.py
# main.py
import json
import uuid
import hashlib
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis
from config import (
REDIS_HOST,
REDIS_PORT,
REDIS_DB,
TASK_QUEUE_KEY,
TASK_RESULT_PREFIX,
TASK_STATUS_PREFIX,
CACHE_PREFIX,
CACHE_TTL,
)
app = FastAPI(title="AI High Concurrency Demo")
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True
)
class CreateTaskRequest(BaseModel):
user_id: str
prompt: str
def build_cache_key(prompt: str) -> str:
raw = prompt.strip()
h = hashlib.sha256(raw.encode("utf-8")).hexdigest()
return CACHE_PREFIX + h
@app.post("/tasks")
def create_task(req: CreateTaskRequest):
"""
创建 AI 任务。
如果缓存命中,直接返回结果。
如果未命中,则写入任务队列。
"""
if not req.prompt.strip():
raise HTTPException(status_code=400, detail="prompt不能为空")
cache_key = build_cache_key(req.prompt)
cached_result = r.get(cache_key)
if cached_result:
task_id = str(uuid.uuid4())
r.set(TASK_STATUS_PREFIX + task_id, "success", ex=3600)
r.set(TASK_RESULT_PREFIX + task_id, cached_result, ex=3600)
return {
"task_id": task_id,
"status": "success",
"from_cache": True,
"result": cached_result
}
task_id = str(uuid.uuid4())
task = {
"task_id": task_id,
"user_id": req.user_id,
"prompt": req.prompt,
"retry": 0,
"cache_key": cache_key
}
r.set(TASK_STATUS_PREFIX + task_id, "pending", ex=3600)
# 写入队列
r.lpush(TASK_QUEUE_KEY, json.dumps(task, ensure_ascii=False))
return {
"task_id": task_id,
"status": "pending",
"message": "任务已提交,请稍后查询结果"
}
@app.get("/tasks/{task_id}")
def get_task(task_id: str):
"""
查询任务状态。
"""
status = r.get(TASK_STATUS_PREFIX + task_id)
if not status:
raise HTTPException(status_code=404, detail="任务不存在或已过期")
result = r.get(TASK_RESULT_PREFIX + task_id)
return {
"task_id": task_id,
"status": status,
"result": result
}
@app.get("/health")
def health():
return {"status": "ok"}
6. worker.py
# worker.py
import json
import time
import asyncio
import redis
from config import (
REDIS_HOST,
REDIS_PORT,
REDIS_DB,
TASK_QUEUE_KEY,
TASK_RESULT_PREFIX,
TASK_STATUS_PREFIX,
CACHE_TTL,
MAX_WORKER_CONCURRENCY,
MAX_RETRY,
)
from ai_client import call_ai_model
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True
)
semaphore = asyncio.Semaphore(MAX_WORKER_CONCURRENCY)
async def handle_task(task: dict):
task_id = task["task_id"]
prompt = task["prompt"]
retry = task.get("retry", 0)
cache_key = task.get("cache_key")
async with semaphore:
try:
print(f"开始处理任务: {task_id}, retry={retry}")
r.set(TASK_STATUS_PREFIX + task_id, "running", ex=3600)
result = await asyncio.wait_for(
call_ai_model(prompt),
timeout=30
)
r.set(TASK_RESULT_PREFIX + task_id, result, ex=3600)
r.set(TASK_STATUS_PREFIX + task_id, "success", ex=3600)
if cache_key:
r.set(cache_key, result, ex=CACHE_TTL)
print(f"任务成功: {task_id}")
except Exception as e:
print(f"任务失败: {task_id}, error={e}")
if retry < MAX_RETRY:
task["retry"] = retry + 1
r.set(TASK_STATUS_PREFIX + task_id, "retrying", ex=3600)
# 简单退避
await asyncio.sleep(2 ** retry)
r.lpush(TASK_QUEUE_KEY, json.dumps(task, ensure_ascii=False))
else:
r.set(TASK_STATUS_PREFIX + task_id, "failed", ex=3600)
r.set(
TASK_RESULT_PREFIX + task_id,
f"任务失败:{str(e)}",
ex=3600
)
async def worker_loop():
print("Worker started...")
while True:
try:
item = r.brpop(TASK_QUEUE_KEY, timeout=5)
if item is None:
await asyncio.sleep(0.1)
continue
_, raw_task = item
task = json.loads(raw_task)
asyncio.create_task(handle_task(task))
except Exception as e:
print(f"Worker loop error: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(worker_loop())
7. requirements.txt
fastapi
uvicorn
redis
httpx
pydantic
8. 启动服务
启动 API 服务:
uvicorn main:app --host 0.0.0.0 --port 8000
启动 Worker:
python worker.py
如果需要提升处理能力,可以启动多个 Worker:
python worker.py
python worker.py
python worker.py
或者使用进程管理工具:
supervisor
systemd
pm2
docker compose
kubernetes
9. 测试接口
创建任务:
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"user_id":"u001","prompt":"请帮我写一篇关于AI高并发架构的文章"}'
返回示例:
{
"task_id": "f6a18d25-5b28-4c75-bf71-b5d1b9bb5a9e",
"status": "pending",
"message": "任务已提交,请稍后查询结果"
}
查询结果:
curl http://localhost:8000/tasks/f6a18d25-5b28-4c75-bf71-b5d1b9bb5a9e
返回示例:
{
"task_id": "f6a18d25-5b28-4c75-bf71-b5d1b9bb5a9e",
"status": "success",
"result": "AI生成结果:请帮我写一篇关于AI高并发架构的文章"
}
六、生产环境优化建议
上面的源码是一个简化示例,适合用于理解核心思路。如果要用于生产环境,还需要继续加强。
1. 增加用户级限流
例如:
- 免费用户:每分钟 5 次;
- 会员用户:每分钟 30 次;
- 企业用户:每分钟 300 次。
可以用 Redis 实现滑动窗口限流:
def rate_limit(user_id: str, limit: int, window: int):
key = f"rate:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit
在创建任务前调用:
if not rate_limit(req.user_id, 10, 60):
raise HTTPException(status_code=429, detail="请求过于频繁")
2. 增加队列长度保护
如果队列长度已经很大,继续接收请求只会让用户等待更久。
可以设置最大队列长度:
queue_len = r.llen(TASK_QUEUE_KEY)
if queue_len > 10000:
raise HTTPException(status_code=503, detail="系统繁忙,请稍后再试")
3. 增加任务优先级
对于商业化 AI 工具,通常需要区分用户等级。
例如:
VIP 队列
普通队列
免费队列
Worker 优先消费 VIP 队列:
先消费 ai:task:queue:vip
再消费 ai:task:queue:normal
最后消费 ai:task:queue:free
这样可以保证核心用户体验。
4. 使用 SSE 推送结果
如果产品希望实现类似 ChatGPT 的流式输出,可以使用:
- Server-Sent Events;
- WebSocket;
- HTTP Chunked Streaming。
短文本对话建议使用 SSE,简单稳定,浏览器支持好。
5. API Key 池调度
如果接入第三方模型,经常会遇到单个 API Key 的限速问题。
可以维护一个 API Key 池:
key_1: 当前可用
key_2: 当前可用
key_3: 已限流,冷却中
key_4: 当前可用
每次调用模型前选择一个可用 Key,并记录:
- 成功次数;
- 失败次数;
- 剩余额度;
- 限流状态;
- 冷却时间。
6. 数据库持久化
Redis 适合缓存和队列,但重要任务建议持久化到数据库。
例如 MySQL 表设计:
CREATE TABLE ai_task (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
task_id VARCHAR(64) NOT NULL UNIQUE,
user_id VARCHAR(64) NOT NULL,
prompt TEXT NOT NULL,
status VARCHAR(32) NOT NULL,
result MEDIUMTEXT,
retry_count INT DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
这样即使 Redis 重启,也不会丢失关键任务数据。
7. 模型服务独立部署
如果你使用本地开源模型,比如:
- Qwen;
- DeepSeek;
- Llama;
- ChatGLM;
- Stable Diffusion;
- Whisper。
建议将模型服务独立部署,Web 服务不要和模型服务混在一起。
推荐结构:
API 服务集群
↓
任务队列
↓
推理服务集群
↓
GPU 节点
这样可以独立扩缩容,避免 Web 服务被 GPU 推理拖垮。
七、常见性能瓶颈与解决方案
| 瓶颈 | 表现 | 解决方案 |
|---|---|---|
| 模型调用慢 | 任务长期 running | 增加 Worker、模型并发、使用更快模型 |
| Redis 队列堆积 | pending 很多 | 增加消费节点、限流、拆分队列 |
| 第三方 API 限流 | 大量 429 | API Key 池、限速、退避重试 |
| 数据库压力大 | 查询慢 | 缓存、读写分离、索引优化 |
| 用户重复提交 | 队列膨胀 | 请求去重、结果缓存 |
| 大文件处理慢 | 超时 | 文件异步解析、分片处理 |
| Token 成本高 | 账单上涨 | Prompt 压缩、缓存、模型分级 |
八、部署建议
生产环境可以使用 Docker Compose 或 Kubernetes 部署。
推荐最小部署单元:
Nginx × 1
API 服务 × 2
Redis × 1
Worker × 3
MySQL × 1
监控服务 × 1
如果业务增长,可以逐步扩展:
API 服务横向扩容
Worker 横向扩容
Redis 主从或集群
模型服务独立扩容
任务队列拆分
GPU 节点扩容
对于 AI 应用,Worker 的扩容通常比 API 服务扩容更关键,因为真正耗时的部分在模型调用。
九、总结
AI 工具的高并发问题,本质上是 长耗时任务在高流量场景下的系统稳定性问题。
一套可靠的方案通常包括:
- 网关限流:防止入口流量失控;
- 异步任务:避免 Web 请求长时间阻塞;
- 消息队列:削峰填谷,保护下游模型;
- Worker 并发控制:限制模型调用并发;
- 缓存复用:降低重复计算与 API 成本;
- 失败重试:提高任务成功率;
- 超时控制:防止任务无限挂起;
- 降级策略:系统高压时优雅失败;
- 监控告警:及时发现性能瓶颈;
- 分层扩容:API、队列、Worker、模型服务分别扩展。
如果只是做 Demo,可以同步调用模型;但如果要做真正面向用户的 AI 工具,建议从一开始就采用 异步任务 + 队列 + Worker + 缓存 + 限流 的架构。
这样不仅能提升系统并发能力,也能显著降低模型调用成本,让 AI 产品在用户增长后依然稳定可用。