上一篇 下一篇 分享链接 返回 返回顶部

AI 工具扛不住并发?这套队列 + Worker 架构直接解决痛点

发布人:慈云数据-客服中心 发布时间:23小时前 阅读量:5

AI工具 高并发解决方案|附源码

在 AI 应用快速落地的过程中,很多团队都会遇到同一个问题:模型能力已经具备,但系统扛不住高并发

尤其是接入大语言模型、文生图、语音识别、向量检索等 AI 能力后,单次请求耗时往往比普通业务接口更长。如果不做架构设计,用户量稍微上来,就容易出现:

  • 接口响应变慢;
  • 请求大量超时;
  • 服务 CPU、内存飙升;
  • 模型 API 被限流;
  • 队列堆积不可控;
  • 成本快速上涨;
  • 用户体验严重下降。

本文将围绕 AI 工具类应用的高并发解决方案 展开,结合常见业务场景,介绍一套较为通用的架构设计思路,并附上可运行的简化源码,帮助你快速搭建一个支持高并发请求处理的 AI 服务框架。


一、AI 工具为什么更容易遇到高并发问题?

传统业务接口通常是数据库查询、缓存读取、简单计算,接口耗时可能在几十毫秒到几百毫秒之间。

但 AI 工具不同。

例如:

AI 场景 单次请求耗时
AI 写作 3 秒 - 30 秒
AI 总结文档 5 秒 - 60 秒
AI 文生图 10 秒 - 120 秒
AI 数字人生成 数分钟
AI 语音转文字 几秒到几十秒
向量知识库问答 1 秒 - 10 秒

当请求耗时变长后,即使 QPS 不算特别高,系统并发数也会迅速增加。

举个例子:

如果一个接口平均响应时间是 10 秒,每秒有 100 个请求进来,那么系统同时需要处理的请求数量大约是:

并发数 = QPS × 平均耗时
并发数 = 100 × 10 = 1000

这意味着系统需要同时维护 1000 个未完成任务。

如果这些请求全部直接打到模型服务或第三方 AI API,很容易出现:

  • 上游服务被打爆;
  • API 触发限流;
  • 本地连接数耗尽;
  • 请求堆积导致雪崩;
  • 用户端长时间等待。

所以,AI 工具高并发的核心不是单纯“加机器”,而是要设计一套完整的 削峰、限流、异步、缓存、降级、监控 体系。


二、高并发 AI 系统的核心设计目标

一个可靠的 AI 高并发系统,通常需要满足以下目标:

1. 接口不能被慢任务拖死

AI 请求可能很慢,如果所有请求都同步等待模型返回,那么 Web 服务线程或协程会被大量占用,最终导致整个服务无法响应。

正确做法是:

  • 短任务可以同步返回;
  • 长任务改为异步任务;
  • 客户端通过任务 ID 查询状态;
  • 或使用 WebSocket / SSE 推送结果。

2. 请求量过大时必须可控

高并发并不可怕,可怕的是无限制接收请求。

系统需要具备:

  • 全局限流;
  • 用户级限流;
  • IP 级限流;
  • 模型级限流;
  • 队列长度限制;
  • 超时控制;
  • 熔断降级。

3. 模型调用要有并发保护

无论是本地模型,还是第三方大模型 API,都不应该被无限并发调用。

例如某个大模型 API 只允许每分钟 300 次请求,那么本地服务就必须在调用前做控制。

常见手段包括:

  • 令牌桶限流;
  • 信号量控制并发;
  • 请求队列;
  • 分布式锁;
  • Redis 计数器;
  • API Key 池调度。

4. 结果尽量复用

很多 AI 工具类应用具有明显的重复请求特征。

例如:

  • 同一篇文章摘要;
  • 同一个 prompt 生成文案;
  • 同一个 URL 解析;
  • 同一份文件向量化;
  • 同一个问题命中知识库。

这些请求可以通过缓存减少模型调用。

缓存可以分为:

  • 本地内存缓存;
  • Redis 缓存;
  • 文件缓存;
  • 向量检索缓存;
  • Prompt Hash 缓存。

5. 系统必须具备可观测性

AI 服务出问题时,排查难度通常高于普通接口。

你需要知道:

  • 当前请求数;
  • 队列长度;
  • 平均响应时间;
  • 模型调用耗时;
  • 错误率;
  • 超时率;
  • Token 消耗;
  • 用户调用次数;
  • 第三方 API 状态。

如果没有监控,一旦系统变慢,你很难判断到底是数据库慢、模型慢、网络慢,还是队列堆积。


三、推荐架构方案

下面是一套比较通用的 AI 工具高并发架构:

用户请求
   ↓
Nginx / API Gateway
   ↓
限流层 / 鉴权层
   ↓
Web 服务层 FastAPI / Spring Boot / Go
   ↓
任务队列 Redis / RabbitMQ / Kafka
   ↓
Worker 工作节点
   ↓
AI 模型服务 / 第三方 API
   ↓
结果缓存 Redis / MySQL / 对象存储
   ↓
用户查询结果 / SSE 推送

四、关键模块设计

1. 网关层限流

网关层是第一道防线。

常见方案:

  • Nginx limit_req
  • Kong / APISIX 限流插件;
  • Spring Cloud Gateway;
  • 云厂商 API 网关;
  • 自研 Redis 限流中间件。

例如 Nginx 可以做基础限流:

http {
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

    server {
        location /api/ {
            limit_req zone=api_limit burst=20 nodelay;
            proxy_pass http://backend;
        }
    }
}

这个配置表示:每个 IP 平均每秒允许 10 个请求,突发最多 20 个。


2. Web 层快速响应

对于耗时较长的 AI 任务,不建议 HTTP 请求一直阻塞等待。

推荐方式:

POST /tasks        创建任务,立即返回 task_id
GET  /tasks/{id}   查询任务状态和结果

创建任务接口只负责:

  • 参数校验;
  • 鉴权;
  • 限流;
  • 生成任务 ID;
  • 写入队列;
  • 返回任务 ID。

真正的 AI 调用由后台 Worker 执行。


3. 队列削峰

队列是高并发系统中非常重要的削峰组件。

它的作用包括:

  • 缓冲瞬时流量;
  • 控制处理速度;
  • 防止模型服务被打爆;
  • 提高系统稳定性;
  • 支持失败重试。

常用队列:

队列 适用场景
Redis List / Stream 中小型任务队列
RabbitMQ 可靠消息、延迟任务
Kafka 超高吞吐日志流
Celery Python 异步任务
BullMQ Node.js 任务队列

对于大部分 AI 工具类产品,初期使用 Redis 队列已经足够。


4. Worker 并发控制

Worker 是真正调用模型的地方。

这里一定要控制并发,否则队列里的任务会被瞬间拉出来全部执行,模型服务依然会被压垮。

常见控制方式:

semaphore = asyncio.Semaphore(10)

表示同一时刻最多只有 10 个模型调用在执行。

如果是多机器部署,则需要配合 Redis 做分布式限流。


5. 缓存结果

缓存可以显著降低成本。

例如:

cache_key = sha256(model + prompt + params)

只要用户输入相同,就可以直接返回之前生成的结果。

缓存策略建议:

  • 短文本生成:缓存 1 小时到 1 天;
  • 文档摘要:缓存 7 天或更久;
  • 文件解析:永久缓存;
  • 向量化结果:长期缓存;
  • 用户私有数据:注意隔离用户 ID。

6. 超时与重试

AI 模型调用不应无限等待。

推荐设置:

  • 单次模型调用超时:30 秒;
  • 总任务超时:60 秒;
  • 失败重试:1 - 3 次;
  • 重试间隔:指数退避;
  • 对不可恢复错误不重试。

例如:

第一次失败:1 秒后重试
第二次失败:3 秒后重试
第三次失败:10 秒后重试

7. 降级策略

当系统压力过高时,需要主动降级,而不是等系统崩溃。

可选降级策略:

  • 暂停低优先级任务;
  • 免费用户排队,付费用户优先;
  • 关闭复杂模型,切换轻量模型;
  • 降低最大输入长度;
  • 返回“系统繁忙,请稍后重试”;
  • 对重复请求直接返回缓存;
  • 限制单用户同时任务数。

五、完整简化源码:FastAPI + Redis + 异步 Worker

下面提供一个简化版本的高并发 AI 任务处理服务。

功能包括:

  • 创建 AI 任务;
  • Redis 队列削峰;
  • Worker 异步消费;
  • 并发数控制;
  • 任务状态查询;
  • 结果缓存;
  • 失败重试。

1. 安装依赖

pip install fastapi uvicorn redis httpx pydantic

需要本地启动 Redis:

docker run -d --name redis-ai -p 6379:6379 redis:7

2. 项目结构

ai-high-concurrency-demo/
├── main.py
├── worker.py
├── ai_client.py
├── config.py
└── requirements.txt

3. config.py

# config.py

REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0

TASK_QUEUE_KEY = "ai:task:queue"
TASK_RESULT_PREFIX = "ai:task:result:"
TASK_STATUS_PREFIX = "ai:task:status:"
CACHE_PREFIX = "ai:cache:"

# Worker 本地最大并发数
MAX_WORKER_CONCURRENCY = 5

# 任务最大重试次数
MAX_RETRY = 3

# 结果缓存时间,单位:秒
CACHE_TTL = 3600

4. ai_client.py

这里用 asyncio.sleep 模拟真实 AI 模型调用。实际项目中,你可以替换为 OpenAI、Claude、通义千问、DeepSeek、本地模型等接口。

# ai_client.py

import asyncio
import random

async def call_ai_model(prompt: str) -> str:
    """
    模拟 AI 模型调用。
    真实场景中,可以在这里请求大模型 API。
    """
    await asyncio.sleep(random.randint(2, 5))

    if random.random() < 0.1:
        raise Exception("AI model temporary error")

    return f"AI生成结果:{prompt}"

5. main.py

# main.py

import json
import uuid
import hashlib
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis

from config import (
    REDIS_HOST,
    REDIS_PORT,
    REDIS_DB,
    TASK_QUEUE_KEY,
    TASK_RESULT_PREFIX,
    TASK_STATUS_PREFIX,
    CACHE_PREFIX,
    CACHE_TTL,
)

app = FastAPI(title="AI High Concurrency Demo")

r = redis.Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    db=REDIS_DB,
    decode_responses=True
)


class CreateTaskRequest(BaseModel):
    user_id: str
    prompt: str


def build_cache_key(prompt: str) -> str:
    raw = prompt.strip()
    h = hashlib.sha256(raw.encode("utf-8")).hexdigest()
    return CACHE_PREFIX + h


@app.post("/tasks")
def create_task(req: CreateTaskRequest):
    """
    创建 AI 任务。
    如果缓存命中,直接返回结果。
    如果未命中,则写入任务队列。
    """
    if not req.prompt.strip():
        raise HTTPException(status_code=400, detail="prompt不能为空")

    cache_key = build_cache_key(req.prompt)
    cached_result = r.get(cache_key)

    if cached_result:
        task_id = str(uuid.uuid4())
        r.set(TASK_STATUS_PREFIX + task_id, "success", ex=3600)
        r.set(TASK_RESULT_PREFIX + task_id, cached_result, ex=3600)

        return {
            "task_id": task_id,
            "status": "success",
            "from_cache": True,
            "result": cached_result
        }

    task_id = str(uuid.uuid4())

    task = {
        "task_id": task_id,
        "user_id": req.user_id,
        "prompt": req.prompt,
        "retry": 0,
        "cache_key": cache_key
    }

    r.set(TASK_STATUS_PREFIX + task_id, "pending", ex=3600)

    # 写入队列
    r.lpush(TASK_QUEUE_KEY, json.dumps(task, ensure_ascii=False))

    return {
        "task_id": task_id,
        "status": "pending",
        "message": "任务已提交,请稍后查询结果"
    }


@app.get("/tasks/{task_id}")
def get_task(task_id: str):
    """
    查询任务状态。
    """
    status = r.get(TASK_STATUS_PREFIX + task_id)

    if not status:
        raise HTTPException(status_code=404, detail="任务不存在或已过期")

    result = r.get(TASK_RESULT_PREFIX + task_id)

    return {
        "task_id": task_id,
        "status": status,
        "result": result
    }


@app.get("/health")
def health():
    return {"status": "ok"}

6. worker.py

# worker.py

import json
import time
import asyncio
import redis

from config import (
    REDIS_HOST,
    REDIS_PORT,
    REDIS_DB,
    TASK_QUEUE_KEY,
    TASK_RESULT_PREFIX,
    TASK_STATUS_PREFIX,
    CACHE_TTL,
    MAX_WORKER_CONCURRENCY,
    MAX_RETRY,
)

from ai_client import call_ai_model

r = redis.Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    db=REDIS_DB,
    decode_responses=True
)

semaphore = asyncio.Semaphore(MAX_WORKER_CONCURRENCY)


async def handle_task(task: dict):
    task_id = task["task_id"]
    prompt = task["prompt"]
    retry = task.get("retry", 0)
    cache_key = task.get("cache_key")

    async with semaphore:
        try:
            print(f"开始处理任务: {task_id}, retry={retry}")

            r.set(TASK_STATUS_PREFIX + task_id, "running", ex=3600)

            result = await asyncio.wait_for(
                call_ai_model(prompt),
                timeout=30
            )

            r.set(TASK_RESULT_PREFIX + task_id, result, ex=3600)
            r.set(TASK_STATUS_PREFIX + task_id, "success", ex=3600)

            if cache_key:
                r.set(cache_key, result, ex=CACHE_TTL)

            print(f"任务成功: {task_id}")

        except Exception as e:
            print(f"任务失败: {task_id}, error={e}")

            if retry < MAX_RETRY:
                task["retry"] = retry + 1
                r.set(TASK_STATUS_PREFIX + task_id, "retrying", ex=3600)

                # 简单退避
                await asyncio.sleep(2 ** retry)

                r.lpush(TASK_QUEUE_KEY, json.dumps(task, ensure_ascii=False))
            else:
                r.set(TASK_STATUS_PREFIX + task_id, "failed", ex=3600)
                r.set(
                    TASK_RESULT_PREFIX + task_id,
                    f"任务失败:{str(e)}",
                    ex=3600
                )


async def worker_loop():
    print("Worker started...")

    while True:
        try:
            item = r.brpop(TASK_QUEUE_KEY, timeout=5)

            if item is None:
                await asyncio.sleep(0.1)
                continue

            _, raw_task = item
            task = json.loads(raw_task)

            asyncio.create_task(handle_task(task))

        except Exception as e:
            print(f"Worker loop error: {e}")
            await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(worker_loop())

7. requirements.txt

fastapi
uvicorn
redis
httpx
pydantic

8. 启动服务

启动 API 服务:

uvicorn main:app --host 0.0.0.0 --port 8000

启动 Worker:

python worker.py

如果需要提升处理能力,可以启动多个 Worker:

python worker.py
python worker.py
python worker.py

或者使用进程管理工具:

supervisor
systemd
pm2
docker compose
kubernetes

9. 测试接口

创建任务:

curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"user_id":"u001","prompt":"请帮我写一篇关于AI高并发架构的文章"}'

返回示例:

{
  "task_id": "f6a18d25-5b28-4c75-bf71-b5d1b9bb5a9e",
  "status": "pending",
  "message": "任务已提交,请稍后查询结果"
}

查询结果:

curl http://localhost:8000/tasks/f6a18d25-5b28-4c75-bf71-b5d1b9bb5a9e

返回示例:

{
  "task_id": "f6a18d25-5b28-4c75-bf71-b5d1b9bb5a9e",
  "status": "success",
  "result": "AI生成结果:请帮我写一篇关于AI高并发架构的文章"
}

六、生产环境优化建议

上面的源码是一个简化示例,适合用于理解核心思路。如果要用于生产环境,还需要继续加强。


1. 增加用户级限流

例如:

  • 免费用户:每分钟 5 次;
  • 会员用户:每分钟 30 次;
  • 企业用户:每分钟 300 次。

可以用 Redis 实现滑动窗口限流:

def rate_limit(user_id: str, limit: int, window: int):
    key = f"rate:{user_id}"
    current = r.incr(key)

    if current == 1:
        r.expire(key, window)

    return current <= limit

在创建任务前调用:

if not rate_limit(req.user_id, 10, 60):
    raise HTTPException(status_code=429, detail="请求过于频繁")

2. 增加队列长度保护

如果队列长度已经很大,继续接收请求只会让用户等待更久。

可以设置最大队列长度:

queue_len = r.llen(TASK_QUEUE_KEY)

if queue_len > 10000:
    raise HTTPException(status_code=503, detail="系统繁忙,请稍后再试")

3. 增加任务优先级

对于商业化 AI 工具,通常需要区分用户等级。

例如:

VIP 队列
普通队列
免费队列

Worker 优先消费 VIP 队列:

先消费 ai:task:queue:vip
再消费 ai:task:queue:normal
最后消费 ai:task:queue:free

这样可以保证核心用户体验。


4. 使用 SSE 推送结果

如果产品希望实现类似 ChatGPT 的流式输出,可以使用:

  • Server-Sent Events;
  • WebSocket;
  • HTTP Chunked Streaming。

短文本对话建议使用 SSE,简单稳定,浏览器支持好。


5. API Key 池调度

如果接入第三方模型,经常会遇到单个 API Key 的限速问题。

可以维护一个 API Key 池:

key_1: 当前可用
key_2: 当前可用
key_3: 已限流,冷却中
key_4: 当前可用

每次调用模型前选择一个可用 Key,并记录:

  • 成功次数;
  • 失败次数;
  • 剩余额度;
  • 限流状态;
  • 冷却时间。

6. 数据库持久化

Redis 适合缓存和队列,但重要任务建议持久化到数据库。

例如 MySQL 表设计:

CREATE TABLE ai_task (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    task_id VARCHAR(64) NOT NULL UNIQUE,
    user_id VARCHAR(64) NOT NULL,
    prompt TEXT NOT NULL,
    status VARCHAR(32) NOT NULL,
    result MEDIUMTEXT,
    retry_count INT DEFAULT 0,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

这样即使 Redis 重启,也不会丢失关键任务数据。


7. 模型服务独立部署

如果你使用本地开源模型,比如:

  • Qwen;
  • DeepSeek;
  • Llama;
  • ChatGLM;
  • Stable Diffusion;
  • Whisper。

建议将模型服务独立部署,Web 服务不要和模型服务混在一起。

推荐结构:

API 服务集群
   ↓
任务队列
   ↓
推理服务集群
   ↓
GPU 节点

这样可以独立扩缩容,避免 Web 服务被 GPU 推理拖垮。


七、常见性能瓶颈与解决方案

瓶颈 表现 解决方案
模型调用慢 任务长期 running 增加 Worker、模型并发、使用更快模型
Redis 队列堆积 pending 很多 增加消费节点、限流、拆分队列
第三方 API 限流 大量 429 API Key 池、限速、退避重试
数据库压力大 查询慢 缓存、读写分离、索引优化
用户重复提交 队列膨胀 请求去重、结果缓存
大文件处理慢 超时 文件异步解析、分片处理
Token 成本高 账单上涨 Prompt 压缩、缓存、模型分级

八、部署建议

生产环境可以使用 Docker Compose 或 Kubernetes 部署。

推荐最小部署单元:

Nginx × 1
API 服务 × 2
Redis × 1
Worker × 3
MySQL × 1
监控服务 × 1

如果业务增长,可以逐步扩展:

API 服务横向扩容
Worker 横向扩容
Redis 主从或集群
模型服务独立扩容
任务队列拆分
GPU 节点扩容

对于 AI 应用,Worker 的扩容通常比 API 服务扩容更关键,因为真正耗时的部分在模型调用。


九、总结

AI 工具的高并发问题,本质上是 长耗时任务在高流量场景下的系统稳定性问题

一套可靠的方案通常包括:

  1. 网关限流:防止入口流量失控;
  2. 异步任务:避免 Web 请求长时间阻塞;
  3. 消息队列:削峰填谷,保护下游模型;
  4. Worker 并发控制:限制模型调用并发;
  5. 缓存复用:降低重复计算与 API 成本;
  6. 失败重试:提高任务成功率;
  7. 超时控制:防止任务无限挂起;
  8. 降级策略:系统高压时优雅失败;
  9. 监控告警:及时发现性能瓶颈;
  10. 分层扩容:API、队列、Worker、模型服务分别扩展。

如果只是做 Demo,可以同步调用模型;但如果要做真正面向用户的 AI 工具,建议从一开始就采用 异步任务 + 队列 + Worker + 缓存 + 限流 的架构。

这样不仅能提升系统并发能力,也能显著降低模型调用成本,让 AI 产品在用户增长后依然稳定可用。

目录结构
全文