用 Cloudflare Workers 搭一个自动化任务系统:队列、KV 与源码实战
Cloudflare 工作流自动化教程|附源码
在现代 Web 开发与运维体系中,“自动化”几乎已经成为提升效率、降低错误率、缩短交付周期的必备能力。无论是自动部署、定时任务、接口编排、数据同步,还是告警通知、图片处理、订单状态流转,只要流程中存在重复步骤,都可以通过工作流自动化来优化。
Cloudflare 近年来已经不再只是一个 CDN 和 DNS 服务商,它提供了包括 Workers、Pages、KV、D1、R2、Queues、Cron Triggers、Workflows 等一整套边缘计算能力。借助这些能力,我们可以在 Cloudflare 的全球边缘网络上构建轻量、稳定、低成本的自动化系统。
本文将以一个完整示例为主线,介绍如何使用 Cloudflare Workers 实现一个“工作流自动化”项目:通过接口触发任务,将任务写入队列,由 Worker 后台消费处理,并在处理完成后记录状态。文章会包含项目结构、配置方式、核心源码、部署步骤以及扩展建议。
一、什么是 Cloudflare 工作流自动化?
所谓“工作流自动化”,本质上是把一个业务流程拆分成多个可执行步骤,并通过程序自动完成这些步骤。
例如:
- 用户提交表单;
- 系统校验数据;
- 生成任务;
- 写入消息队列;
- 后台异步处理;
- 调用第三方 API;
- 保存处理结果;
- 发送通知。
如果这些步骤全部由人工操作,不仅效率低,而且容易出错。而使用 Cloudflare Workers 这类无服务器平台,我们可以把这些步骤部署到边缘节点,由系统自动完成。
Cloudflare 的优势主要包括:
- 无需维护服务器:不用购买 VPS,也不用配置 Nginx、PM2、Docker 等;
- 全球边缘网络:请求就近执行,延迟低;
- 按量计费:小项目成本非常低;
- 生态完整:Workers、KV、D1、R2、Queues 可组合使用;
- 适合自动化任务:支持 HTTP 触发、定时触发、队列消费等模式。
二、本文要实现的功能
本文将实现一个简单但完整的自动化工作流系统。
假设我们有一个内容处理场景:
用户通过 API 提交一段文本,系统自动创建任务,将任务加入队列,后台 Worker 异步处理文本,最后保存任务状态和结果。
整体流程如下:
客户端请求
↓
Cloudflare Worker API
↓
参数校验
↓
写入任务状态到 KV
↓
发送消息到 Cloudflare Queue
↓
Queue Consumer 异步消费
↓
执行文本处理逻辑
↓
更新任务结果到 KV
↓
客户端查询任务状态
我们将实现三个核心接口:
| 接口 | 方法 | 说明 |
|---|---|---|
/api/tasks |
POST | 创建自动化任务 |
/api/tasks/:id |
GET | 查询任务状态 |
/api/health |
GET | 健康检查 |
同时,我们会使用:
- Cloudflare Workers:运行 API 和消费任务;
- Cloudflare Queues:实现异步任务队列;
- Cloudflare KV:保存任务状态和执行结果;
- Wrangler:本地开发与部署工具。
三、准备工作
在开始之前,你需要准备以下内容:
1. Cloudflare 账号
前往 Cloudflare 官网注册账号:
https://dash.cloudflare.com/
2. 安装 Node.js
建议使用 Node.js 18 或更高版本。
查看版本:
node -v
npm -v
3. 安装 Wrangler
Wrangler 是 Cloudflare Workers 的官方 CLI 工具。
npm install -g wrangler
登录 Cloudflare:
wrangler login
登录成功后,Wrangler 会在本地保存认证信息。
四、创建项目
使用 npm 创建项目目录:
mkdir cloudflare-workflow-demo
cd cloudflare-workflow-demo
npm init -y
安装必要依赖:
npm install nanoid
npm install -D wrangler typescript
初始化 Worker 项目也可以使用:
npm create cloudflare@latest
不过为了便于理解,本文采用手动创建文件的方式。
五、项目目录结构
最终项目结构如下:
cloudflare-workflow-demo
├── package.json
├── wrangler.toml
├── tsconfig.json
└── src
├── index.ts
├── router.ts
├── task.ts
└── types.ts
各文件作用如下:
| 文件 | 说明 |
|---|---|
index.ts |
Worker 入口文件,处理 fetch 和 queue |
router.ts |
简易路由分发 |
task.ts |
任务创建、查询、处理逻辑 |
types.ts |
类型定义 |
wrangler.toml |
Cloudflare 配置文件 |
六、配置 wrangler.toml
首先创建 wrangler.toml:
name = "cloudflare-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-06-01"
[vars]
APP_NAME = "Cloudflare Workflow Demo"
[[kv_namespaces]]
binding = "TASK_KV"
id = "替换为你的KV命名空间ID"
[[queues.producers]]
binding = "TASK_QUEUE"
queue = "task-workflow-queue"
[[queues.consumers]]
queue = "task-workflow-queue"
max_batch_size = 5
max_batch_timeout = 10
这里有几个重点:
TASK_KV:在 Worker 中访问 KV 的绑定名称;TASK_QUEUE:发送任务消息时使用的队列绑定;task-workflow-queue:Cloudflare Queue 的名称;max_batch_size:一次最多消费多少条消息;max_batch_timeout:最多等待多少秒后触发消费。
七、创建 Cloudflare KV
执行命令创建 KV 命名空间:
wrangler kv namespace create TASK_KV
创建成功后会看到类似输出:
🌀 Creating namespace with title "cloudflare-workflow-demo-TASK_KV"
✨ Success!
Add the following to your configuration file in your kv_namespaces array:
{ binding = "TASK_KV", id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }
将输出中的 id 填入 wrangler.toml。
如果你还需要预览环境,可以创建 preview namespace:
wrangler kv namespace create TASK_KV --preview
八、创建 Cloudflare Queue
创建队列:
wrangler queues create task-workflow-queue
创建完成后,确认 wrangler.toml 中配置的队列名称与这里一致。
查看队列列表:
wrangler queues list
九、编写类型定义
创建 src/types.ts:
export type TaskStatus = "pending" | "processing" | "success" | "failed";
export interface CreateTaskPayload {
text: string;
action?: "uppercase" | "lowercase" | "summary";
}
export interface TaskRecord {
id: string;
text: string;
action: "uppercase" | "lowercase" | "summary";
status: TaskStatus;
result?: string;
error?: string;
createdAt: string;
updatedAt: string;
}
export interface QueueMessageBody {
taskId: string;
}
export interface Env {
APP_NAME: string;
TASK_KV: KVNamespace;
TASK_QUEUE: Queue;
}
这里定义了任务状态、创建任务的请求体、任务记录结构、队列消息体以及 Cloudflare Worker 的环境变量类型。
任务状态说明:
| 状态 | 含义 |
|---|---|
pending |
任务已创建,等待处理 |
processing |
任务正在处理 |
success |
任务处理成功 |
failed |
任务处理失败 |
十、编写任务逻辑
创建 src/task.ts:
import { nanoid } from "nanoid";
import type {
CreateTaskPayload,
Env,
QueueMessageBody,
TaskRecord,
} from "./types";
function now() {
return new Date().toISOString();
}
function getTaskKey(taskId: string) {
return `task:${taskId}`;
}
export async function createTask(env: Env, payload: CreateTaskPayload) {
if (!payload || typeof payload.text !== "string") {
return jsonResponse(
{
success: false,
message: "字段 text 必须是字符串",
},
400
);
}
const text = payload.text.trim();
if (!text) {
return jsonResponse(
{
success: false,
message: "字段 text 不能为空",
},
400
);
}
if (text.length > 5000) {
return jsonResponse(
{
success: false,
message: "text 长度不能超过 5000 个字符",
},
400
);
}
const action = payload.action || "uppercase";
if (!["uppercase", "lowercase", "summary"].includes(action)) {
return jsonResponse(
{
success: false,
message: "action 只支持 uppercase、lowercase、summary",
},
400
);
}
const taskId = nanoid();
const record: TaskRecord = {
id: taskId,
text,
action,
status: "pending",
createdAt: now(),
updatedAt: now(),
};
await env.TASK_KV.put(getTaskKey(taskId), JSON.stringify(record));
const message: QueueMessageBody = {
taskId,
};
await env.TASK_QUEUE.send(message);
return jsonResponse({
success: true,
message: "任务已创建",
data: {
taskId,
status: record.status,
},
});
}
export async function getTask(env: Env, taskId: string) {
if (!taskId) {
return jsonResponse(
{
success: false,
message: "缺少任务 ID",
},
400
);
}
const recordText = await env.TASK_KV.get(getTaskKey(taskId));
if (!recordText) {
return jsonResponse(
{
success: false,
message: "任务不存在",
},
404
);
}
const record = JSON.parse(recordText) as TaskRecord;
return jsonResponse({
success: true,
data: record,
});
}
export async function processTask(env: Env, taskId: string) {
const key = getTaskKey(taskId);
const recordText = await env.TASK_KV.get(key);
if (!recordText) {
throw new Error(`任务不存在:${taskId}`);
}
const record = JSON.parse(recordText) as TaskRecord;
if (record.status === "success") {
return;
}
record.status = "processing";
record.updatedAt = now();
await env.TASK_KV.put(key, JSON.stringify(record));
try {
const result = await runAction(record.text, record.action);
record.status = "success";
record.result = result;
record.updatedAt = now();
await env.TASK_KV.put(key, JSON.stringify(record));
} catch (error) {
record.status = "failed";
record.error = error instanceof Error ? error.message : String(error);
record.updatedAt = now();
await env.TASK_KV.put(key, JSON.stringify(record));
throw error;
}
}
async function runAction(
text: string,
action: "uppercase" | "lowercase" | "summary"
) {
await sleep(500);
if (action === "uppercase") {
return text.toUpperCase();
}
if (action === "lowercase") {
return text.toLowerCase();
}
if (action === "summary") {
return mockSummary(text);
}
return text;
}
function mockSummary(text: string) {
if (text.length <= 80) {
return text;
}
return `${text.slice(0, 80)}...`;
}
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export function jsonResponse(data: unknown, status = 200) {
return new Response(JSON.stringify(data, null, 2), {
status,
headers: {
"content-type": "application/json;charset=UTF-8",
"access-control-allow-origin": "*",
"access-control-allow-methods": "GET,POST,OPTIONS",
"access-control-allow-headers": "content-type,authorization",
},
});
}
这段代码完成了几个关键动作:
- 接收并校验用户输入;
- 创建任务 ID;
- 将任务初始状态写入 KV;
- 将任务 ID 发送到 Queue;
- 队列消费时读取任务;
- 执行具体动作;
- 更新任务状态和结果。
在真实项目中,runAction 可以替换为更复杂的逻辑,例如调用 OpenAI、发送邮件、同步数据库、处理图片、请求第三方接口等。
十一、编写路由逻辑
创建 src/router.ts:
import type { Env } from "./types";
import { createTask, getTask, jsonResponse } from "./task";
export async function handleRequest(request: Request, env: Env) {
const url = new URL(request.url);
const method = request.method.toUpperCase();
if (method === "OPTIONS") {
return jsonResponse({
success: true,
});
}
if (url.pathname === "/api/health" && method === "GET") {
return jsonResponse({
success: true,
message: "ok",
app: env.APP_NAME,
timestamp: new Date().toISOString(),
});
}
if (url.pathname === "/api/tasks" && method === "POST") {
let payload;
try {
payload = await request.json();
} catch {
return jsonResponse(
{
success: false,
message: "请求体必须是合法 JSON",
},
400
);
}
return createTask(env, payload);
}
const taskMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)$/);
if (taskMatch && method === "GET") {
const taskId = taskMatch[1];
return getTask(env, taskId);
}
return jsonResponse(
{
success: false,
message: "接口不存在",
},
404
);
}
这里没有使用任何第三方 Web 框架,而是直接基于 Request 和 URL 实现了一个轻量路由。对于小型自动化工具来说,这种写法足够清晰,也减少了依赖。
如果你的项目接口较多,也可以考虑使用:
- Hono;
- itty-router;
- hono-openapi;
- zod;
- valibot。
十二、编写 Worker 入口
创建 src/index.ts:
import type { Env, QueueMessageBody } from "./types";
import { handleRequest } from "./router";
import { processTask } from "./task";
export default {
async fetch(request: Request, env: Env): Promise {
return handleRequest(request, env);
},
async queue(
batch: MessageBatch,
env: Env
): Promise {
for (const message of batch.messages) {
try {
await processTask(env, message.body.taskId);
message.ack();
} catch (error) {
console.error("任务处理失败:", error);
message.retry();
}
}
},
};
Cloudflare Workers 支持多个事件入口:
fetch:处理 HTTP 请求;scheduled:处理定时任务;queue:处理队列消息;email:处理邮件事件。
本文示例中,我们同时使用了 fetch 和 queue。
当用户调用 /api/tasks 创建任务后,Worker 会把消息发送到队列;Cloudflare Queue 随后触发同一个 Worker 的 queue 方法进行异步消费。
十三、配置 TypeScript
创建 tsconfig.json:
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "Bundler",
"lib": ["ES2022", "WebWorker"],
"types": ["@cloudflare/workers-types"],
"strict": true,
"skipLibCheck": true,
"noEmit": true
},
"include": ["src"]
}
安装 Cloudflare 类型:
npm install -D @cloudflare/workers-types
十四、配置 package.json
修改 package.json:
{
"name": "cloudflare-workflow-demo",
"version": "1.0.0",
"description": "Cloudflare workflow automation demo",
"type": "module",
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy",
"tail": "wrangler tail"
},
"dependencies": {
"nanoid": "^5.0.7"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20240620.0",
"typescript": "^5.4.5",
"wrangler": "^3.60.0"
}
}
十五、本地运行
启动本地开发环境:
npm run dev
如果配置正确,终端会显示本地访问地址,例如:
http://localhost:8787
测试健康检查接口:
curl http://localhost:8787/api/health
返回示例:
{
"success": true,
"message": "ok",
"app": "Cloudflare Workflow Demo",
"timestamp": "2024-06-01T12:00:00.000Z"
}
十六、创建任务
发送 POST 请求:
curl -X POST http://localhost:8787/api/tasks \
-H "content-type: application/json" \
-d '{
"text": "Hello Cloudflare Workflow Automation",
"action": "uppercase"
}'
返回示例:
{
"success": true,
"message": "任务已创建",
"data": {
"taskId": "abc123",
"status": "pending"
}
}
保存 taskId,稍后用于查询任务状态。
十七、查询任务状态
curl http://localhost:8787/api/tasks/abc123
如果任务还未处理完成,可能返回:
{
"success": true,
"data": {
"id": "abc123",
"text": "Hello Cloudflare Workflow Automation",
"action": "uppercase",
"status": "processing",
"createdAt": "2024-06-01T12:00:00.000Z",
"updatedAt": "2024-06-01T12:00:01.000Z"
}
}
处理完成后返回:
{
"success": true,
"data": {
"id": "abc123",
"text": "Hello Cloudflare Workflow Automation",
"action": "uppercase",
"status": "success",
"result": "HELLO CLOUDFLARE WORKFLOW AUTOMATION",
"createdAt": "2024-06-01T12:00:00.000Z",
"updatedAt": "2024-06-01T12:00:02.000Z"
}
}
十八、部署到 Cloudflare
确认已经登录 Wrangler:
wrangler whoami
部署:
npm run deploy
部署成功后,会看到类似地址:
https://cloudflare-workflow-demo.yourname.workers.dev
在线测试:
curl https://cloudflare-workflow-demo.yourname.workers.dev/api/health
如果返回正常,说明部署成功。
十九、加入定时触发能力
除了 API 触发,很多自动化场景还需要定时执行。例如:
- 每天凌晨同步数据;
- 每 10 分钟检查订单状态;
- 每小时清理过期任务;
- 每周生成报表;
- 定时抓取 RSS 或 Sitemap。
Cloudflare Workers 支持 Cron Triggers。
在 wrangler.toml 添加:
[triggers]
crons = ["*/10 * * * *"]
表示每 10 分钟执行一次。
然后修改 src/index.ts:
import type { Env, QueueMessageBody } from "./types";
import { handleRequest } from "./router";
import { processTask } from "./task";
export default {
async fetch(request: Request, env: Env): Promise {
return handleRequest(request, env);
},
async queue(
batch: MessageBatch,
env: Env
): Promise {
for (const message of batch.messages) {
try {
await processTask(env, message.body.taskId);
message.ack();
} catch (error) {
console.error("任务处理失败:", error);
message.retry();
}
}
},
async scheduled(
event: ScheduledEvent,
env: Env,
ctx: ExecutionContext
): Promise {
ctx.waitUntil(runScheduledJob(env));
},
};
async function runScheduledJob(env: Env) {
console.log("定时任务触发:", new Date().toISOString());
// 示例:可以在这里扫描外部 API、创建任务、发送队列消息等
}
这样,我们就同时拥有了:
- HTTP API 触发;
- Queue 异步消费;
- Cron 定时触发。
这已经可以覆盖大多数轻量工作流自动化需求。
二十、实际业务扩展示例
1. 自动发送邮件
在 runAction 中可以调用邮件服务商接口,例如 Resend、SendGrid、Mailgun:
await fetch("https://api.resend.com/emails", {
method: "POST",
headers: {
authorization: `Bearer ${env.RESEND_API_KEY}`,
"content-type": "application/json",
},
body: JSON.stringify({
from: "noreply@example.com",
to: "user@example.com",
subject: "任务完成通知",
html: "你的任务已经处理完成。
",
}),
});
2. 自动调用 AI 接口
可以将文本处理替换为 AI 总结:
const response = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
headers: {
authorization: `Bearer ${env.OPENAI_API_KEY}`,
"content-type": "application/json",
},
body: JSON.stringify({
model: "gpt-4o-mini",
messages: [
{
role: "user",
content: `请总结以下内容:${text}`,
},
],
}),
});
3. 自动同步数据
定时从一个接口拉取数据,再写入 D1 数据库或外部系统:
const res = await fetch("https://example.com/api/orders");
const orders = await res.json();
for (const order of orders) {
await env.TASK_QUEUE.send({
taskId: order.id,
});
}
4. 自动清理过期任务
KV 可以设置过期时间,也可以通过定时任务清理旧数据:
await env.TASK_KV.put(key, JSON.stringify(record), {
expirationTtl: 60 * 60 * 24 * 7,
});
表示任务记录 7 天后自动过期。
二十一、错误处理与重试机制
在自动化工作流中,错误处理非常重要。常见问题包括:
- 第三方 API 超时;
- 网络失败;
- 请求频率限制;
- 数据格式不符合预期;
- 队列消息重复消费;
- 某一步处理成功但后续失败。
Cloudflare Queues 支持消息重试。本文中的代码:
message.retry();
表示当前消息处理失败后交给队列重试。
不过在生产环境中,建议加入以下机制:
1. 幂等处理
同一个任务可能被重复消费,所以任务处理逻辑应当可以重复执行而不产生错误结果。
例如:
if (record.status === "success") {
return;
}
这就是一种简单的幂等判断。
2. 最大重试次数
可以在任务记录中增加 retryCount 字段,超过次数后标记为失败,避免无限重试。
3. 死信队列
对于无法处理的消息,可以发送到另一个队列,例如:
task-dead-letter-queue
后续由人工或专门 Worker 处理。
4. 结构化日志
建议打印包含任务 ID、状态、耗时的信息,方便排查问题:
console.log(JSON.stringify({
event: "task_completed",
taskId,
duration,
status: "success"
}));
二十二、安全建议
自动化接口通常会触发实际业务动作,因此一定要重视安全。
1. 加入 API Token
可以在请求头中要求携带 Token:
const token = request.headers.get("authorization");
if (token !== `Bearer ${env.API_TOKEN}`) {
return jsonResponse(
{
success: false,
message: "Unauthorized",
},
401
);
}
然后在 wrangler.toml 或 Cloudflare Dashboard 中配置环境变量。
更推荐使用 Secret:
wrangler secret put API_TOKEN
2. 限制请求大小
本文已经对 text.length 做了限制。对于文件、图片、JSON 批量数据,也应该控制大小,防止滥用。
3. 校验来源
如果接口只供你的前端使用,可以结合:
- Cloudflare Access;
- Turnstile;
- 自定义签名;
- IP 白名单;
- HMAC 校验。
4. 不要把密钥写进源码
所有 API Key、Token、Webhook Secret 都应使用 wrangler secret put 管理。
二十三、KV、D1、R2 如何选择?
在 Cloudflare 生态中,不同存储适合不同场景。
| 产品 | 适合场景 |
|---|---|
| KV | 简单键值存储、缓存、任务状态 |
| D1 | 关系型数据、订单、用户、报表 |
| R2 | 文件、图片、日志归档、大对象 |
| Queues | 异步任务、削峰、解耦 |
| Durable Objects | 强一致状态、协同、聊天室、计数器 |
本文选择 KV,是因为任务状态查询非常适合键值模式:
task:abc123 -> JSON
如果你的任务需要复杂查询,例如按用户查询、按状态分页、统计成功率,那么可以考虑使用 D1。
二十四、完整源码汇总
为了方便复制,这里给出核心源码汇总。
src/types.ts
export type TaskStatus = "pending" | "processing" | "success" | "failed";
export interface CreateTaskPayload {
text: string;
action?: "uppercase" | "lowercase" | "summary";
}
export interface TaskRecord {
id: string;
text: string;
action: "uppercase" | "lowercase" | "summary";
status: TaskStatus;
result?: string;
error?: string;
createdAt: string;
updatedAt: string;
}
export interface QueueMessageBody {
taskId: string;
}
export interface Env {
APP_NAME: string;
TASK_KV: KVNamespace;
TASK_QUEUE: Queue;
}
src/index.ts
import type { Env, QueueMessageBody } from "./types";
import { handleRequest } from "./router";
import { processTask } from "./task";
export default {
async fetch(request: Request, env: Env): Promise {
return handleRequest(request, env);
},
async queue(
batch: MessageBatch,
env: Env
): Promise {
for (const message of batch.messages) {
try {
await processTask(env, message.body.taskId);
message.ack();
} catch (error) {
console.error("任务处理失败:", error);
message.retry();
}
}
},
};
完整项目还包括 router.ts、task.ts、wrangler.toml 和 package.json,上文已经给出。
二十五、总结
本文从零实现了一个基于 Cloudflare Workers 的工作流自动化示例。它虽然是一个简单的文本处理系统,但已经包含了自动化工作流中的核心思想:
- 使用 API 创建任务;
- 使用 KV 保存任务状态;
- 使用 Queue 解耦请求与后台处理;
- 使用 Worker 消费队列;
- 使用状态字段追踪任务生命周期;
- 支持进一步扩展定时任务、通知、AI 处理和数据同步。
Cloudflare 的边缘计算能力非常适合构建轻量级自动化系统。对于个人开发者、小团队、独立站、SaaS 工具、内部运维平台来说,它可以用很低的成本完成许多过去需要服务器、消息队列、定时任务系统才能实现的功能。
如果你正在做以下事情:
- 自动部署;
- 自动采集;
- 自动通知;
- 表单自动处理;
- AI 内容加工;
- Webhook 编排;
- 订单状态同步;
- 定时数据报表;
那么 Cloudflare Workers + Queues + KV/D1 是一个非常值得尝试的组合。你可以从本文的示例开始,把 runAction 替换成自己的业务逻辑,一个实用的工作流自动化系统就搭建完成了。