AI Agent 扛不住并发?这套生产级架构和配置可以直接参考
AI Agent 高并发解决方案|附配置文件
随着大模型能力的快速提升,AI Agent 已经从“单轮问答工具”逐渐演进为能够调用工具、执行任务、访问数据库、编排工作流、进行多步骤推理的智能系统。无论是企业知识库助手、客服机器人、数据分析 Agent,还是自动化办公、代码生成、智能运维系统,在真实业务场景中都绕不开一个核心问题:高并发。
很多团队在 Demo 阶段运行良好,但一旦进入生产环境,面对大量用户同时访问,就会出现响应变慢、请求堆积、模型接口超时、工具调用阻塞、数据库连接耗尽、任务重复执行等问题。AI Agent 与传统 Web 服务不同,它不仅需要处理 HTTP 请求,还涉及模型推理、上下文管理、任务调度、工具调用、向量检索、缓存、异步队列等多个环节,因此其高并发方案也更复杂。
本文将系统讲解 AI Agent 高并发架构设计思路,并给出可直接参考的配置文件,包括 Nginx、Gunicorn/Uvicorn、Redis、Celery、Docker Compose 等内容,帮助你构建一个稳定、可扩展、可落地的 AI Agent 服务。
一、AI Agent 高并发的核心挑战
在讨论解决方案之前,我们需要先明确 AI Agent 在高并发场景下面临的主要瓶颈。
1. 大模型接口延迟高
传统 Web API 的响应时间可能在几十毫秒到几百毫秒之间,而大模型调用往往需要数秒甚至数十秒。尤其在复杂 Agent 场景下,一次用户请求可能需要多次调用 LLM:
- 第一次理解用户意图;
- 第二次规划任务步骤;
- 第三次调用工具后总结结果;
- 第四次生成最终答案。
如果每次模型调用平均耗时 3 秒,一个请求可能很容易超过 10 秒。高并发下,请求会迅速堆积,导致系统吞吐下降。
2. Agent 工具调用不可控
AI Agent 通常会调用多种工具,例如:
- 搜索引擎;
- 数据库查询;
- 文件读取;
- 代码执行;
- 第三方 API;
- 企业内部系统接口。
这些工具的响应时间并不稳定。如果某个工具阻塞,整个 Agent 链路都会变慢,甚至拖垮服务线程。
3. 上下文和会话状态管理复杂
AI Agent 通常需要记住用户历史对话、任务状态、工具调用结果等信息。如果所有状态都存储在应用内存中,不仅无法横向扩展,还可能在实例重启后丢失数据。
高并发情况下,如果状态管理不当,很容易出现:
- 会话串线;
- 上下文丢失;
- 重复消费任务;
- 多实例之间数据不一致。
4. 流式输出占用连接时间长
很多 AI 应用会采用 SSE 或 WebSocket 进行流式输出。流式输出虽然改善了用户体验,但会让 HTTP 连接保持更长时间。并发用户较多时,连接数会快速增长,对网关、应用服务和负载均衡都提出更高要求。
5. 数据库与向量库容易成为瓶颈
AI Agent 经常依赖数据库和向量数据库,例如 MySQL、PostgreSQL、MongoDB、Milvus、Qdrant、Weaviate、Elasticsearch 等。高并发检索时,如果没有合理设计连接池、索引、缓存和限流策略,数据库很容易成为性能瓶颈。
二、整体架构设计
一个适合生产环境的 AI Agent 高并发架构,通常不建议采用单体同步阻塞模式,而应采用“网关层 + 应用层 + 缓存层 + 队列层 + Worker 层 + 存储层”的分层架构。
推荐架构如下:
用户请求
|
v
Nginx / API Gateway
|
v
FastAPI / Flask / Node.js Agent API 服务
|
|---- Redis 缓存 / 会话状态 / 分布式锁
|
|---- PostgreSQL / MySQL 业务数据库
|
|---- Vector DB 向量数据库
|
v
消息队列 Celery / RabbitMQ / Redis Stream / Kafka
|
v
Agent Worker 集群
|
|---- LLM Provider:OpenAI / Azure OpenAI / Claude / Gemini / 本地模型
|
|---- Tool Service:搜索、数据库、代码执行、RPA、内部系统
|
v
结果回写 Redis / DB / WebSocket / SSE
该架构的核心思想是:前端请求尽量快速接入,复杂任务异步化,状态集中存储,计算任务水平扩展,模型调用统一限流。
三、高并发解决方案关键策略
1. 请求入口使用 Nginx 反向代理
Nginx 主要负责:
- 反向代理;
- 负载均衡;
- SSL 终止;
- 连接管理;
- 静态资源处理;
- 限流限速;
- 超时控制。
对于 AI Agent 场景,Nginx 需要特别注意长连接和流式输出配置。如果使用 SSE,需要关闭代理缓冲,否则用户可能无法实时看到模型输出。
Nginx 配置示例
worker_processes auto;
events {
worker_connections 4096;
use epoll;
multi_accept on;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
client_max_body_size 20m;
gzip on;
gzip_types text/plain application/json application/javascript text/css;
limit_req_zone $binary_remote_addr zone=agent_limit:10m rate=10r/s;
limit_conn_zone $binary_remote_addr zone=addr:10m;
upstream agent_backend {
least_conn;
server agent-api-1:8000 max_fails=3 fail_timeout=30s;
server agent-api-2:8000 max_fails=3 fail_timeout=30s;
keepalive 128;
}
server {
listen 80;
server_name agent.example.com;
location /api/ {
limit_req zone=agent_limit burst=30 nodelay;
limit_conn addr 50;
proxy_pass http://agent_backend;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_connect_timeout 10s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
proxy_buffering off;
proxy_request_buffering off;
}
location /health {
proxy_pass http://agent_backend;
}
}
}
这里需要注意:
proxy_buffering off:适合流式响应;proxy_read_timeout 300s:适合长时间模型推理;limit_req:防止恶意请求打爆后端;least_conn:更适合请求耗时不均的 AI 场景。
2. 应用服务采用异步框架
如果使用 Python,推荐 FastAPI + Uvicorn/Gunicorn。FastAPI 本身支持异步 IO,适合处理外部 API 调用、流式输出、数据库异步访问等场景。
但是要注意:异步框架不等于无限并发。如果 Agent 内部有 CPU 密集型任务、阻塞式 SDK、同步数据库访问,仍然会卡住事件循环。
推荐做法:
- API 层只负责接收请求、鉴权、参数校验、创建任务;
- 长耗时 Agent 执行逻辑交给 Worker;
- 使用 Redis 或数据库保存任务状态;
- 前端通过轮询、SSE 或 WebSocket 获取结果。
Gunicorn + Uvicorn 配置示例
创建 gunicorn_conf.py:
import multiprocessing
bind = "0.0.0.0:8000"
worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count() * 2 + 1
threads = 1
worker_connections = 2000
timeout = 300
graceful_timeout = 30
keepalive = 30
max_requests = 5000
max_requests_jitter = 500
loglevel = "info"
accesslog = "-"
errorlog = "-"
启动命令:
gunicorn app.main:app -c gunicorn_conf.py
配置说明:
workers:进程数,一般为 CPU 核数 * 2 + 1;timeout:AI 任务较长,需要适当调大;max_requests:防止内存泄漏导致进程长期膨胀;worker_connections:异步 Worker 可承载的连接数。
3. 长任务异步化:使用 Celery 或消息队列
AI Agent 最大的问题是一次请求耗时长。如果用户请求直接等待 Agent 完成,服务很容易被占满。因此推荐将任务异步化。
典型流程:
- 用户提交请求;
- API 服务创建任务 ID;
- 任务写入队列;
- Worker 消费任务并执行 Agent;
- 执行结果写入 Redis 或数据库;
- 用户通过任务 ID 查询结果,或通过 SSE/WebSocket 接收推送。
这种模式可以显著提升系统吞吐,并且方便控制 Worker 数量。
Celery 配置示例
创建 celery_app.py:
from celery import Celery
celery_app = Celery(
"agent_worker",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1"
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="Asia/Shanghai",
enable_utc=False,
worker_concurrency=8,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
task_time_limit=600,
task_soft_time_limit=540,
result_expires=3600,
broker_connection_retry_on_startup=True,
)
Agent 任务示例
from celery_app import celery_app
@celery_app.task(name="run_agent_task")
def run_agent_task(task_id: str, user_id: str, prompt: str):
"""
执行 AI Agent 任务
"""
# 1. 读取用户上下文
# 2. 调用 LLM 进行规划
# 3. 调用工具
# 4. 汇总结果
# 5. 写入 Redis 或数据库
return {
"task_id": task_id,
"status": "success",
"answer": "Agent 执行完成"
}
API 提交任务示例
import uuid
from fastapi import FastAPI
from pydantic import BaseModel
from celery_app import celery_app
app = FastAPI()
class AgentRequest(BaseModel):
user_id: str
prompt: str
@app.post("/api/agent/tasks")
async def create_agent_task(req: AgentRequest):
task_id = str(uuid.uuid4())
celery_app.send_task(
"run_agent_task",
args=[task_id, req.user_id, req.prompt]
)
return {
"task_id": task_id,
"status": "queued"
}
这种方式可以避免 API 服务被长时间阻塞。
4. 使用 Redis 管理会话、缓存和限流
Redis 在 AI Agent 高并发架构中非常重要,常见用途包括:
- 保存会话上下文;
- 保存任务状态;
- 缓存 LLM 响应;
- 缓存向量检索结果;
- 实现分布式锁;
- 实现接口限流;
- 保存流式输出片段。
Redis 配置示例
创建 redis.conf:
bind 0.0.0.0
port 6379
protected-mode no
databases 16
maxmemory 2gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec
tcp-keepalive 300
timeout 0
io-threads 4
io-threads-do-reads yes
配置说明:
maxmemory-policy allkeys-lru:适合缓存场景;appendonly yes:提高数据可靠性;io-threads:Redis 6 以后可开启 IO 多线程;- 如果 Redis 仅作为缓存,可关闭 AOF;如果保存任务状态,建议开启。
5. LLM 调用必须做限流和熔断
很多 AI Agent 高并发失败并不是应用服务扛不住,而是模型接口扛不住。无论使用 OpenAI、Azure OpenAI、Claude、Gemini,还是本地部署模型,都需要考虑:
- 每分钟请求数限制;
- 每分钟 Token 限制;
- 模型超时;
- 失败重试;
- 熔断降级;
- 多模型路由;
- 请求排队。
推荐策略
| 策略 | 说明 |
|---|---|
| 限流 | 控制每个用户、每个租户、每个模型的调用频率 |
| 熔断 | 当模型连续失败时,短时间停止请求 |
| 重试 | 对 429、500、网络超时进行指数退避重试 |
| 降级 | 高峰期切换到小模型或返回简化结果 |
| 缓存 | 对重复问题缓存答案或检索结果 |
| 队列 | 将超过并发上限的任务排队处理 |
Python 限流示例
import asyncio
from aiolimiter import AsyncLimiter
# 每分钟最多 60 次模型请求
llm_limiter = AsyncLimiter(60, 60)
async def call_llm(prompt: str):
async with llm_limiter:
# 调用模型接口
# response = await llm_client.chat(...)
return "llm response"
如果是多租户 SaaS 系统,建议按租户维度做限流:
tenant_id:model_name:rpm
tenant_id:model_name:tpm
user_id:daily_quota
这样可以避免单个大客户或异常用户占满全局资源。
6. 向量检索需要缓存和分片
知识库 Agent 往往依赖 RAG,也就是检索增强生成。高并发下,向量检索可能成为瓶颈。
优化建议:
- 对用户 Query 先做归一化;
- 对热门 Query 缓存检索结果;
- 控制 Top K 数量,避免一次检索过多;
- 使用 metadata filter 减少搜索空间;
- 大规模知识库按租户、业务线或语义空间分片;
- 为向量库单独配置连接池;
- 对文档召回结果做重排时控制并发。
例如,在 RAG 场景中,不建议每次都召回 50 条文档再交给大模型。可以先召回 10 条,再重排选取 3 到 5 条,既降低 Token 成本,也提升响应速度。
四、Docker Compose 一键部署示例
下面给出一个简化版 Docker Compose,用于部署 Nginx、Agent API、Worker、Redis。
创建 docker-compose.yml:
version: "3.9"
services:
nginx:
image: nginx:1.25
container_name: agent-nginx
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- agent-api-1
- agent-api-2
networks:
- agent-net
agent-api-1:
build: .
container_name: agent-api-1
command: gunicorn app.main:app -c gunicorn_conf.py
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_BACKEND_URL=redis://redis:6379/1
depends_on:
- redis
networks:
- agent-net
agent-api-2:
build: .
container_name: agent-api-2
command: gunicorn app.main:app -c gunicorn_conf.py
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_BACKEND_URL=redis://redis:6379/1
depends_on:
- redis
networks:
- agent-net
agent-worker:
build: .
container_name: agent-worker
command: celery -A celery_app.celery_app worker --loglevel=info --concurrency=8
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_BACKEND_URL=redis://redis:6379/1
depends_on:
- redis
networks:
- agent-net
redis:
image: redis:7
container_name: agent-redis
command: redis-server /etc/redis/redis.conf
volumes:
- ./redis.conf:/etc/redis/redis.conf:ro
- redis-data:/data
networks:
- agent-net
networks:
agent-net:
driver: bridge
volumes:
redis-data:
如果 Worker 压力较大,可以横向扩展:
docker compose up -d --scale agent-worker=5
这样可以同时启动多个 Worker 消费队列任务。
五、流式输出的高并发处理
对于 AI Agent 产品而言,流式输出几乎是标配。用户不希望等待 30 秒后一次性看到结果,而是希望模型边生成边返回。
常见方案有两种:
1. SSE
SSE,即 Server-Sent Events,适合服务端向客户端单向推送文本流。对于 ChatGPT 类产品非常合适。
优点:
- 实现简单;
- 基于 HTTP;
- 浏览器原生支持;
- 适合文本流式输出。
缺点:
- 主要是单向通信;
- 连接时间长;
- 对 Nginx 和后端连接数要求高。
2. WebSocket
WebSocket 适合双向实时通信,例如 Agent 需要频繁接收用户中断、确认、补充参数等操作。
优点:
- 双向通信;
- 实时性强;
- 适合复杂交互型 Agent。
缺点:
- 连接管理复杂;
- 需要心跳机制;
- 负载均衡需要考虑会话保持或集中状态管理。
推荐做法
如果只是文本流式输出,优先选择 SSE。如果需要用户和 Agent 持续交互,例如“确认执行”“暂停任务”“修改计划”,可以选择 WebSocket。
无论使用哪种方式,都要注意:
- 输出内容不要只存在应用内存;
- 中间结果可以写入 Redis Stream;
- 客户端断线后支持重新连接;
- 服务端要有连接超时和心跳机制;
- 避免一个用户长时间占用过多连接。
六、数据库连接池配置
AI Agent 通常需要读写任务表、消息表、用户表、知识库表等。高并发下,数据库连接池配置非常关键。
如果使用 SQLAlchemy,可以参考如下配置:
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:password@postgres:5432/agent",
pool_size=20,
max_overflow=40,
pool_timeout=30,
pool_recycle=1800,
echo=False
)
配置说明:
pool_size:常驻连接数;max_overflow:临时可扩展连接数;pool_timeout:获取连接超时时间;pool_recycle:定期回收连接,避免长连接失效。
同时,数据库表设计也要注意索引。例如任务表可以按如下字段建索引:
CREATE INDEX idx_agent_task_user_id ON agent_task(user_id);
CREATE INDEX idx_agent_task_status ON agent_task(status);
CREATE INDEX idx_agent_task_created_at ON agent_task(created_at);
如果任务量非常大,还可以按时间或租户进行分区。
七、Agent 执行链路优化
高并发不仅仅是“加机器”,更重要的是减少单个请求的耗时和资源消耗。
1. 减少不必要的 LLM 调用
很多 Agent 框架默认会进行多轮规划和反思,但生产环境中并不一定需要。可以根据任务类型选择不同策略:
- 简单问答:直接 RAG + 回答;
- 中等复杂任务:一次规划 + 工具调用 + 总结;
- 复杂任务:多步骤 Agent;
- 高峰期:关闭反思链路或改用轻量模型。
2. 工具调用并行化
如果多个工具之间没有依赖关系,可以并行执行。例如同时查询:
- 用户订单;
- 库存状态;
- 物流信息;
- 优惠券信息。
异步并发示例:
import asyncio
async def run_tools():
results = await asyncio.gather(
query_order(),
query_inventory(),
query_logistics(),
query_coupon(),
return_exceptions=True
)
return results
这样可以将多个工具的总耗时从“累加”变成“取最大值”。
3. 设置工具超时
任何外部工具都必须设置超时。否则一个第三方接口卡住,就可能拖垮整个 Agent。
import httpx
async def call_tool(url: str, payload: dict):
timeout = httpx.Timeout(10.0, connect=3.0)
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
return resp.json()
建议:
- 内部工具超时:3 到 10 秒;
- 外部搜索接口超时:5 到 15 秒;
- LLM 调用超时:30 到 120 秒;
- 整个 Agent 任务超时:根据业务设置,一般 1 到 10 分钟。
八、监控与告警
没有监控的高并发系统是不可靠的。AI Agent 服务至少需要监控以下指标:
1. API 层指标
- QPS;
- P95/P99 延迟;
- 错误率;
- 活跃连接数;
- 请求体大小;
- 限流命中次数。
2. 队列指标
- 队列长度;
- 任务等待时间;
- Worker 数量;
- 任务执行耗时;
- 任务失败率;
- 重试次数。
3. LLM 指标
- 每分钟请求数;
- 每分钟 Token 数;
- 平均首 Token 延迟;
- 平均完整响应耗时;
- 429 次数;
- 模型调用失败率;
- 单租户 Token 使用量。
4. 存储指标
- Redis 内存;
- Redis 命中率;
- 数据库连接数;
- 慢查询;
- 向量库检索耗时;
- 磁盘 IO。
推荐使用:
- Prometheus;
- Grafana;
- Loki;
- OpenTelemetry;
- Jaeger;
- ELK。
九、生产环境推荐参数
下面给出一组通用参考参数,具体还需要根据服务器配置和业务压测结果调整。
| 模块 | 推荐配置 |
|---|---|
| Nginx worker_connections | 4096 起步 |
| API 实例数 | 至少 2 个 |
| Gunicorn workers | CPU * 2 + 1 |
| Worker 并发数 | 4 到 16 |
| Redis maxmemory | 2GB 起步 |
| API 超时 | 60 到 300 秒 |
| Agent 任务超时 | 5 到 10 分钟 |
| LLM 重试次数 | 2 到 3 次 |
| 向量检索 Top K | 3 到 10 |
| SSE 心跳 | 15 到 30 秒 |
| 队列预取 | worker_prefetch_multiplier=1 |
特别注意:
如果任务执行时间差异很大,Celery 一定要设置:
worker_prefetch_multiplier = 1
task_acks_late = True
否则 Worker 可能提前拿走多个任务,导致短任务被长任务阻塞。
十、压测建议
上线前必须进行压测。AI Agent 压测不能只测 HTTP 接口,还要覆盖完整链路。
1. 压测维度
- 并发用户数;
- 平均请求耗时;
- 队列堆积情况;
- LLM 调用成功率;
- Redis 命中率;
- 数据库慢查询;
- Worker CPU 和内存;
- SSE 连接保持能力。
2. 压测工具
可以使用:
- Locust;
- k6;
- JMeter;
- wrk;
- Vegeta。
3. k6 压测示例
创建 loadtest.js:
import http from "k6/http";
import { sleep, check } from "k6";
export const options = {
vus: 200,
duration: "5m",
};
export default function () {
const payload = JSON.stringify({
user_id: `user-${__VU}`,
prompt: "请总结一下公司知识库中关于报销流程的内容"
});
const params = {
headers: {
"Content-Type": "application/json",
},
};
const res = http.post(
"http://localhost/api/agent/tasks",
payload,
params
);
check(res, {
"status is 200": (r) => r.status === 200,
"task created": (r) => r.json("task_id") !== undefined,
});
sleep(1);
}
执行:
k6 run loadtest.js
压测时要重点观察:请求是否快速进入队列、Worker 是否稳定消费、Redis 是否出现内存上涨、LLM 是否触发限流。
十一、常见故障与解决办法
1. 请求大量超时
可能原因:
- LLM 响应慢;
- Agent 同步执行;
- Worker 数量不足;
- Nginx 超时时间过短;
- 数据库连接池耗尽。
解决方法:
- 长任务异步化;
- 增加 Worker;
- 优化 Agent 步骤;
- 调整 Nginx 和 Gunicorn 超时;
- 检查慢查询。
2. Redis 内存持续上涨
可能原因:
- 任务结果没有过期时间;
- 会话上下文无限增长;
- 缓存 Key 没有 TTL;
- 流式输出片段未清理。
解决方法:
- 设置 TTL;
- 限制上下文轮数;
- 定期清理历史任务;
- 设置
maxmemory-policy。
3. LLM 频繁 429
可能原因:
- 超过模型服务商 RPM/TPM;
- 没有全局限流;
- Worker 并发过高;
- 重试策略过于激进。
解决方法:
- 增加限流器;
- 降低 Worker 并发;
- 按模型和租户分配额度;
- 使用指数退避;
- 增加备用模型。
4. Worker 任务堆积
可能原因:
- 请求量超过处理能力;
- 单任务耗时太长;
- 工具调用阻塞;
- Worker 数量不足。
解决方法:
- 扩容 Worker;
- 拆分任务队列;
- 对不同任务设置优先级;
- 优化工具调用;
- 高峰期降级。
十二、总结
AI Agent 的高并发解决方案,不能简单理解为“多开几个服务实例”。它需要从架构层面进行系统设计,核心原则包括:
- 入口层限流和负载均衡:通过 Nginx 或 API Gateway 控制流量;
- 应用层异步化:API 服务不直接执行长任务;
- 任务队列削峰填谷:使用 Celery、Kafka、RabbitMQ 等队列承接高峰流量;
- 状态集中管理:使用 Redis 和数据库保存任务状态、上下文和结果;
- 模型调用限流熔断:防止 LLM 接口成为系统崩溃点;
- 工具调用超时隔离:避免外部系统拖垮 Agent;
- 向量检索缓存优化:降低 RAG 链路延迟;
- 监控告警闭环:用数据驱动扩容和优化。
一个稳定的 AI Agent 生产系统,本质上是一个“分布式任务处理系统 + 大模型调用编排系统 + 高并发 Web 服务”。只有把模型、队列、缓存、数据库、网关、监控全部纳入整体设计,才能真正支撑大规模用户访问。
如果你的 AI Agent 还停留在单机同步调用阶段,建议优先完成三件事:引入队列、接入 Redis、增加 LLM 限流。这三步通常就能显著提升系统稳定性,并为后续横向扩展打下基础。