上一篇 下一篇 分享链接 返回 返回顶部

Claude 扛不住高并发?这套网关方案把限流、队列、缓存和熔断一次讲透

发布人:慈云数据-客服中心 发布时间:6小时前 阅读量:3

Claude 高并发解决方案|附源码

在企业级 AI 应用中,Claude 这类大模型通常不是“能不能调用”的问题,而是“如何在高并发场景下稳定、可控、低成本地调用”的问题。

很多团队在接入 Claude API 后,前期测试非常顺利:单用户、低频请求、简单 Prompt 都能正常返回。但一旦上线到真实业务环境,例如客服机器人、知识库问答、代码助手、AI 写作平台、智能工单分析系统,就会很快遇到以下问题:

  • 并发请求突然升高,接口响应变慢;
  • Claude API 出现限流、超时、重试失败;
  • 单个请求生成时间过长,占用连接资源;
  • 多用户同时请求时,排队机制混乱;
  • Token 消耗不可控,成本迅速上升;
  • 服务端没有熔断、降级、缓存机制;
  • 前端等待时间过长,用户体验差;
  • 多个 API Key 或多个模型之间缺少统一调度。

因此,想要让 Claude 在生产环境中稳定运行,必须设计一套完整的高并发解决方案。本文将从架构设计、核心策略、队列模型、限流方案、缓存机制、重试机制、流式响应、源码实现等方面进行系统讲解,并提供一套可直接参考的 Node.js 服务端源码。


一、Claude 高并发场景的核心问题

Claude API 本质上是一个外部大模型服务。和普通数据库、Redis、HTTP 接口不同,它有几个明显特点:

1. 单次请求耗时较长

普通 HTTP 接口可能几十毫秒到几百毫秒返回,而 Claude 的一次完整生成可能需要几秒甚至几十秒。如果并发请求非常多,服务端线程、连接、内存都会被大量占用。

2. Token 成本不可忽略

Claude 的计费一般与输入 Token 和输出 Token 相关。高并发场景下,如果不控制 Prompt 长度、不限制最大输出、不做缓存,成本会快速上升。

3. API 有速率限制

Claude API 通常存在请求频率、Token 速率、并发数等限制。不同套餐、不同模型、不同区域可能有不同额度。如果业务请求超过限制,就会出现 429、超时、失败等问题。

4. 请求不可简单无限重试

大模型请求的重试成本较高,因为每次重试都可能再次消耗时间和资源。盲目重试不仅不能解决问题,反而会进一步放大故障。

5. 高并发下需要排队与削峰

当用户请求超过模型服务处理能力时,必须通过消息队列、任务队列、令牌桶、并发池等方式进行削峰填谷,不能把所有请求直接打到 Claude API。


二、整体架构设计

推荐的 Claude 高并发架构如下:

用户请求
   ↓
API 网关 / Web 服务
   ↓
参数校验 / 鉴权 / Prompt 清洗
   ↓
缓存查询
   ↓
限流器
   ↓
任务队列
   ↓
并发控制器
   ↓
Claude API 调用层
   ↓
重试 / 熔断 / 降级
   ↓
结果缓存
   ↓
返回用户

在这套架构中,关键模块包括:

  1. 限流模块:防止瞬时流量击穿服务;
  2. 队列模块:控制请求进入模型层的速度;
  3. 并发池模块:限制同时调用 Claude API 的数量;
  4. 缓存模块:减少重复请求;
  5. 重试模块:处理临时性失败;
  6. 熔断模块:避免故障扩大;
  7. 流式响应模块:改善用户等待体验;
  8. 日志与监控模块:分析成本、性能和错误。

三、高并发解决方案核心策略

1. 使用任务队列削峰

高并发请求不能全部直接访问 Claude API,而应该先进入任务队列。队列可以让系统按照自身处理能力逐步消费任务。

常见选择:

  • BullMQ + Redis;
  • RabbitMQ;
  • Kafka;
  • 自研内存队列;
  • 云厂商队列服务。

如果是中小型项目,可以使用 BullMQ。如果是大型企业系统,可以考虑 Kafka 或 RabbitMQ。

2. 控制最大并发数

即使有队列,也不能让消费者无限并发调用 Claude API。必须设置最大并发数,例如:

同时最多处理 20 个 Claude 请求

这样可以避免:

  • API 触发限流;
  • 服务器连接耗尽;
  • 内存暴涨;
  • 请求大量超时。

3. 使用令牌桶限流

令牌桶适合控制单位时间内的请求量。例如:

每分钟最多 300 个请求
每秒最多 10 个请求

当请求超过限制时,可以选择:

  • 直接返回“当前繁忙”;
  • 进入队列等待;
  • 降级到更便宜模型;
  • 返回缓存结果。

4. Prompt 压缩与输入控制

Claude 的输入越长,成本和延迟越高。在高并发场景下,必须限制输入长度。例如:

  • 限制用户问题最大字符数;
  • 对历史对话进行摘要;
  • 知识库检索只取 Top N 片段;
  • 删除无关上下文;
  • 对重复 Prompt 做缓存。

5. 输出 Token 限制

很多接口慢并不是因为模型响应慢,而是输出内容太长。因此要设置合理的 max_tokens

例如:

max_tokens: 1024

对不同业务场景设置不同上限:

场景 建议最大输出 Token
意图识别 100 - 300
客服问答 500 - 1000
总结摘要 800 - 1500
长文生成 2000 - 4000
代码生成 2000+

6. 流式响应提升体验

在用户等待 Claude 返回时,如果使用普通阻塞响应,用户可能会觉得系统“卡住了”。流式响应可以让用户逐字看到结果,体验会明显更好。

适合流式响应的场景:

  • 聊天机器人;
  • AI 写作;
  • 代码助手;
  • 总结生成;
  • 长文本问答。

7. 缓存重复请求

对于相同问题、相同上下文、相同参数,可以使用缓存避免重复调用 Claude。

缓存 Key 可以由以下内容生成:

hash(model + prompt + temperature + max_tokens)

缓存位置可以是:

  • Redis;
  • 本地 LRU Cache;
  • 数据库;
  • CDN 边缘缓存。

注意:如果请求中包含用户隐私或强个性化内容,需要谨慎缓存。

8. 重试机制

Claude API 调用失败时,可以进行有限次数重试。

建议只对以下错误重试:

  • 网络超时;
  • 502;
  • 503;
  • 504;
  • 临时连接失败;
  • 部分 429 限流错误。

不建议对以下错误重试:

  • API Key 错误;
  • 参数错误;
  • 余额不足;
  • 模型不存在;
  • Prompt 违规;
  • 请求体格式错误。

重试策略建议使用指数退避:

第 1 次失败:等待 500ms
第 2 次失败:等待 1000ms
第 3 次失败:等待 2000ms

9. 熔断与降级

当 Claude API 持续失败时,系统应自动进入熔断状态,暂停继续请求,避免故障扩大。

降级方式包括:

  • 返回固定兜底话术;
  • 返回缓存结果;
  • 切换到轻量模型;
  • 提示用户稍后重试;
  • 只做关键词匹配,不调用大模型。

10. 多模型调度

不同任务没必要都使用最强模型。可以按任务复杂度路由:

任务类型 推荐策略
简单分类 使用轻量模型
普通问答 使用中等模型
复杂推理 使用高性能模型
长文生成 使用大上下文模型
批量处理 使用低成本模型

这样可以同时降低成本和提升吞吐。


四、源码实现:Node.js Claude 高并发网关

下面提供一套简化但完整的实现方案,包含:

  • Express HTTP 服务;
  • Claude API 调用;
  • 并发控制;
  • Redis 缓存;
  • 请求队列;
  • 限流;
  • 重试;
  • 熔断;
  • 日志记录。

说明:以下代码用于演示生产架构思路。实际生产环境需要根据自己的业务增加鉴权、审计、监控、数据脱敏等能力。


五、项目结构

claude-concurrency-gateway/
├── package.json
├── .env
├── src/
│   ├── app.js
│   ├── claudeClient.js
│   ├── queue.js
│   ├── limiter.js
│   ├── cache.js
│   ├── circuitBreaker.js
│   └── utils.js

六、package.json

{
  "name": "claude-concurrency-gateway",
  "version": "1.0.0",
  "description": "Claude high concurrency gateway demo",
  "main": "src/app.js",
  "type": "module",
  "scripts": {
    "start": "node src/app.js"
  },
  "dependencies": {
    "@anthropic-ai/sdk": "^0.32.1",
    "express": "^4.18.3",
    "ioredis": "^5.4.1",
    "p-queue": "^8.0.1",
    "express-rate-limit": "^7.1.5",
    "dotenv": "^16.4.5",
    "crypto-js": "^4.2.0"
  }
}

七、环境变量 .env

PORT=3000
ANTHROPIC_API_KEY=your_claude_api_key
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
MAX_CONCURRENCY=10
CACHE_TTL=3600

八、Redis 缓存模块 cache.js

import Redis from "ioredis";

const redis = new Redis({
  host: process.env.REDIS_HOST || "127.0.0.1",
  port: Number(process.env.REDIS_PORT || 6379)
});

export async function getCache(key) {
  const value = await redis.get(key);
  if (!value) return null;

  try {
    return JSON.parse(value);
  } catch {
    return value;
  }
}

export async function setCache(key, value, ttl = 3600) {
  await redis.set(key, JSON.stringify(value), "EX", ttl);
}

九、工具函数 utils.js

import CryptoJS from "crypto-js";

export function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

export function hashRequest(payload) {
  return CryptoJS.SHA256(JSON.stringify(payload)).toString();
}

export function safeString(input, maxLength = 8000) {
  if (!input) return "";
  const text = String(input);
  return text.length > maxLength ? text.slice(0, maxLength) : text;
}

十、熔断器 circuitBreaker.js

export class CircuitBreaker {
  constructor(options = {}) {
    this.failureCount = 0;
    this.successCount = 0;
    this.state = "CLOSED";

    this.failureThreshold = options.failureThreshold || 5;
    this.recoveryTime = options.recoveryTime || 30000;
    this.lastFailureTime = 0;
  }

  canRequest() {
    if (this.state === "CLOSED") return true;

    if (this.state === "OPEN") {
      const now = Date.now();
      if (now - this.lastFailureTime > this.recoveryTime) {
        this.state = "HALF_OPEN";
        return true;
      }
      return false;
    }

    if (this.state === "HALF_OPEN") {
      return true;
    }

    return false;
  }

  success() {
    this.failureCount = 0;

    if (this.state === "HALF_OPEN") {
      this.state = "CLOSED";
    }
  }

  failure() {
    this.failureCount += 1;
    this.lastFailureTime = Date.now();

    if (this.failureCount >= this.failureThreshold) {
      this.state = "OPEN";
    }
  }

  getState() {
    return this.state;
  }
}

export const claudeBreaker = new CircuitBreaker({
  failureThreshold: 5,
  recoveryTime: 30000
});

十一、限流模块 limiter.js

import rateLimit from "express-rate-limit";

export const apiLimiter = rateLimit({
  windowMs: 60 * 1000,
  max: 120,
  standardHeaders: true,
  legacyHeaders: false,
  message: {
    code: 429,
    message: "请求过于频繁,请稍后再试"
  }
});

这里表示单个 IP 每分钟最多请求 120 次。实际生产中可以按照用户 ID、租户 ID、套餐等级设置不同限流规则。


十二、队列模块 queue.js

import PQueue from "p-queue";

const concurrency = Number(process.env.MAX_CONCURRENCY || 10);

export const claudeQueue = new PQueue({
  concurrency,
  interval: 1000,
  intervalCap: concurrency
});

export function getQueueStatus() {
  return {
    size: claudeQueue.size,
    pending: claudeQueue.pending,
    concurrency
  };
}

这个队列有两个作用:

  1. 控制同时执行的 Claude 请求数量;
  2. 控制单位时间内进入 Claude API 的请求数量。

如果服务部署多实例,建议改成 BullMQ + Redis,这样不同实例之间可以共享任务队列。


十三、Claude 调用模块 claudeClient.js

import Anthropic from "@anthropic-ai/sdk";
import { sleep } from "./utils.js";
import { claudeBreaker } from "./circuitBreaker.js";

const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY
});

function shouldRetry(error) {
  const status = error?.status;

  if ([429, 500, 502, 503, 504].includes(status)) {
    return true;
  }

  if (error?.message?.includes("timeout")) {
    return true;
  }

  return false;
}

export async function callClaudeWithRetry({
  model = "claude-3-5-sonnet-latest",
  prompt,
  maxTokens = 1024,
  temperature = 0.7
}) {
  if (!claudeBreaker.canRequest()) {
    throw new Error("Claude 服务暂时不可用,熔断器已开启");
  }

  const maxRetries = 3;
  let lastError = null;

  for (let i = 0; i <= maxRetries; i++) {
    try {
      const response = await anthropic.messages.create({
        model,
        max_tokens: maxTokens,
        temperature,
        messages: [
          {
            role: "user",
            content: prompt
          }
        ]
      });

      claudeBreaker.success();

      const text = response.content
        .map(item => item.type === "text" ? item.text : "")
        .join("");

      return {
        text,
        usage: response.usage,
        model: response.model,
        id: response.id
      };
    } catch (error) {
      lastError = error;
      claudeBreaker.failure();

      if (!shouldRetry(error) || i === maxRetries) {
        break;
      }

      const delay = 500 * Math.pow(2, i);
      await sleep(delay);
    }
  }

  throw lastError;
}

十四、主服务 app.js

import express from "express";
import dotenv from "dotenv";
import { apiLimiter } from "./limiter.js";
import { claudeQueue, getQueueStatus } from "./queue.js";
import { getCache, setCache } from "./cache.js";
import { hashRequest, safeString } from "./utils.js";
import { callClaudeWithRetry } from "./claudeClient.js";
import { claudeBreaker } from "./circuitBreaker.js";

dotenv.config();

const app = express();

app.use(express.json({ limit: "1mb" }));
app.use(apiLimiter);

app.get("/health", (req, res) => {
  res.json({
    status: "ok",
    queue: getQueueStatus(),
    breaker: claudeBreaker.getState()
  });
});

app.post("/v1/chat", async (req, res) => {
  const startTime = Date.now();

  try {
    const {
      prompt,
      model = "claude-3-5-sonnet-latest",
      maxTokens = 1024,
      temperature = 0.7,
      useCache = true
    } = req.body;

    if (!prompt) {
      return res.status(400).json({
        code: 400,
        message: "prompt 不能为空"
      });
    }

    const safePrompt = safeString(prompt, 8000);

    const requestPayload = {
      model,
      prompt: safePrompt,
      maxTokens,
      temperature
    };

    const cacheKey = `claude:${hashRequest(requestPayload)}`;

    if (useCache) {
      const cached = await getCache(cacheKey);
      if (cached) {
        return res.json({
          code: 0,
          source: "cache",
          data: cached,
          costMs: Date.now() - startTime
        });
      }
    }

    const result = await claudeQueue.add(async () => {
      return await callClaudeWithRetry({
        model,
        prompt: safePrompt,
        maxTokens,
        temperature
      });
    });

    if (useCache) {
      await setCache(
        cacheKey,
        result,
        Number(process.env.CACHE_TTL || 3600)
      );
    }

    res.json({
      code: 0,
      source: "claude",
      data: result,
      costMs: Date.now() - startTime
    });
  } catch (error) {
    console.error("Claude request error:", error);

    res.status(500).json({
      code: 500,
      message: error.message || "Claude 请求失败",
      queue: getQueueStatus(),
      breaker: claudeBreaker.getState()
    });
  }
});

const port = Number(process.env.PORT || 3000);

app.listen(port, () => {
  console.log(`Claude gateway running on port ${port}`);
});

十五、启动服务

安装依赖:

npm install

启动 Redis:

redis-server

启动服务:

npm start

测试请求:

curl -X POST http://localhost:3000/v1/chat \
  -H "Content-Type: application/json" \
  -d '{
    "prompt": "请用中文总结一下高并发系统的核心设计原则",
    "maxTokens": 800,
    "temperature": 0.7
  }'

查看健康状态:

curl http://localhost:3000/health

返回示例:

{
  "status": "ok",
  "queue": {
    "size": 0,
    "pending": 2,
    "concurrency": 10
  },
  "breaker": "CLOSED"
}

十六、流式响应优化方案

如果业务是聊天机器人,建议支持 Claude 流式响应。流式响应可以显著提升用户体验,因为用户不需要等待完整内容生成后才看到结果。

示例代码:

app.post("/v1/chat/stream", async (req, res) => {
  try {
    const { prompt, model = "claude-3-5-sonnet-latest" } = req.body;

    res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
    res.setHeader("Cache-Control", "no-cache");
    res.setHeader("Connection", "keep-alive");

    await claudeQueue.add(async () => {
      const stream = await anthropic.messages.stream({
        model,
        max_tokens: 1024,
        messages: [
          {
            role: "user",
            content: prompt
          }
        ]
      });

      for await (const event of stream) {
        if (event.type === "content_block_delta") {
          const text = event.delta?.text || "";
          res.write(`data: ${JSON.stringify({ text })}\n\n`);
        }
      }

      res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
      res.end();
    });
  } catch (error) {
    res.write(`data: ${JSON.stringify({
      error: error.message || "stream error"
    })}\n\n`);
    res.end();
  }
});

不过需要注意,流式响应会占用更长连接时间。因此高并发场景下必须配合:

  • Nginx 长连接配置;
  • 服务端超时配置;
  • 客户端断连检测;
  • 最大并发限制;
  • SSE 或 WebSocket 管理。

十七、生产环境优化建议

1. 使用 BullMQ 替代内存队列

上面的 p-queue 适合单实例服务。如果部署多个 Node.js 实例,内存队列无法共享。生产环境推荐:

BullMQ + Redis

这样可以实现:

  • 多实例共享队列;
  • 任务失败重试;
  • 延迟任务;
  • 优先级队列;
  • 任务持久化;
  • 任务监控。

2. 按用户等级设置优先级

例如:

用户等级 队列优先级
企业用户 1
付费用户 3
免费用户 5
匿名用户 10

高优先级用户可以更快获得响应,避免所有请求一视同仁导致核心客户体验下降。

3. 增加 Token 预算系统

建议为每个用户、租户或 API Key 设置 Token 预算。

例如:

用户 A 每天最多使用 100 万 Token
租户 B 每月最多使用 5000 万 Token
免费用户每次最多输出 500 Token

这样可以避免异常用户或攻击请求导致成本失控。

4. 增加请求去重

当多个用户在短时间内发出相同请求时,可以只让第一个请求调用 Claude,其他请求等待同一个结果。

这种机制叫:

Request Coalescing

特别适合热点问题,例如:

  • “今天有什么活动?”
  • “这个产品怎么退款?”
  • “帮我总结这份公告。”

5. 使用语义缓存

普通缓存只能命中完全相同的 Prompt,而语义缓存可以命中意思相近的问题。例如:

如何申请退款?
退款流程是什么?
我想退货怎么办?

这些问题语义相近,可以返回相同或相似答案。实现方式通常是:

  1. 对问题生成 Embedding;
  2. 存入向量数据库;
  3. 查询相似问题;
  4. 相似度超过阈值则直接返回缓存答案。

6. 增加监控指标

建议重点监控:

  • QPS;
  • 平均响应时间;
  • P95 / P99 延迟;
  • Claude API 错误率;
  • 429 限流次数;
  • Token 消耗量;
  • 缓存命中率;
  • 队列长度;
  • 熔断器状态;
  • 用户维度成本。

只有具备完整监控,才能判断系统瓶颈在哪里。


十八、常见问题与解决方法

问题一:Claude 经常返回 429 怎么办?

说明请求量或 Token 使用超过限制。解决方法:

  1. 降低并发数;
  2. 增加队列等待;
  3. 减少 max_tokens;
  4. 压缩 Prompt;
  5. 做缓存;
  6. 联系服务商提升额度;
  7. 按官方规则合理使用,不要绕过限制。

问题二:响应时间太慢怎么办?

可以从以下方向优化:

  • 开启流式响应;
  • 降低输出 Token;
  • 使用更快模型;
  • 减少上下文长度;
  • 增加缓存;
  • 将复杂任务拆成异步任务;
  • 使用队列返回任务 ID,前端轮询结果。

问题三:成本太高怎么办?

优先检查:

  • 是否重复请求过多;
  • Prompt 是否过长;
  • 输出是否无限制;
  • 是否所有任务都用了高性能模型;
  • 是否缺少缓存;
  • 是否缺少用户额度控制。

问题四:多实例部署时队列不准确怎么办?

内存队列只适合单实例。多实例必须使用 Redis、RabbitMQ 或 Kafka 等外部队列系统。


十九、推荐生产级架构

如果是正式商业系统,推荐如下架构:

Nginx / API Gateway
      ↓
鉴权服务
      ↓
限流服务
      ↓
业务 API 服务
      ↓
Redis 缓存
      ↓
BullMQ 队列
      ↓
Worker 集群
      ↓
Claude API
      ↓
日志 / 监控 / 成本统计

其中 Worker 集群可以独立扩容。例如:

  • API 服务负责接收请求;
  • Worker 服务负责调用 Claude;
  • Redis 负责队列和缓存;
  • Prometheus 负责指标监控;
  • Grafana 负责可视化;
  • PostgreSQL 负责记录请求日志和账单数据。

二十、总结

Claude 高并发解决方案的核心不是“多开几个接口”这么简单,而是要围绕模型调用特点设计完整的工程体系。

关键原则可以总结为:

  1. 不要直接把所有请求打到 Claude API
  2. 必须使用队列削峰
  3. 必须限制最大并发数
  4. 必须控制 Prompt 长度和输出 Token
  5. 必须做缓存,尤其是热点问题缓存
  6. 必须有重试,但不能无限重试
  7. 必须有熔断与降级机制
  8. 必须监控 Token 成本和错误率
  9. 聊天场景优先使用流式响应
  10. 生产环境建议拆分 API 服务和 Worker 服务

本文提供的源码虽然是简化版本,但已经包含高并发 Claude 网关的核心模块:限流、缓存、队列、并发控制、重试、熔断和健康检查。基于这套代码继续扩展,就可以逐步演进为适合企业生产环境的大模型调用平台。

目录结构
全文