别再硬怼 Claude API:一套能抗高并发的队列限流方案(含源码)
Claude 高并发解决方案|附源码
在实际业务中,很多团队接入 Claude 这类大模型 API 后,很快会遇到一个共同问题:单次调用效果不错,但一旦并发量上来,就容易出现超时、限流、排队阻塞、成本失控、响应不稳定等问题。
例如:
- 客服系统同时有几百个用户咨询;
- 内容平台批量生成文章、标题、摘要;
- 企业知识库同时处理多个员工问答请求;
- 数据分析系统需要批量调用 Claude 做文本理解;
- Agent 系统需要多轮调用模型、工具和外部接口。
如果只是简单地在业务代码里直接请求 Claude API:
client.messages.create(...)
在低并发场景下问题不大,但在高并发场景下会迅速暴露出以下问题:
- 请求过多触发限流
- 网络抖动导致失败率升高
- 接口响应时间不稳定
- 任务堆积导致系统雪崩
- 没有统一队列和重试机制
- 无法有效控制成本
- 业务线程被模型调用阻塞
- 无法观测每个请求的状态
因此,想要稳定支撑 Claude 高并发调用,不能只依赖“多开线程”或“简单异步请求”,而是需要设计一套完整的高并发调用架构。
本文将从架构设计、核心思路、限流策略、异步队列、重试机制、降级方案、监控指标以及完整源码几个方面,介绍一种实用的 Claude 高并发解决方案。
一、Claude 高并发调用的核心挑战
在讨论解决方案之前,先明确问题本质。
Claude API 本质上是一个外部服务。调用外部服务时,系统稳定性受以下因素影响:
- 外部 API 的限流策略;
- 网络链路稳定性;
- 单次请求 token 数量;
- 模型响应耗时;
- 本地服务并发控制;
- 业务峰值流量;
- 失败重试策略;
- 队列堆积情况。
很多开发者的误区是:
高并发 = 多线程 / 多协程。
实际上,对于大模型 API 调用而言,高并发不只是“同时发更多请求”,而是要在“吞吐量、稳定性、成本、延迟”之间做平衡。
如果没有限制地发起大量请求,结果往往不是系统性能提升,而是:
- API 返回
429 Too Many Requests; - 请求大量超时;
- 重试风暴;
- CPU、内存、连接池被打满;
- 业务请求长时间无响应;
- 用户体验急剧下降。
因此,Claude 高并发方案的核心不是“无限并发”,而是:
在可控范围内最大化吞吐,同时保证系统稳定和请求成功率。
二、推荐整体架构
一个较为稳健的 Claude 高并发架构可以分为以下几层:
用户请求
↓
业务 API 服务
↓
任务队列 / 异步调度层
↓
并发控制器 / 限流器
↓
Claude 调用客户端
↓
结果缓存 / 状态存储
↓
返回结果 / 回调通知
核心模块包括:
| 模块 | 作用 |
|---|---|
| API 接入层 | 接收业务请求,做参数校验 |
| 任务队列 | 将请求异步化,削峰填谷 |
| 并发控制器 | 控制同时调用 Claude 的请求数量 |
| 速率限制器 | 控制单位时间内的请求数 |
| 重试模块 | 对临时失败进行自动重试 |
| 超时控制 | 防止请求无限等待 |
| 缓存模块 | 对重复请求直接返回结果 |
| 监控模块 | 统计成功率、延迟、失败原因 |
| 降级模块 | 高峰期保护系统稳定 |
三、几种常见方案对比
1. 直接同步调用
最简单的方式是业务服务直接调用 Claude:
result = call_claude(prompt)
return result
优点:
- 实现简单;
- 适合低并发;
- 调试方便。
缺点:
- 请求阻塞;
- 高并发下容易超时;
- 无法削峰;
- 难以统一限流;
- 用户体验不稳定。
适用场景:
- 内部工具;
- 日请求量较小;
- 对延迟要求不高的后台脚本。
2. 多线程并发调用
通过线程池并发调用 Claude:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=20) as executor:
executor.map(call_claude, prompts)
优点:
- 简单提升吞吐;
- 适合批处理;
- 迁移成本低。
缺点:
- 线程开销较大;
- 缺乏细粒度限流;
- 容易触发 API 限制;
- 不适合海量请求。
3. asyncio 异步调用
使用 Python 的 asyncio 进行异步并发:
await asyncio.gather(*tasks)
优点:
- 并发性能好;
- 资源占用低;
- 适合 I/O 密集型任务;
- 易于控制超时和并发数。
缺点:
- 代码复杂度略高;
- 需要合理设计限流和重试;
- 对开发者异步编程能力有要求。
4. 队列 + Worker 模式
将请求放入队列,由 Worker 消费并调用 Claude。
优点:
- 可削峰填谷;
- 易于横向扩展;
- 适合生产环境;
- 可以支持任务状态查询;
- 失败任务可重试;
- 可以和 Redis、RabbitMQ、Kafka 等集成。
缺点:
- 架构复杂度增加;
- 实时性略有下降;
- 需要维护任务状态。
四、本文方案设计
本文采用一种适合中小型生产系统的方案:
FastAPI + asyncio.Queue + Semaphore 并发控制 + Token Bucket 限流 + 重试机制 + 任务状态存储
该方案不依赖复杂中间件,适合快速落地。如果业务规模继续增长,可以将内存队列替换为 Redis、RabbitMQ 或 Kafka。
整体流程如下:
1. 用户提交任务
2. 服务生成 task_id
3. 任务进入异步队列
4. 后台 Worker 从队列取任务
5. 通过信号量控制最大并发
6. 通过限流器控制请求速率
7. 调用 Claude API
8. 成功则保存结果
9. 失败则重试
10. 用户根据 task_id 查询结果
五、关键设计点
1. 队列削峰
当瞬时请求量过大时,如果所有请求直接打到 Claude API,很容易触发限流。
队列的作用是把瞬时流量变成平滑流量。
例如一分钟内来了 1000 个请求,但系统只允许每秒处理 5 个 Claude 调用,那么多余请求会在队列中等待,而不是同时打爆下游服务。
2. 并发控制
并发控制用于限制同时进行中的 Claude 请求数量。
例如:
semaphore = asyncio.Semaphore(10)
表示最多同时有 10 个请求正在调用 Claude。
这个值不能盲目设置,通常需要根据:
- Claude API 的限额;
- 服务器网络能力;
- 平均响应时间;
- 平均 token 数;
- 业务可接受延迟;
综合评估。
3. 速率限制
并发控制解决的是“同时有多少请求在执行”,而速率限制解决的是“单位时间允许多少请求发出去”。
例如:
- 最大并发:10;
- 每秒最多请求:5。
二者并不相同。
如果模型响应很快,并发数不高也可能在短时间内发出大量请求,因此需要速率限制器。
4. 重试机制
Claude API 调用失败时,并不是所有错误都应该重试。
适合重试的情况包括:
- 网络超时;
- 连接错误;
- 429 限流;
- 500、502、503、504 等服务端错误。
不适合重试的情况包括:
- 参数错误;
- API Key 错误;
- 请求内容违规;
- 上下文过长;
- 账户余额不足。
重试时建议使用指数退避:
第 1 次失败:等待 1 秒
第 2 次失败:等待 2 秒
第 3 次失败:等待 4 秒
第 4 次失败:等待 8 秒
并加入随机抖动,避免大量请求同时重试形成“重试风暴”。
5. 超时控制
大模型调用时间不可控,必须设置超时。
如果不设置超时,一个请求可能长时间占用连接和 Worker,导致后续任务排队。
建议设置:
- 单次 Claude 调用超时:30~120 秒;
- 队列任务最大等待时间:根据业务决定;
- 客户端 HTTP 超时:略大于服务端处理时间。
6. 缓存机制
很多业务中,用户会重复提交相同问题,或者不同用户查询相同内容。
可以对请求内容做 Hash:
cache_key = sha256(prompt + model + params)
如果缓存中已有结果,则直接返回,减少 Claude 调用成本。
适合缓存的场景:
- 摘要生成;
- 标题生成;
- 文档分类;
- FAQ 问答;
- 固定知识库查询。
不适合缓存的场景:
- 强个性化对话;
- 实时数据分析;
- 需要上下文连续性的多轮对话。
六、完整源码示例
下面给出一个可运行的示例代码,使用 FastAPI 构建 Claude 高并发任务服务。
说明:示例中使用 Anthropic 官方 SDK。实际使用前请安装依赖并配置环境变量。
1. 安装依赖
pip install fastapi uvicorn anthropic httpx pydantic
2. 配置环境变量
export ANTHROPIC_API_KEY="your_api_key_here"
3. 完整代码:main.py
import os
import time
import uuid
import random
import asyncio
import hashlib
from typing import Dict, Optional, Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from anthropic import AsyncAnthropic
# =========================
# 基础配置
# =========================
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
if not ANTHROPIC_API_KEY:
raise RuntimeError("请先设置环境变量 ANTHROPIC_API_KEY")
MODEL_NAME = "claude-3-5-sonnet-20241022"
MAX_CONCURRENCY = 10 # 最大并发 Claude 调用数
MAX_REQUESTS_PER_SECOND = 5 # 每秒最多请求数
MAX_RETRIES = 3 # 最大重试次数
CALL_TIMEOUT = 90 # 单次调用超时时间
QUEUE_MAX_SIZE = 1000 # 队列最大长度
WORKER_COUNT = 5 # Worker 数量
# =========================
# FastAPI 初始化
# =========================
app = FastAPI(title="Claude 高并发调用服务")
client = AsyncAnthropic(api_key=ANTHROPIC_API_KEY)
task_queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAX_SIZE)
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
# =========================
# 内存任务状态存储
# 生产环境建议替换为 Redis / MySQL
# =========================
tasks: Dict[str, Dict[str, Any]] = {}
cache: Dict[str, Dict[str, Any]] = {}
# =========================
# 请求模型
# =========================
class ClaudeRequest(BaseModel):
prompt: str = Field(..., min_length=1, description="用户输入内容")
max_tokens: int = Field(1024, ge=1, le=4096)
temperature: float = Field(0.7, ge=0, le=1)
class TaskResponse(BaseModel):
task_id: str
status: str
# =========================
# 简单 Token Bucket 限流器
# =========================
class TokenBucketRateLimiter:
def __init__(self, rate: int, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.updated_at = time.monotonic()
self.lock = asyncio.Lock()
async def acquire(self):
while True:
async with self.lock:
now = time.monotonic()
elapsed = now - self.updated_at
# 按时间补充令牌
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.updated_at = now
if self.tokens >= 1:
self.tokens -= 1
return
# 计算需要等待多久
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
rate_limiter = TokenBucketRateLimiter(
rate=MAX_REQUESTS_PER_SECOND,
capacity=MAX_REQUESTS_PER_SECOND
)
# =========================
# 工具函数
# =========================
def build_cache_key(req: ClaudeRequest) -> str:
raw = f"{MODEL_NAME}:{req.prompt}:{req.max_tokens}:{req.temperature}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def now_ms() -> int:
return int(time.time() * 1000)
def is_retryable_error(error: Exception) -> bool:
message = str(error).lower()
retry_keywords = [
"timeout",
"rate limit",
"429",
"500",
"502",
"503",
"504",
"connection",
"temporarily",
]
return any(keyword in message for keyword in retry_keywords)
# =========================
# Claude 调用函数
# =========================
async def call_claude(req: ClaudeRequest) -> str:
response = await client.messages.create(
model=MODEL_NAME,
max_tokens=req.max_tokens,
temperature=req.temperature,
messages=[
{
"role": "user",
"content": req.prompt
}
],
)
# Claude 返回 content 列表,这里取文本内容
texts = []
for block in response.content:
if getattr(block, "type", None) == "text":
texts.append(block.text)
return "\n".join(texts)
async def call_claude_with_retry(req: ClaudeRequest) -> str:
last_error: Optional[Exception] = None
for attempt in range(MAX_RETRIES + 1):
try:
await rate_limiter.acquire()
async with semaphore:
return await asyncio.wait_for(
call_claude(req),
timeout=CALL_TIMEOUT
)
except Exception as e:
last_error = e
if not is_retryable_error(e):
raise e
if attempt >= MAX_RETRIES:
break
# 指数退避 + 随机抖动
backoff = min(2 ** attempt, 8)
jitter = random.uniform(0, 0.5)
sleep_time = backoff + jitter
await asyncio.sleep(sleep_time)
raise last_error if last_error else RuntimeError("Claude 调用失败")
# =========================
# Worker 消费任务
# =========================
async def worker(worker_id: int):
while True:
task_id, req, cache_key = await task_queue.get()
try:
tasks[task_id]["status"] = "running"
tasks[task_id]["started_at"] = now_ms()
result = await call_claude_with_retry(req)
tasks[task_id]["status"] = "success"
tasks[task_id]["result"] = result
tasks[task_id]["finished_at"] = now_ms()
cache[cache_key] = {
"result": result,
"created_at": now_ms()
}
except Exception as e:
tasks[task_id]["status"] = "failed"
tasks[task_id]["error"] = str(e)
tasks[task_id]["finished_at"] = now_ms()
finally:
task_queue.task_done()
# =========================
# 生命周期事件
# =========================
@app.on_event("startup")
async def startup_event():
for i in range(WORKER_COUNT):
asyncio.create_task(worker(i))
# =========================
# API:提交任务
# =========================
@app.post("/tasks", response_model=TaskResponse)
async def create_task(req: ClaudeRequest):
cache_key = build_cache_key(req)
# 命中缓存,直接创建成功任务
if cache_key in cache:
task_id = str(uuid.uuid4())
tasks[task_id] = {
"task_id": task_id,
"status": "success",
"result": cache[cache_key]["result"],
"from_cache": True,
"created_at": now_ms(),
"finished_at": now_ms()
}
return TaskResponse(task_id=task_id, status="success")
# 队列满则拒绝,保护系统
if task_queue.full():
raise HTTPException(
status_code=429,
detail="系统繁忙,请稍后重试"
)
task_id = str(uuid.uuid4())
tasks[task_id] = {
"task_id": task_id,
"status": "pending",
"prompt": req.prompt,
"from_cache": False,
"created_at": now_ms(),
"started_at": None,
"finished_at": None,
"result": None,
"error": None
}
await task_queue.put((task_id, req, cache_key))
return TaskResponse(task_id=task_id, status="pending")
# =========================
# API:查询任务
# =========================
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
task = tasks.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return task
# =========================
# API:健康检查
# =========================
@app.get("/health")
async def health():
return {
"status": "ok",
"queue_size": task_queue.qsize(),
"max_queue_size": QUEUE_MAX_SIZE,
"max_concurrency": MAX_CONCURRENCY,
"requests_per_second": MAX_REQUESTS_PER_SECOND,
"worker_count": WORKER_COUNT
}
七、启动服务
保存为 main.py 后执行:
uvicorn main:app --host 0.0.0.0 --port 8000
提交任务:
curl -X POST "http://localhost:8000/tasks" \
-H "Content-Type: application/json" \
-d '{
"prompt": "请用中文解释什么是高并发架构",
"max_tokens": 1024,
"temperature": 0.7
}'
返回示例:
{
"task_id": "3e0d12c7-f489-4e58-b6c7-2d4c420c48ff",
"status": "pending"
}
查询任务:
curl "http://localhost:8000/tasks/3e0d12c7-f489-4e58-b6c7-2d4c420c48ff"
成功返回:
{
"task_id": "3e0d12c7-f489-4e58-b6c7-2d4c420c48ff",
"status": "success",
"result": "高并发架构是指系统能够同时处理大量请求的架构设计……",
"from_cache": false,
"created_at": 1730000000000,
"started_at": 1730000001000,
"finished_at": 1730000008000,
"error": null
}
八、生产环境优化建议
上面的代码适合演示和中小规模服务。如果要在生产环境中使用,建议继续做以下优化。
1. 使用 Redis 替代内存队列
当前代码中的任务状态和缓存都保存在内存中,服务重启后会丢失。
生产环境建议使用:
- Redis List / Stream 存储任务队列;
- Redis Hash 存储任务状态;
- Redis String 存储缓存结果;
- Redis Sorted Set 做任务超时扫描。
这样可以支持多实例部署和任务恢复。
2. 使用 Celery / Dramatiq / RQ
如果业务中已有成熟的异步任务系统,可以直接使用:
- Celery;
- Dramatiq;
- RQ;
- Arq。
这些框架提供了更成熟的:
- 任务分发;
- 失败重试;
- 任务持久化;
- 定时任务;
- Worker 横向扩容;
- 任务监控。
3. 增加 Token 级别限流
Claude API 通常不仅限制请求数,也可能限制 token 使用量。
因此,仅限制 QPS 还不够,还应根据请求估算 token 数量。
可以增加:
每分钟最大输入 token
每分钟最大输出 token
每分钟最大总 token
当 token 使用量达到阈值时,让任务等待或拒绝。
4. 流式输出优化体验
如果业务是聊天场景,用户通常不希望一直等待完整结果。
可以使用 Claude 的流式输出能力,将内容边生成边返回。
流式输出适合:
- 聊天机器人;
- 在线写作助手;
- 代码生成;
- 长文本总结;
- Agent 执行过程展示。
但需要注意,流式输出会增加连接保持时间,对网关、负载均衡、前端连接管理都有要求。
5. 请求分级处理
不是所有请求都具有相同优先级。
例如:
- 付费用户优先级高;
- 后台批处理优先级低;
- 实时聊天优先级高;
- 非实时摘要任务优先级低。
可以使用优先级队列:
high_priority_queue
normal_priority_queue
low_priority_queue
Worker 优先消费高优先级任务,在系统繁忙时延迟处理低优先级任务。
6. 增加熔断机制
如果 Claude API 在短时间内持续失败,系统应主动熔断,避免继续发起大量无效请求。
熔断状态可以分为:
| 状态 | 含义 |
|---|---|
| Closed | 正常调用 |
| Open | 暂停调用 |
| Half-Open | 少量探测请求 |
当失败率超过阈值时进入 Open 状态;等待一段时间后进入 Half-Open;如果探测成功则恢复 Closed,否则继续熔断。
7. 增加降级策略
高峰期或 Claude API 不可用时,可以进行业务降级。
常见降级方式包括:
- 返回缓存结果;
- 使用更便宜或更快的模型;
- 返回简化答案;
- 将任务改为异步处理;
- 提示用户稍后查看;
- 对非核心功能暂时关闭。
降级不是为了“降低质量”,而是为了在异常情况下保证核心服务可用。
九、关键参数如何设置
很多人会问:MAX_CONCURRENCY 和 MAX_REQUESTS_PER_SECOND 应该设置多少?
没有固定答案,需要根据实际压测结果决定。
可以从较保守的参数开始:
MAX_CONCURRENCY = 5
MAX_REQUESTS_PER_SECOND = 2
WORKER_COUNT = 3
观察以下指标:
- 平均响应时间;
- P95 响应时间;
- P99 响应时间;
- 失败率;
- 429 频率;
- 队列长度;
- CPU 和内存;
- 每分钟 token 消耗;
- 单任务平均等待时间。
如果系统稳定,再逐步提高并发和 QPS。
建议不要一次性把参数调得很大,而是采用阶梯式压测:
并发 5 → 并发 10 → 并发 20 → 并发 50
每个阶段至少观察几分钟到几十分钟。
十、监控指标设计
生产系统中,监控非常重要。建议至少记录以下指标:
1. 请求指标
- 总请求数;
- 成功请求数;
- 失败请求数;
- 失败率;
- 平均响应时间;
- P95 延迟;
- P99 延迟。
2. 队列指标
- 当前队列长度;
- 队列最大长度;
- 任务平均等待时间;
- 任务最长等待时间;
- 入队速度;
- 出队速度。
3. Claude 调用指标
- API 调用次数;
- API 失败次数;
- 429 次数;
- 超时次数;
- 平均 token 消耗;
- 每分钟 token 消耗;
- 单次调用平均成本。
4. 系统指标
- CPU 使用率;
- 内存使用率;
- 网络连接数;
- Worker 数量;
- 事件循环延迟;
- Redis 连接数。
十一、常见问题
1. 为什么已经用了 asyncio,还是会限流?
因为 asyncio 只能提高本地并发能力,不能突破 Claude API 的服务端限制。
你可以同时发出 1000 个请求,但如果 API 限额只允许每分钟 100 个请求,其余请求仍然会失败。
2. 为什么要同时使用 Semaphore 和 Rate Limiter?
因为二者解决的问题不同:
- Semaphore 控制同时运行的请求数量;
- Rate Limiter 控制单位时间发出的请求数量。
例如一个请求平均耗时 10 秒,并发数为 10,那么理论 QPS 约为 1。
但如果请求平均耗时 0.2 秒,并发数为 10,那么理论 QPS 可达 50。
因此仅靠并发控制无法稳定限制 QPS。
3. 队列越大越好吗?
不是。
队列太小会导致请求容易被拒绝;队列太大则可能造成用户等待过久,甚至积压大量过期任务。
建议根据业务可接受等待时间设置队列长度。
例如:
系统每秒处理 5 个任务
用户最多可接受等待 60 秒
则队列长度建议不超过 300
4. 是否应该无限重试?
不应该。
无限重试会导致任务堆积和成本失控。通常重试 2~3 次即可。
对于非实时任务,可以将失败任务放入死信队列,后续人工或定时补偿处理。
十二、总结
Claude 高并发调用的关键,不是简单地把请求同时发出去,而是要构建一套可控、可观测、可恢复的调用体系。
一个稳定的 Claude 高并发方案至少应包含:
- 异步任务队列;
- 并发控制;
- QPS 限流;
- 超时控制;
- 自动重试;
- 缓存机制;
- 队列保护;
- 状态查询;
- 监控告警;
- 降级熔断。
本文给出的 FastAPI + asyncio 方案结构简单、易于理解,适合快速落地 Claude 高并发服务。对于更大规模的生产系统,可以进一步引入 Redis、Kafka、Celery、Prometheus、Grafana 等组件,实现多实例部署、任务持久化和全链路监控。
最终目标不是“跑满并发”,而是让系统在高峰期依然稳定,让用户请求有序处理,让成本和失败率都处于可控范围内。