上一篇 下一篇 分享链接 返回 返回顶部

把流量挡在源站之外:Cloudflare Workers 高并发网关实战(含源码)

发布人:慈云数据-客服中心 发布时间:23小时前 阅读量:4

Cloudflare 高并发解决方案|附源码

在现代互联网业务中,高并发几乎是所有线上系统绕不开的问题。无论是电商秒杀、活动报名、短链接跳转、接口网关、图片分发,还是 SaaS 平台的 API 服务,只要流量在短时间内集中爆发,传统单机或普通云服务器架构就很容易出现 CPU 飙升、数据库连接耗尽、响应延迟升高、服务雪崩等问题。

Cloudflare 作为全球边缘网络平台,提供了包括 CDN、Workers、KV、R2、D1、Durable Objects、Queues、WAF、Rate Limiting 等一系列能力。合理组合这些产品,可以构建一套具备高并发处理能力、低延迟、自动扩展、抗攻击能力强的服务架构。

本文将围绕一个典型场景展开:使用 Cloudflare Workers 构建高并发 API 网关与缓存系统,并给出可直接参考的源码示例。


一、为什么选择 Cloudflare 处理高并发?

传统高并发架构通常需要考虑以下问题:

  1. 服务器是否能承载大量请求;
  2. 数据库是否会被瞬间打爆;
  3. 静态资源是否需要 CDN;
  4. 是否需要负载均衡;
  5. 是否需要防御恶意请求;
  6. 是否需要削峰填谷;
  7. 是否需要全球用户低延迟访问;
  8. 是否需要弹性扩容和自动伸缩。

如果采用传统云服务器方案,通常需要自己搭建:

  • Nginx;
  • SLB / ELB;
  • Redis;
  • MySQL 主从;
  • 消息队列;
  • CDN;
  • WAF;
  • 限流组件;
  • 日志系统;
  • 自动扩缩容系统。

这套系统当然可以实现,但运维复杂度和成本都不低。

Cloudflare 的优势在于,它将大量能力前置到了全球边缘节点:

  • 请求先到 Cloudflare 边缘节点;
  • 静态资源可直接在边缘缓存;
  • Worker 代码可在边缘执行;
  • 恶意流量可在到达源站前被拦截;
  • 热点数据可通过 Cache API 或 KV 缓存;
  • 写入任务可通过 Queues 异步处理;
  • 强一致状态可用 Durable Objects 管理。

简单来说,Cloudflare 可以让我们把很多高并发压力消化在源站之前。


二、整体架构设计

本文推荐的高并发架构如下:

用户请求
   │
   ▼
Cloudflare DNS
   │
   ▼
Cloudflare WAF / DDoS 防护 / Bot Fight Mode
   │
   ▼
Cloudflare Worker
   │
   ├── 静态资源:Cache API / CDN
   │
   ├── 热点数据:KV / Cache API
   │
   ├── 强一致计数:Durable Objects
   │
   ├── 异步任务:Queues
   │
   ├── 文件资源:R2
   │
   └── 数据查询:D1 / 外部数据库
   │
   ▼
源站服务 / 数据库

核心思路

高并发系统最重要的原则并不是“让数据库更强”,而是:

能不访问源站就不访问源站,能不访问数据库就不访问数据库,能异步处理就不要同步处理。

因此我们可以将请求分为几类:

请求类型 处理方式
静态资源 Cloudflare CDN / Cache API
热点接口 Worker + Cache API
配置数据 KV
写入日志 Queues 异步入库
秒杀库存 Durable Objects 串行化处理
用户上传文件 R2
管理后台查询 D1 或源站数据库

三、适用场景

这套方案非常适合以下场景:

  1. 高并发 API 网关;
  2. 电商活动页;
  3. 秒杀抢购;
  4. 短链接跳转;
  5. 图片、视频、文件分发;
  6. SaaS 多租户接口;
  7. 全球化业务访问加速;
  8. 边缘鉴权;
  9. 防刷、防爬虫;
  10. 轻量后端服务。

如果你的业务主要是读多写少,Cloudflare 架构会非常有优势。如果是复杂事务型系统,仍然建议核心交易链路放在传统数据库中,Cloudflare 负责入口、缓存、限流和削峰。


四、高并发解决方案关键点

1. 使用 Cache API 缓存热点接口

对于访问量很高但数据变化不频繁的接口,例如:

  • 首页配置;
  • 商品列表;
  • 文章详情;
  • 活动规则;
  • 公告信息;
  • 排行榜快照;

都可以通过 Cloudflare Worker 的 Cache API 缓存在边缘节点。

用户请求到达 Worker 后,先查缓存:

  • 如果命中缓存,直接返回;
  • 如果未命中缓存,再请求源站;
  • 请求源站成功后,将结果写入缓存。

这样可以显著减少源站压力。


2. 使用 KV 存储全局配置

Cloudflare KV 适合存储读多写少的数据,例如:

  • 站点配置;
  • API 开关;
  • 活动状态;
  • 黑名单;
  • 白名单;
  • 路由规则;
  • 页面配置;
  • 短链接映射。

KV 的优点是读取速度快、全球分布,缺点是写入后不是强一致,通常会有短暂传播延迟。因此不适合存储库存、余额等强一致数据。


3. 使用 Durable Objects 管理强一致状态

高并发系统中,很多问题不是读,而是写。例如秒杀库存扣减、计数器递增、抽奖次数限制等。

如果多个请求同时修改同一份状态,很容易产生并发问题。Durable Objects 的特点是:

  • 同一个对象实例内请求串行处理;
  • 适合管理某个资源的强一致状态;
  • 可用于库存、计数器、限流器、房间状态等。

例如秒杀活动中,一个商品对应一个 Durable Object。所有扣库存请求都进入同一个对象,由该对象逐个处理,从而避免超卖。


4. 使用 Queues 进行削峰填谷

高并发写入最容易击穿数据库。例如:

  • 访问日志;
  • 操作日志;
  • 下单后通知;
  • 邮件发送;
  • 短信发送;
  • 积分发放;
  • 数据同步。

这些任务没有必要在用户请求中同步完成,可以先写入 Cloudflare Queues,然后由消费者异步处理。

这样做的好处:

  • 降低接口响应时间;
  • 防止数据库瞬间被打爆;
  • 支持失败重试;
  • 支持批量消费;
  • 提高系统稳定性。

5. 使用 WAF 和 Rate Limiting 防御恶意流量

高并发不一定全部来自正常用户,也可能来自:

  • 爬虫;
  • 刷接口;
  • CC 攻击;
  • 暴力破解;
  • 恶意脚本;
  • 扫描器。

Cloudflare 在入口层提供了多种防护能力:

  • DDoS 防护;
  • WAF 规则;
  • Bot Fight Mode;
  • Turnstile 人机验证;
  • Rate Limiting;
  • IP Reputation;
  • Geo Blocking。

在实际项目中,建议将安全策略前置,让无效请求尽量不要进入 Worker,更不要进入源站。


五、项目结构

下面给出一个基于 Cloudflare Workers 的高并发 API 网关示例。

项目结构如下:

cloudflare-high-concurrency-demo
├── package.json
├── wrangler.toml
└── src
    ├── index.js
    └── durable-object.js

该示例包含:

  • API 缓存;
  • KV 配置读取;
  • 简单限流;
  • Durable Objects 库存扣减;
  • Queues 异步日志;
  • 源站回源;
  • 安全响应头。

六、wrangler.toml 配置

name = "cloudflare-high-concurrency-demo"
main = "src/index.js"
compatibility_date = "2024-09-01"

workers_dev = true

[[kv_namespaces]]
binding = "CONFIG_KV"
id = "your_kv_namespace_id"

[[queues.producers]]
binding = "LOG_QUEUE"
queue = "access-log-queue"

[[queues.consumers]]
queue = "access-log-queue"
max_batch_size = 20
max_batch_timeout = 5

[[durable_objects.bindings]]
name = "STOCK_DO"
class_name = "StockDurableObject"

[[migrations]]
tag = "v1"
new_classes = ["StockDurableObject"]

[vars]
ORIGIN_API = "https://api.example.com"
CACHE_TTL = "60"
RATE_LIMIT = "100"

说明:

  • CONFIG_KV:用于存储配置;
  • LOG_QUEUE:用于异步写入日志;
  • STOCK_DO:用于秒杀库存扣减;
  • ORIGIN_API:源站地址;
  • CACHE_TTL:接口缓存时间;
  • RATE_LIMIT:单 IP 简单限流阈值。

七、核心源码:Worker 入口

文件:src/index.js

import { StockDurableObject } from "./durable-object.js";

export { StockDurableObject };

export default {
  async fetch(request, env, ctx) {
    const startTime = Date.now();
    const url = new URL(request.url);

    try {
      // 1. 基础安全检查
      const securityResult = await securityCheck(request, env);
      if (!securityResult.pass) {
        return jsonResponse({
          code: 403,
          message: securityResult.message
        }, 403);
      }

      // 2. 路由分发
      let response;

      if (url.pathname === "/api/health") {
        response = jsonResponse({
          code: 0,
          message: "ok",
          timestamp: Date.now()
        });
      } else if (url.pathname === "/api/config") {
        response = await handleConfig(request, env);
      } else if (url.pathname === "/api/products") {
        response = await handleCacheProxy(request, env, ctx);
      } else if (url.pathname === "/api/seckill") {
        response = await handleSeckill(request, env);
      } else {
        response = await handleOriginProxy(request, env);
      }

      // 3. 异步日志,不阻塞用户响应
      ctx.waitUntil(writeAccessLog(request, env, response, startTime));

      // 4. 添加安全响应头
      return addSecurityHeaders(response);

    } catch (error) {
      console.error("Worker Error:", error);

      ctx.waitUntil(writeErrorLog(request, env, error, startTime));

      return jsonResponse({
        code: 500,
        message: "Internal Server Error"
      }, 500);
    }
  },

  async queue(batch, env) {
    for (const message of batch.messages) {
      try {
        const log = message.body;

        // 实际项目中可以写入 D1、R2、外部日志系统或数据库
        console.log("consume log:", JSON.stringify(log));

        message.ack();
      } catch (error) {
        console.error("queue consume error:", error);
        message.retry();
      }
    }
  }
};

八、安全检查与限流

下面是一个简单的边缘安全检查示例。生产环境中可以结合 Cloudflare WAF、Turnstile、Rate Limiting 等能力实现更完整的策略。

async function securityCheck(request, env) {
  const url = new URL(request.url);
  const ip = request.headers.get("CF-Connecting-IP") || "unknown";
  const userAgent = request.headers.get("User-Agent") || "";

  // 拦截空 UA 的异常请求
  if (!userAgent || userAgent.length < 5) {
    return {
      pass: false,
      message: "Invalid User-Agent"
    };
  }

  // 禁止访问敏感路径
  const forbiddenPaths = [
    "/.env",
    "/wp-admin",
    "/phpmyadmin",
    "/server-status"
  ];

  if (forbiddenPaths.some(path => url.pathname.startsWith(path))) {
    return {
      pass: false,
      message: "Forbidden Path"
    };
  }

  // 从 KV 读取黑名单
  const blacklist = await env.CONFIG_KV.get("ip_blacklist", "json");
  if (Array.isArray(blacklist) && blacklist.includes(ip)) {
    return {
      pass: false,
      message: "IP Blocked"
    };
  }

  return {
    pass: true
  };
}

这里使用 KV 存储 IP 黑名单,例如:

[
  "1.1.1.1",
  "2.2.2.2"
]

需要注意的是,KV 不适合做毫秒级强一致限流。如果需要严格限流,应结合 Durable Objects 或 Cloudflare 原生 Rate Limiting。


九、配置接口:KV 读取示例

async function handleConfig(request, env) {
  const siteConfig = await env.CONFIG_KV.get("site_config", "json");

  return jsonResponse({
    code: 0,
    data: siteConfig || {
      siteName: "Cloudflare 高并发示例站点",
      maintenance: false,
      version: "1.0.0"
    }
  });
}

KV 中可以保存如下配置:

{
  "siteName": "高并发活动平台",
  "maintenance": false,
  "version": "1.0.0",
  "features": {
    "seckill": true,
    "ranking": true
  }
}

这样我们无需重新部署 Worker,就可以通过修改 KV 配置动态控制业务。


十、热点接口缓存代理

对于 /api/products 这类高频访问接口,可以使用 Cache API 缓存。

async function handleCacheProxy(request, env, ctx) {
  const cache = caches.default;
  const url = new URL(request.url);

  // 只缓存 GET 请求
  if (request.method !== "GET") {
    return handleOriginProxy(request, env);
  }

  const cacheKey = new Request(url.toString(), request);
  const cachedResponse = await cache.match(cacheKey);

  if (cachedResponse) {
    const response = new Response(cachedResponse.body, cachedResponse);
    response.headers.set("X-Cache", "HIT");
    return response;
  }

  const originUrl = env.ORIGIN_API + url.pathname + url.search;

  const originResponse = await fetch(originUrl, {
    method: request.method,
    headers: buildOriginHeaders(request)
  });

  if (!originResponse.ok) {
    return originResponse;
  }

  const ttl = Number(env.CACHE_TTL || 60);

  const responseBody = await originResponse.text();

  const response = new Response(responseBody, {
    status: originResponse.status,
    headers: {
      "Content-Type": originResponse.headers.get("Content-Type") || "application/json",
      "Cache-Control": `public, max-age=${ttl}`
    }
  });

  response.headers.set("X-Cache", "MISS");

  ctx.waitUntil(cache.put(cacheKey, response.clone()));

  return response;
}

这段代码的逻辑是:

  1. GET 请求才走缓存;
  2. 先查询 Cloudflare 边缘缓存;
  3. 命中则直接返回;
  4. 未命中则请求源站;
  5. 成功后异步写入缓存;
  6. 下次请求直接从边缘节点返回。

这样可以把大量热点读取请求挡在边缘层。


十一、普通请求回源代理

如果不是热点接口,可以直接代理到源站。

async function handleOriginProxy(request, env) {
  const url = new URL(request.url);
  const originUrl = env.ORIGIN_API + url.pathname + url.search;

  const originRequest = new Request(originUrl, {
    method: request.method,
    headers: buildOriginHeaders(request),
    body: request.method === "GET" || request.method === "HEAD"
      ? undefined
      : request.body
  });

  return fetch(originRequest);
}

function buildOriginHeaders(request) {
  const headers = new Headers(request.headers);

  headers.set("X-Forwarded-By", "Cloudflare-Worker");
  headers.set("X-Real-IP", request.headers.get("CF-Connecting-IP") || "");

  // 避免把客户端 Host 传到源站导致路由异常
  headers.delete("Host");

  return headers;
}

这里 Worker 相当于一个边缘 API 网关。源站可以隐藏在 Cloudflare 后面,减少暴露风险。


十二、秒杀库存:Durable Objects 源码

文件:src/durable-object.js

export class StockDurableObject {
  constructor(state, env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request) {
    const url = new URL(request.url);

    if (url.pathname === "/init") {
      return this.initStock(request);
    }

    if (url.pathname === "/deduct") {
      return this.deductStock(request);
    }

    if (url.pathname === "/status") {
      return this.getStatus();
    }

    return new Response("Not Found", { status: 404 });
  }

  async initStock(request) {
    const body = await request.json();
    const stock = Number(body.stock || 0);

    if (stock < 0) {
      return jsonResponse({
        code: 400,
        message: "Invalid stock"
      }, 400);
    }

    await this.state.storage.put("stock", stock);

    return jsonResponse({
      code: 0,
      message: "Stock initialized",
      stock
    });
  }

  async deductStock(request) {
    const body = await request.json();
    const userId = body.userId;
    const quantity = Number(body.quantity || 1);

    if (!userId) {
      return jsonResponse({
        code: 400,
        message: "Missing userId"
      }, 400);
    }

    if (quantity <= 0) {
      return jsonResponse({
        code: 400,
        message: "Invalid quantity"
      }, 400);
    }

    // 防止同一用户重复抢购
    const userKey = `user:${userId}`;
    const hasBought = await this.state.storage.get(userKey);

    if (hasBought) {
      return jsonResponse({
        code: 409,
        message: "User already purchased"
      }, 409);
    }

    let stock = await this.state.storage.get("stock");
    stock = Number(stock || 0);

    if (stock < quantity) {
      return jsonResponse({
        code:  sold_out_code(),
        message: "Sold out",
        stock
      }, 200);
    }

    stock -= quantity;

    await this.state.storage.put("stock", stock);
    await this.state.storage.put(userKey, {
      quantity,
      time: Date.now()
    });

    return jsonResponse({
      code: 0,
      message: "Success",
      stock
    });
  }

  async getStatus() {
    const stock = await this.state.storage.get("stock");

    return jsonResponse({
      code: 0,
      stock: Number(stock || 0)
    });
  }
}

function sold_out_code() {
  return 1001;
}

function jsonResponse(data, status = 200) {
  return new Response(JSON.stringify(data), {
    status,
    headers: {
      "Content-Type": "application/json;charset=UTF-8"
    }
  });
}

Durable Objects 的关键点是:同一个对象实例会串行处理请求。因此对于同一个商品库存,只要所有扣减请求都进入同一个 Durable Object,就可以避免并发超卖。


十三、秒杀接口调用 Durable Objects

src/index.js 中增加:

async function handleSeckill(request, env) {
  if (request.method !== "POST") {
    return jsonResponse({
      code: 405,
      message: "Method Not Allowed"
    }, 405);
  }

  const body = await request.json();

  const productId = body.productId;
  const userId = body.userId;
  const quantity = Number(body.quantity || 1);

  if (!productId || !userId) {
    return jsonResponse({
      code: 400,
      message: "Missing productId or userId"
    }, 400);
  }

  // 一个商品对应一个 Durable Object
  const objectId = env.STOCK_DO.idFromName(`product:${productId}`);
  const object = env.STOCK_DO.get(objectId);

  const doRequest = new Request("https://stock.durable-object/deduct", {
    method: "POST",
    body: JSON.stringify({
      userId,
      quantity
    }),
    headers: {
      "Content-Type": "application/json"
    }
  });

  return object.fetch(doRequest);
}

初始化库存可以通过临时接口、后台接口或 Wrangler 调用完成。例如:

async function initProductStock(env, productId, stock) {
  const objectId = env.STOCK_DO.idFromName(`product:${productId}`);
  const object = env.STOCK_DO.get(objectId);

  const request = new Request("https://stock.durable-object/init", {
    method: "POST",
    body: JSON.stringify({ stock }),
    headers: {
      "Content-Type": "application/json"
    }
  });

  return object.fetch(request);
}

十四、异步日志写入 Queues

高并发场景下,日志写入非常容易拖慢主请求。如果每个请求都同步写数据库,数据库压力会非常大。

建议使用 Queues 异步处理。

async function writeAccessLog(request, env, response, startTime) {
  const url = new URL(request.url);

  const log = {
    type: "access",
    path: url.pathname,
    query: url.search,
    method: request.method,
    status: response.status,
    ip: request.headers.get("CF-Connecting-IP") || "",
    country: request.cf?.country || "",
    colo: request.cf?.colo || "",
    userAgent: request.headers.get("User-Agent") || "",
    cost: Date.now() - startTime,
    time: new Date().toISOString()
  };

  await env.LOG_QUEUE.send(log);
}

async function writeErrorLog(request, env, error, startTime) {
  const url = new URL(request.url);

  const log = {
    type: "error",
    path: url.pathname,
    method: request.method,
    message: error.message,
    stack: error.stack,
    cost: Date.now() - startTime,
    time: new Date().toISOString()
  };

  await env.LOG_QUEUE.send(log);
}

在 Worker 主逻辑中通过:

ctx.waitUntil(writeAccessLog(request, env, response, startTime));

可以让日志写入不阻塞用户响应。


十五、统一 JSON 响应与安全响应头

function jsonResponse(data, status = 200) {
  return new Response(JSON.stringify(data), {
    status,
    headers: {
      "Content-Type": "application/json;charset=UTF-8"
    }
  });
}

function addSecurityHeaders(response) {
  const newResponse = new Response(response.body, response);

  newResponse.headers.set("X-Content-Type-Options", "nosniff");
  newResponse.headers.set("X-Frame-Options", "DENY");
  newResponse.headers.set("X-XSS-Protection", "1; mode=block");
  newResponse.headers.set("Referrer-Policy", "strict-origin-when-cross-origin");
  newResponse.headers.set("Server", "Cloudflare Workers");

  return newResponse;
}

安全响应头虽然不能解决所有问题,但可以减少常见的浏览器安全风险。


十六、完整 src/index.js 参考代码

import { StockDurableObject } from "./durable-object.js";

export { StockDurableObject };

export default {
  async fetch(request, env, ctx) {
    const startTime = Date.now();
    const url = new URL(request.url);

    try {
      const securityResult = await securityCheck(request, env);
      if (!securityResult.pass) {
        return jsonResponse({
          code: 403,
          message: securityResult.message
        }, 403);
      }

      let response;

      if (url.pathname === "/api/health") {
        response = jsonResponse({
          code: 0,
          message: "ok",
          timestamp: Date.now()
        });
      } else if (url.pathname === "/api/config") {
        response = await handleConfig(request, env);
      } else if (url.pathname === "/api/products") {
        response = await handleCacheProxy(request, env, ctx);
      } else if (url.pathname === "/api/seckill") {
        response = await handleSeckill(request, env);
      } else {
        response = await handleOriginProxy(request, env);
      }

      ctx.waitUntil(writeAccessLog(request, env, response, startTime));

      return addSecurityHeaders(response);
    } catch (error) {
      console.error("Worker Error:", error);

      ctx.waitUntil(writeErrorLog(request, env, error, startTime));

      return jsonResponse({
        code: 500,
        message: "Internal Server Error"
      }, 500);
    }
  },

  async queue(batch, env) {
    for (const message of batch.messages) {
      try {
        console.log("consume log:", JSON.stringify(message.body));
        message.ack();
      } catch (error) {
        console.error("queue consume error:", error);
        message.retry();
      }
    }
  }
};

async function securityCheck(request, env) {
  const url = new URL(request.url);
  const ip = request.headers.get("CF-Connecting-IP") || "unknown";
  const userAgent = request.headers.get("User-Agent") || "";

  if (!userAgent || userAgent.length < 5) {
    return {
      pass: false,
      message: "Invalid User-Agent"
    };
  }

  const forbiddenPaths = [
    "/.env",
    "/wp-admin",
    "/phpmyadmin",
    "/server-status"
  ];

  if (forbiddenPaths.some(path => url.pathname.startsWith(path))) {
    return {
      pass: false,
      message: "Forbidden Path"
    };
  }

  const blacklist = await env.CONFIG_KV.get("ip_blacklist", "json");

  if (Array.isArray(blacklist) && blacklist.includes(ip)) {
    return {
      pass: false,
      message: "IP Blocked"
    };
  }

  return {
    pass: true
  };
}

async function handleConfig(request, env) {
  const siteConfig = await env.CONFIG_KV.get("site_config", "json");

  return jsonResponse({
    code: 0,
    data: siteConfig || {
      siteName: "Cloudflare 高并发示例站点",
      maintenance: false,
      version: "1.0.0"
    }
  });
}

async function handleCacheProxy(request, env, ctx) {
  const cache = caches.default;
  const url = new URL(request.url);

  if (request.method !== "GET") {
    return handleOriginProxy(request, env);
  }

  const cacheKey = new Request(url.toString(), request);
  const cachedResponse = await cache.match(cacheKey);

  if (cachedResponse) {
    const response = new Response(cachedResponse.body, cachedResponse);
    response.headers.set("X-Cache", "HIT");
    return response;
  }

  const originUrl = env.ORIGIN_API + url.pathname + url.search;

  const originResponse = await fetch(originUrl, {
    method: request.method,
    headers: buildOriginHeaders(request)
  });

  if (!originResponse.ok) {
    return originResponse;
  }

  const ttl = Number(env.CACHE_TTL || 60);
  const responseBody = await originResponse.text();

  const response = new Response(responseBody, {
    status: originResponse.status,
    headers: {
      "Content-Type": originResponse.headers.get("Content-Type") || "application/json",
      "Cache-Control": `public, max-age=${ttl}`
    }
  });

  response.headers.set("X-Cache", "MISS");

  ctx.waitUntil(cache.put(cacheKey, response.clone()));

  return response;
}

async function handleOriginProxy(request, env) {
  const url = new URL(request.url);
  const originUrl = env.ORIGIN_API + url.pathname + url.search;

  const originRequest = new Request(originUrl, {
    method: request.method,
    headers: buildOriginHeaders(request),
    body: request.method === "GET" || request.method === "HEAD"
      ? undefined
      : request.body
  });

  return fetch(originRequest);
}

async function handleSeckill(request, env) {
  if (request.method !== "POST") {
    return jsonResponse({
      code: 405,
      message: "Method Not Allowed"
    }, 405);
  }

  const body = await request.json();

  const productId = body.productId;
  const userId = body.userId;
  const quantity = Number(body.quantity || 1);

  if (!productId || !userId) {
    return jsonResponse({
      code: 400,
      message: "Missing productId or userId"
    }, 400);
  }

  const objectId = env.STOCK_DO.idFromName(`product:${productId}`);
  const object = env.STOCK_DO.get(objectId);

  const doRequest = new Request("https://stock.durable-object/deduct", {
    method: "POST",
    body: JSON.stringify({
      userId,
      quantity
    }),
    headers: {
      "Content-Type": "application/json"
    }
  });

  return object.fetch(doRequest);
}

function buildOriginHeaders(request) {
  const headers = new Headers(request.headers);

  headers.set("X-Forwarded-By", "Cloudflare-Worker");
  headers.set("X-Real-IP", request.headers.get("CF-Connecting-IP") || "");
  headers.delete("Host");

  return headers;
}

async function writeAccessLog(request, env, response, startTime) {
  const url = new URL(request.url);

  const log = {
    type: "access",
    path: url.pathname,
    query: url.search,
    method: request.method,
    status: response.status,
    ip: request.headers.get("CF-Connecting-IP") || "",
    country: request.cf?.country || "",
    colo: request.cf?.colo || "",
    userAgent: request.headers.get("User-Agent") || "",
    cost: Date.now() - startTime,
    time: new Date().toISOString()
  };

  await env.LOG_QUEUE.send(log);
}

async function writeErrorLog(request, env, error, startTime) {
  const url = new URL(request.url);

  const log = {
    type: "error",
    path: url.pathname,
    method: request.method,
    message: error.message,
    stack: error.stack,
    cost: Date.now() - startTime,
    time: new Date().toISOString()
  };

  await env.LOG_QUEUE.send(log);
}

function jsonResponse(data, status = 200) {
  return new Response(JSON.stringify(data), {
    status,
    headers: {
      "Content-Type": "application/json;charset=UTF-8"
    }
  });
}

function addSecurityHeaders(response) {
  const newResponse = new Response(response.body, response);

  newResponse.headers.set("X-Content-Type-Options", "nosniff");
  newResponse.headers.set("X-Frame-Options", "DENY");
  newResponse.headers.set("X-XSS-Protection", "1; mode=block");
  newResponse.headers.set("Referrer-Policy", "strict-origin-when-cross-origin");
  newResponse.headers.set("Server", "Cloudflare Workers");

  return newResponse;
}

十七、package.json

{
  "name": "cloudflare-high-concurrency-demo",
  "version": "1.0.0",
  "description": "Cloudflare Workers high concurrency solution demo",
  "main": "src/index.js",
  "scripts": {
    "dev": "wrangler dev",
    "deploy": "wrangler deploy",
    "tail": "wrangler tail"
  },
  "dependencies": {},
  "devDependencies": {
    "wrangler": "^3.78.0"
  }
}

十八、部署步骤

1. 安装 Wrangler

npm install -g wrangler

2. 登录 Cloudflare

wrangler login

3. 创建 KV

wrangler kv namespace create CONFIG_KV

将返回的 id 填入 wrangler.toml

4. 创建 Queue

wrangler queues create access-log-queue

5. 部署 Worker

npm install
npm run deploy

6. 查看日志

npm run tail

十九、性能优化建议

1. 缓存优先

高并发系统最有效的优化永远是缓存。建议对读接口进行分类:

  • 秒级变化:缓存 5 到 30 秒;
  • 分钟级变化:缓存 1 到 10 分钟;
  • 小时级变化:缓存 1 到 24 小时;
  • 几乎不变:长期缓存。

即使只缓存 10 秒,也能在突发流量中大幅降低源站压力。


2. 限流前置

不要等请求进入数据库后才判断是否非法。限流应该尽量前置:

  1. Cloudflare WAF;
  2. Cloudflare Rate Limiting;
  3. Worker 边缘判断;
  4. Durable Objects 精确限流;
  5. 源站兜底限流。

3. 读写分离

高并发系统中,读请求与写请求要分开处理。

读请求:

  • CDN;
  • Cache API;
  • KV;
  • R2。

写请求:

  • Durable Objects;
  • Queues;
  • D1;
  • 源站数据库。

不要让所有请求都直接进入数据库。


4. 异步化

用户不关心的操作尽量异步化,例如:

  • 发送邮件;
  • 写访问日志;
  • 统计 PV;
  • 同步第三方系统;
  • 更新排行榜;
  • 消息通知。

这些操作都可以放入队列中处理。


5. 熔断与降级

当源站不可用时,Worker 可以返回缓存数据或降级页面。例如:

async function handleWithFallback(request, env, ctx) {
  try {
    return await handleOriginProxy(request, env);
  } catch (error) {
    const fallback = await env.CONFIG_KV.get("fallback_message");

    return jsonResponse({
      code: 503,
      message: fallback || "服务繁忙,请稍后再试"
    }, 503);
  }
}

在大促、活动、突发热点期间,降级策略非常重要。系统不一定要在任何情况下都返回完整功能,但一定要避免整体崩溃。


二十、常见问题

1. KV 能不能做库存扣减?

不建议。KV 是最终一致的,适合读多写少的数据。库存扣减、余额变更、订单状态等强一致业务不适合使用 KV。

2. Durable Objects 能不能承载无限并发?

不能。Durable Objects 适合处理某个热点资源的串行状态,但如果所有请求都打到同一个对象,也会形成单点热点。实际设计中应按商品、活动、用户、房间等维度拆分对象。

3. Worker 是否可以完全替代后端?

对于轻量业务可以,例如短链接、配置接口、静态站点 API、边缘鉴权等。但对于复杂业务,如复杂事务、财务系统、复杂报表,仍建议保留后端服务。

4. Cache API 和 KV 有什么区别?

Cache API 更适合缓存 HTTP 响应,常用于接口和页面缓存;KV 更适合键值配置和小数据读取。Cache API 通常与请求 URL 绑定,KV 则更像一个全局键值数据库。

5. 高并发下如何避免缓存击穿?

可以采用:

  • 较短 TTL;
  • 热点数据预热;
  • stale-while-revalidate;
  • Worker 中加请求合并;
  • 源站限流;
  • 降级兜底。

二十一、总结

Cloudflare 高并发解决方案的核心不是单点性能,而是利用全球边缘网络将压力分散处理。

一套比较完整的方案可以概括为:

  1. CDN / Cache API 承担静态资源和热点接口缓存;
  2. KV 存储读多写少的配置和映射数据;
  3. Durable Objects 处理强一致状态,例如库存和计数器;
  4. Queues 处理异步任务,实现削峰填谷;
  5. WAF / Rate Limiting 在入口层拦截恶意流量;
  6. Worker 作为边缘 API 网关,完成鉴权、路由、缓存、代理和降级;
  7. 源站数据库 只处理真正需要强事务和复杂计算的请求。

通过这套架构,系统可以在面对突发流量时保持更好的稳定性和响应速度,同时降低源站成本和运维复杂度。

如果你的业务正在面临高并发压力,尤其是读多写少、全球访问、活动突发流量等场景,Cloudflare Workers 生态是一套非常值得尝试的解决方案。

目录结构
全文