2026 用 Cloudflare 搭建自动化工作流:从 Worker、队列到数据库与定时任务
Cloudflare 工作流自动化教程|2026 最新版
适用对象:开发者、运维工程师、SaaS 创业团队、站长、自动化爱好者
关键词:Cloudflare Workers、Cloudflare Workflows、Queues、R2、D1、KV、Cron Triggers、API 自动化、Serverless
一、为什么要用 Cloudflare 做工作流自动化?
过去几年,Cloudflare 已经不再只是一个 CDN、DNS 或 WAF 服务商,而是逐渐发展成一个完整的边缘计算与开发平台。通过 Cloudflare Workers、Queues、R2、D1、KV、Durable Objects、Workflows、Cron Triggers 等产品,开发者可以在不维护传统服务器的情况下,搭建稳定、高性能、可扩展的自动化系统。
所谓“工作流自动化”,可以理解为:
将一系列原本需要人工执行的步骤,通过程序、触发器、队列、数据库和第三方 API 自动串联起来,让系统自动完成任务。
例如:
- 网站有新用户注册后,自动写入数据库、发送欢迎邮件、同步到 CRM;
- 每天定时抓取数据,清洗后保存到 R2 或 D1;
- 用户上传图片后,自动压缩、转存、生成访问链接;
- 电商订单支付成功后,自动通知仓库、发送短信、生成发票;
- API 请求量过高时,自动加入队列,异步处理,避免系统崩溃;
- 监控网站状态,异常时自动发送告警到 Telegram、Slack 或企业微信。
相比传统服务器方案,Cloudflare 的优势主要有:
- 无需维护服务器:不用配置 Nginx、Linux、防火墙、负载均衡等基础设施。
- 全球边缘节点执行:请求可以在离用户最近的位置处理,延迟更低。
- 按量计费:适合个人项目、创业项目和中小型业务。
- 生态完整:Workers 负责逻辑,Queues 负责异步任务,R2 负责对象存储,D1 负责关系型数据库,KV 负责高速键值存储。
- 天然适合自动化:支持定时任务、Webhook、API 调用、事件驱动和队列消费。
本文将以 2026 年的 Cloudflare 开发体系为基础,系统讲解如何搭建一套完整的工作流自动化方案。
二、Cloudflare 自动化核心组件介绍
在开始实战之前,我们先了解几个常用组件。
1. Cloudflare Workers
Workers 是 Cloudflare 的无服务器运行环境,可以理解为运行在 Cloudflare 边缘网络上的 JavaScript / TypeScript 函数。
它常用于:
- 接收 Webhook;
- 处理 API 请求;
- 调用第三方接口;
- 写入数据库;
- 访问对象存储;
- 执行权限校验;
- 作为自动化工作流的入口。
一个最简单的 Worker 示例:
export default {
async fetch(request, env, ctx) {
return new Response("Hello Cloudflare Workflow!");
}
}
2. Cron Triggers
Cron Triggers 用来执行定时任务。例如每天凌晨同步数据、每小时检测网站状态、每 5 分钟处理报表等。
配置示例:
[triggers]
crons = ["0 0 * * *"]
表示每天 UTC 时间 0 点执行一次。
3. Cloudflare Queues
Queues 是消息队列服务,非常适合处理异步任务。
例如用户提交订单后,不必在 HTTP 请求中立即完成所有任务,而是先把任务放入队列:
- 发送邮件;
- 生成发票;
- 通知仓库;
- 同步 CRM;
- 记录日志。
这样可以提高接口响应速度,也能避免第三方 API 暂时不可用时导致主流程失败。
4. Cloudflare R2
R2 是 Cloudflare 的对象存储服务,类似 AWS S3,但没有出站流量费用,非常适合存放:
- 图片;
- 视频;
- 日志文件;
- 备份文件;
- 报表文件;
- 用户上传内容。
5. Cloudflare D1
D1 是 Cloudflare 提供的 Serverless SQLite 数据库,适合存储结构化数据,例如:
- 用户信息;
- 订单记录;
- 任务状态;
- 自动化执行日志;
- API 调用记录。
6. Cloudflare KV
KV 是键值存储,适合读多写少、需要快速访问的场景,例如:
- 配置项;
- 用户 Token;
- 缓存结果;
- 临时状态;
- 自动化开关。
7. Cloudflare Workflows
Cloudflare Workflows 是专门用于编排长时间运行、多步骤任务的服务。相比普通 Worker,它更适合需要状态管理、重试、暂停、恢复的自动化流程。
例如:
- 接收用户上传文件;
- 写入任务记录;
- 调用 AI 接口分析文件;
- 等待处理结果;
- 保存结果到数据库;
- 发送通知;
- 标记任务完成。
这类流程如果全部写在一个 Worker 请求里,很容易超时或难以维护,而 Workflows 更适合作为编排层。
三、准备工作:创建 Cloudflare 自动化项目
1. 安装 Node.js
建议使用 Node.js LTS 版本。可以通过以下命令检查:
node -v
npm -v
2. 安装 Wrangler
Wrangler 是 Cloudflare Workers 的官方命令行工具。
npm install -g wrangler
登录 Cloudflare:
wrangler login
登录后会打开浏览器授权,授权成功后即可在本地管理 Workers、D1、R2、Queues 等资源。
3. 创建 Workers 项目
npm create cloudflare@latest cloudflare-workflow-demo
进入项目目录:
cd cloudflare-workflow-demo
本地运行:
npm run dev
如果看到本地服务正常启动,说明项目初始化成功。
四、实战案例:搭建一个“订单自动处理工作流”
下面我们设计一个常见的自动化场景:
用户提交订单后,系统自动记录订单、放入队列、异步发送通知、生成日志,并支持定时检查未完成订单。
整体流程如下:
用户提交订单
↓
Worker 接收请求
↓
写入 D1 数据库
↓
发送消息到 Queue
↓
Queue Consumer 异步处理订单
↓
调用第三方通知接口
↓
更新订单状态
↓
保存日志到 R2 或 D1
五、创建 D1 数据库
1. 创建数据库
wrangler d1 create workflow_demo_db
执行后会返回数据库信息,例如:
[[d1_databases]]
binding = "DB"
database_name = "workflow_demo_db"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
把它添加到 wrangler.toml 中。
2. 创建数据表
新建 schema.sql:
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id TEXT NOT NULL UNIQUE,
user_email TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS workflow_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id TEXT NOT NULL,
message TEXT NOT NULL,
created_at TEXT NOT NULL
);
执行迁移:
wrangler d1 execute workflow_demo_db --file=./schema.sql
六、创建 Queue 消息队列
1. 创建队列
wrangler queues create order-processing-queue
2. 配置 wrangler.toml
[[queues.producers]]
binding = "ORDER_QUEUE"
queue = "order-processing-queue"
[[queues.consumers]]
queue = "order-processing-queue"
这个配置表示:
- Worker 可以通过
env.ORDER_QUEUE向队列发送消息; - 当前 Worker 同时也是这个队列的消费者。
七、编写订单提交接口
在 src/index.js 中编写 Worker 入口。
export default {
async fetch(request, env, ctx) {
const url = new URL(request.url);
if (request.method === "POST" && url.pathname === "/orders") {
return await createOrder(request, env);
}
return new Response("Not Found", { status: 404 });
},
async queue(batch, env, ctx) {
for (const message of batch.messages) {
await processOrder(message.body, env);
message.ack();
}
}
};
async function createOrder(request, env) {
try {
const body = await request.json();
if (!body.order_id || !body.user_email || !body.amount) {
return Response.json(
{ error: "order_id、user_email 和 amount 为必填字段" },
{ status: 400 }
);
}
const now = new Date().toISOString();
await env.DB.prepare(`
INSERT INTO orders (order_id, user_email, amount, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
`).bind(
body.order_id,
body.user_email,
body.amount,
"pending",
now,
now
).run();
await env.ORDER_QUEUE.send({
order_id: body.order_id,
user_email: body.user_email,
amount: body.amount
});
await env.DB.prepare(`
INSERT INTO workflow_logs (order_id, message, created_at)
VALUES (?, ?, ?)
`).bind(
body.order_id,
"订单已创建,并加入处理队列",
now
).run();
return Response.json({
success: true,
message: "订单已创建,正在异步处理",
order_id: body.order_id
});
} catch (error) {
return Response.json(
{ error: error.message },
{ status: 500 }
);
}
}
async function processOrder(order, env) {
const now = new Date().toISOString();
try {
await env.DB.prepare(`
INSERT INTO workflow_logs (order_id, message, created_at)
VALUES (?, ?, ?)
`).bind(
order.order_id,
"开始处理订单",
now
).run();
// 模拟调用第三方通知接口
// 实际项目中可以替换为邮件、短信、Slack、企业微信等 API
await fakeNotify(order);
await env.DB.prepare(`
UPDATE orders
SET status = ?, updated_at = ?
WHERE order_id = ?
`).bind(
"completed",
new Date().toISOString(),
order.order_id
).run();
await env.DB.prepare(`
INSERT INTO workflow_logs (order_id, message, created_at)
VALUES (?, ?, ?)
`).bind(
order.order_id,
"订单处理完成",
new Date().toISOString()
).run();
} catch (error) {
await env.DB.prepare(`
UPDATE orders
SET status = ?, updated_at = ?
WHERE order_id = ?
`).bind(
"failed",
new Date().toISOString(),
order.order_id
).run();
await env.DB.prepare(`
INSERT INTO workflow_logs (order_id, message, created_at)
VALUES (?, ?, ?)
`).bind(
order.order_id,
`订单处理失败:${error.message}`,
new Date().toISOString()
).run();
throw error;
}
}
async function fakeNotify(order) {
console.log(`通知用户 ${order.user_email},订单 ${order.order_id} 已处理`);
}
八、测试订单自动化流程
本地开发模式下运行:
npm run dev
发送测试请求:
curl -X POST http://localhost:8787/orders \
-H "Content-Type: application/json" \
-d '{
"order_id": "ORD-2026-0001",
"user_email": "user@example.com",
"amount": 199
}'
如果返回:
{
"success": true,
"message": "订单已创建,正在异步处理",
"order_id": "ORD-2026-0001"
}
说明接口已经成功接收订单并把任务放入队列。
可以查询数据库:
wrangler d1 execute workflow_demo_db --command="SELECT * FROM orders;"
查看日志:
wrangler d1 execute workflow_demo_db --command="SELECT * FROM workflow_logs;"
九、添加定时任务:自动检查异常订单
很多自动化工作流都需要定时补偿机制。例如订单长时间处于 pending 状态,可能说明队列消费失败、第三方接口异常或数据状态未正确更新。
我们可以使用 Cron Triggers 定时扫描异常订单。
1. 配置 Cron
在 wrangler.toml 中添加:
[triggers]
crons = ["*/10 * * * *"]
表示每 10 分钟执行一次。
2. 添加 scheduled 处理函数
修改 Worker:
export default {
async fetch(request, env, ctx) {
// HTTP 入口
},
async queue(batch, env, ctx) {
// 队列入口
},
async scheduled(event, env, ctx) {
ctx.waitUntil(checkPendingOrders(env));
}
};
async function checkPendingOrders(env) {
const result = await env.DB.prepare(`
SELECT order_id, user_email, created_at
FROM orders
WHERE status = 'pending'
LIMIT 50
`).all();
for (const order of result.results) {
await env.DB.prepare(`
INSERT INTO workflow_logs (order_id, message, created_at)
VALUES (?, ?, ?)
`).bind(
order.order_id,
"定时任务检测到订单仍处于 pending 状态",
new Date().toISOString()
).run();
await env.ORDER_QUEUE.send({
order_id: order.order_id,
user_email: order.user_email
});
}
}
这样,即使某些任务因为异常没有被正常处理,定时任务也可以重新加入队列,形成自动补偿。
十、集成第三方通知:以 Telegram 为例
自动化工作流中最常见的需求之一是发送通知。下面以 Telegram Bot 为例。
1. 保存密钥
不要把 Token 写死在代码中。使用 Wrangler Secret:
wrangler secret put TELEGRAM_BOT_TOKEN
wrangler secret put TELEGRAM_CHAT_ID
2. 编写通知函数
async function sendTelegramMessage(env, text) {
const url = `https://api.telegram.org/bot${env.TELEGRAM_BOT_TOKEN}/sendMessage`;
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
chat_id: env.TELEGRAM_CHAT_ID,
text
})
});
if (!response.ok) {
throw new Error(`Telegram 通知失败:${response.status}`);
}
return await response.json();
}
在订单处理成功后调用:
await sendTelegramMessage(
env,
`订单 ${order.order_id} 已处理完成,用户:${order.user_email}`
);
这样,每当订单处理完成,你就可以在 Telegram 中收到通知。
十一、使用 R2 保存报表或日志文件
如果你的自动化系统需要保存大量日志、报表、导出文件,建议使用 R2,而不是全部写入 D1。
1. 创建 R2 Bucket
wrangler r2 bucket create workflow-logs
2. 配置 wrangler.toml
[[r2_buckets]]
binding = "LOG_BUCKET"
bucket_name = "workflow-logs"
3. 写入日志文件
async function saveLogToR2(env, orderId, content) {
const key = `orders/${orderId}/${Date.now()}.json`;
await env.LOG_BUCKET.put(
key,
JSON.stringify(content, null, 2),
{
httpMetadata: {
contentType: "application/json"
}
}
);
return key;
}
适合保存:
- 订单处理详情;
- 第三方 API 响应;
- 批量导入结果;
- 日报、周报、月报;
- 大体积调试日志。
十二、使用 KV 管理自动化开关
有时你可能希望临时关闭某个自动化流程,例如第三方接口维护、系统升级、活动结束等。
可以使用 KV 存储配置。
1. 创建 KV Namespace
wrangler kv namespace create WORKFLOW_CONFIG
将返回的配置添加到 wrangler.toml:
[[kv_namespaces]]
binding = "WORKFLOW_CONFIG"
id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
2. 设置开关
wrangler kv key put --binding=WORKFLOW_CONFIG "order_workflow_enabled" "true"
3. 在代码中读取
async function isWorkflowEnabled(env) {
const value = await env.WORKFLOW_CONFIG.get("order_workflow_enabled");
return value !== "false";
}
在创建订单时判断:
const enabled = await isWorkflowEnabled(env);
if (!enabled) {
return Response.json(
{ error: "订单自动化流程暂时关闭" },
{ status: 503 }
);
}
这种方式非常适合做:
- 功能开关;
- 灰度发布;
- 临时禁用流程;
- API 限流配置;
- 多环境配置管理。
十三、使用 Cloudflare Workflows 编排复杂任务
当自动化流程变得越来越复杂时,仅靠 Worker + Queue 可能会出现几个问题:
- 任务步骤太多,代码难以维护;
- 某些步骤需要等待较长时间;
- 需要在失败后从指定步骤重试;
- 需要记录每一步状态;
- 任务可能跨越多个 API、多个系统;
- 需要人工审批或延迟执行。
这时可以考虑使用 Cloudflare Workflows。
一个典型工作流可以设计为:
step 1:验证订单
step 2:扣减库存
step 3:调用支付确认接口
step 4:生成发票
step 5:发送邮件
step 6:同步 CRM
step 7:完成任务
Workflows 的价值在于:它不是简单地“执行一个函数”,而是帮助你管理多步骤状态,让自动化流程更可靠。
在复杂业务中,推荐采用以下架构:
Worker:负责接收请求和鉴权
D1:记录业务数据和状态
Queue:处理高并发异步任务
Workflow:编排长流程任务
R2:保存文件和日志
KV:保存配置和开关
Cron:执行定时检查和补偿
十四、安全最佳实践
自动化系统一旦暴露 API,就必须重视安全。
1. 校验请求来源
对于 Webhook 或第三方回调,必须验证签名。例如:
function verifyToken(request, env) {
const token = request.headers.get("X-Webhook-Token");
return token === env.WEBHOOK_SECRET;
}
2. 使用 Secret 管理敏感信息
不要在代码中写入:
- API Token;
- 数据库密码;
- Telegram Bot Token;
- Slack Webhook;
- 支付密钥。
统一使用:
wrangler secret put SECRET_NAME
3. 设置速率限制
可以结合 Cloudflare WAF、Rate Limiting 或代码级限流,避免接口被恶意刷爆。
4. 保留执行日志
每个自动化流程都应记录:
- 任务 ID;
- 执行步骤;
- 执行时间;
- 成功或失败状态;
- 错误信息;
- 第三方 API 返回摘要。
这样便于排查问题和审计。
5. 避免重复处理
队列和 Webhook 都可能出现重复投递,因此业务代码要具备幂等性。例如订单号唯一、任务状态判断、重复请求直接返回已有结果。
const existing = await env.DB.prepare(`
SELECT order_id FROM orders WHERE order_id = ?
`).bind(orderId).first();
if (existing) {
return Response.json({
success: true,
message: "订单已存在,无需重复创建"
});
}
十五、部署到 Cloudflare
完成开发后,执行:
wrangler deploy
部署成功后会得到一个线上地址,例如:
https://cloudflare-workflow-demo.yourname.workers.dev
测试线上接口:
curl -X POST https://cloudflare-workflow-demo.yourname.workers.dev/orders \
-H "Content-Type: application/json" \
-d '{
"order_id": "ORD-2026-0002",
"user_email": "test@example.com",
"amount": 299
}'
十六、常见问题与解决方案
1. Worker 请求超时怎么办?
如果任务执行时间较长,不要在 HTTP 请求中同步完成全部逻辑。推荐做法是:
- HTTP 请求只负责接收和校验;
- 将任务写入 Queue;
- 后台异步消费;
- 使用 D1 保存任务状态。
2. 第三方 API 调用失败怎么办?
建议使用:
- Queue 自动重试;
- D1 记录失败状态;
- Cron 定时补偿;
- Workflows 管理步骤级重试;
- 通知系统发送告警。
3. D1 适合存储大量日志吗?
D1 适合结构化业务数据,但如果日志量很大,建议把详细日志写入 R2,D1 只保存索引和状态。
4. KV 可以当数据库使用吗?
不建议。KV 更适合配置、缓存、简单键值数据。对于订单、用户、任务状态等结构化数据,建议使用 D1。
5. 如何避免队列重复消费?
业务层必须做幂等处理。例如:
- 使用唯一任务 ID;
- 处理前检查状态;
- 已完成任务直接跳过;
- 数据库使用唯一索引;
- 日志中记录处理结果。
十七、推荐自动化架构模板
如果你正在设计 Cloudflare 自动化系统,可以参考下面这个模板:
外部系统 / 用户请求
↓
Cloudflare Worker
↓
鉴权 / 参数校验 / 幂等判断
↓
D1 写入任务记录
↓
Queue 异步任务处理
↓
Workflows 编排复杂步骤
↓
调用第三方 API / AI 接口 / 支付接口
↓
R2 保存文件或日志
↓
D1 更新最终状态
↓
Telegram / Slack / 邮件通知
↓
Cron 定时检查和补偿
这个架构适合大多数自动化场景,包括:
- 订单处理系统;
- AI 内容生成流程;
- 网站监控告警;
- 数据采集与清洗;
- 自动报表生成;
- 用户注册自动化;
- 文件上传处理;
- 跨平台内容同步;
- SaaS 后台任务系统。
十八、总结
Cloudflare 在 2026 年已经具备了非常完整的 Serverless 自动化能力。对于大多数中小型项目来说,你完全可以不购买传统服务器,而是通过 Cloudflare Workers、Queues、D1、R2、KV、Cron Triggers 和 Workflows 搭建一套稳定、低成本、可扩展的自动化系统。
本文重点介绍了:
- Cloudflare 自动化的核心组件;
- 如何创建 Workers 项目;
- 如何使用 D1 保存订单和日志;
- 如何使用 Queue 异步处理任务;
- 如何使用 Cron 定时补偿;
- 如何集成 Telegram 通知;
- 如何使用 R2 保存报表和大日志;
- 如何使用 KV 管理自动化开关;
- 如何利用 Workflows 编排复杂流程;
- 自动化系统的安全和幂等最佳实践。
如果你只是做简单自动化,推荐从 Worker + Cron + D1 开始;如果任务量较大,加入 Queues;如果涉及文件和日志,加入 R2;如果流程复杂、步骤较多、需要状态恢复,则进一步引入 Cloudflare Workflows。
最终的核心原则是:
同步请求只做轻量工作,复杂任务交给队列和工作流;状态写入数据库,文件写入对象存储;失败可重试,流程可追踪,系统可恢复。
按照这个思路,你就可以用 Cloudflare 构建出一套真正适合生产环境的工作流自动化平台。