AI接口一上线就被打爆?这套高并发架构和源码可以直接参考
AI编程 高并发解决方案|附源码
在 AI 应用快速落地的过程中,很多团队会遇到一个非常现实的问题:模型能力验证成功了,但一上线就扛不住并发。
例如:
- 用户同时发起大量对话请求;
- 多人并发调用大模型 API;
- AI 绘图、向量检索、RAG 问答任务耗时较长;
- 后端线程被阻塞,请求排队严重;
- 第三方大模型接口有 QPS / TPM / RPM 限制;
- 推理服务 GPU 资源有限,无法无限扩容。
这类问题本质上不是“AI 模型不会回答”,而是AI 系统工程能力不足。AI 编程不仅仅是写 Prompt、调接口,更重要的是构建一个稳定、可扩展、可观测、可限流、可降级的高并发架构。
本文将围绕 AI 编程中的高并发问题,系统讲解常见瓶颈、架构设计思路,并给出一套可运行的 FastAPI 异步高并发示例源码。
一、AI 应用为什么容易出现高并发问题?
传统 Web 应用中,一个接口可能只需要查询数据库、做简单计算,然后快速返回。但 AI 应用通常有以下特点。
1. 单次请求耗时长
一次普通的数据库查询可能几十毫秒完成,而一次大模型调用可能需要:
- 1 秒;
- 3 秒;
- 10 秒;
- 甚至更久。
如果是 AI 绘图、视频生成、代码生成、复杂 Agent 任务,耗时会更长。
请求耗时越长,系统中同时堆积的请求就越多。
假设一个接口平均响应时间是 5 秒,如果每秒进来 100 个请求,那么系统中同时存在的请求大约就是:
并发请求数 = QPS × 平均耗时
并发请求数 = 100 × 5 = 500
也就是说,即使你的 QPS 只有 100,系统也可能要同时维护 500 个执行中的请求。
2. AI 请求经常依赖外部服务
很多 AI 应用会调用:
- OpenAI;
- Claude;
- Gemini;
- 通义千问;
- 智谱;
- DeepSeek;
- 私有化推理服务;
- 向量数据库;
- 搜索引擎;
- 对象存储;
- Redis;
- MySQL / PostgreSQL。
只要其中任何一个服务变慢,整个链路就会变慢。
高并发系统最怕的不是某一个服务慢,而是慢请求扩散。当大量请求阻塞在慢服务上,会导致线程池耗尽、连接池打满、内存上涨,最终整个服务不可用。
3. 大模型接口通常有限流
第三方 AI 服务一般会限制:
- 每分钟请求数 RPM;
- 每分钟 Token 数 TPM;
- 每秒请求数 QPS;
- 并发连接数;
- 单个 API Key 额度。
如果后端没有做限流和排队,流量一上来就会出现:
429 Too Many Requests
Rate limit exceeded
Quota exceeded
这时用户体验会非常差。
4. AI 任务有明显的资源瓶颈
如果你自己部署模型,那么瓶颈可能来自:
- GPU 显存;
- GPU 推理吞吐;
- CPU 编解码;
- 网络带宽;
- 向量检索延迟;
- 模型上下文长度;
- 批处理能力。
尤其是大模型推理,GPU 不是普通业务服务器那样可以随便开线程。并发过高反而可能让每个请求都变慢。
二、高并发 AI 系统的核心设计原则
要解决 AI 高并发问题,不能只靠“多开几台机器”。更合理的方式是从架构层面设计。
常用原则包括:
- 异步化:避免线程阻塞,提高连接处理能力;
- 限流:保护后端和第三方接口;
- 排队:把瞬时高峰转化为平滑消费;
- 缓存:减少重复调用模型;
- 熔断降级:外部服务异常时快速失败;
- 批处理:提升 GPU 或模型接口吞吐;
- 任务拆分:长任务异步执行;
- 连接池管理:避免连接资源耗尽;
- 可观测性:监控延迟、错误率、队列长度;
- 弹性扩容:根据负载自动扩展服务。
三、推荐的 AI 高并发架构
一个比较常见的 AI 高并发架构如下:
用户
↓
Nginx / API Gateway
↓
业务服务 FastAPI / Spring Boot / Go
↓
限流层 Redis Rate Limit
↓
缓存层 Redis
↓
任务队列 RabbitMQ / Kafka / Redis Stream / Celery
↓
AI Worker 集群
↓
大模型 API / 私有化模型 / 向量数据库
↓
结果缓存 / 数据库存储
其中:
- API 网关负责统一入口、鉴权、基础限流;
- 业务服务负责参数校验、会话管理、任务创建;
- Redis 负责限流、缓存、任务状态存储;
- 消息队列负责削峰填谷;
- Worker 负责真正调用模型;
- 数据库负责持久化业务数据;
- 监控系统负责追踪系统状态。
四、同步阻塞写法的问题
很多初学者会这样写 AI 接口:
from fastapi import FastAPI
import requests
app = FastAPI()
@app.post("/chat")
def chat(prompt: str):
response = requests.post(
"https://api.example.com/v1/chat",
json={"prompt": prompt},
timeout=30
)
return response.json()
这段代码看起来很简单,但在高并发下存在明显问题。
问题一:requests 是同步阻塞的
当请求大模型接口时,当前线程会一直等待响应。在等待期间,这个线程无法处理其他请求。
如果线程池数量是 100,而模型接口平均响应 5 秒,那么当并发超过 100 时,后续请求就只能排队。
问题二:没有限流
如果瞬间来了 1000 个请求,这段代码会直接把 1000 个请求打到下游模型服务。
结果可能是:
- 第三方 API 返回 429;
- 本地连接池耗尽;
- 系统响应时间暴涨;
- 服务雪崩。
问题三:没有缓存
如果多个用户问了相同或高度相似的问题,系统仍然会重复调用模型,浪费成本和资源。
问题四:没有超时、重试和熔断策略
生产环境中,下游服务一定会出现:
- 网络抖动;
- 偶发超时;
- 返回异常;
- 服务不可用。
没有超时和熔断机制,会导致请求无限堆积。
五、解决方案一:使用异步接口提升并发能力
在 Python 生态中,可以使用:
- FastAPI;
- Uvicorn;
- aiohttp;
- httpx.AsyncClient;
- asyncio。
异步并不是让单个请求更快,而是让服务在等待 I/O 的时候可以处理其他请求。
示例:
import httpx
from fastapi import FastAPI
app = FastAPI()
@app.post("/chat")
async def chat(prompt: str):
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
"https://api.example.com/v1/chat",
json={"prompt": prompt}
)
return response.json()
不过,这只是第一步。真实生产中还需要连接池、限流、缓存、并发控制。
六、解决方案二:使用 Semaphore 控制并发
如果你的模型接口最多只能承受 50 个并发,就不能让 500 个请求同时打过去。
可以使用 asyncio.Semaphore 控制同时执行的大模型请求数量。
import asyncio
import httpx
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
MODEL_CONCURRENCY_LIMIT = 50
model_semaphore = asyncio.Semaphore(MODEL_CONCURRENCY_LIMIT)
class ChatRequest(BaseModel):
prompt: str
async def call_model(prompt: str):
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
"https://api.example.com/v1/chat",
json={"prompt": prompt}
)
response.raise_for_status()
return response.json()
@app.post("/chat")
async def chat(req: ChatRequest):
try:
async with model_semaphore:
result = await call_model(req.prompt)
return result
except httpx.HTTPError as e:
raise HTTPException(status_code=502, detail=str(e))
这样即使外部来了大量请求,真正同时调用模型的请求也不会超过 50 个。
但要注意:Semaphore 只能控制单进程内的并发。如果你部署了多个进程或多台机器,就需要使用 Redis、网关或集中式限流方案。
七、解决方案三:Redis 分布式限流
对于多实例部署场景,本地 Semaphore 不够,需要分布式限流。
常见算法有:
- 固定窗口限流;
- 滑动窗口限流;
- 令牌桶;
- 漏桶。
下面给出一个基于 Redis 的固定窗口限流示例。
Redis 限流源码
import time
import redis.asyncio as redis
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True
)
async def redis_rate_limit(key: str, limit: int, window: int) -> bool:
"""
Redis 固定窗口限流
:param key: 限流 Key,例如 user:123 或 api:chat
:param limit: 窗口期内最大请求数
:param window: 窗口大小,单位秒
:return: True 表示允许请求,False 表示拒绝请求
"""
current_window = int(time.time() // window)
redis_key = f"rate_limit:{key}:{current_window}"
count = await redis_client.incr(redis_key)
if count == 1:
await redis_client.expire(redis_key, window + 1)
return count <= limit
接口中使用:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
user_id: str
prompt: str
@app.post("/chat")
async def chat(req: ChatRequest):
allowed = await redis_rate_limit(
key=f"user:{req.user_id}",
limit=20,
window=60
)
if not allowed:
raise HTTPException(
status_code=429,
detail="请求过于频繁,请稍后再试"
)
return {"message": "通过限流,继续处理 AI 请求"}
这个方案适合做用户级限流,比如每个用户每分钟最多请求 20 次。
八、解决方案四:缓存重复问题,降低模型调用成本
AI 调用成本通常比普通接口高得多,因此缓存非常重要。
可以缓存:
- 完全相同的 Prompt;
- 相同用户的历史上下文摘要;
- RAG 检索结果;
- 向量搜索结果;
- 工具调用结果;
- 模型最终回答。
简单的缓存方案可以使用 Redis。
Redis 缓存源码
import hashlib
import json
import redis.asyncio as redis
cache_redis = redis.Redis(
host="localhost",
port=6379,
db=1,
decode_responses=True
)
def build_cache_key(prompt: str) -> str:
text = prompt.strip().lower()
digest = hashlib.sha256(text.encode("utf-8")).hexdigest()
return f"ai_cache:{digest}"
async def get_cache(prompt: str):
key = build_cache_key(prompt)
value = await cache_redis.get(key)
if value:
return json.loads(value)
return None
async def set_cache(prompt: str, result: dict, ttl: int = 3600):
key = build_cache_key(prompt)
await cache_redis.setex(
key,
ttl,
json.dumps(result, ensure_ascii=False)
)
业务接口中使用:
@app.post("/chat-with-cache")
async def chat_with_cache(req: ChatRequest):
cached = await get_cache(req.prompt)
if cached:
return {
"from_cache": True,
"data": cached
}
result = await call_model(req.prompt)
await set_cache(req.prompt, result, ttl=3600)
return {
"from_cache": False,
"data": result
}
缓存需要注意:
- 不适合缓存所有个性化强的回答;
- 涉及隐私数据时要谨慎;
- Prompt 中包含用户身份、时间、权限信息时,缓存 Key 要包含上下文;
- 可以设置较短 TTL,避免长期错误缓存。
九、解决方案五:长任务异步化,使用队列削峰填谷
对于 AI 绘图、视频生成、长文本分析、代码审查、文档总结等耗时任务,不建议使用同步 HTTP 请求一直等待结果。
更推荐:
- 用户提交任务;
- 服务立即返回
task_id; - 后台 Worker 异步处理;
- 用户轮询任务状态,或通过 WebSocket / SSE 推送结果。
架构流程
POST /tasks
↓
创建任务,写入 Redis
↓
加入队列
↓
立即返回 task_id
Worker
↓
从队列取任务
↓
调用 AI 模型
↓
保存结果
GET /tasks/{task_id}
↓
查询任务状态和结果
下面给出一个基于 Redis Queue 的简化实现。
十、完整示例源码:FastAPI + Redis + 异步 Worker
1. 安装依赖
pip install fastapi uvicorn httpx redis pydantic
启动 Redis:
docker run -d --name redis-ai -p 6379:6379 redis:7
2. 项目结构
ai-high-concurrency-demo/
├── main.py
├── worker.py
├── model_client.py
├── redis_client.py
└── requirements.txt
3. requirements.txt
fastapi==0.115.0
uvicorn==0.30.6
httpx==0.27.2
redis==5.0.8
pydantic==2.8.2
4. redis_client.py
import redis.asyncio as redis
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True
)
5. model_client.py
这里为了方便演示,用 asyncio.sleep 模拟大模型耗时调用。实际项目中可以替换成 OpenAI、DeepSeek、通义千问或私有模型接口。
import asyncio
import random
async def call_ai_model(prompt: str) -> dict:
"""
模拟调用 AI 大模型。
真实项目中可替换为 httpx.AsyncClient 调用模型 API。
"""
await asyncio.sleep(random.uniform(1.0, 3.0))
return {
"answer": f"AI 回复:你输入的问题是:{prompt}",
"tokens": random.randint(100, 500)
}
如果你要调用真实 HTTP 模型接口,可以改成:
import httpx
async def call_ai_model(prompt: str) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
"https://api.example.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY"
},
json={
"model": "your-model",
"messages": [
{"role": "user", "content": prompt}
]
}
)
resp.raise_for_status()
return resp.json()
6. main.py
import uuid
import time
import json
import hashlib
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from redis_client import redis_client
app = FastAPI(title="AI 高并发解决方案 Demo")
TASK_QUEUE_KEY = "ai_task_queue"
TASK_RESULT_KEY_PREFIX = "ai_task:"
CACHE_KEY_PREFIX = "ai_cache:"
RATE_LIMIT_PREFIX = "rate_limit:"
class ChatTaskRequest(BaseModel):
user_id: str = Field(..., description="用户 ID")
prompt: str = Field(..., min_length=1, max_length=5000)
def build_cache_key(prompt: str) -> str:
digest = hashlib.sha256(
prompt.strip().lower().encode("utf-8")
).hexdigest()
return f"{CACHE_KEY_PREFIX}{digest}"
async def redis_rate_limit(key: str, limit: int, window: int) -> bool:
current_window = int(time.time() // window)
redis_key = f"{RATE_LIMIT_PREFIX}{key}:{current_window}"
count = await redis_client.incr(redis_key)
if count == 1:
await redis_client.expire(redis_key, window + 1)
return count <= limit
@app.post("/tasks")
async def create_task(req: ChatTaskRequest):
"""
创建 AI 任务:
1. 用户限流;
2. 查询缓存;
3. 创建任务;
4. 推入 Redis 队列;
5. 返回 task_id。
"""
allowed = await redis_rate_limit(
key=f"user:{req.user_id}",
limit=30,
window=60
)
if not allowed:
raise HTTPException(
status_code=429,
detail="请求过于频繁,请稍后再试"
)
cache_key = build_cache_key(req.prompt)
cached = await redis_client.get(cache_key)
if cached:
task_id = str(uuid.uuid4())
task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"
await redis_client.setex(
task_key,
3600,
json.dumps({
"status": "success",
"from_cache": True,
"result": json.loads(cached)
}, ensure_ascii=False)
)
return {
"task_id": task_id,
"status": "success",
"from_cache": True
}
task_id = str(uuid.uuid4())
task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"
task_data = {
"task_id": task_id,
"user_id": req.user_id,
"prompt": req.prompt,
"created_at": int(time.time())
}
await redis_client.setex(
task_key,
3600,
json.dumps({
"status": "pending",
"from_cache": False,
"result": None
}, ensure_ascii=False)
)
await redis_client.lpush(
TASK_QUEUE_KEY,
json.dumps(task_data, ensure_ascii=False)
)
return {
"task_id": task_id,
"status": "pending",
"from_cache": False
}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"
data = await redis_client.get(task_key)
if not data:
raise HTTPException(
status_code=404,
detail="任务不存在或已过期"
)
return json.loads(data)
@app.get("/health")
async def health():
return {"status": "ok"}
7. worker.py
import asyncio
import json
import hashlib
import time
from redis_client import redis_client
from model_client import call_ai_model
TASK_QUEUE_KEY = "ai_task_queue"
TASK_RESULT_KEY_PREFIX = "ai_task:"
CACHE_KEY_PREFIX = "ai_cache:"
WORKER_CONCURRENCY = 10
semaphore = asyncio.Semaphore(WORKER_CONCURRENCY)
def build_cache_key(prompt: str) -> str:
digest = hashlib.sha256(
prompt.strip().lower().encode("utf-8")
).hexdigest()
return f"{CACHE_KEY_PREFIX}{digest}"
async def handle_task(task_data: dict):
async with semaphore:
task_id = task_data["task_id"]
prompt = task_data["prompt"]
task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"
try:
await redis_client.setex(
task_key,
3600,
json.dumps({
"status": "running",
"from_cache": False,
"result": None
}, ensure_ascii=False)
)
start = time.time()
result = await call_ai_model(prompt)
cost_ms = int((time.time() - start) * 1000)
final_result = {
"status": "success",
"from_cache": False,
"cost_ms": cost_ms,
"result": result
}
await redis_client.setex(
task_key,
3600,
json.dumps(final_result, ensure_ascii=False)
)
cache_key = build_cache_key(prompt)
await redis_client.setex(
cache_key,
3600,
json.dumps(result, ensure_ascii=False)
)
except Exception as e:
await redis_client.setex(
task_key,
3600,
json.dumps({
"status": "failed",
"from_cache": False,
"error": str(e),
"result": None
}, ensure_ascii=False)
)
async def worker_loop():
print("AI Worker started...")
while True:
item = await redis_client.brpop(TASK_QUEUE_KEY, timeout=5)
if not item:
continue
_, raw_task = item
task_data = json.loads(raw_task)
asyncio.create_task(handle_task(task_data))
if __name__ == "__main__":
asyncio.run(worker_loop())
8. 启动服务
启动 API 服务:
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 2
启动 Worker:
python worker.py
如果需要更强处理能力,可以启动多个 Worker 进程:
python worker.py
python worker.py
python worker.py
或者使用 Supervisor、Docker Compose、Kubernetes 进行管理。
9. 测试接口
创建任务:
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{
"user_id": "u1001",
"prompt": "请解释一下什么是高并发架构"
}'
返回示例:
{
"task_id": "4f2a8fd1-9e8c-48b2-a6d6-58c6e9e0f123",
"status": "pending",
"from_cache": false
}
查询任务:
curl http://localhost:8000/tasks/4f2a8fd1-9e8c-48b2-a6d6-58c6e9e0f123
可能返回:
{
"status": "success",
"from_cache": false,
"cost_ms": 1852,
"result": {
"answer": "AI 回复:你输入的问题是:请解释一下什么是高并发架构",
"tokens": 238
}
}
十一、进一步优化:流式响应
对于聊天类 AI 应用,用户并不一定要等完整结果生成完才看到内容。可以使用流式响应,让模型边生成边返回。
常见方案:
- Server-Sent Events,简称 SSE;
- WebSocket;
- HTTP chunked response。
流式输出的优点是:
- 首字响应时间更短;
- 用户体验更好;
- 长文本生成不会让用户一直等待空白页面;
- 适合 AI 对话、代码生成、文章生成等场景。
FastAPI SSE 简化示例:
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def fake_stream(prompt: str):
words = ["这是", "一个", "AI", "流式", "响应", "示例"]
for word in words:
yield f"data: {word}\n\n"
await asyncio.sleep(0.5)
@app.get("/stream")
async def stream(prompt: str):
return StreamingResponse(
fake_stream(prompt),
media_type="text/event-stream"
)
流式响应不是为了提升总吞吐,而是为了优化用户感知体验。在高并发场景中,它仍然需要配合限流、连接数控制和超时管理。
十二、进一步优化:超时、重试和熔断
AI 服务调用必须设置超时。
错误示例:
response = await client.post(url, json=data)
正确示例:
timeout = httpx.Timeout(
connect=5.0,
read=30.0,
write=5.0,
pool=5.0
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, json=data)
对于偶发错误,可以做有限重试,但不能无限重试。
import asyncio
import httpx
async def call_with_retry(prompt: str, retries: int = 3):
for i in range(retries):
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
"https://api.example.com/v1/chat",
json={"prompt": prompt}
)
resp.raise_for_status()
return resp.json()
except Exception:
if i == retries - 1:
raise
await asyncio.sleep(0.5 * (i + 1))
重试要注意:
- 只对可重试错误重试;
- 429 不一定适合立即重试;
- 超时重试可能会放大流量;
- 重试次数不能太多;
- 建议使用指数退避。
熔断机制则用于在下游服务持续异常时快速失败,避免请求继续堆积。
十三、进一步优化:批处理提升推理吞吐
如果是自部署模型,批处理非常重要。
例如 GPU 同时处理一个请求和同时处理多个请求,在某些场景下吞吐差异很大。可以把短时间内到达的请求合并成一个 batch,再统一送入模型。
简单流程:
请求进入队列
↓
等待 10ms 到 50ms
↓
收集最多 N 个请求
↓
组成 batch
↓
模型统一推理
↓
拆分结果返回给各个请求
批处理适合:
- embedding 向量生成;
- 文本分类;
- rerank;
- 语义匹配;
- OCR;
- 部分模型推理场景。
但对于流式对话模型,批处理实现会更复杂,需要结合推理框架能力,例如:
- vLLM;
- TensorRT-LLM;
- TGI;
- Ray Serve;
- Triton Inference Server。
十四、进一步优化:分层限流策略
高并发 AI 系统中,限流不能只做一层。推荐至少三层限流。
1. 网关层限流
例如 Nginx、Kong、APISIX、Spring Cloud Gateway。
负责:
- IP 限流;
- 全局 QPS 限流;
- 黑名单;
- 基础防刷。
2. 用户层限流
根据用户等级控制额度:
免费用户:每分钟 10 次
普通用户:每分钟 60 次
高级用户:每分钟 300 次
企业用户:独立配额
3. 模型层限流
不同模型成本不同,限流策略也不同:
小模型:高 QPS,低成本
大模型:低 QPS,高成本
绘图模型:低并发,长耗时
Embedding 模型:适合批处理
这样可以避免一个用户、一个模型或一个接口拖垮整个系统。
十五、AI 高并发系统的监控指标
没有监控,就无法判断系统是否真的稳定。
建议重点监控以下指标:
1. 请求指标
- QPS;
- 平均响应时间;
- P95 / P99 延迟;
- HTTP 4xx / 5xx;
- 超时率;
- 429 限流次数。
2. 队列指标
- 队列长度;
- 任务等待时间;
- Worker 消费速度;
- 任务失败率;
- 重试次数。
3. 模型指标
- 模型调用耗时;
- Token 消耗;
- 每分钟 Token 数;
- 模型错误率;
- 首 Token 延迟;
- 生成速度 tokens/s。
4. 资源指标
- CPU;
- 内存;
- 网络;
- Redis 连接数;
- 数据库连接数;
- GPU 利用率;
- GPU 显存使用率。
5. 业务指标
- 活跃用户数;
- 人均调用次数;
- 缓存命中率;
- 单次调用成本;
- 每日 Token 成本;
- 付费转化率。
十六、常见高并发故障与解决办法
故障一:接口响应越来越慢
可能原因:
- 下游模型慢;
- 队列堆积;
- 数据库连接池不足;
- Redis 慢查询;
- 线程池耗尽;
- 请求没有超时。
解决办法:
- 增加超时;
- 增加 Worker;
- 排查慢请求;
- 增加缓存;
- 对长任务异步化;
- 限制最大并发。
故障二:频繁出现 429
可能原因:
- 第三方模型限流;
- 用户流量超过配额;
- 没有本地排队;
- 多实例没有统一限流。
解决办法:
- Redis 分布式限流;
- 请求排队;
- 多 API Key 池化;
- 不同用户分配不同配额;
- 对 429 做延迟重试。
故障三:Redis 队列越来越长
可能原因:
- Worker 数量不足;
- 模型调用太慢;
- 任务生成速度大于消费速度;
- 某些任务卡住;
- 下游模型故障。
解决办法:
- 扩容 Worker;
- 增加模型副本;
- 设置任务超时;
- 增加失败重试队列;
- 加入熔断降级;
- 限制任务提交速度。
故障四:成本突然暴涨
可能原因:
- 被刷接口;
- 缓存命中率低;
- Prompt 太长;
- 重试次数过多;
- 用户滥用;
- 日志中重复触发任务。
解决办法:
- 用户鉴权;
- 限流;
- Token 预算控制;
- Prompt 压缩;
- 缓存;
- 计费系统;
- 异常流量告警。
十七、生产环境落地建议
如果要把本文示例用于生产环境,建议进一步完善:
-
鉴权认证
使用 JWT、OAuth2、API Key 等机制,避免匿名滥用。 -
任务持久化
Redis 适合缓存和队列,但重要任务建议落 MySQL、PostgreSQL 或 MongoDB。 -
死信队列
任务多次失败后进入死信队列,方便人工排查。 -
幂等设计
客户端重试时避免重复创建任务,可使用请求 ID。 -
更成熟的队列系统
高可靠场景建议使用 RabbitMQ、Kafka、RocketMQ、Celery、Dramatiq。 -
灰度发布
新模型、新 Prompt、新 Agent 流程要灰度上线。 -
模型路由
根据任务复杂度选择不同模型,降低成本。 -
降级策略
大模型不可用时,可以切换小模型、缓存答案或提示稍后再试。 -
多租户隔离
企业客户之间需要资源隔离,避免互相影响。 -
全链路追踪
使用 Trace ID 贯穿网关、业务服务、队列、Worker、模型调用。
十八、总结
AI 编程中的高并发问题,本质上是一个系统工程问题。只会调用大模型 API 并不等于能做好 AI 应用。当用户规模增长后,真正决定系统稳定性的往往是:
- 是否异步化;
- 是否有限流;
- 是否有缓存;
- 是否有队列;
- 是否能削峰填谷;
- 是否能熔断降级;
- 是否有监控告警;
- 是否能控制成本。
本文给出的方案可以概括为:
短请求:异步接口 + 限流 + 缓存
长任务:任务队列 + Worker + 状态查询
模型调用:超时 + 重试 + 熔断 + 并发控制
系统稳定性:监控 + 告警 + 扩容 + 降级
如果只是 Demo,一个同步接口就够了;但如果要支撑真实用户,就必须从第一天开始考虑高并发架构。
AI 应用的竞争,不只是模型能力的竞争,也是工程稳定性、成本控制和用户体验的竞争。谁能让 AI 服务在高并发下依然稳定、快速、可控,谁就更有机会把 AI 产品真正做成可持续的业务。