把定时任务搬到 Cloudflare:一套生产可用的自动化工作流实践
Cloudflare 工作流自动化教程|生产环境实测
在过去很长一段时间里,很多团队在做自动化任务时,常见方案是:服务器定时任务、GitHub Actions、Jenkins、Airflow、n8n、自建 Node.js 脚本,或者直接在业务系统里写一堆异步逻辑。
这些方案当然都能用,但一旦进入生产环境,就会遇到几个很现实的问题:
- 服务器需要维护;
- 定时任务失败后不好恢复;
- 异步任务链路过长,排查困难;
- 跨系统调用容易超时;
- 大批量任务需要限流和重试;
- 日志、告警、状态追踪成本较高;
- 海外业务对网络延迟和稳定性要求更高。
如果你的业务已经在使用 Cloudflare,例如 Workers、Pages、R2、D1、KV、Queues、Cron Triggers,那么用 Cloudflare 来做工作流自动化会是一个非常自然的选择。
本文将以生产环境实测角度,介绍如何使用 Cloudflare 体系搭建一套稳定的工作流自动化方案,适合用于:
- 定时同步数据;
- 批量处理订单;
- 自动生成报表;
- 内容抓取与清洗;
- Webhook 分发;
- AI 内容处理任务;
- 文件上传后的异步处理;
- 失败自动重试;
- 跨服务自动化编排。
说明:本文重点讲的是基于 Cloudflare Workers、Queues、Cron Triggers、R2、D1、KV 等能力组合实现工作流自动化。不同账号、套餐、区域以及 Cloudflare 产品更新情况可能有所差异,生产环境请以官方文档和实际控制台为准。
一、为什么选择 Cloudflare 做工作流自动化?
Cloudflare 给人的第一印象通常是 CDN、DNS、防护、WAF,但近几年 Cloudflare 的开发者平台已经越来越完整。尤其是 Cloudflare Workers,可以理解为运行在边缘网络上的 Serverless 运行环境。
和传统服务器相比,它有几个明显优势。
1. 不需要维护服务器
传统方式中,你可能需要购买 VPS、配置 Node.js / Python 环境、安装 PM2、配置 Nginx、写 crontab、做日志轮转、处理磁盘空间、监控进程存活。
而 Cloudflare Workers 是 Serverless 模式,你只需要关注代码本身,不需要关心服务器运维。
对于中小型团队来说,这一点非常重要。因为很多自动化任务本质上不是核心业务,却会消耗大量维护精力。
2. 全球边缘节点,网络质量好
如果你的自动化任务涉及海外 API,比如 Stripe、GitHub、OpenAI、Telegram、Shopify、Notion、Slack 等,Cloudflare Workers 的网络环境通常比普通国内服务器或廉价 VPS 更稳定。
生产环境里,网络质量直接影响任务成功率。尤其是 Webhook、支付回调、第三方接口同步场景,低延迟和稳定连接非常关键。
3. 原生支持定时任务
Cloudflare Workers 支持 Cron Triggers,可以定时执行 Worker,例如:
- 每分钟执行一次;
- 每小时执行一次;
- 每天凌晨执行;
- 每周固定时间执行。
这对于自动化非常实用。例如:
- 每天同步订单;
- 每小时生成统计数据;
- 每 10 分钟检查失败任务;
- 每天清理过期缓存;
- 定时调用接口拉取数据。
4. 可以结合队列做异步任务
Cloudflare Queues 可以用于消息队列场景。简单来说,你可以把任务先投递到队列,然后由消费者异步处理。
这在生产环境非常重要,因为很多任务不应该在 HTTP 请求中直接执行。
例如用户上传文件后,你不应该让用户等待文件压缩、识别、转码、AI 分析全部完成,而是应该:
- 用户上传文件;
- Worker 记录任务;
- 投递消息到 Queue;
- 后台异步处理;
- 处理完成后更新状态;
- 用户再查询结果。
5. 可以搭配 R2、D1、KV 使用
Cloudflare 提供了不少存储能力:
| 产品 | 适合场景 |
|---|---|
| KV | 简单键值缓存、配置、临时状态 |
| D1 | SQLite 数据库,适合结构化数据 |
| R2 | 对象存储,适合文件、图片、日志、报表 |
| Durable Objects | 强一致状态、连接管理、协同控制 |
| Queues | 异步任务队列 |
| Workers Analytics Engine | 分析数据写入与查询 |
使用这些能力可以构建一套完整的自动化系统。
二、生产环境中的典型工作流架构
我们先看一个实际可落地的工作流架构。
假设我们要实现一个“每日自动生成业务报表并发送通知”的任务,流程如下:
- 每天凌晨 2 点触发定时任务;
- 从业务 API 拉取订单数据;
- 清洗并统计数据;
- 将统计结果写入 D1;
- 将报表文件写入 R2;
- 发送通知到 Slack / 企业微信 / Telegram;
- 如果某一步失败,则自动重试;
- 失败超过次数后记录日志并告警。
对应到 Cloudflare,可以这样设计:
Cron Trigger
↓
Cloudflare Worker
↓
Cloudflare Queue
↓
Queue Consumer Worker
↓
D1 / R2 / KV
↓
Webhook 通知
为什么不让 Cron Trigger 直接完成所有任务?
因为生产环境中,任务可能会比较耗时。如果直接在定时触发器里完成全部逻辑,可能会遇到超时、重试困难、无法拆分任务等问题。
更合理的方式是:
- Cron Trigger 只负责生成任务;
- Queue 负责承载任务;
- Consumer 负责实际处理;
- D1 / KV 负责记录任务状态;
- R2 负责保存文件;
- Webhook 负责通知结果。
这种模式更加稳定,也更容易扩展。
三、准备工作
在开始之前,你需要准备以下内容:
- 一个 Cloudflare 账号;
- 已安装 Node.js;
- 已安装 Wrangler CLI;
- 一个 Cloudflare Worker 项目;
- 根据需要创建 D1、R2、Queue 等资源。
安装 Wrangler:
npm install -g wrangler
登录 Cloudflare:
wrangler login
创建 Worker 项目:
npm create cloudflare@latest cloudflare-workflow-demo
进入项目:
cd cloudflare-workflow-demo
本地启动:
npm run dev
部署:
npm run deploy
或者使用:
wrangler deploy
四、配置 Cron 定时任务
在 wrangler.toml 中配置定时任务:
name = "cloudflare-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-09-01"
[triggers]
crons = ["0 2 * * *"]
上面表示每天凌晨 2 点执行一次。
如果你想每 10 分钟执行一次:
[triggers]
crons = ["*/10 * * * *"]
Worker 中处理定时任务:
export default {
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
console.log("Cron triggered:", event.cron);
const task = {
type: "daily_report",
createdAt: new Date().toISOString(),
retry: 0
};
await env.REPORT_QUEUE.send(task);
},
async fetch(request: Request, env: Env, ctx: ExecutionContext) {
return new Response("Workflow service is running");
}
};
这里有一个关键点:定时任务触发后,不直接做复杂处理,而是把任务发送到队列。
五、配置 Cloudflare Queues
创建队列:
wrangler queues create report-queue
在 wrangler.toml 中绑定队列:
[[queues.producers]]
queue = "report-queue"
binding = "REPORT_QUEUE"
[[queues.consumers]]
queue = "report-queue"
max_batch_size = 5
max_batch_timeout = 30
队列消费者代码:
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
for (const message of batch.messages) {
try {
const task = message.body;
if (task.type === "daily_report") {
await handleDailyReport(task, env);
}
message.ack();
} catch (error) {
console.error("Task failed:", error);
message.retry();
}
}
}
};
async function handleDailyReport(task: any, env: Env) {
const data = await fetchOrderData(env);
const report = buildReport(data);
await saveReportToR2(report, env);
await saveReportRecordToD1(report, env);
await sendNotification(report, env);
}
这里使用了 message.ack() 和 message.retry()。
在生产环境中,务必明确任务成功和失败的处理逻辑,不要让任务“静默失败”。
六、使用 D1 记录任务状态
自动化工作流最常见的问题之一是:任务到底执行到哪一步了?
如果没有状态记录,出现问题时只能翻日志,非常痛苦。
建议为每个任务建立状态表。
创建 D1 数据库:
wrangler d1 create workflow-db
创建表结构:
CREATE TABLE workflow_tasks (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
status TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
payload TEXT,
result TEXT,
error_message TEXT,
created_at TEXT,
updated_at TEXT
);
绑定 D1:
[[d1_databases]]
binding = "DB"
database_name = "workflow-db"
database_id = "你的 database_id"
写入任务状态:
async function createTaskRecord(env: Env, task: any) {
const id = crypto.randomUUID();
const now = new Date().toISOString();
await env.DB.prepare(`
INSERT INTO workflow_tasks
(id, type, status, retry_count, payload, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`).bind(
id,
task.type,
"pending",
0,
JSON.stringify(task),
now,
now
).run();
return id;
}
更新任务状态:
async function updateTaskStatus(env: Env, id: string, status: string, result?: any, error?: any) {
await env.DB.prepare(`
UPDATE workflow_tasks
SET status = ?, result = ?, error_message = ?, updated_at = ?
WHERE id = ?
`).bind(
status,
result ? JSON.stringify(result) : null,
error ? String(error) : null,
new Date().toISOString(),
id
).run();
}
推荐使用以下状态:
| 状态 | 含义 |
|---|---|
| pending | 已创建,等待执行 |
| running | 正在执行 |
| success | 执行成功 |
| failed | 执行失败 |
| retrying | 等待重试 |
| cancelled | 已取消 |
生产环境中,任务状态非常重要。它不仅用于排查问题,也可以作为后台管理页面的数据来源。
七、使用 R2 保存报表和文件
如果你的工作流会生成文件,例如:
- CSV 报表;
- JSON 数据快照;
- PDF 文件;
- 图片处理结果;
- AI 分析结果;
- 日志归档文件;
那么建议使用 R2 存储。
创建 R2 Bucket:
wrangler r2 bucket create workflow-reports
绑定 R2:
[[r2_buckets]]
binding = "REPORT_BUCKET"
bucket_name = "workflow-reports"
保存报表:
async function saveReportToR2(report: any, env: Env) {
const date = new Date().toISOString().slice(0, 10);
const key = `reports/${date}/daily-report.json`;
await env.REPORT_BUCKET.put(
key,
JSON.stringify(report, null, 2),
{
httpMetadata: {
contentType: "application/json"
}
}
);
return key;
}
如果是 CSV:
function toCSV(rows: any[]) {
if (!rows.length) return "";
const headers = Object.keys(rows[0]);
const csv = [
headers.join(","),
...rows.map(row => headers.map(h => JSON.stringify(row[h] ?? "")).join(","))
].join("\n");
return csv;
}
上传 CSV:
await env.REPORT_BUCKET.put("reports/daily.csv", csv, {
httpMetadata: {
contentType: "text/csv"
}
});
八、失败重试与幂等设计
生产环境中,失败是常态,不是例外。
常见失败包括:
- 第三方 API 超时;
- 网络波动;
- 数据库写入失败;
- Webhook 发送失败;
- 任务执行超时;
- 队列重复投递;
- 请求被限流;
- 外部系统返回 500。
因此,工作流必须考虑两个核心问题:
- 如何重试?
- 如何避免重复执行造成脏数据?
1. 重试机制
Cloudflare Queues 本身支持失败重试,但你仍然应该在业务层做重试次数控制。
例如:
async function processTask(message: any, env: Env) {
const task = message.body;
try {
await updateTaskStatus(env, task.id, "running");
await handleDailyReport(task, env);
await updateTaskStatus(env, task.id, "success");
} catch (error) {
const retryCount = task.retryCount || 0;
if (retryCount >= 3) {
await updateTaskStatus(env, task.id, "failed", null, error);
throw error;
}
await updateTaskStatus(env, task.id, "retrying", null, error);
await env.REPORT_QUEUE.send({
...task,
retryCount: retryCount + 1
});
}
}
2. 幂等设计
所谓幂等,就是同一个任务执行多次,结果仍然一致,不会产生重复数据。
例如生成日报时,可以使用固定任务 ID:
const date = new Date().toISOString().slice(0, 10);
const taskId = `daily_report_${date}`;
插入数据库时使用 INSERT OR IGNORE:
INSERT OR IGNORE INTO workflow_tasks
(id, type, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?);
这样即使定时任务被重复触发,也不会重复创建任务。
对于报表文件,也可以使用固定路径:
reports/2025-01-01/daily-report.json
这样重复执行只会覆盖同一个文件,而不会生成一堆重复文件。
九、生产环境实测经验
下面是实际在生产环境使用 Cloudflare 自动化工作流时,总结出的经验。
1. 不要把所有逻辑写在一个 Worker 里
一开始很多人会把定时任务、HTTP 接口、队列消费、数据处理、通知发送全部写在同一个文件里。短期看很方便,长期维护会非常痛苦。
建议按模块拆分:
src/
index.ts
handlers/
cron.ts
queue.ts
http.ts
services/
report.ts
notification.ts
storage.ts
database.ts
utils/
retry.ts
logger.ts
这样的结构更适合生产维护。
2. 外部 API 一定要设置超时
不要直接裸写:
await fetch(url)
建议封装超时:
async function fetchWithTimeout(url: string, options: RequestInit = {}, timeout = 10000) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeout);
try {
return await fetch(url, {
...options,
signal: controller.signal
});
} finally {
clearTimeout(timer);
}
}
否则某些外部接口异常时,会拖慢整个任务。
3. 日志要包含 taskId
生产排查问题时,最重要的是能快速定位某个任务的完整链路。
建议所有日志都带上 taskId:
console.log(`[task:${task.id}] start processing`);
console.log(`[task:${task.id}] fetch orders success`);
console.log(`[task:${task.id}] report saved`);
这样在 Cloudflare Dashboard 里查看日志时更容易过滤。
4. Webhook 通知要降级处理
很多自动化任务最终会发送通知,例如 Slack、Telegram、企业微信、飞书。
但是通知失败不一定代表主任务失败。
例如报表已经生成成功,只是通知没发出去,这种情况可以单独记录通知失败,而不要把整个任务标记为失败。
建议区分主流程和附加流程:
try {
await sendNotification(report, env);
} catch (error) {
console.error("Notification failed:", error);
}
当然,如果通知本身就是关键业务流程,那就另当别论。
5. 注意 API 限流
很多第三方 API 都有限流,比如每分钟 60 次、每秒 10 次等。
如果队列消费速度太快,可能会导致大量请求被拒绝。
解决方法:
- 降低
max_batch_size; - 增加任务间隔;
- 对请求做分批;
- 在业务层实现简单限速;
- 失败后指数退避重试。
6. 避免大任务一次性执行
如果你要处理 10 万条数据,不建议一个任务全部处理完。
更好的方式是拆分成多个小任务。
例如:
sync_orders_2025_01_01_page_1
sync_orders_2025_01_01_page_2
sync_orders_2025_01_01_page_3
每个任务只处理一页数据。
这样即使某一页失败,也可以单独重试,而不是整个任务重来。
十、一个完整的工作流示例
下面给出一个简化版完整示例。
export default {
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
const date = new Date().toISOString().slice(0, 10);
const taskId = `daily_report_${date}`;
await env.DB.prepare(`
INSERT OR IGNORE INTO workflow_tasks
(id, type, status, retry_count, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
`).bind(
taskId,
"daily_report",
"pending",
0,
new Date().toISOString(),
new Date().toISOString()
).run();
await env.REPORT_QUEUE.send({
id: taskId,
type: "daily_report",
date,
retryCount: 0
});
console.log(`[task:${taskId}] queued`);
},
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
for (const message of batch.messages) {
const task = message.body;
try {
await env.DB.prepare(`
UPDATE workflow_tasks
SET status = ?, updated_at = ?
WHERE id = ?
`).bind("running", new Date().toISOString(), task.id).run();
const orders = await fetchOrders(task.date, env);
const report = buildDailyReport(orders);
const key = `reports/${task.date}/daily-report.json`;
await env.REPORT_BUCKET.put(key, JSON.stringify(report, null, 2), {
httpMetadata: {
contentType: "application/json"
}
});
await env.DB.prepare(`
UPDATE workflow_tasks
SET status = ?, result = ?, updated_at = ?
WHERE id = ?
`).bind(
"success",
JSON.stringify({ r2Key: key, total: report.total }),
new Date().toISOString(),
task.id
).run();
try {
await sendTelegramMessage(`日报生成成功:${task.date},订单数:${report.total}`, env);
} catch (noticeError) {
console.error(`[task:${task.id}] notice failed`, noticeError);
}
message.ack();
} catch (error) {
console.error(`[task:${task.id}] failed`, error);
const retryCount = task.retryCount || 0;
if (retryCount >= 3) {
await env.DB.prepare(`
UPDATE workflow_tasks
SET status = ?, error_message = ?, updated_at = ?
WHERE id = ?
`).bind(
"failed",
String(error),
new Date().toISOString(),
task.id
).run();
message.ack();
} else {
await env.DB.prepare(`
UPDATE workflow_tasks
SET status = ?, retry_count = ?, error_message = ?, updated_at = ?
WHERE id = ?
`).bind(
"retrying",
retryCount + 1,
String(error),
new Date().toISOString(),
task.id
).run();
await env.REPORT_QUEUE.send({
...task,
retryCount: retryCount + 1
});
message.ack();
}
}
}
},
async fetch(request: Request, env: Env) {
const url = new URL(request.url);
if (url.pathname === "/health") {
return Response.json({ status: "ok" });
}
return new Response("Not Found", { status: 404 });
}
};
async function fetchOrders(date: string, env: Env) {
const res = await fetch(`${env.API_BASE_URL}/orders?date=${date}`, {
headers: {
Authorization: `Bearer ${env.API_TOKEN}`
}
});
if (!res.ok) {
throw new Error(`Fetch orders failed: ${res.status}`);
}
return await res.json();
}
function buildDailyReport(orders: any[]) {
return {
total: orders.length,
amount: orders.reduce((sum, item) => sum + Number(item.amount || 0), 0),
generatedAt: new Date().toISOString()
};
}
async function sendTelegramMessage(text: string, env: Env) {
const url = `https://api.telegram.org/bot${env.TELEGRAM_BOT_TOKEN}/sendMessage`;
const res = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
chat_id: env.TELEGRAM_CHAT_ID,
text
})
});
if (!res.ok) {
throw new Error(`Telegram notice failed: ${res.status}`);
}
}
这个示例虽然简化,但已经包含生产环境中最重要的几个点:
- 定时触发;
- 队列异步处理;
- D1 记录状态;
- R2 保存结果;
- 通知发送;
- 失败重试;
- 幂等任务 ID;
- 健康检查接口。
十一、上线前检查清单
生产环境上线前,建议逐项检查:
- [ ] Cron 表达式是否正确;
- [ ] 时区是否符合预期;
- [ ] 队列是否绑定成功;
- [ ] D1 表结构是否已初始化;
- [ ] R2 Bucket 是否存在;
- [ ] 环境变量是否配置完整;
- [ ] API Token 是否使用 Secret 管理;
- [ ] 外部 API 是否设置超时;
- [ ] 任务是否具备幂等性;
- [ ] 重试次数是否有限制;
- [ ] 失败后是否有告警;
- [ ] 日志是否包含 taskId;
- [ ] 是否有
/health健康检查接口; - [ ] 是否有手动触发任务的入口;
- [ ] 是否区分测试环境和生产环境。
配置 Secret:
wrangler secret put API_TOKEN
wrangler secret put TELEGRAM_BOT_TOKEN
wrangler secret put TELEGRAM_CHAT_ID
不要把密钥直接写进代码或 wrangler.toml。
十二、适合 Cloudflare 自动化的场景
根据实测经验,Cloudflare 非常适合以下自动化任务:
1. Webhook 中转与分发
比如 GitHub、Stripe、Shopify、Paddle、Lemon Squeezy 的 Webhook,可以先进入 Worker,再分发到内部系统。
这样可以实现:
- 签名校验;
- 请求过滤;
- 重试;
- 日志记录;
- 安全隔离;
- 多目标转发。
2. 定时报表
每天生成销售日报、访问日报、订单日报,然后保存到 R2 并发送通知。
3. 文件异步处理
用户上传文件后,通过队列异步处理,例如:
- 图片压缩;
- JSON 清洗;
- CSV 导入;
- 日志分析;
- AI 摘要;
- 文档解析。
4. 轻量级数据同步
比如每小时从第三方系统拉取数据,写入 D1 或转发到业务服务器。
5. 自动告警
定时检测接口状态、库存数量、余额、证书过期时间、订单异常等,一旦触发条件就发送通知。
十三、不适合的场景
Cloudflare 虽然强大,但并不是所有任务都适合。
以下场景需要谨慎:
-
超长时间运行任务
如果任务需要连续运行几十分钟甚至几小时,Workers 并不是最佳选择。 -
重 CPU 计算任务
例如视频转码、大模型本地推理、大规模压缩计算,更适合专用服务器或云计算平台。 -
复杂 DAG 工作流
如果你需要非常复杂的任务依赖、可视化编排、人工审批、任务回滚,Airflow、Temporal、Prefect 等专业工作流系统可能更合适。 -
强事务数据库场景
D1 适合轻量结构化数据,但如果你的业务要求高并发复杂事务,仍然需要评估传统数据库。
十四、总结
Cloudflare 工作流自动化的核心思路不是“把所有事情都塞进 Worker”,而是充分利用 Cloudflare 平台能力进行拆分:
- 用 Cron Triggers 做定时触发;
- 用 Workers 做业务入口和调度;
- 用 Queues 做异步解耦;
- 用 D1 记录任务状态;
- 用 R2 保存文件结果;
- 用 KV 存储配置和轻量缓存;
- 用 Webhook 做通知和系统联动。
从生产环境实测来看,这套方案非常适合中小型自动化系统,尤其适合需要快速上线、低维护成本、全球网络稳定、与第三方 API 高频交互的业务。
但要想稳定运行,必须重视以下几点:
- 任务要拆小;
- 状态要可追踪;
- 失败要可重试;
- 重复执行要幂等;
- 日志要可定位;
- 密钥要安全管理;
- 关键任务要有告警。
如果你正在维护一堆服务器脚本、crontab 或 GitHub Actions 自动化任务,那么可以尝试把其中一部分迁移到 Cloudflare Workers 体系。
对于很多轻量到中等规模的自动化需求,Cloudflare 已经足够稳定、灵活,并且维护成本非常低。