把流量挡在源站之外:Cloudflare Workers 高并发网关实战(含源码)
Cloudflare 高并发解决方案|附源码
在现代互联网业务中,高并发几乎是所有线上系统绕不开的问题。无论是电商秒杀、活动报名、短链接跳转、接口网关、图片分发,还是 SaaS 平台的 API 服务,只要流量在短时间内集中爆发,传统单机或普通云服务器架构就很容易出现 CPU 飙升、数据库连接耗尽、响应延迟升高、服务雪崩等问题。
Cloudflare 作为全球边缘网络平台,提供了包括 CDN、Workers、KV、R2、D1、Durable Objects、Queues、WAF、Rate Limiting 等一系列能力。合理组合这些产品,可以构建一套具备高并发处理能力、低延迟、自动扩展、抗攻击能力强的服务架构。
本文将围绕一个典型场景展开:使用 Cloudflare Workers 构建高并发 API 网关与缓存系统,并给出可直接参考的源码示例。
一、为什么选择 Cloudflare 处理高并发?
传统高并发架构通常需要考虑以下问题:
- 服务器是否能承载大量请求;
- 数据库是否会被瞬间打爆;
- 静态资源是否需要 CDN;
- 是否需要负载均衡;
- 是否需要防御恶意请求;
- 是否需要削峰填谷;
- 是否需要全球用户低延迟访问;
- 是否需要弹性扩容和自动伸缩。
如果采用传统云服务器方案,通常需要自己搭建:
- Nginx;
- SLB / ELB;
- Redis;
- MySQL 主从;
- 消息队列;
- CDN;
- WAF;
- 限流组件;
- 日志系统;
- 自动扩缩容系统。
这套系统当然可以实现,但运维复杂度和成本都不低。
Cloudflare 的优势在于,它将大量能力前置到了全球边缘节点:
- 请求先到 Cloudflare 边缘节点;
- 静态资源可直接在边缘缓存;
- Worker 代码可在边缘执行;
- 恶意流量可在到达源站前被拦截;
- 热点数据可通过 Cache API 或 KV 缓存;
- 写入任务可通过 Queues 异步处理;
- 强一致状态可用 Durable Objects 管理。
简单来说,Cloudflare 可以让我们把很多高并发压力消化在源站之前。
二、整体架构设计
本文推荐的高并发架构如下:
用户请求
│
▼
Cloudflare DNS
│
▼
Cloudflare WAF / DDoS 防护 / Bot Fight Mode
│
▼
Cloudflare Worker
│
├── 静态资源:Cache API / CDN
│
├── 热点数据:KV / Cache API
│
├── 强一致计数:Durable Objects
│
├── 异步任务:Queues
│
├── 文件资源:R2
│
└── 数据查询:D1 / 外部数据库
│
▼
源站服务 / 数据库
核心思路
高并发系统最重要的原则并不是“让数据库更强”,而是:
能不访问源站就不访问源站,能不访问数据库就不访问数据库,能异步处理就不要同步处理。
因此我们可以将请求分为几类:
| 请求类型 | 处理方式 |
|---|---|
| 静态资源 | Cloudflare CDN / Cache API |
| 热点接口 | Worker + Cache API |
| 配置数据 | KV |
| 写入日志 | Queues 异步入库 |
| 秒杀库存 | Durable Objects 串行化处理 |
| 用户上传文件 | R2 |
| 管理后台查询 | D1 或源站数据库 |
三、适用场景
这套方案非常适合以下场景:
- 高并发 API 网关;
- 电商活动页;
- 秒杀抢购;
- 短链接跳转;
- 图片、视频、文件分发;
- SaaS 多租户接口;
- 全球化业务访问加速;
- 边缘鉴权;
- 防刷、防爬虫;
- 轻量后端服务。
如果你的业务主要是读多写少,Cloudflare 架构会非常有优势。如果是复杂事务型系统,仍然建议核心交易链路放在传统数据库中,Cloudflare 负责入口、缓存、限流和削峰。
四、高并发解决方案关键点
1. 使用 Cache API 缓存热点接口
对于访问量很高但数据变化不频繁的接口,例如:
- 首页配置;
- 商品列表;
- 文章详情;
- 活动规则;
- 公告信息;
- 排行榜快照;
都可以通过 Cloudflare Worker 的 Cache API 缓存在边缘节点。
用户请求到达 Worker 后,先查缓存:
- 如果命中缓存,直接返回;
- 如果未命中缓存,再请求源站;
- 请求源站成功后,将结果写入缓存。
这样可以显著减少源站压力。
2. 使用 KV 存储全局配置
Cloudflare KV 适合存储读多写少的数据,例如:
- 站点配置;
- API 开关;
- 活动状态;
- 黑名单;
- 白名单;
- 路由规则;
- 页面配置;
- 短链接映射。
KV 的优点是读取速度快、全球分布,缺点是写入后不是强一致,通常会有短暂传播延迟。因此不适合存储库存、余额等强一致数据。
3. 使用 Durable Objects 管理强一致状态
高并发系统中,很多问题不是读,而是写。例如秒杀库存扣减、计数器递增、抽奖次数限制等。
如果多个请求同时修改同一份状态,很容易产生并发问题。Durable Objects 的特点是:
- 同一个对象实例内请求串行处理;
- 适合管理某个资源的强一致状态;
- 可用于库存、计数器、限流器、房间状态等。
例如秒杀活动中,一个商品对应一个 Durable Object。所有扣库存请求都进入同一个对象,由该对象逐个处理,从而避免超卖。
4. 使用 Queues 进行削峰填谷
高并发写入最容易击穿数据库。例如:
- 访问日志;
- 操作日志;
- 下单后通知;
- 邮件发送;
- 短信发送;
- 积分发放;
- 数据同步。
这些任务没有必要在用户请求中同步完成,可以先写入 Cloudflare Queues,然后由消费者异步处理。
这样做的好处:
- 降低接口响应时间;
- 防止数据库瞬间被打爆;
- 支持失败重试;
- 支持批量消费;
- 提高系统稳定性。
5. 使用 WAF 和 Rate Limiting 防御恶意流量
高并发不一定全部来自正常用户,也可能来自:
- 爬虫;
- 刷接口;
- CC 攻击;
- 暴力破解;
- 恶意脚本;
- 扫描器。
Cloudflare 在入口层提供了多种防护能力:
- DDoS 防护;
- WAF 规则;
- Bot Fight Mode;
- Turnstile 人机验证;
- Rate Limiting;
- IP Reputation;
- Geo Blocking。
在实际项目中,建议将安全策略前置,让无效请求尽量不要进入 Worker,更不要进入源站。
五、项目结构
下面给出一个基于 Cloudflare Workers 的高并发 API 网关示例。
项目结构如下:
cloudflare-high-concurrency-demo
├── package.json
├── wrangler.toml
└── src
├── index.js
└── durable-object.js
该示例包含:
- API 缓存;
- KV 配置读取;
- 简单限流;
- Durable Objects 库存扣减;
- Queues 异步日志;
- 源站回源;
- 安全响应头。
六、wrangler.toml 配置
name = "cloudflare-high-concurrency-demo"
main = "src/index.js"
compatibility_date = "2024-09-01"
workers_dev = true
[[kv_namespaces]]
binding = "CONFIG_KV"
id = "your_kv_namespace_id"
[[queues.producers]]
binding = "LOG_QUEUE"
queue = "access-log-queue"
[[queues.consumers]]
queue = "access-log-queue"
max_batch_size = 20
max_batch_timeout = 5
[[durable_objects.bindings]]
name = "STOCK_DO"
class_name = "StockDurableObject"
[[migrations]]
tag = "v1"
new_classes = ["StockDurableObject"]
[vars]
ORIGIN_API = "https://api.example.com"
CACHE_TTL = "60"
RATE_LIMIT = "100"
说明:
CONFIG_KV:用于存储配置;LOG_QUEUE:用于异步写入日志;STOCK_DO:用于秒杀库存扣减;ORIGIN_API:源站地址;CACHE_TTL:接口缓存时间;RATE_LIMIT:单 IP 简单限流阈值。
七、核心源码:Worker 入口
文件:src/index.js
import { StockDurableObject } from "./durable-object.js";
export { StockDurableObject };
export default {
async fetch(request, env, ctx) {
const startTime = Date.now();
const url = new URL(request.url);
try {
// 1. 基础安全检查
const securityResult = await securityCheck(request, env);
if (!securityResult.pass) {
return jsonResponse({
code: 403,
message: securityResult.message
}, 403);
}
// 2. 路由分发
let response;
if (url.pathname === "/api/health") {
response = jsonResponse({
code: 0,
message: "ok",
timestamp: Date.now()
});
} else if (url.pathname === "/api/config") {
response = await handleConfig(request, env);
} else if (url.pathname === "/api/products") {
response = await handleCacheProxy(request, env, ctx);
} else if (url.pathname === "/api/seckill") {
response = await handleSeckill(request, env);
} else {
response = await handleOriginProxy(request, env);
}
// 3. 异步日志,不阻塞用户响应
ctx.waitUntil(writeAccessLog(request, env, response, startTime));
// 4. 添加安全响应头
return addSecurityHeaders(response);
} catch (error) {
console.error("Worker Error:", error);
ctx.waitUntil(writeErrorLog(request, env, error, startTime));
return jsonResponse({
code: 500,
message: "Internal Server Error"
}, 500);
}
},
async queue(batch, env) {
for (const message of batch.messages) {
try {
const log = message.body;
// 实际项目中可以写入 D1、R2、外部日志系统或数据库
console.log("consume log:", JSON.stringify(log));
message.ack();
} catch (error) {
console.error("queue consume error:", error);
message.retry();
}
}
}
};
八、安全检查与限流
下面是一个简单的边缘安全检查示例。生产环境中可以结合 Cloudflare WAF、Turnstile、Rate Limiting 等能力实现更完整的策略。
async function securityCheck(request, env) {
const url = new URL(request.url);
const ip = request.headers.get("CF-Connecting-IP") || "unknown";
const userAgent = request.headers.get("User-Agent") || "";
// 拦截空 UA 的异常请求
if (!userAgent || userAgent.length < 5) {
return {
pass: false,
message: "Invalid User-Agent"
};
}
// 禁止访问敏感路径
const forbiddenPaths = [
"/.env",
"/wp-admin",
"/phpmyadmin",
"/server-status"
];
if (forbiddenPaths.some(path => url.pathname.startsWith(path))) {
return {
pass: false,
message: "Forbidden Path"
};
}
// 从 KV 读取黑名单
const blacklist = await env.CONFIG_KV.get("ip_blacklist", "json");
if (Array.isArray(blacklist) && blacklist.includes(ip)) {
return {
pass: false,
message: "IP Blocked"
};
}
return {
pass: true
};
}
这里使用 KV 存储 IP 黑名单,例如:
[
"1.1.1.1",
"2.2.2.2"
]
需要注意的是,KV 不适合做毫秒级强一致限流。如果需要严格限流,应结合 Durable Objects 或 Cloudflare 原生 Rate Limiting。
九、配置接口:KV 读取示例
async function handleConfig(request, env) {
const siteConfig = await env.CONFIG_KV.get("site_config", "json");
return jsonResponse({
code: 0,
data: siteConfig || {
siteName: "Cloudflare 高并发示例站点",
maintenance: false,
version: "1.0.0"
}
});
}
KV 中可以保存如下配置:
{
"siteName": "高并发活动平台",
"maintenance": false,
"version": "1.0.0",
"features": {
"seckill": true,
"ranking": true
}
}
这样我们无需重新部署 Worker,就可以通过修改 KV 配置动态控制业务。
十、热点接口缓存代理
对于 /api/products 这类高频访问接口,可以使用 Cache API 缓存。
async function handleCacheProxy(request, env, ctx) {
const cache = caches.default;
const url = new URL(request.url);
// 只缓存 GET 请求
if (request.method !== "GET") {
return handleOriginProxy(request, env);
}
const cacheKey = new Request(url.toString(), request);
const cachedResponse = await cache.match(cacheKey);
if (cachedResponse) {
const response = new Response(cachedResponse.body, cachedResponse);
response.headers.set("X-Cache", "HIT");
return response;
}
const originUrl = env.ORIGIN_API + url.pathname + url.search;
const originResponse = await fetch(originUrl, {
method: request.method,
headers: buildOriginHeaders(request)
});
if (!originResponse.ok) {
return originResponse;
}
const ttl = Number(env.CACHE_TTL || 60);
const responseBody = await originResponse.text();
const response = new Response(responseBody, {
status: originResponse.status,
headers: {
"Content-Type": originResponse.headers.get("Content-Type") || "application/json",
"Cache-Control": `public, max-age=${ttl}`
}
});
response.headers.set("X-Cache", "MISS");
ctx.waitUntil(cache.put(cacheKey, response.clone()));
return response;
}
这段代码的逻辑是:
- GET 请求才走缓存;
- 先查询 Cloudflare 边缘缓存;
- 命中则直接返回;
- 未命中则请求源站;
- 成功后异步写入缓存;
- 下次请求直接从边缘节点返回。
这样可以把大量热点读取请求挡在边缘层。
十一、普通请求回源代理
如果不是热点接口,可以直接代理到源站。
async function handleOriginProxy(request, env) {
const url = new URL(request.url);
const originUrl = env.ORIGIN_API + url.pathname + url.search;
const originRequest = new Request(originUrl, {
method: request.method,
headers: buildOriginHeaders(request),
body: request.method === "GET" || request.method === "HEAD"
? undefined
: request.body
});
return fetch(originRequest);
}
function buildOriginHeaders(request) {
const headers = new Headers(request.headers);
headers.set("X-Forwarded-By", "Cloudflare-Worker");
headers.set("X-Real-IP", request.headers.get("CF-Connecting-IP") || "");
// 避免把客户端 Host 传到源站导致路由异常
headers.delete("Host");
return headers;
}
这里 Worker 相当于一个边缘 API 网关。源站可以隐藏在 Cloudflare 后面,减少暴露风险。
十二、秒杀库存:Durable Objects 源码
文件:src/durable-object.js
export class StockDurableObject {
constructor(state, env) {
this.state = state;
this.env = env;
}
async fetch(request) {
const url = new URL(request.url);
if (url.pathname === "/init") {
return this.initStock(request);
}
if (url.pathname === "/deduct") {
return this.deductStock(request);
}
if (url.pathname === "/status") {
return this.getStatus();
}
return new Response("Not Found", { status: 404 });
}
async initStock(request) {
const body = await request.json();
const stock = Number(body.stock || 0);
if (stock < 0) {
return jsonResponse({
code: 400,
message: "Invalid stock"
}, 400);
}
await this.state.storage.put("stock", stock);
return jsonResponse({
code: 0,
message: "Stock initialized",
stock
});
}
async deductStock(request) {
const body = await request.json();
const userId = body.userId;
const quantity = Number(body.quantity || 1);
if (!userId) {
return jsonResponse({
code: 400,
message: "Missing userId"
}, 400);
}
if (quantity <= 0) {
return jsonResponse({
code: 400,
message: "Invalid quantity"
}, 400);
}
// 防止同一用户重复抢购
const userKey = `user:${userId}`;
const hasBought = await this.state.storage.get(userKey);
if (hasBought) {
return jsonResponse({
code: 409,
message: "User already purchased"
}, 409);
}
let stock = await this.state.storage.get("stock");
stock = Number(stock || 0);
if (stock < quantity) {
return jsonResponse({
code: sold_out_code(),
message: "Sold out",
stock
}, 200);
}
stock -= quantity;
await this.state.storage.put("stock", stock);
await this.state.storage.put(userKey, {
quantity,
time: Date.now()
});
return jsonResponse({
code: 0,
message: "Success",
stock
});
}
async getStatus() {
const stock = await this.state.storage.get("stock");
return jsonResponse({
code: 0,
stock: Number(stock || 0)
});
}
}
function sold_out_code() {
return 1001;
}
function jsonResponse(data, status = 200) {
return new Response(JSON.stringify(data), {
status,
headers: {
"Content-Type": "application/json;charset=UTF-8"
}
});
}
Durable Objects 的关键点是:同一个对象实例会串行处理请求。因此对于同一个商品库存,只要所有扣减请求都进入同一个 Durable Object,就可以避免并发超卖。
十三、秒杀接口调用 Durable Objects
在 src/index.js 中增加:
async function handleSeckill(request, env) {
if (request.method !== "POST") {
return jsonResponse({
code: 405,
message: "Method Not Allowed"
}, 405);
}
const body = await request.json();
const productId = body.productId;
const userId = body.userId;
const quantity = Number(body.quantity || 1);
if (!productId || !userId) {
return jsonResponse({
code: 400,
message: "Missing productId or userId"
}, 400);
}
// 一个商品对应一个 Durable Object
const objectId = env.STOCK_DO.idFromName(`product:${productId}`);
const object = env.STOCK_DO.get(objectId);
const doRequest = new Request("https://stock.durable-object/deduct", {
method: "POST",
body: JSON.stringify({
userId,
quantity
}),
headers: {
"Content-Type": "application/json"
}
});
return object.fetch(doRequest);
}
初始化库存可以通过临时接口、后台接口或 Wrangler 调用完成。例如:
async function initProductStock(env, productId, stock) {
const objectId = env.STOCK_DO.idFromName(`product:${productId}`);
const object = env.STOCK_DO.get(objectId);
const request = new Request("https://stock.durable-object/init", {
method: "POST",
body: JSON.stringify({ stock }),
headers: {
"Content-Type": "application/json"
}
});
return object.fetch(request);
}
十四、异步日志写入 Queues
高并发场景下,日志写入非常容易拖慢主请求。如果每个请求都同步写数据库,数据库压力会非常大。
建议使用 Queues 异步处理。
async function writeAccessLog(request, env, response, startTime) {
const url = new URL(request.url);
const log = {
type: "access",
path: url.pathname,
query: url.search,
method: request.method,
status: response.status,
ip: request.headers.get("CF-Connecting-IP") || "",
country: request.cf?.country || "",
colo: request.cf?.colo || "",
userAgent: request.headers.get("User-Agent") || "",
cost: Date.now() - startTime,
time: new Date().toISOString()
};
await env.LOG_QUEUE.send(log);
}
async function writeErrorLog(request, env, error, startTime) {
const url = new URL(request.url);
const log = {
type: "error",
path: url.pathname,
method: request.method,
message: error.message,
stack: error.stack,
cost: Date.now() - startTime,
time: new Date().toISOString()
};
await env.LOG_QUEUE.send(log);
}
在 Worker 主逻辑中通过:
ctx.waitUntil(writeAccessLog(request, env, response, startTime));
可以让日志写入不阻塞用户响应。
十五、统一 JSON 响应与安全响应头
function jsonResponse(data, status = 200) {
return new Response(JSON.stringify(data), {
status,
headers: {
"Content-Type": "application/json;charset=UTF-8"
}
});
}
function addSecurityHeaders(response) {
const newResponse = new Response(response.body, response);
newResponse.headers.set("X-Content-Type-Options", "nosniff");
newResponse.headers.set("X-Frame-Options", "DENY");
newResponse.headers.set("X-XSS-Protection", "1; mode=block");
newResponse.headers.set("Referrer-Policy", "strict-origin-when-cross-origin");
newResponse.headers.set("Server", "Cloudflare Workers");
return newResponse;
}
安全响应头虽然不能解决所有问题,但可以减少常见的浏览器安全风险。
十六、完整 src/index.js 参考代码
import { StockDurableObject } from "./durable-object.js";
export { StockDurableObject };
export default {
async fetch(request, env, ctx) {
const startTime = Date.now();
const url = new URL(request.url);
try {
const securityResult = await securityCheck(request, env);
if (!securityResult.pass) {
return jsonResponse({
code: 403,
message: securityResult.message
}, 403);
}
let response;
if (url.pathname === "/api/health") {
response = jsonResponse({
code: 0,
message: "ok",
timestamp: Date.now()
});
} else if (url.pathname === "/api/config") {
response = await handleConfig(request, env);
} else if (url.pathname === "/api/products") {
response = await handleCacheProxy(request, env, ctx);
} else if (url.pathname === "/api/seckill") {
response = await handleSeckill(request, env);
} else {
response = await handleOriginProxy(request, env);
}
ctx.waitUntil(writeAccessLog(request, env, response, startTime));
return addSecurityHeaders(response);
} catch (error) {
console.error("Worker Error:", error);
ctx.waitUntil(writeErrorLog(request, env, error, startTime));
return jsonResponse({
code: 500,
message: "Internal Server Error"
}, 500);
}
},
async queue(batch, env) {
for (const message of batch.messages) {
try {
console.log("consume log:", JSON.stringify(message.body));
message.ack();
} catch (error) {
console.error("queue consume error:", error);
message.retry();
}
}
}
};
async function securityCheck(request, env) {
const url = new URL(request.url);
const ip = request.headers.get("CF-Connecting-IP") || "unknown";
const userAgent = request.headers.get("User-Agent") || "";
if (!userAgent || userAgent.length < 5) {
return {
pass: false,
message: "Invalid User-Agent"
};
}
const forbiddenPaths = [
"/.env",
"/wp-admin",
"/phpmyadmin",
"/server-status"
];
if (forbiddenPaths.some(path => url.pathname.startsWith(path))) {
return {
pass: false,
message: "Forbidden Path"
};
}
const blacklist = await env.CONFIG_KV.get("ip_blacklist", "json");
if (Array.isArray(blacklist) && blacklist.includes(ip)) {
return {
pass: false,
message: "IP Blocked"
};
}
return {
pass: true
};
}
async function handleConfig(request, env) {
const siteConfig = await env.CONFIG_KV.get("site_config", "json");
return jsonResponse({
code: 0,
data: siteConfig || {
siteName: "Cloudflare 高并发示例站点",
maintenance: false,
version: "1.0.0"
}
});
}
async function handleCacheProxy(request, env, ctx) {
const cache = caches.default;
const url = new URL(request.url);
if (request.method !== "GET") {
return handleOriginProxy(request, env);
}
const cacheKey = new Request(url.toString(), request);
const cachedResponse = await cache.match(cacheKey);
if (cachedResponse) {
const response = new Response(cachedResponse.body, cachedResponse);
response.headers.set("X-Cache", "HIT");
return response;
}
const originUrl = env.ORIGIN_API + url.pathname + url.search;
const originResponse = await fetch(originUrl, {
method: request.method,
headers: buildOriginHeaders(request)
});
if (!originResponse.ok) {
return originResponse;
}
const ttl = Number(env.CACHE_TTL || 60);
const responseBody = await originResponse.text();
const response = new Response(responseBody, {
status: originResponse.status,
headers: {
"Content-Type": originResponse.headers.get("Content-Type") || "application/json",
"Cache-Control": `public, max-age=${ttl}`
}
});
response.headers.set("X-Cache", "MISS");
ctx.waitUntil(cache.put(cacheKey, response.clone()));
return response;
}
async function handleOriginProxy(request, env) {
const url = new URL(request.url);
const originUrl = env.ORIGIN_API + url.pathname + url.search;
const originRequest = new Request(originUrl, {
method: request.method,
headers: buildOriginHeaders(request),
body: request.method === "GET" || request.method === "HEAD"
? undefined
: request.body
});
return fetch(originRequest);
}
async function handleSeckill(request, env) {
if (request.method !== "POST") {
return jsonResponse({
code: 405,
message: "Method Not Allowed"
}, 405);
}
const body = await request.json();
const productId = body.productId;
const userId = body.userId;
const quantity = Number(body.quantity || 1);
if (!productId || !userId) {
return jsonResponse({
code: 400,
message: "Missing productId or userId"
}, 400);
}
const objectId = env.STOCK_DO.idFromName(`product:${productId}`);
const object = env.STOCK_DO.get(objectId);
const doRequest = new Request("https://stock.durable-object/deduct", {
method: "POST",
body: JSON.stringify({
userId,
quantity
}),
headers: {
"Content-Type": "application/json"
}
});
return object.fetch(doRequest);
}
function buildOriginHeaders(request) {
const headers = new Headers(request.headers);
headers.set("X-Forwarded-By", "Cloudflare-Worker");
headers.set("X-Real-IP", request.headers.get("CF-Connecting-IP") || "");
headers.delete("Host");
return headers;
}
async function writeAccessLog(request, env, response, startTime) {
const url = new URL(request.url);
const log = {
type: "access",
path: url.pathname,
query: url.search,
method: request.method,
status: response.status,
ip: request.headers.get("CF-Connecting-IP") || "",
country: request.cf?.country || "",
colo: request.cf?.colo || "",
userAgent: request.headers.get("User-Agent") || "",
cost: Date.now() - startTime,
time: new Date().toISOString()
};
await env.LOG_QUEUE.send(log);
}
async function writeErrorLog(request, env, error, startTime) {
const url = new URL(request.url);
const log = {
type: "error",
path: url.pathname,
method: request.method,
message: error.message,
stack: error.stack,
cost: Date.now() - startTime,
time: new Date().toISOString()
};
await env.LOG_QUEUE.send(log);
}
function jsonResponse(data, status = 200) {
return new Response(JSON.stringify(data), {
status,
headers: {
"Content-Type": "application/json;charset=UTF-8"
}
});
}
function addSecurityHeaders(response) {
const newResponse = new Response(response.body, response);
newResponse.headers.set("X-Content-Type-Options", "nosniff");
newResponse.headers.set("X-Frame-Options", "DENY");
newResponse.headers.set("X-XSS-Protection", "1; mode=block");
newResponse.headers.set("Referrer-Policy", "strict-origin-when-cross-origin");
newResponse.headers.set("Server", "Cloudflare Workers");
return newResponse;
}
十七、package.json
{
"name": "cloudflare-high-concurrency-demo",
"version": "1.0.0",
"description": "Cloudflare Workers high concurrency solution demo",
"main": "src/index.js",
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy",
"tail": "wrangler tail"
},
"dependencies": {},
"devDependencies": {
"wrangler": "^3.78.0"
}
}
十八、部署步骤
1. 安装 Wrangler
npm install -g wrangler
2. 登录 Cloudflare
wrangler login
3. 创建 KV
wrangler kv namespace create CONFIG_KV
将返回的 id 填入 wrangler.toml。
4. 创建 Queue
wrangler queues create access-log-queue
5. 部署 Worker
npm install
npm run deploy
6. 查看日志
npm run tail
十九、性能优化建议
1. 缓存优先
高并发系统最有效的优化永远是缓存。建议对读接口进行分类:
- 秒级变化:缓存 5 到 30 秒;
- 分钟级变化:缓存 1 到 10 分钟;
- 小时级变化:缓存 1 到 24 小时;
- 几乎不变:长期缓存。
即使只缓存 10 秒,也能在突发流量中大幅降低源站压力。
2. 限流前置
不要等请求进入数据库后才判断是否非法。限流应该尽量前置:
- Cloudflare WAF;
- Cloudflare Rate Limiting;
- Worker 边缘判断;
- Durable Objects 精确限流;
- 源站兜底限流。
3. 读写分离
高并发系统中,读请求与写请求要分开处理。
读请求:
- CDN;
- Cache API;
- KV;
- R2。
写请求:
- Durable Objects;
- Queues;
- D1;
- 源站数据库。
不要让所有请求都直接进入数据库。
4. 异步化
用户不关心的操作尽量异步化,例如:
- 发送邮件;
- 写访问日志;
- 统计 PV;
- 同步第三方系统;
- 更新排行榜;
- 消息通知。
这些操作都可以放入队列中处理。
5. 熔断与降级
当源站不可用时,Worker 可以返回缓存数据或降级页面。例如:
async function handleWithFallback(request, env, ctx) {
try {
return await handleOriginProxy(request, env);
} catch (error) {
const fallback = await env.CONFIG_KV.get("fallback_message");
return jsonResponse({
code: 503,
message: fallback || "服务繁忙,请稍后再试"
}, 503);
}
}
在大促、活动、突发热点期间,降级策略非常重要。系统不一定要在任何情况下都返回完整功能,但一定要避免整体崩溃。
二十、常见问题
1. KV 能不能做库存扣减?
不建议。KV 是最终一致的,适合读多写少的数据。库存扣减、余额变更、订单状态等强一致业务不适合使用 KV。
2. Durable Objects 能不能承载无限并发?
不能。Durable Objects 适合处理某个热点资源的串行状态,但如果所有请求都打到同一个对象,也会形成单点热点。实际设计中应按商品、活动、用户、房间等维度拆分对象。
3. Worker 是否可以完全替代后端?
对于轻量业务可以,例如短链接、配置接口、静态站点 API、边缘鉴权等。但对于复杂业务,如复杂事务、财务系统、复杂报表,仍建议保留后端服务。
4. Cache API 和 KV 有什么区别?
Cache API 更适合缓存 HTTP 响应,常用于接口和页面缓存;KV 更适合键值配置和小数据读取。Cache API 通常与请求 URL 绑定,KV 则更像一个全局键值数据库。
5. 高并发下如何避免缓存击穿?
可以采用:
- 较短 TTL;
- 热点数据预热;
- stale-while-revalidate;
- Worker 中加请求合并;
- 源站限流;
- 降级兜底。
二十一、总结
Cloudflare 高并发解决方案的核心不是单点性能,而是利用全球边缘网络将压力分散处理。
一套比较完整的方案可以概括为:
- CDN / Cache API 承担静态资源和热点接口缓存;
- KV 存储读多写少的配置和映射数据;
- Durable Objects 处理强一致状态,例如库存和计数器;
- Queues 处理异步任务,实现削峰填谷;
- WAF / Rate Limiting 在入口层拦截恶意流量;
- Worker 作为边缘 API 网关,完成鉴权、路由、缓存、代理和降级;
- 源站数据库 只处理真正需要强事务和复杂计算的请求。
通过这套架构,系统可以在面对突发流量时保持更好的稳定性和响应速度,同时降低源站成本和运维复杂度。
如果你的业务正在面临高并发压力,尤其是读多写少、全球访问、活动突发流量等场景,Cloudflare Workers 生态是一套非常值得尝试的解决方案。