用 Cloudflare Workers 搭一个定时巡检与告警工作流,完整源码实战
Cloudflare 工作流自动化教程|附源码
在现代 Web 项目中,很多任务并不适合直接放在用户请求链路里执行,例如:
- 定时抓取第三方接口数据;
- 用户注册后发送欢迎邮件;
- 订单支付后同步库存、发放权益;
- 定期生成报表;
- 监控接口状态并发送告警;
- 处理耗时较长、容易失败、需要重试的任务。
如果这些逻辑都写在普通 HTTP 接口中,一旦接口超时、第三方服务失败、服务器重启,就很容易导致任务中断或数据不一致。
Cloudflare 提供了一整套无服务器能力,例如 Workers、Cron Triggers、Queues、D1、KV、R2、Durable Objects 等。借助这些能力,我们可以在 Cloudflare 上搭建一套轻量、稳定、低成本的工作流自动化系统。
本文将以一个完整示例来讲解如何使用 Cloudflare Workers 实现工作流自动化,并附带完整源码。
一、本文要实现什么功能?
本文将实现一个「网站状态巡检与告警」自动化工作流。
整体流程如下:
- Cloudflare Cron 定时触发任务;
- Worker 读取需要监控的网站列表;
- 逐个请求目标网站;
- 记录检测结果;
- 如果网站异常,则发送告警通知;
- 提供 API 查询最近检测记录;
- 支持手动触发检测。
这个示例虽然简单,但它覆盖了工作流自动化中非常常见的几个核心能力:
- 定时任务;
- 异步执行;
- 状态记录;
- 异常处理;
- 重试机制;
- API 查询;
- 配置化管理。
最终我们会实现如下接口:
GET / 查看服务说明
GET /health 查看 Worker 健康状态
GET /records 查询最近检测记录
POST /run 手动触发巡检
二、技术架构说明
本教程使用以下 Cloudflare 产品:
| 产品 | 用途 |
|---|---|
| Cloudflare Workers | 运行服务端逻辑 |
| Cron Triggers | 定时触发工作流 |
| D1 Database | 存储网站配置和检测记录 |
| Wrangler | 本地开发和部署工具 |
架构图如下:
┌─────────────────────┐
│ Cloudflare Cron │
│ 定时触发 │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Cloudflare Worker │
│ 执行自动化逻辑 │
└──────────┬──────────┘
│
├── 读取站点配置
│
├── 请求目标网站
│
├── 写入检测结果
│
└── 发送告警通知
│
▼
┌─────────────────────┐
│ Cloudflare D1 │
│ 存储配置和记录 │
└─────────────────────┘
三、准备工作
在开始之前,你需要准备:
- 一个 Cloudflare 账号;
- 已安装 Node.js,推荐 Node.js 18 或以上;
- 已安装 Wrangler;
- 一个可以用于测试的网站地址;
- 可选:一个告警 Webhook 地址,例如企业微信、钉钉、飞书或自建接口。
安装 Wrangler:
npm install -g wrangler
登录 Cloudflare:
wrangler login
验证是否登录成功:
wrangler whoami
四、创建项目
使用 Wrangler 创建一个 Worker 项目:
npm create cloudflare@latest cf-workflow-demo
如果提示选择模板,可以选择:
Worker only
TypeScript
进入项目目录:
cd cf-workflow-demo
项目结构大致如下:
cf-workflow-demo
├── package.json
├── src
│ └── index.ts
├── tsconfig.json
└── wrangler.toml
五、创建 D1 数据库
Cloudflare D1 是一个基于 SQLite 的无服务器数据库,非常适合保存轻量级任务数据。
执行命令创建数据库:
wrangler d1 create workflow_demo_db
命令执行后,你会看到类似输出:
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
把这个 database_id 记下来,后面需要写入 wrangler.toml。
六、配置 wrangler.toml
打开 wrangler.toml,修改为如下内容:
name = "cf-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-09-23"
[triggers]
crons = ["*/10 * * * *"]
[[d1_databases]]
binding = "DB"
database_name = "workflow_demo_db"
database_id = "替换为你的 database_id"
[vars]
ALERT_WEBHOOK = ""
说明:
*/10 * * * *
表示每 10 分钟执行一次定时任务。
如果你不想发送告警,可以暂时把 ALERT_WEBHOOK 留空。
如果你希望使用飞书、钉钉、企业微信等 Webhook,可以把对应地址填进去。
七、创建数据库表结构
在项目根目录新建 schema.sql:
DROP TABLE IF EXISTS sites;
DROP TABLE IF EXISTS check_records;
CREATE TABLE sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
url TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE check_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL,
site_name TEXT NOT NULL,
url TEXT NOT NULL,
status TEXT NOT NULL,
status_code INTEGER,
response_time_ms INTEGER,
error_message TEXT,
checked_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO sites (name, url, enabled)
VALUES
('Cloudflare', 'https://www.cloudflare.com', 1),
('Example', 'https://example.com', 1);
执行本地初始化:
wrangler d1 execute workflow_demo_db --local --file=./schema.sql
执行远程初始化:
wrangler d1 execute workflow_demo_db --remote --file=./schema.sql
如果你只是本地测试,先执行 --local 即可。
如果你准备部署到线上,则需要执行 --remote。
八、完整源码
下面是完整的 src/index.ts 代码。
export interface Env {
DB: D1Database;
ALERT_WEBHOOK: string;
}
type Site = {
id: number;
name: string;
url: string;
enabled: number;
};
type CheckResult = {
siteId: number;
siteName: string;
url: string;
status: "success" | "failed";
statusCode: number | null;
responseTimeMs: number;
errorMessage: string | null;
};
function json(data: unknown, status = 200): Response {
return new Response(JSON.stringify(data, null, 2), {
status,
headers: {
"Content-Type": "application/json; charset=utf-8",
},
});
}
async function getEnabledSites(env: Env): Promise {
const result = await env.DB.prepare(
`
SELECT id, name, url, enabled
FROM sites
WHERE enabled = 1
ORDER BY id ASC
`
).all();
return result.results || [];
}
async function checkSite(site: Site): Promise {
const start = Date.now();
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 10000);
const response = await fetch(site.url, {
method: "GET",
signal: controller.signal,
headers: {
"User-Agent": "Cloudflare-Workflow-Demo/1.0",
},
});
clearTimeout(timeout);
const responseTimeMs = Date.now() - start;
const ok = response.status >= 200 && response.status < 400;
return {
siteId: site.id,
siteName: site.name,
url: site.url,
status: ok ? "success" : "failed",
statusCode: response.status,
responseTimeMs,
errorMessage: ok ? null : `Unexpected status code: ${response.status}`,
};
} catch (error) {
const responseTimeMs = Date.now() - start;
return {
siteId: site.id,
siteName: site.name,
url: site.url,
status: "failed",
statusCode: null,
responseTimeMs,
errorMessage: error instanceof Error ? error.message : String(error),
};
}
}
async function saveRecord(env: Env, result: CheckResult): Promise {
await env.DB.prepare(
`
INSERT INTO check_records (
site_id,
site_name,
url,
status,
status_code,
response_time_ms,
error_message
)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
)
.bind(
result.siteId,
result.siteName,
result.url,
result.status,
result.statusCode,
result.responseTimeMs,
result.errorMessage
)
.run();
}
async function sendAlert(env: Env, result: CheckResult): Promise {
if (!env.ALERT_WEBHOOK) {
return;
}
const content = [
"【网站巡检告警】",
`站点:${result.siteName}`,
`地址:${result.url}`,
`状态:${result.status}`,
`状态码:${result.statusCode ?? "N/A"}`,
`耗时:${result.responseTimeMs}ms`,
`错误:${result.errorMessage ?? "无"}`,
`时间:${new Date().toISOString()}`,
].join("\n");
try {
await fetch(env.ALERT_WEBHOOK, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
text: content,
}),
});
} catch (error) {
console.error("Send alert failed:", error);
}
}
async function runWorkflow(env: Env): Promise<{
total: number;
success: number;
failed: number;
results: CheckResult[];
}> {
const sites = await getEnabledSites(env);
const results: CheckResult[] = [];
for (const site of sites) {
const result = await checkSite(site);
await saveRecord(env, result);
if (result.status === "failed") {
await sendAlert(env, result);
}
results.push(result);
}
return {
total: results.length,
success: results.filter((item) => item.status === "success").length,
failed: results.filter((item) => item.status === "failed").length,
results,
};
}
async function getRecords(env: Env, limit = 50): Promise {
const safeLimit = Math.min(Math.max(limit, 1), 100);
const result = await env.DB.prepare(
`
SELECT
id,
site_id,
site_name,
url,
status,
status_code,
response_time_ms,
error_message,
checked_at
FROM check_records
ORDER BY id DESC
LIMIT ?
`
)
.bind(safeLimit)
.all();
return result.results || [];
}
export default {
async fetch(request: Request, env: Env): Promise {
const url = new URL(request.url);
if (url.pathname === "/") {
return json({
name: "Cloudflare 工作流自动化示例",
description: "定时检测网站状态,并记录结果与发送告警。",
routes: {
"GET /": "查看服务说明",
"GET /health": "查看健康状态",
"GET /records?limit=20": "查询最近检测记录",
"POST /run": "手动触发巡检",
},
});
}
if (url.pathname === "/health") {
return json({
status: "ok",
time: new Date().toISOString(),
});
}
if (url.pathname === "/records" && request.method === "GET") {
const limit = Number(url.searchParams.get("limit") || "50");
const records = await getRecords(env, limit);
return json({
total: records.length,
records,
});
}
if (url.pathname === "/run" && request.method === "POST") {
const result = await runWorkflow(env);
return json({
message: "workflow executed",
...result,
});
}
return json(
{
error: "Not Found",
},
404
);
},
async scheduled(
event: ScheduledEvent,
env: Env,
ctx: ExecutionContext
): Promise {
ctx.waitUntil(runWorkflow(env));
},
};
九、源码核心逻辑解析
1. 获取启用的网站列表
async function getEnabledSites(env: Env): Promise {
const result = await env.DB.prepare(
`
SELECT id, name, url, enabled
FROM sites
WHERE enabled = 1
ORDER BY id ASC
`
).all();
return result.results || [];
}
这里从 D1 数据库中读取所有启用状态的网站。
enabled = 1 表示该站点需要参与巡检。
如果你想暂停某个站点,只需要把它的 enabled 改成 0。
例如:
UPDATE sites SET enabled = 0 WHERE id = 2;
2. 检测网站状态
async function checkSite(site: Site): Promise {
const start = Date.now();
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 10000);
const response = await fetch(site.url, {
method: "GET",
signal: controller.signal,
headers: {
"User-Agent": "Cloudflare-Workflow-Demo/1.0",
},
});
clearTimeout(timeout);
const responseTimeMs = Date.now() - start;
const ok = response.status >= 200 && response.status < 400;
return {
siteId: site.id,
siteName: site.name,
url: site.url,
status: ok ? "success" : "failed",
statusCode: response.status,
responseTimeMs,
errorMessage: ok ? null : `Unexpected status code: ${response.status}`,
};
} catch (error) {
const responseTimeMs = Date.now() - start;
return {
siteId: site.id,
siteName: site.name,
url: site.url,
status: "failed",
statusCode: null,
responseTimeMs,
errorMessage: error instanceof Error ? error.message : String(error),
};
}
}
这段代码负责真正访问目标网站。
这里有几个关键点:
第一,使用 AbortController 实现超时控制。
如果目标网站 10 秒内没有响应,就主动中断请求,避免整个工作流被卡住。
第二,使用状态码判断是否成功。
本文中把 200 <= status < 400 视为正常,其他状态码视为异常。
第三,记录响应耗时。
响应耗时是监控系统里非常重要的指标。一个网站即使没有宕机,但如果响应时间越来越长,也可能表示服务出现了性能问题。
3. 保存检测记录
async function saveRecord(env: Env, result: CheckResult): Promise {
await env.DB.prepare(
`
INSERT INTO check_records (
site_id,
site_name,
url,
status,
status_code,
response_time_ms,
error_message
)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
)
.bind(
result.siteId,
result.siteName,
result.url,
result.status,
result.statusCode,
result.responseTimeMs,
result.errorMessage
)
.run();
}
每次检测完成后,都会把结果写入 check_records 表。
这可以帮助我们后续实现:
- 历史状态查询;
- 可用率统计;
- 异常趋势分析;
- 慢请求分析;
- 告警恢复判断。
4. 发送告警通知
async function sendAlert(env: Env, result: CheckResult): Promise {
if (!env.ALERT_WEBHOOK) {
return;
}
const content = [
"【网站巡检告警】",
`站点:${result.siteName}`,
`地址:${result.url}`,
`状态:${result.status}`,
`状态码:${result.statusCode ?? "N/A"}`,
`耗时:${result.responseTimeMs}ms`,
`错误:${result.errorMessage ?? "无"}`,
`时间:${new Date().toISOString()}`,
].join("\n");
try {
await fetch(env.ALERT_WEBHOOK, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
text: content,
}),
});
} catch (error) {
console.error("Send alert failed:", error);
}
}
这里使用 Webhook 发送告警。
不同平台的 Webhook 消息格式可能不一样。例如:
企业微信机器人常见格式
{
"msgtype": "text",
"text": {
"content": "告警内容"
}
}
飞书机器人常见格式
{
"msg_type": "text",
"content": {
"text": "告警内容"
}
}
钉钉机器人常见格式
{
"msgtype": "text",
"text": {
"content": "告警内容"
}
}
本文为了保持通用,只写了:
{
"text": "告警内容"
}
实际项目中,你需要根据目标平台调整 sendAlert 方法。
十、本地运行测试
执行:
wrangler dev
启动后,会看到本地地址,例如:
http://localhost:8787
访问首页:
curl http://localhost:8787
查看健康状态:
curl http://localhost:8787/health
手动触发巡检:
curl -X POST http://localhost:8787/run
查询最近记录:
curl http://localhost:8787/records?limit=10
如果一切正常,你会看到类似结果:
{
"message": "workflow executed",
"total": 2,
"success": 2,
"failed": 0,
"results": [
{
"siteId": 1,
"siteName": "Cloudflare",
"url": "https://www.cloudflare.com",
"status": "success",
"statusCode": 200,
"responseTimeMs": 428,
"errorMessage": null
}
]
}
十一、部署到 Cloudflare
确认本地测试没问题后,执行部署:
wrangler deploy
部署成功后,终端会输出线上地址,例如:
https://cf-workflow-demo.your-name.workers.dev
然后测试线上接口:
curl https://cf-workflow-demo.your-name.workers.dev/health
手动触发线上巡检:
curl -X POST https://cf-workflow-demo.your-name.workers.dev/run
查询记录:
curl https://cf-workflow-demo.your-name.workers.dev/records
部署成功后,Cron Triggers 会按照 wrangler.toml 中配置的表达式自动运行。
十二、如何增加监控网站?
你可以通过 D1 SQL 直接插入新的站点。
本地插入:
wrangler d1 execute workflow_demo_db --local --command="INSERT INTO sites (name, url, enabled) VALUES ('My Blog', 'https://你的域名.com', 1);"
远程插入:
wrangler d1 execute workflow_demo_db --remote --command="INSERT INTO sites (name, url, enabled) VALUES ('My Blog', 'https://你的域名.com', 1);"
查看站点列表:
wrangler d1 execute workflow_demo_db --remote --command="SELECT * FROM sites;"
禁用某个站点:
wrangler d1 execute workflow_demo_db --remote --command="UPDATE sites SET enabled = 0 WHERE id = 1;"
重新启用:
wrangler d1 execute workflow_demo_db --remote --command="UPDATE sites SET enabled = 1 WHERE id = 1;"
十三、增加简单权限保护
当前 /run 和 /records 接口是公开的。实际项目中不建议这么做。
可以在 wrangler.toml 中增加一个 Token:
[vars]
ALERT_WEBHOOK = ""
API_TOKEN = "your-secret-token"
然后修改 Env:
export interface Env {
DB: D1Database;
ALERT_WEBHOOK: string;
API_TOKEN: string;
}
增加鉴权函数:
function requireAuth(request: Request, env: Env): boolean {
const token = request.headers.get("Authorization");
return token === `Bearer ${env.API_TOKEN}`;
}
在 /run 和 /records 中使用:
if (!requireAuth(request, env)) {
return json({ error: "Unauthorized" }, 401);
}
请求时携带 Token:
curl -H "Authorization: Bearer your-secret-token" \
https://cf-workflow-demo.your-name.workers.dev/records
这样可以避免别人随意调用你的接口。
十四、增加重试机制
在真实场景中,网络请求偶尔失败很正常。如果失败一次就告警,可能会产生大量误报。
我们可以增加一个简单重试函数:
async function retry(
task: () => Promise,
times = 3,
delayMs = 1000
): Promise {
let lastError: unknown;
for (let i = 0; i < times; i++) {
try {
return await task();
} catch (error) {
lastError = error;
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
throw lastError;
}
然后在检测时使用:
const response = await retry(() =>
fetch(site.url, {
method: "GET",
signal: controller.signal,
headers: {
"User-Agent": "Cloudflare-Workflow-Demo/1.0",
},
}),
3,
1000
);
不过要注意,Worker 的执行时间不是无限的。如果站点非常多,且每个站点都重试很多次,可能会造成执行时间过长。
更好的方式是使用 Cloudflare Queues,把每个站点检测拆成独立任务。
十五、进一步优化:使用队列拆分任务
如果你需要监控几十个甚至几百个网站,那么在一次 Cron 中顺序执行所有检测并不理想。
可以把架构改成:
Cron Trigger
│
▼
Worker 读取站点列表
│
▼
Cloudflare Queue
│
▼
Consumer Worker 逐个处理检测任务
│
▼
D1 记录结果
这样做的好处是:
- 任务可以异步处理;
- 单个任务失败不会影响整体;
- 可以利用队列重试;
- 更适合大规模任务;
- 更容易控制并发。
队列生产者示例:
export interface Env {
DB: D1Database;
CHECK_QUEUE: Queue;
}
async function enqueueSites(env: Env) {
const sites = await getEnabledSites(env);
for (const site of sites) {
await env.CHECK_QUEUE.send({
siteId: site.id,
name: site.name,
url: site.url,
});
}
return sites.length;
}
队列消费者示例:
export default {
async queue(batch: MessageBatch, env: Env): Promise {
for (const message of batch.messages) {
const site = message.body as Site;
const result = await checkSite(site);
await saveRecord(env, result);
if (result.status === "failed") {
await sendAlert(env, result);
}
message.ack();
}
},
};
队列适合更复杂的自动化任务,例如:
- 批量同步数据;
- 批量发送邮件;
- 批量生成静态页面;
- 批量处理图片;
- 大量 URL 可用性检测。
十六、常见问题
1. Cron 为什么没有立即执行?
Cloudflare Cron Triggers 不是保存配置后马上执行,它会按照配置的 cron 表达式运行。
如果你想立即测试,可以调用:
curl -X POST https://你的-worker-url/run
2. 本地 D1 和远程 D1 数据不一样?
是的,本地和远程是两套环境。
本地初始化:
wrangler d1 execute workflow_demo_db --local --file=./schema.sql
远程初始化:
wrangler d1 execute workflow_demo_db --remote --file=./schema.sql
3. 为什么告警没有收到?
请检查:
ALERT_WEBHOOK是否配置正确;- Webhook 消息格式是否符合平台要求;
- 目标平台是否要求签名;
- Worker 日志中是否有错误。
查看日志:
wrangler tail
4. Worker 是否适合长时间任务?
Cloudflare Workers 更适合短任务、事件驱动任务和边缘计算逻辑。
如果你的任务非常长,建议拆分为多个小任务,并结合:
- Queues;
- Durable Objects;
- D1;
- R2;
- Workflows;
- 外部 API。
十七、完整项目结构
最终项目结构如下:
cf-workflow-demo
├── package.json
├── schema.sql
├── src
│ └── index.ts
├── tsconfig.json
└── wrangler.toml
你可以把这些文件提交到 GitHub:
git init
git add .
git commit -m "init cloudflare workflow automation demo"
之后每次修改后执行:
wrangler deploy
即可更新线上版本。
十八、可以扩展成哪些自动化场景?
这个示例可以很容易扩展成更多工作流自动化系统:
1. 自动内容同步
定时从 RSS、CMS、第三方 API 抓取数据,写入 D1 或 KV。
2. 自动报表生成
每天凌晨统计业务数据,生成日报、周报,并推送到邮箱或群聊。
3. 自动清理任务
定期清理过期缓存、过期订单、过期验证码、无效会话。
4. 自动监控告警
监控网站、接口、证书有效期、CDN 状态,一旦异常立即通知。
5. 自动部署辅助
监听 Webhook,触发构建、刷新缓存、预热页面。
6. 自动 AI 处理
定时拉取待处理内容,调用 AI API 生成摘要、标签、翻译结果,再保存到数据库。
十九、最佳实践建议
在实际项目中,建议遵循以下原则:
-
任务要小
不要把一个巨大任务塞进一次 Worker 执行中,尽量拆成多个小任务。 -
状态要落库
自动化任务必须记录状态,否则失败后无法排查。 -
失败要可重试
网络请求、第三方 API 调用都可能失败,应该设计重试机制。 -
接口要鉴权
手动触发接口、查询接口不要裸奔。 -
日志要清晰
关键节点使用console.log或console.error,方便通过wrangler tail排查。 -
告警要降噪
不要失败一次就疯狂通知,可以增加连续失败次数判断。 -
配置要外置
网站列表、Webhook、Token 等配置不要硬编码在源码里。 -
注意平台限制
Worker、D1、Queues 都有各自的免费额度和限制,生产环境要提前评估。
二十、总结
本文从零实现了一个基于 Cloudflare Workers 的工作流自动化系统,功能包括:
- 定时触发;
- 网站状态检测;
- 数据库存储;
- 告警通知;
- 手动执行;
- 历史记录查询;
- 本地开发;
- 线上部署。
虽然示例是「网站巡检」,但它的结构可以复用于大多数自动化场景。
核心思想是:
使用 Cron 触发任务,使用 Worker 执行业务逻辑,使用 D1 保存状态,使用 Webhook 发送结果。
如果任务规模较小,这套方案已经足够简单、稳定、低成本。
如果任务规模变大,可以继续引入 Cloudflare Queues,把一个大工作流拆分成多个异步任务,从而提升可靠性和可维护性。