AI Agent 扛不住并发?这套部署方案和命令可以直接抄
AI Agent 高并发解决方案|附完整命令
随着大模型应用从 Demo 阶段进入生产阶段,越来越多企业开始构建自己的 AI Agent 系统。与普通聊天机器人不同,AI Agent 往往需要具备多轮推理、工具调用、数据库检索、任务规划、代码执行、外部 API 访问等能力。
这也带来了一个非常现实的问题:当大量用户同时访问 AI Agent 时,系统如何保证稳定、低延迟、可扩展?
本文将围绕 AI Agent 高并发架构设计展开,结合生产实践,给出一套较完整的解决方案,并附上可直接参考的部署命令,适合用于企业级 AI Agent 后端、智能客服、知识库问答、自动化办公助手、数据分析 Agent 等场景。
一、AI Agent 为什么更容易遇到高并发瓶颈?
普通 Web 应用通常是:
接收请求 → 查询数据库 → 返回结果
而 AI Agent 的典型流程可能是:
接收用户请求 → 构造上下文 → 调用大模型 → 判断是否需要调用工具 → 调用数据库/API/向量库 → 再次调用大模型 → 生成最终答案
也就是说,AI Agent 的一次请求背后,可能包含多次 I/O 操作和多次模型调用。
常见瓶颈主要有以下几类。
二、AI Agent 高并发常见瓶颈
1. 大模型接口延迟高
无论是调用 OpenAI、Claude、通义千问、智谱、DeepSeek,还是自建 vLLM / Ollama / TGI,大模型推理都比普通 API 慢得多。
一个普通 HTTP 接口可能几十毫秒完成,而一次大模型调用可能需要:
- 500ms
- 2s
- 10s
- 甚至更久
如果 Agent 需要多轮推理,延迟会继续放大。
2. Token 成本与上下文过长
很多 Agent 会将历史对话、知识库检索结果、工具描述、系统提示词全部塞进 Prompt。
这会导致:
- 请求 Token 变长
- 模型响应变慢
- 成本升高
- 并发能力下降
- 上下文窗口被浪费
3. 工具调用链路复杂
Agent 常见工具包括:
- 数据库查询
- 搜索引擎
- 浏览器访问
- 文件解析
- Python 代码执行
- 企业内部系统接口
- 向量数据库检索
- 消息队列任务
每增加一个工具,就增加一个潜在的延迟点和失败点。
4. Python 单进程能力有限
很多 AI Agent 项目使用 Python 开发,例如:
- FastAPI
- LangChain
- LlamaIndex
- AutoGen
- CrewAI
- Semantic Kernel
Python 本身非常适合 AI 应用开发,但如果部署方式不合理,单进程很容易成为瓶颈。
例如只用下面这种命令启动:
python main.py
在生产环境中基本是不够的。
5. 同步阻塞导致吞吐下降
如果所有调用都使用同步方式,例如:
requests.post(...)
那么当大量请求进入时,请求线程会大量阻塞在网络 I/O 上,系统吞吐会急剧下降。
对于 AI Agent 来说,大量操作本质上都是 I/O 密集型任务,因此异步架构非常重要。
三、AI Agent 高并发总体架构
一个较为推荐的高并发 AI Agent 架构如下:
用户
|
CDN / SLB / Nginx
|
FastAPI / Node.js API Gateway
|
任务队列 Redis / RabbitMQ / Kafka
|
Agent Worker 集群
|
LLM 服务 / 向量库 / 数据库 / 工具服务
|
结果回传 / WebSocket / SSE
核心思想是:
前端请求快速进入系统,耗时任务异步处理,Agent Worker 横向扩展,大模型调用统一限流和熔断。
四、推荐技术选型
下面是一套比较通用、容易落地的技术栈。
| 模块 | 推荐方案 |
|---|---|
| API 服务 | FastAPI |
| 异步网络请求 | httpx / aiohttp |
| Web 服务进程 | Gunicorn + Uvicorn Worker |
| 反向代理 | Nginx |
| 缓存 | Redis |
| 任务队列 | Celery / RQ / Dramatiq |
| 向量数据库 | Milvus / Qdrant / Weaviate / pgvector |
| 数据库 | PostgreSQL / MySQL |
| 模型服务 | OpenAI API / DeepSeek API / vLLM / Ollama |
| 容器化 | Docker |
| 编排 | Docker Compose / Kubernetes |
| 监控 | Prometheus + Grafana |
| 日志 | Loki / ELK |
五、高并发解决方案核心思路
1. API 层保持轻量
API 层不应该承担复杂 Agent 推理任务,而是负责:
- 鉴权
- 参数校验
- 创建任务
- 返回 task_id
- 查询任务状态
- 推送结果
错误示例:
@app.post("/chat")
def chat(req: ChatRequest):
result = run_agent(req.message)
return {"answer": result}
这种写法会让 HTTP 请求长时间阻塞,不适合高并发。
推荐方式:
@app.post("/chat")
async def chat(req: ChatRequest):
task = celery_app.send_task("agent.tasks.run_agent", args=[req.message])
return {
"task_id": task.id,
"status": "processing"
}
这样 API 层可以快速返回,真正耗时的 Agent 推理由 Worker 完成。
2. 使用异步 I/O
大模型 API、向量数据库、外部搜索接口、企业系统接口都属于网络 I/O。
推荐使用 async / await:
import httpx
async def call_llm(prompt: str):
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
"https://api.example.com/v1/chat/completions",
json={
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": prompt}
]
},
headers={
"Authorization": "Bearer YOUR_API_KEY"
}
)
return response.json()
相比同步阻塞请求,异步方式可以在等待远程接口响应时释放执行权,让服务器处理更多请求。
3. 使用消息队列削峰填谷
高并发系统最怕流量瞬间打满服务。
例如某个智能客服 Agent 在上午 9 点突然涌入大量用户,如果直接把请求全部打到模型接口,很容易出现:
- 模型 API 限流
- 请求超时
- 数据库连接耗尽
- 服务雪崩
消息队列可以将请求暂存起来,让 Worker 按照系统承载能力逐步消费。
常见方案:
- Redis + Celery:简单易用
- RabbitMQ + Celery:可靠性更强
- Kafka:适合超大规模日志流和事件流
4. Worker 横向扩容
Agent Worker 是真正执行任务的服务,可以根据并发量横向扩容。
例如:
celery -A agent.tasks worker --loglevel=info --concurrency=8
如果一台机器不够,可以多台机器同时启动 Worker:
celery -A agent.tasks worker --loglevel=info --concurrency=8 --hostname=worker1@%h
celery -A agent.tasks worker --loglevel=info --concurrency=8 --hostname=worker2@%h
celery -A agent.tasks worker --loglevel=info --concurrency=8 --hostname=worker3@%h
如果使用 Docker Compose,也可以直接扩容:
docker compose up -d --scale agent-worker=5
5. 对 LLM 调用做限流
很多大模型平台都有 QPS、TPM、RPM 限制:
- QPS:每秒请求数
- RPM:每分钟请求数
- TPM:每分钟 Token 数
如果系统不做限流,瞬间并发可能直接触发模型接口限流,导致大量失败。
可以使用 Redis 实现分布式限流。
伪代码示例:
def allow_request(redis, key, limit, window):
current = redis.incr(key)
if current == 1:
redis.expire(key, window)
return current <= limit
例如限制某个模型每分钟最多调用 300 次:
if not allow_request(redis_client, "llm:deepseek:rpm", 300, 60):
raise Exception("LLM rate limit exceeded")
6. 对模型调用做重试、超时、熔断
大模型接口不是永远稳定的,生产环境一定要处理:
- 请求超时
- HTTP 429
- HTTP 500
- 连接失败
- 响应格式异常
推荐策略:
| 异常类型 | 处理方式 |
|---|---|
| 超时 | 重试 1~2 次 |
| 429 限流 | 指数退避 |
| 5xx | 切换备用模型 |
| 参数错误 | 直接失败 |
| 长时间不可用 | 熔断 |
示例:
import asyncio
import httpx
async def call_llm_with_retry(payload, max_retry=3):
for i in range(max_retry):
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
"https://api.example.com/v1/chat/completions",
json=payload
)
resp.raise_for_status()
return resp.json()
except Exception as e:
if i == max_retry - 1:
raise e
await asyncio.sleep(2 ** i)
7. 缓存常见问题结果
很多 Agent 场景中,用户问题具有明显重复性,例如:
- 公司报销流程是什么?
- 如何申请 VPN?
- 退款政策是什么?
- 某个产品的使用方法是什么?
这类问题可以做语义缓存。
缓存方式包括:
- 精确缓存:相同问题直接返回
- 语义缓存:向量相似度达到阈值则返回
- Prompt 片段缓存:缓存检索结果或工具结果
- LLM 响应缓存:缓存最终答案
Redis 精确缓存示例:
import hashlib
import json
def cache_key(question: str):
return "agent:answer:" + hashlib.md5(question.encode()).hexdigest()
def get_cached_answer(redis_client, question):
key = cache_key(question)
value = redis_client.get(key)
if value:
return json.loads(value)
return None
def set_cached_answer(redis_client, question, answer, ttl=3600):
key = cache_key(question)
redis_client.setex(key, ttl, json.dumps(answer, ensure_ascii=False))
缓存不仅可以提升响应速度,还能显著降低 Token 成本。
六、完整部署示例
下面给出一套基于以下组件的示例部署:
- FastAPI:API 服务
- Celery:异步任务
- Redis:消息队列和缓存
- Gunicorn + Uvicorn:生产级 Web 服务
- Nginx:反向代理
- Docker Compose:容器编排
1. 项目目录结构
ai-agent-demo/
├── app/
│ ├── main.py
│ ├── tasks.py
│ ├── agent.py
│ ├── config.py
│ └── requirements.txt
├── nginx/
│ └── nginx.conf
├── Dockerfile
└── docker-compose.yml
2. 创建项目目录
mkdir -p ai-agent-demo/app
mkdir -p ai-agent-demo/nginx
cd ai-agent-demo
3. 编写依赖文件
cat > app/requirements.txt <<'EOF'
fastapi==0.115.0
uvicorn==0.30.6
gunicorn==23.0.0
celery==5.4.0
redis==5.0.8
httpx==0.27.2
pydantic==2.8.2
python-dotenv==1.0.1
EOF
4. 编写配置文件
cat > app/config.py <<'EOF'
import os
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
LLM_API_KEY = os.getenv("LLM_API_KEY", "your-api-key")
LLM_API_URL = os.getenv("LLM_API_URL", "https://api.deepseek.com/chat/completions")
LLM_MODEL = os.getenv("LLM_MODEL", "deepseek-chat")
EOF
5. 编写 Agent 逻辑
cat > app/agent.py <<'EOF'
import httpx
from config import LLM_API_KEY, LLM_API_URL, LLM_MODEL
async def call_llm(message: str):
headers = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": LLM_MODEL,
"messages": [
{
"role": "system",
"content": "你是一个企业级 AI Agent,回答要准确、简洁、结构化。"
},
{
"role": "user",
"content": message
}
],
"temperature": 0.3
}
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
LLM_API_URL,
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
EOF
6. 编写 Celery 任务
由于 Celery 默认任务函数是同步的,而我们的 LLM 调用是异步函数,因此可以在任务中使用 asyncio.run() 执行。
cat > app/tasks.py <<'EOF'
import asyncio
from celery import Celery
from config import REDIS_URL
from agent import call_llm
celery_app = Celery(
"agent_tasks",
broker=REDIS_URL,
backend=REDIS_URL
)
celery_app.conf.update(
task_track_started=True,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_time_limit=120,
task_soft_time_limit=90,
)
@celery_app.task(name="agent.tasks.run_agent", bind=True)
def run_agent(self, message: str):
try:
result = asyncio.run(call_llm(message))
return {
"status": "success",
"answer": result
}
except Exception as e:
return {
"status": "failed",
"error": str(e)
}
EOF
7. 编写 FastAPI 服务
cat > app/main.py <<'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from celery.result import AsyncResult
from tasks import celery_app
app = FastAPI(title="AI Agent High Concurrency Demo")
class ChatRequest(BaseModel):
message: str
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/chat")
async def chat(req: ChatRequest):
task = celery_app.send_task(
"agent.tasks.run_agent",
args=[req.message]
)
return {
"task_id": task.id,
"status": "processing"
}
@app.get("/result/{task_id}")
async def get_result(task_id: str):
result = AsyncResult(task_id, app=celery_app)
if result.state == "PENDING":
return {
"task_id": task_id,
"status": "pending"
}
if result.state == "STARTED":
return {
"task_id": task_id,
"status": "started"
}
if result.state == "SUCCESS":
return {
"task_id": task_id,
"status": "success",
"result": result.result
}
if result.state == "FAILURE":
return {
"task_id": task_id,
"status": "failed",
"error": str(result.result)
}
return {
"task_id": task_id,
"status": result.state
}
EOF
8. 编写 Dockerfile
cat > Dockerfile <<'EOF'
FROM python:3.11-slim
WORKDIR /app
COPY app/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY app /app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
EOF
9. 编写 Nginx 配置
cat > nginx/nginx.conf <<'EOF'
events {}
http {
upstream api_backend {
server api:8000;
}
server {
listen 80;
client_max_body_size 20m;
location / {
proxy_pass http://api_backend;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_connect_timeout 10s;
proxy_send_timeout 120s;
proxy_read_timeout 120s;
}
}
}
EOF
10. 编写 Docker Compose
cat > docker-compose.yml <<'EOF'
services:
redis:
image: redis:7
container_name: agent-redis
command: redis-server --appendonly yes
ports:
- "6379:6379"
api:
build: .
container_name: agent-api
environment:
- REDIS_URL=redis://redis:6379/0
- LLM_API_KEY=${LLM_API_KEY}
- LLM_API_URL=${LLM_API_URL}
- LLM_MODEL=${LLM_MODEL}
command: >
gunicorn main:app
-k uvicorn.workers.UvicornWorker
-w 4
-b 0.0.0.0:8000
--timeout 120
depends_on:
- redis
ports:
- "8000:8000"
agent-worker:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
- LLM_API_KEY=${LLM_API_KEY}
- LLM_API_URL=${LLM_API_URL}
- LLM_MODEL=${LLM_MODEL}
command: >
celery -A tasks.celery_app worker
--loglevel=info
--concurrency=8
--prefetch-multiplier=1
depends_on:
- redis
nginx:
image: nginx:1.27
container_name: agent-nginx
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- api
ports:
- "80:80"
EOF
11. 配置环境变量
如果使用 DeepSeek API,可以这样配置:
cat > .env <<'EOF'
LLM_API_KEY=你的真实API_KEY
LLM_API_URL=https://api.deepseek.com/chat/completions
LLM_MODEL=deepseek-chat
EOF
注意:生产环境不要把 .env 提交到 Git 仓库。
可以添加 .gitignore:
cat > .gitignore <<'EOF'
.env
__pycache__/
*.pyc
EOF
12. 启动服务
docker compose up -d --build
查看容器状态:
docker compose ps
查看日志:
docker compose logs -f api
查看 Worker 日志:
docker compose logs -f agent-worker
13. 测试接口
提交 Agent 请求:
curl -X POST http://localhost/chat \
-H "Content-Type: application/json" \
-d '{"message":"请解释一下 AI Agent 高并发架构应该如何设计"}'
返回示例:
{
"task_id": "8e5d0c85-7f1c-4a18-9a10-7de0f8c3e2d1",
"status": "processing"
}
查询任务结果:
curl http://localhost/result/8e5d0c85-7f1c-4a18-9a10-7de0f8c3e2d1
七、如何提升并发能力?
1. 增加 API 进程数
当前示例中 API 使用 4 个 Worker:
-w 4
可以根据 CPU 核数调整。
一般建议:
worker 数量 = CPU 核心数 × 2 + 1
例如 4 核机器:
gunicorn main:app -k uvicorn.workers.UvicornWorker -w 9 -b 0.0.0.0:8000
不过 AI Agent 的瓶颈通常不在 API 层,而在模型调用和工具调用,因此不要盲目增加 API Worker。
2. 增加 Agent Worker 数量
单容器扩容并发:
docker compose up -d --scale agent-worker=5
如果每个 Worker 容器并发为 8,则理论任务并发为:
5 × 8 = 40
如果模型服务或第三方 API 支持更高并发,可以继续增加。
3. 调整 Celery 并发参数
启动命令示例:
celery -A tasks.celery_app worker \
--loglevel=info \
--concurrency=16 \
--prefetch-multiplier=1
参数说明:
| 参数 | 说明 |
|---|---|
--concurrency |
单个 Worker 并发数 |
--prefetch-multiplier |
每个进程预取任务数量 |
task_acks_late=True |
任务执行完成后再确认 |
task_time_limit |
强制任务超时时间 |
task_soft_time_limit |
软超时时间 |
对于大模型任务,建议设置:
--prefetch-multiplier=1
避免某个 Worker 一次拿走太多任务,导致任务分配不均。
4. 使用流式响应优化体验
如果用户希望像 ChatGPT 一样逐字输出,可以使用:
- SSE
- WebSocket
- HTTP Streaming
流式响应并不能减少总推理时间,但可以明显改善用户体验。
SSE 返回示例:
from fastapi.responses import StreamingResponse
@app.get("/stream")
async def stream():
async def event_generator():
for i in range(10):
yield f"data: chunk-{i}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Nginx 中需要关闭缓冲:
proxy_buffering off;
八、生产环境必须关注的稳定性问题
1. 超时控制
必须对所有外部调用设置 timeout。
错误示例:
requests.get(url)
推荐示例:
requests.get(url, timeout=10)
异步推荐:
httpx.AsyncClient(timeout=30)
否则一旦外部接口卡死,系统资源会被拖垮。
2. 数据库连接池
如果 Agent 需要查询数据库,必须使用连接池,并限制最大连接数。
PostgreSQL 示例:
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:password@postgres:5432/db",
pool_size=20,
max_overflow=10,
pool_timeout=30
)
3. 向量数据库检索优化
RAG Agent 常常依赖向量检索。
优化建议:
- 控制 top_k,不要一次返回太多文档
- 对文档分块大小进行优化
- 建立合适索引
- 对检索结果做 rerank
- 对相同 query 做缓存
- 不要把全部检索内容无脑塞进 Prompt
常见配置:
chunk_size: 500~1000 tokens
top_k: 3~8
similarity_threshold: 0.75
4. Prompt 压缩
Prompt 越长,延迟越高,成本越高。
优化方向:
- 系统提示词精简
- 工具描述压缩
- 历史对话摘要化
- 检索内容去重
- 对长文档先摘要再输入
- 对无关上下文过滤
例如,只保留最近 5 轮对话:
recent_messages = messages[-10:]
或者对历史对话做摘要:
将之前的对话总结为不超过 300 字,保留用户目标、关键约束和已确认信息。
5. 多模型降级
生产系统最好不要只依赖一个模型。
可以设计模型路由:
| 场景 | 模型 |
|---|---|
| 简单问答 | 小模型 |
| 复杂推理 | 大模型 |
| 代码生成 | 代码模型 |
| 高峰期 | 便宜快速模型 |
| 主模型失败 | 备用模型 |
示例策略:
try:
return await call_primary_model(prompt)
except Exception:
return await call_backup_model(prompt)
这样可以降低单点风险。
九、压测方案
高并发系统上线前必须压测。
可以使用 wrk、hey、ab、Locust 等工具。
1. 使用 hey 压测
安装:
go install github.com/rakyll/hey@latest
压测 /health 接口:
hey -n 10000 -c 200 http://localhost/health
压测 /chat 接口:
hey -n 1000 -c 100 \
-m POST \
-H "Content-Type: application/json" \
-d '{"message":"什么是AI Agent?"}' \
http://localhost/chat
2. 使用 wrk 压测
安装:
sudo apt-get update
sudo apt-get install -y wrk
压测健康检查接口:
wrk -t4 -c200 -d60s http://localhost/health
3. 使用 Locust 压测
安装:
pip install locust
创建压测文件:
cat > locustfile.py <<'EOF'
from locust import HttpUser, task, between
class AgentUser(HttpUser):
wait_time = between(1, 3)
@task
def chat(self):
self.client.post(
"/chat",
json={"message": "请介绍 AI Agent 的高并发优化方案"}
)
EOF
启动:
locust -f locustfile.py --host=http://localhost
然后打开:
http://localhost:8089
进行压测配置。
十、监控指标设计
AI Agent 的监控不能只看 CPU 和内存,还要关注模型调用和任务队列。
建议监控以下指标:
| 指标 | 含义 |
|---|---|
| API QPS | 接口请求量 |
| API P95/P99 延迟 | 用户体验 |
| Celery 队列长度 | 是否积压 |
| Worker 成功率 | 任务执行质量 |
| LLM 调用耗时 | 模型瓶颈 |
| LLM 错误率 | 稳定性 |
| Token 消耗 | 成本控制 |
| Redis 内存 | 队列和缓存压力 |
| 数据库连接数 | 是否连接耗尽 |
| 向量检索耗时 | RAG 性能 |
查看 Redis 队列情况:
docker exec -it agent-redis redis-cli
进入后执行:
KEYS *
LLEN celery
INFO memory
INFO clients
查看 Celery Worker 状态:
docker compose exec agent-worker celery -A tasks.celery_app inspect active
查看已注册任务:
docker compose exec agent-worker celery -A tasks.celery_app inspect registered
查看队列统计:
docker compose exec agent-worker celery -A tasks.celery_app inspect stats
十一、Kubernetes 扩容示例
如果系统进入更高规模,可以迁移到 Kubernetes。
Agent Worker Deployment 示例:
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-worker
spec:
replicas: 5
selector:
matchLabels:
app: agent-worker
template:
metadata:
labels:
app: agent-worker
spec:
containers:
- name: agent-worker
image: your-registry/ai-agent:latest
command:
- celery
- -A
- tasks.celery_app
- worker
- --loglevel=info
- --concurrency=8
env:
- name: REDIS_URL
value: redis://redis:6379/0
- name: LLM_API_KEY
valueFrom:
secretKeyRef:
name: llm-secret
key: api-key
应用配置:
kubectl apply -f agent-worker.yaml
扩容:
kubectl scale deployment agent-worker --replicas=10
查看 Pod:
kubectl get pods
查看日志:
kubectl logs -f deployment/agent-worker
十二、推荐上线配置
以中小型企业 AI Agent 为例,初始配置可以参考:
| 组件 | 推荐配置 |
|---|---|
| API 服务 | 2~4 个 Gunicorn Worker |
| Agent Worker | 3~5 个容器 |
| 单 Worker 并发 | 4~8 |
| Redis | 2C4G 起步 |
| PostgreSQL | 4C8G 起步 |
| 向量库 | 视数据量而定 |
| Nginx 超时 | 120s |
| LLM 超时 | 30~60s |
| 任务超时 | 90~180s |
如果是大规模生产环境,建议进一步引入:
- Kubernetes HPA 自动扩容
- Redis Cluster
- 多区域部署
- 多模型路由
- 统一 API Gateway
- 链路追踪 OpenTelemetry
- 成本分析平台
十三、总结
AI Agent 的高并发问题,本质上不是简单地“多开几个进程”就能解决,而是需要从架构上拆分请求链路。
一套可落地的高并发方案应包含:
- API 层轻量化:快速接收请求,避免长时间阻塞;
- 任务队列异步化:用 Redis / RabbitMQ / Kafka 削峰填谷;
- Worker 横向扩容:根据并发量动态增加 Agent 执行节点;
- 模型调用限流:避免触发 LLM 平台限流和雪崩;
- 缓存与语义缓存:降低重复请求成本;
- 超时、重试、熔断:提升系统韧性;
- Prompt 压缩与上下文管理:减少 Token 消耗;
- 流式输出:优化用户感知体验;
- 监控与压测:用数据指导扩容和优化;
- 多模型降级:避免单一模型服务故障影响整体系统。
如果只是内部几十人使用,一个 FastAPI + Celery + Redis 的架构已经足够;如果面向公网数万用户,则需要进一步引入 Kubernetes、自动扩容、分布式限流、模型路由和完善的监控体系。
真正可靠的 AI Agent 系统,不只是“能回答问题”,更重要的是在高并发、高延迟、高成本和不稳定外部依赖下,依然能够稳定、可控、可扩展地运行。