上一篇 下一篇 分享链接 返回 返回顶部

AI接口一上线就被打爆?这套高并发架构和源码可以直接参考

发布人:慈云数据-客服中心 发布时间:21小时前 阅读量:3

AI编程 高并发解决方案|附源码

在 AI 应用快速落地的过程中,很多团队会遇到一个非常现实的问题:模型能力验证成功了,但一上线就扛不住并发

例如:

  • 用户同时发起大量对话请求;
  • 多人并发调用大模型 API;
  • AI 绘图、向量检索、RAG 问答任务耗时较长;
  • 后端线程被阻塞,请求排队严重;
  • 第三方大模型接口有 QPS / TPM / RPM 限制;
  • 推理服务 GPU 资源有限,无法无限扩容。

这类问题本质上不是“AI 模型不会回答”,而是AI 系统工程能力不足。AI 编程不仅仅是写 Prompt、调接口,更重要的是构建一个稳定、可扩展、可观测、可限流、可降级的高并发架构。

本文将围绕 AI 编程中的高并发问题,系统讲解常见瓶颈、架构设计思路,并给出一套可运行的 FastAPI 异步高并发示例源码。


一、AI 应用为什么容易出现高并发问题?

传统 Web 应用中,一个接口可能只需要查询数据库、做简单计算,然后快速返回。但 AI 应用通常有以下特点。

1. 单次请求耗时长

一次普通的数据库查询可能几十毫秒完成,而一次大模型调用可能需要:

  • 1 秒;
  • 3 秒;
  • 10 秒;
  • 甚至更久。

如果是 AI 绘图、视频生成、代码生成、复杂 Agent 任务,耗时会更长。

请求耗时越长,系统中同时堆积的请求就越多。

假设一个接口平均响应时间是 5 秒,如果每秒进来 100 个请求,那么系统中同时存在的请求大约就是:

并发请求数 = QPS × 平均耗时
并发请求数 = 100 × 5 = 500

也就是说,即使你的 QPS 只有 100,系统也可能要同时维护 500 个执行中的请求。


2. AI 请求经常依赖外部服务

很多 AI 应用会调用:

  • OpenAI;
  • Claude;
  • Gemini;
  • 通义千问;
  • 智谱;
  • DeepSeek;
  • 私有化推理服务;
  • 向量数据库;
  • 搜索引擎;
  • 对象存储;
  • Redis;
  • MySQL / PostgreSQL。

只要其中任何一个服务变慢,整个链路就会变慢。

高并发系统最怕的不是某一个服务慢,而是慢请求扩散。当大量请求阻塞在慢服务上,会导致线程池耗尽、连接池打满、内存上涨,最终整个服务不可用。


3. 大模型接口通常有限流

第三方 AI 服务一般会限制:

  • 每分钟请求数 RPM;
  • 每分钟 Token 数 TPM;
  • 每秒请求数 QPS;
  • 并发连接数;
  • 单个 API Key 额度。

如果后端没有做限流和排队,流量一上来就会出现:

429 Too Many Requests
Rate limit exceeded
Quota exceeded

这时用户体验会非常差。


4. AI 任务有明显的资源瓶颈

如果你自己部署模型,那么瓶颈可能来自:

  • GPU 显存;
  • GPU 推理吞吐;
  • CPU 编解码;
  • 网络带宽;
  • 向量检索延迟;
  • 模型上下文长度;
  • 批处理能力。

尤其是大模型推理,GPU 不是普通业务服务器那样可以随便开线程。并发过高反而可能让每个请求都变慢。


二、高并发 AI 系统的核心设计原则

要解决 AI 高并发问题,不能只靠“多开几台机器”。更合理的方式是从架构层面设计。

常用原则包括:

  1. 异步化:避免线程阻塞,提高连接处理能力;
  2. 限流:保护后端和第三方接口;
  3. 排队:把瞬时高峰转化为平滑消费;
  4. 缓存:减少重复调用模型;
  5. 熔断降级:外部服务异常时快速失败;
  6. 批处理:提升 GPU 或模型接口吞吐;
  7. 任务拆分:长任务异步执行;
  8. 连接池管理:避免连接资源耗尽;
  9. 可观测性:监控延迟、错误率、队列长度;
  10. 弹性扩容:根据负载自动扩展服务。

三、推荐的 AI 高并发架构

一个比较常见的 AI 高并发架构如下:

用户
 ↓
Nginx / API Gateway
 ↓
业务服务 FastAPI / Spring Boot / Go
 ↓
限流层 Redis Rate Limit
 ↓
缓存层 Redis
 ↓
任务队列 RabbitMQ / Kafka / Redis Stream / Celery
 ↓
AI Worker 集群
 ↓
大模型 API / 私有化模型 / 向量数据库
 ↓
结果缓存 / 数据库存储

其中:

  • API 网关负责统一入口、鉴权、基础限流;
  • 业务服务负责参数校验、会话管理、任务创建;
  • Redis 负责限流、缓存、任务状态存储;
  • 消息队列负责削峰填谷;
  • Worker 负责真正调用模型;
  • 数据库负责持久化业务数据;
  • 监控系统负责追踪系统状态。

四、同步阻塞写法的问题

很多初学者会这样写 AI 接口:

from fastapi import FastAPI
import requests

app = FastAPI()

@app.post("/chat")
def chat(prompt: str):
    response = requests.post(
        "https://api.example.com/v1/chat",
        json={"prompt": prompt},
        timeout=30
    )
    return response.json()

这段代码看起来很简单,但在高并发下存在明显问题。

问题一:requests 是同步阻塞的

当请求大模型接口时,当前线程会一直等待响应。在等待期间,这个线程无法处理其他请求。

如果线程池数量是 100,而模型接口平均响应 5 秒,那么当并发超过 100 时,后续请求就只能排队。


问题二:没有限流

如果瞬间来了 1000 个请求,这段代码会直接把 1000 个请求打到下游模型服务。

结果可能是:

  • 第三方 API 返回 429;
  • 本地连接池耗尽;
  • 系统响应时间暴涨;
  • 服务雪崩。

问题三:没有缓存

如果多个用户问了相同或高度相似的问题,系统仍然会重复调用模型,浪费成本和资源。


问题四:没有超时、重试和熔断策略

生产环境中,下游服务一定会出现:

  • 网络抖动;
  • 偶发超时;
  • 返回异常;
  • 服务不可用。

没有超时和熔断机制,会导致请求无限堆积。


五、解决方案一:使用异步接口提升并发能力

在 Python 生态中,可以使用:

  • FastAPI;
  • Uvicorn;
  • aiohttp;
  • httpx.AsyncClient;
  • asyncio。

异步并不是让单个请求更快,而是让服务在等待 I/O 的时候可以处理其他请求。

示例:

import httpx
from fastapi import FastAPI

app = FastAPI()

@app.post("/chat")
async def chat(prompt: str):
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.post(
            "https://api.example.com/v1/chat",
            json={"prompt": prompt}
        )
    return response.json()

不过,这只是第一步。真实生产中还需要连接池、限流、缓存、并发控制。


六、解决方案二:使用 Semaphore 控制并发

如果你的模型接口最多只能承受 50 个并发,就不能让 500 个请求同时打过去。

可以使用 asyncio.Semaphore 控制同时执行的大模型请求数量。

import asyncio
import httpx
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

MODEL_CONCURRENCY_LIMIT = 50
model_semaphore = asyncio.Semaphore(MODEL_CONCURRENCY_LIMIT)

class ChatRequest(BaseModel):
    prompt: str

async def call_model(prompt: str):
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.post(
            "https://api.example.com/v1/chat",
            json={"prompt": prompt}
        )
        response.raise_for_status()
        return response.json()

@app.post("/chat")
async def chat(req: ChatRequest):
    try:
        async with model_semaphore:
            result = await call_model(req.prompt)
        return result
    except httpx.HTTPError as e:
        raise HTTPException(status_code=502, detail=str(e))

这样即使外部来了大量请求,真正同时调用模型的请求也不会超过 50 个。

但要注意:Semaphore 只能控制单进程内的并发。如果你部署了多个进程或多台机器,就需要使用 Redis、网关或集中式限流方案。


七、解决方案三:Redis 分布式限流

对于多实例部署场景,本地 Semaphore 不够,需要分布式限流。

常见算法有:

  • 固定窗口限流;
  • 滑动窗口限流;
  • 令牌桶;
  • 漏桶。

下面给出一个基于 Redis 的固定窗口限流示例。

Redis 限流源码

import time
import redis.asyncio as redis

redis_client = redis.Redis(
    host="localhost",
    port=6379,
    db=0,
    decode_responses=True
)

async def redis_rate_limit(key: str, limit: int, window: int) -> bool:
    """
    Redis 固定窗口限流

    :param key: 限流 Key,例如 user:123 或 api:chat
    :param limit: 窗口期内最大请求数
    :param window: 窗口大小,单位秒
    :return: True 表示允许请求,False 表示拒绝请求
    """
    current_window = int(time.time() // window)
    redis_key = f"rate_limit:{key}:{current_window}"

    count = await redis_client.incr(redis_key)

    if count == 1:
        await redis_client.expire(redis_key, window + 1)

    return count <= limit

接口中使用:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
    user_id: str
    prompt: str

@app.post("/chat")
async def chat(req: ChatRequest):
    allowed = await redis_rate_limit(
        key=f"user:{req.user_id}",
        limit=20,
        window=60
    )

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="请求过于频繁,请稍后再试"
        )

    return {"message": "通过限流,继续处理 AI 请求"}

这个方案适合做用户级限流,比如每个用户每分钟最多请求 20 次。


八、解决方案四:缓存重复问题,降低模型调用成本

AI 调用成本通常比普通接口高得多,因此缓存非常重要。

可以缓存:

  • 完全相同的 Prompt;
  • 相同用户的历史上下文摘要;
  • RAG 检索结果;
  • 向量搜索结果;
  • 工具调用结果;
  • 模型最终回答。

简单的缓存方案可以使用 Redis。

Redis 缓存源码

import hashlib
import json
import redis.asyncio as redis

cache_redis = redis.Redis(
    host="localhost",
    port=6379,
    db=1,
    decode_responses=True
)

def build_cache_key(prompt: str) -> str:
    text = prompt.strip().lower()
    digest = hashlib.sha256(text.encode("utf-8")).hexdigest()
    return f"ai_cache:{digest}"

async def get_cache(prompt: str):
    key = build_cache_key(prompt)
    value = await cache_redis.get(key)
    if value:
        return json.loads(value)
    return None

async def set_cache(prompt: str, result: dict, ttl: int = 3600):
    key = build_cache_key(prompt)
    await cache_redis.setex(
        key,
        ttl,
        json.dumps(result, ensure_ascii=False)
    )

业务接口中使用:

@app.post("/chat-with-cache")
async def chat_with_cache(req: ChatRequest):
    cached = await get_cache(req.prompt)
    if cached:
        return {
            "from_cache": True,
            "data": cached
        }

    result = await call_model(req.prompt)

    await set_cache(req.prompt, result, ttl=3600)

    return {
        "from_cache": False,
        "data": result
    }

缓存需要注意:

  1. 不适合缓存所有个性化强的回答;
  2. 涉及隐私数据时要谨慎;
  3. Prompt 中包含用户身份、时间、权限信息时,缓存 Key 要包含上下文;
  4. 可以设置较短 TTL,避免长期错误缓存。

九、解决方案五:长任务异步化,使用队列削峰填谷

对于 AI 绘图、视频生成、长文本分析、代码审查、文档总结等耗时任务,不建议使用同步 HTTP 请求一直等待结果。

更推荐:

  1. 用户提交任务;
  2. 服务立即返回 task_id
  3. 后台 Worker 异步处理;
  4. 用户轮询任务状态,或通过 WebSocket / SSE 推送结果。

架构流程

POST /tasks
  ↓
创建任务,写入 Redis
  ↓
加入队列
  ↓
立即返回 task_id

Worker
  ↓
从队列取任务
  ↓
调用 AI 模型
  ↓
保存结果

GET /tasks/{task_id}
  ↓
查询任务状态和结果

下面给出一个基于 Redis Queue 的简化实现。


十、完整示例源码:FastAPI + Redis + 异步 Worker

1. 安装依赖

pip install fastapi uvicorn httpx redis pydantic

启动 Redis:

docker run -d --name redis-ai -p 6379:6379 redis:7

2. 项目结构

ai-high-concurrency-demo/
├── main.py
├── worker.py
├── model_client.py
├── redis_client.py
└── requirements.txt

3. requirements.txt

fastapi==0.115.0
uvicorn==0.30.6
httpx==0.27.2
redis==5.0.8
pydantic==2.8.2

4. redis_client.py

import redis.asyncio as redis

redis_client = redis.Redis(
    host="localhost",
    port=6379,
    db=0,
    decode_responses=True
)

5. model_client.py

这里为了方便演示,用 asyncio.sleep 模拟大模型耗时调用。实际项目中可以替换成 OpenAI、DeepSeek、通义千问或私有模型接口。

import asyncio
import random

async def call_ai_model(prompt: str) -> dict:
    """
    模拟调用 AI 大模型。
    真实项目中可替换为 httpx.AsyncClient 调用模型 API。
    """
    await asyncio.sleep(random.uniform(1.0, 3.0))

    return {
        "answer": f"AI 回复:你输入的问题是:{prompt}",
        "tokens": random.randint(100, 500)
    }

如果你要调用真实 HTTP 模型接口,可以改成:

import httpx

async def call_ai_model(prompt: str) -> dict:
    async with httpx.AsyncClient(timeout=30) as client:
        resp = await client.post(
            "https://api.example.com/v1/chat/completions",
            headers={
                "Authorization": "Bearer YOUR_API_KEY"
            },
            json={
                "model": "your-model",
                "messages": [
                    {"role": "user", "content": prompt}
                ]
            }
        )
        resp.raise_for_status()
        return resp.json()

6. main.py

import uuid
import time
import json
import hashlib
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field

from redis_client import redis_client

app = FastAPI(title="AI 高并发解决方案 Demo")

TASK_QUEUE_KEY = "ai_task_queue"
TASK_RESULT_KEY_PREFIX = "ai_task:"
CACHE_KEY_PREFIX = "ai_cache:"
RATE_LIMIT_PREFIX = "rate_limit:"


class ChatTaskRequest(BaseModel):
    user_id: str = Field(..., description="用户 ID")
    prompt: str = Field(..., min_length=1, max_length=5000)


def build_cache_key(prompt: str) -> str:
    digest = hashlib.sha256(
        prompt.strip().lower().encode("utf-8")
    ).hexdigest()
    return f"{CACHE_KEY_PREFIX}{digest}"


async def redis_rate_limit(key: str, limit: int, window: int) -> bool:
    current_window = int(time.time() // window)
    redis_key = f"{RATE_LIMIT_PREFIX}{key}:{current_window}"

    count = await redis_client.incr(redis_key)

    if count == 1:
        await redis_client.expire(redis_key, window + 1)

    return count <= limit


@app.post("/tasks")
async def create_task(req: ChatTaskRequest):
    """
    创建 AI 任务:
    1. 用户限流;
    2. 查询缓存;
    3. 创建任务;
    4. 推入 Redis 队列;
    5. 返回 task_id。
    """

    allowed = await redis_rate_limit(
        key=f"user:{req.user_id}",
        limit=30,
        window=60
    )

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="请求过于频繁,请稍后再试"
        )

    cache_key = build_cache_key(req.prompt)
    cached = await redis_client.get(cache_key)

    if cached:
        task_id = str(uuid.uuid4())
        task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"

        await redis_client.setex(
            task_key,
            3600,
            json.dumps({
                "status": "success",
                "from_cache": True,
                "result": json.loads(cached)
            }, ensure_ascii=False)
        )

        return {
            "task_id": task_id,
            "status": "success",
            "from_cache": True
        }

    task_id = str(uuid.uuid4())
    task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"

    task_data = {
        "task_id": task_id,
        "user_id": req.user_id,
        "prompt": req.prompt,
        "created_at": int(time.time())
    }

    await redis_client.setex(
        task_key,
        3600,
        json.dumps({
            "status": "pending",
            "from_cache": False,
            "result": None
        }, ensure_ascii=False)
    )

    await redis_client.lpush(
        TASK_QUEUE_KEY,
        json.dumps(task_data, ensure_ascii=False)
    )

    return {
        "task_id": task_id,
        "status": "pending",
        "from_cache": False
    }


@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
    task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"
    data = await redis_client.get(task_key)

    if not data:
        raise HTTPException(
            status_code=404,
            detail="任务不存在或已过期"
        )

    return json.loads(data)


@app.get("/health")
async def health():
    return {"status": "ok"}

7. worker.py

import asyncio
import json
import hashlib
import time

from redis_client import redis_client
from model_client import call_ai_model

TASK_QUEUE_KEY = "ai_task_queue"
TASK_RESULT_KEY_PREFIX = "ai_task:"
CACHE_KEY_PREFIX = "ai_cache:"

WORKER_CONCURRENCY = 10
semaphore = asyncio.Semaphore(WORKER_CONCURRENCY)


def build_cache_key(prompt: str) -> str:
    digest = hashlib.sha256(
        prompt.strip().lower().encode("utf-8")
    ).hexdigest()
    return f"{CACHE_KEY_PREFIX}{digest}"


async def handle_task(task_data: dict):
    async with semaphore:
        task_id = task_data["task_id"]
        prompt = task_data["prompt"]
        task_key = f"{TASK_RESULT_KEY_PREFIX}{task_id}"

        try:
            await redis_client.setex(
                task_key,
                3600,
                json.dumps({
                    "status": "running",
                    "from_cache": False,
                    "result": None
                }, ensure_ascii=False)
            )

            start = time.time()

            result = await call_ai_model(prompt)

            cost_ms = int((time.time() - start) * 1000)

            final_result = {
                "status": "success",
                "from_cache": False,
                "cost_ms": cost_ms,
                "result": result
            }

            await redis_client.setex(
                task_key,
                3600,
                json.dumps(final_result, ensure_ascii=False)
            )

            cache_key = build_cache_key(prompt)
            await redis_client.setex(
                cache_key,
                3600,
                json.dumps(result, ensure_ascii=False)
            )

        except Exception as e:
            await redis_client.setex(
                task_key,
                3600,
                json.dumps({
                    "status": "failed",
                    "from_cache": False,
                    "error": str(e),
                    "result": None
                }, ensure_ascii=False)
            )


async def worker_loop():
    print("AI Worker started...")

    while True:
        item = await redis_client.brpop(TASK_QUEUE_KEY, timeout=5)

        if not item:
            continue

        _, raw_task = item
        task_data = json.loads(raw_task)

        asyncio.create_task(handle_task(task_data))


if __name__ == "__main__":
    asyncio.run(worker_loop())

8. 启动服务

启动 API 服务:

uvicorn main:app --host 0.0.0.0 --port 8000 --workers 2

启动 Worker:

python worker.py

如果需要更强处理能力,可以启动多个 Worker 进程:

python worker.py
python worker.py
python worker.py

或者使用 Supervisor、Docker Compose、Kubernetes 进行管理。


9. 测试接口

创建任务:

curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "u1001",
    "prompt": "请解释一下什么是高并发架构"
  }'

返回示例:

{
  "task_id": "4f2a8fd1-9e8c-48b2-a6d6-58c6e9e0f123",
  "status": "pending",
  "from_cache": false
}

查询任务:

curl http://localhost:8000/tasks/4f2a8fd1-9e8c-48b2-a6d6-58c6e9e0f123

可能返回:

{
  "status": "success",
  "from_cache": false,
  "cost_ms": 1852,
  "result": {
    "answer": "AI 回复:你输入的问题是:请解释一下什么是高并发架构",
    "tokens": 238
  }
}

十一、进一步优化:流式响应

对于聊天类 AI 应用,用户并不一定要等完整结果生成完才看到内容。可以使用流式响应,让模型边生成边返回。

常见方案:

  • Server-Sent Events,简称 SSE;
  • WebSocket;
  • HTTP chunked response。

流式输出的优点是:

  1. 首字响应时间更短;
  2. 用户体验更好;
  3. 长文本生成不会让用户一直等待空白页面;
  4. 适合 AI 对话、代码生成、文章生成等场景。

FastAPI SSE 简化示例:

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def fake_stream(prompt: str):
    words = ["这是", "一个", "AI", "流式", "响应", "示例"]
    for word in words:
        yield f"data: {word}\n\n"
        await asyncio.sleep(0.5)

@app.get("/stream")
async def stream(prompt: str):
    return StreamingResponse(
        fake_stream(prompt),
        media_type="text/event-stream"
    )

流式响应不是为了提升总吞吐,而是为了优化用户感知体验。在高并发场景中,它仍然需要配合限流、连接数控制和超时管理。


十二、进一步优化:超时、重试和熔断

AI 服务调用必须设置超时。

错误示例:

response = await client.post(url, json=data)

正确示例:

timeout = httpx.Timeout(
    connect=5.0,
    read=30.0,
    write=5.0,
    pool=5.0
)

async with httpx.AsyncClient(timeout=timeout) as client:
    response = await client.post(url, json=data)

对于偶发错误,可以做有限重试,但不能无限重试。

import asyncio
import httpx

async def call_with_retry(prompt: str, retries: int = 3):
    for i in range(retries):
        try:
            async with httpx.AsyncClient(timeout=30) as client:
                resp = await client.post(
                    "https://api.example.com/v1/chat",
                    json={"prompt": prompt}
                )
                resp.raise_for_status()
                return resp.json()
        except Exception:
            if i == retries - 1:
                raise
            await asyncio.sleep(0.5 * (i + 1))

重试要注意:

  • 只对可重试错误重试;
  • 429 不一定适合立即重试;
  • 超时重试可能会放大流量;
  • 重试次数不能太多;
  • 建议使用指数退避。

熔断机制则用于在下游服务持续异常时快速失败,避免请求继续堆积。


十三、进一步优化:批处理提升推理吞吐

如果是自部署模型,批处理非常重要。

例如 GPU 同时处理一个请求和同时处理多个请求,在某些场景下吞吐差异很大。可以把短时间内到达的请求合并成一个 batch,再统一送入模型。

简单流程:

请求进入队列
 ↓
等待 10ms 到 50ms
 ↓
收集最多 N 个请求
 ↓
组成 batch
 ↓
模型统一推理
 ↓
拆分结果返回给各个请求

批处理适合:

  • embedding 向量生成;
  • 文本分类;
  • rerank;
  • 语义匹配;
  • OCR;
  • 部分模型推理场景。

但对于流式对话模型,批处理实现会更复杂,需要结合推理框架能力,例如:

  • vLLM;
  • TensorRT-LLM;
  • TGI;
  • Ray Serve;
  • Triton Inference Server。

十四、进一步优化:分层限流策略

高并发 AI 系统中,限流不能只做一层。推荐至少三层限流。

1. 网关层限流

例如 Nginx、Kong、APISIX、Spring Cloud Gateway。

负责:

  • IP 限流;
  • 全局 QPS 限流;
  • 黑名单;
  • 基础防刷。

2. 用户层限流

根据用户等级控制额度:

免费用户:每分钟 10 次
普通用户:每分钟 60 次
高级用户:每分钟 300 次
企业用户:独立配额

3. 模型层限流

不同模型成本不同,限流策略也不同:

小模型:高 QPS,低成本
大模型:低 QPS,高成本
绘图模型:低并发,长耗时
Embedding 模型:适合批处理

这样可以避免一个用户、一个模型或一个接口拖垮整个系统。


十五、AI 高并发系统的监控指标

没有监控,就无法判断系统是否真的稳定。

建议重点监控以下指标:

1. 请求指标

  • QPS;
  • 平均响应时间;
  • P95 / P99 延迟;
  • HTTP 4xx / 5xx;
  • 超时率;
  • 429 限流次数。

2. 队列指标

  • 队列长度;
  • 任务等待时间;
  • Worker 消费速度;
  • 任务失败率;
  • 重试次数。

3. 模型指标

  • 模型调用耗时;
  • Token 消耗;
  • 每分钟 Token 数;
  • 模型错误率;
  • 首 Token 延迟;
  • 生成速度 tokens/s。

4. 资源指标

  • CPU;
  • 内存;
  • 网络;
  • Redis 连接数;
  • 数据库连接数;
  • GPU 利用率;
  • GPU 显存使用率。

5. 业务指标

  • 活跃用户数;
  • 人均调用次数;
  • 缓存命中率;
  • 单次调用成本;
  • 每日 Token 成本;
  • 付费转化率。

十六、常见高并发故障与解决办法

故障一:接口响应越来越慢

可能原因:

  • 下游模型慢;
  • 队列堆积;
  • 数据库连接池不足;
  • Redis 慢查询;
  • 线程池耗尽;
  • 请求没有超时。

解决办法:

  • 增加超时;
  • 增加 Worker;
  • 排查慢请求;
  • 增加缓存;
  • 对长任务异步化;
  • 限制最大并发。

故障二:频繁出现 429

可能原因:

  • 第三方模型限流;
  • 用户流量超过配额;
  • 没有本地排队;
  • 多实例没有统一限流。

解决办法:

  • Redis 分布式限流;
  • 请求排队;
  • 多 API Key 池化;
  • 不同用户分配不同配额;
  • 对 429 做延迟重试。

故障三:Redis 队列越来越长

可能原因:

  • Worker 数量不足;
  • 模型调用太慢;
  • 任务生成速度大于消费速度;
  • 某些任务卡住;
  • 下游模型故障。

解决办法:

  • 扩容 Worker;
  • 增加模型副本;
  • 设置任务超时;
  • 增加失败重试队列;
  • 加入熔断降级;
  • 限制任务提交速度。

故障四:成本突然暴涨

可能原因:

  • 被刷接口;
  • 缓存命中率低;
  • Prompt 太长;
  • 重试次数过多;
  • 用户滥用;
  • 日志中重复触发任务。

解决办法:

  • 用户鉴权;
  • 限流;
  • Token 预算控制;
  • Prompt 压缩;
  • 缓存;
  • 计费系统;
  • 异常流量告警。

十七、生产环境落地建议

如果要把本文示例用于生产环境,建议进一步完善:

  1. 鉴权认证
    使用 JWT、OAuth2、API Key 等机制,避免匿名滥用。

  2. 任务持久化
    Redis 适合缓存和队列,但重要任务建议落 MySQL、PostgreSQL 或 MongoDB。

  3. 死信队列
    任务多次失败后进入死信队列,方便人工排查。

  4. 幂等设计
    客户端重试时避免重复创建任务,可使用请求 ID。

  5. 更成熟的队列系统
    高可靠场景建议使用 RabbitMQ、Kafka、RocketMQ、Celery、Dramatiq。

  6. 灰度发布
    新模型、新 Prompt、新 Agent 流程要灰度上线。

  7. 模型路由
    根据任务复杂度选择不同模型,降低成本。

  8. 降级策略
    大模型不可用时,可以切换小模型、缓存答案或提示稍后再试。

  9. 多租户隔离
    企业客户之间需要资源隔离,避免互相影响。

  10. 全链路追踪
    使用 Trace ID 贯穿网关、业务服务、队列、Worker、模型调用。


十八、总结

AI 编程中的高并发问题,本质上是一个系统工程问题。只会调用大模型 API 并不等于能做好 AI 应用。当用户规模增长后,真正决定系统稳定性的往往是:

  • 是否异步化;
  • 是否有限流;
  • 是否有缓存;
  • 是否有队列;
  • 是否能削峰填谷;
  • 是否能熔断降级;
  • 是否有监控告警;
  • 是否能控制成本。

本文给出的方案可以概括为:

短请求:异步接口 + 限流 + 缓存
长任务:任务队列 + Worker + 状态查询
模型调用:超时 + 重试 + 熔断 + 并发控制
系统稳定性:监控 + 告警 + 扩容 + 降级

如果只是 Demo,一个同步接口就够了;但如果要支撑真实用户,就必须从第一天开始考虑高并发架构。

AI 应用的竞争,不只是模型能力的竞争,也是工程稳定性、成本控制和用户体验的竞争。谁能让 AI 服务在高并发下依然稳定、快速、可控,谁就更有机会把 AI 产品真正做成可持续的业务。

目录结构
全文