上一篇 下一篇 分享链接 返回 返回顶部

2026 用 Cloudflare 搭建自动化工作流:从 Worker、队列到数据库与定时任务

发布人:慈云数据-客服中心 发布时间:1 天前 阅读量:4

Cloudflare 工作流自动化教程|2026 最新版

适用对象:开发者、运维工程师、SaaS 创业团队、站长、自动化爱好者
关键词:Cloudflare Workers、Cloudflare Workflows、Queues、R2、D1、KV、Cron Triggers、API 自动化、Serverless


一、为什么要用 Cloudflare 做工作流自动化?

过去几年,Cloudflare 已经不再只是一个 CDN、DNS 或 WAF 服务商,而是逐渐发展成一个完整的边缘计算与开发平台。通过 Cloudflare Workers、Queues、R2、D1、KV、Durable Objects、Workflows、Cron Triggers 等产品,开发者可以在不维护传统服务器的情况下,搭建稳定、高性能、可扩展的自动化系统。

所谓“工作流自动化”,可以理解为:

将一系列原本需要人工执行的步骤,通过程序、触发器、队列、数据库和第三方 API 自动串联起来,让系统自动完成任务。

例如:

  • 网站有新用户注册后,自动写入数据库、发送欢迎邮件、同步到 CRM;
  • 每天定时抓取数据,清洗后保存到 R2 或 D1;
  • 用户上传图片后,自动压缩、转存、生成访问链接;
  • 电商订单支付成功后,自动通知仓库、发送短信、生成发票;
  • API 请求量过高时,自动加入队列,异步处理,避免系统崩溃;
  • 监控网站状态,异常时自动发送告警到 Telegram、Slack 或企业微信。

相比传统服务器方案,Cloudflare 的优势主要有:

  1. 无需维护服务器:不用配置 Nginx、Linux、防火墙、负载均衡等基础设施。
  2. 全球边缘节点执行:请求可以在离用户最近的位置处理,延迟更低。
  3. 按量计费:适合个人项目、创业项目和中小型业务。
  4. 生态完整:Workers 负责逻辑,Queues 负责异步任务,R2 负责对象存储,D1 负责关系型数据库,KV 负责高速键值存储。
  5. 天然适合自动化:支持定时任务、Webhook、API 调用、事件驱动和队列消费。

本文将以 2026 年的 Cloudflare 开发体系为基础,系统讲解如何搭建一套完整的工作流自动化方案。


二、Cloudflare 自动化核心组件介绍

在开始实战之前,我们先了解几个常用组件。

1. Cloudflare Workers

Workers 是 Cloudflare 的无服务器运行环境,可以理解为运行在 Cloudflare 边缘网络上的 JavaScript / TypeScript 函数。

它常用于:

  • 接收 Webhook;
  • 处理 API 请求;
  • 调用第三方接口;
  • 写入数据库;
  • 访问对象存储;
  • 执行权限校验;
  • 作为自动化工作流的入口。

一个最简单的 Worker 示例:

export default {
  async fetch(request, env, ctx) {
    return new Response("Hello Cloudflare Workflow!");
  }
}

2. Cron Triggers

Cron Triggers 用来执行定时任务。例如每天凌晨同步数据、每小时检测网站状态、每 5 分钟处理报表等。

配置示例:

[triggers]
crons = ["0 0 * * *"]

表示每天 UTC 时间 0 点执行一次。

3. Cloudflare Queues

Queues 是消息队列服务,非常适合处理异步任务。

例如用户提交订单后,不必在 HTTP 请求中立即完成所有任务,而是先把任务放入队列:

  • 发送邮件;
  • 生成发票;
  • 通知仓库;
  • 同步 CRM;
  • 记录日志。

这样可以提高接口响应速度,也能避免第三方 API 暂时不可用时导致主流程失败。

4. Cloudflare R2

R2 是 Cloudflare 的对象存储服务,类似 AWS S3,但没有出站流量费用,非常适合存放:

  • 图片;
  • 视频;
  • 日志文件;
  • 备份文件;
  • 报表文件;
  • 用户上传内容。

5. Cloudflare D1

D1 是 Cloudflare 提供的 Serverless SQLite 数据库,适合存储结构化数据,例如:

  • 用户信息;
  • 订单记录;
  • 任务状态;
  • 自动化执行日志;
  • API 调用记录。

6. Cloudflare KV

KV 是键值存储,适合读多写少、需要快速访问的场景,例如:

  • 配置项;
  • 用户 Token;
  • 缓存结果;
  • 临时状态;
  • 自动化开关。

7. Cloudflare Workflows

Cloudflare Workflows 是专门用于编排长时间运行、多步骤任务的服务。相比普通 Worker,它更适合需要状态管理、重试、暂停、恢复的自动化流程。

例如:

  1. 接收用户上传文件;
  2. 写入任务记录;
  3. 调用 AI 接口分析文件;
  4. 等待处理结果;
  5. 保存结果到数据库;
  6. 发送通知;
  7. 标记任务完成。

这类流程如果全部写在一个 Worker 请求里,很容易超时或难以维护,而 Workflows 更适合作为编排层。


三、准备工作:创建 Cloudflare 自动化项目

1. 安装 Node.js

建议使用 Node.js LTS 版本。可以通过以下命令检查:

node -v
npm -v

2. 安装 Wrangler

Wrangler 是 Cloudflare Workers 的官方命令行工具。

npm install -g wrangler

登录 Cloudflare:

wrangler login

登录后会打开浏览器授权,授权成功后即可在本地管理 Workers、D1、R2、Queues 等资源。

3. 创建 Workers 项目

npm create cloudflare@latest cloudflare-workflow-demo

进入项目目录:

cd cloudflare-workflow-demo

本地运行:

npm run dev

如果看到本地服务正常启动,说明项目初始化成功。


四、实战案例:搭建一个“订单自动处理工作流”

下面我们设计一个常见的自动化场景:

用户提交订单后,系统自动记录订单、放入队列、异步发送通知、生成日志,并支持定时检查未完成订单。

整体流程如下:

用户提交订单
   ↓
Worker 接收请求
   ↓
写入 D1 数据库
   ↓
发送消息到 Queue
   ↓
Queue Consumer 异步处理订单
   ↓
调用第三方通知接口
   ↓
更新订单状态
   ↓
保存日志到 R2 或 D1

五、创建 D1 数据库

1. 创建数据库

wrangler d1 create workflow_demo_db

执行后会返回数据库信息,例如:

[[d1_databases]]
binding = "DB"
database_name = "workflow_demo_db"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

把它添加到 wrangler.toml 中。

2. 创建数据表

新建 schema.sql

CREATE TABLE IF NOT EXISTS orders (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  order_id TEXT NOT NULL UNIQUE,
  user_email TEXT NOT NULL,
  amount INTEGER NOT NULL,
  status TEXT NOT NULL DEFAULT 'pending',
  created_at TEXT NOT NULL,
  updated_at TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS workflow_logs (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  order_id TEXT NOT NULL,
  message TEXT NOT NULL,
  created_at TEXT NOT NULL
);

执行迁移:

wrangler d1 execute workflow_demo_db --file=./schema.sql

六、创建 Queue 消息队列

1. 创建队列

wrangler queues create order-processing-queue

2. 配置 wrangler.toml

[[queues.producers]]
binding = "ORDER_QUEUE"
queue = "order-processing-queue"

[[queues.consumers]]
queue = "order-processing-queue"

这个配置表示:

  • Worker 可以通过 env.ORDER_QUEUE 向队列发送消息;
  • 当前 Worker 同时也是这个队列的消费者。

七、编写订单提交接口

src/index.js 中编写 Worker 入口。

export default {
  async fetch(request, env, ctx) {
    const url = new URL(request.url);

    if (request.method === "POST" && url.pathname === "/orders") {
      return await createOrder(request, env);
    }

    return new Response("Not Found", { status: 404 });
  },

  async queue(batch, env, ctx) {
    for (const message of batch.messages) {
      await processOrder(message.body, env);
      message.ack();
    }
  }
};

async function createOrder(request, env) {
  try {
    const body = await request.json();

    if (!body.order_id || !body.user_email || !body.amount) {
      return Response.json(
        { error: "order_id、user_email 和 amount 为必填字段" },
        { status: 400 }
      );
    }

    const now = new Date().toISOString();

    await env.DB.prepare(`
      INSERT INTO orders (order_id, user_email, amount, status, created_at, updated_at)
      VALUES (?, ?, ?, ?, ?, ?)
    `).bind(
      body.order_id,
      body.user_email,
      body.amount,
      "pending",
      now,
      now
    ).run();

    await env.ORDER_QUEUE.send({
      order_id: body.order_id,
      user_email: body.user_email,
      amount: body.amount
    });

    await env.DB.prepare(`
      INSERT INTO workflow_logs (order_id, message, created_at)
      VALUES (?, ?, ?)
    `).bind(
      body.order_id,
      "订单已创建,并加入处理队列",
      now
    ).run();

    return Response.json({
      success: true,
      message: "订单已创建,正在异步处理",
      order_id: body.order_id
    });

  } catch (error) {
    return Response.json(
      { error: error.message },
      { status: 500 }
    );
  }
}

async function processOrder(order, env) {
  const now = new Date().toISOString();

  try {
    await env.DB.prepare(`
      INSERT INTO workflow_logs (order_id, message, created_at)
      VALUES (?, ?, ?)
    `).bind(
      order.order_id,
      "开始处理订单",
      now
    ).run();

    // 模拟调用第三方通知接口
    // 实际项目中可以替换为邮件、短信、Slack、企业微信等 API
    await fakeNotify(order);

    await env.DB.prepare(`
      UPDATE orders
      SET status = ?, updated_at = ?
      WHERE order_id = ?
    `).bind(
      "completed",
      new Date().toISOString(),
      order.order_id
    ).run();

    await env.DB.prepare(`
      INSERT INTO workflow_logs (order_id, message, created_at)
      VALUES (?, ?, ?)
    `).bind(
      order.order_id,
      "订单处理完成",
      new Date().toISOString()
    ).run();

  } catch (error) {
    await env.DB.prepare(`
      UPDATE orders
      SET status = ?, updated_at = ?
      WHERE order_id = ?
    `).bind(
      "failed",
      new Date().toISOString(),
      order.order_id
    ).run();

    await env.DB.prepare(`
      INSERT INTO workflow_logs (order_id, message, created_at)
      VALUES (?, ?, ?)
    `).bind(
      order.order_id,
      `订单处理失败:${error.message}`,
      new Date().toISOString()
    ).run();

    throw error;
  }
}

async function fakeNotify(order) {
  console.log(`通知用户 ${order.user_email},订单 ${order.order_id} 已处理`);
}

八、测试订单自动化流程

本地开发模式下运行:

npm run dev

发送测试请求:

curl -X POST http://localhost:8787/orders \
  -H "Content-Type: application/json" \
  -d '{
    "order_id": "ORD-2026-0001",
    "user_email": "user@example.com",
    "amount": 199
  }'

如果返回:

{
  "success": true,
  "message": "订单已创建,正在异步处理",
  "order_id": "ORD-2026-0001"
}

说明接口已经成功接收订单并把任务放入队列。

可以查询数据库:

wrangler d1 execute workflow_demo_db --command="SELECT * FROM orders;"

查看日志:

wrangler d1 execute workflow_demo_db --command="SELECT * FROM workflow_logs;"

九、添加定时任务:自动检查异常订单

很多自动化工作流都需要定时补偿机制。例如订单长时间处于 pending 状态,可能说明队列消费失败、第三方接口异常或数据状态未正确更新。

我们可以使用 Cron Triggers 定时扫描异常订单。

1. 配置 Cron

wrangler.toml 中添加:

[triggers]
crons = ["*/10 * * * *"]

表示每 10 分钟执行一次。

2. 添加 scheduled 处理函数

修改 Worker:

export default {
  async fetch(request, env, ctx) {
    // HTTP 入口
  },

  async queue(batch, env, ctx) {
    // 队列入口
  },

  async scheduled(event, env, ctx) {
    ctx.waitUntil(checkPendingOrders(env));
  }
};

async function checkPendingOrders(env) {
  const result = await env.DB.prepare(`
    SELECT order_id, user_email, created_at
    FROM orders
    WHERE status = 'pending'
    LIMIT 50
  `).all();

  for (const order of result.results) {
    await env.DB.prepare(`
      INSERT INTO workflow_logs (order_id, message, created_at)
      VALUES (?, ?, ?)
    `).bind(
      order.order_id,
      "定时任务检测到订单仍处于 pending 状态",
      new Date().toISOString()
    ).run();

    await env.ORDER_QUEUE.send({
      order_id: order.order_id,
      user_email: order.user_email
    });
  }
}

这样,即使某些任务因为异常没有被正常处理,定时任务也可以重新加入队列,形成自动补偿。


十、集成第三方通知:以 Telegram 为例

自动化工作流中最常见的需求之一是发送通知。下面以 Telegram Bot 为例。

1. 保存密钥

不要把 Token 写死在代码中。使用 Wrangler Secret:

wrangler secret put TELEGRAM_BOT_TOKEN
wrangler secret put TELEGRAM_CHAT_ID

2. 编写通知函数

async function sendTelegramMessage(env, text) {
  const url = `https://api.telegram.org/bot${env.TELEGRAM_BOT_TOKEN}/sendMessage`;

  const response = await fetch(url, {
    method: "POST",
    headers: {
      "Content-Type": "application/json"
    },
    body: JSON.stringify({
      chat_id: env.TELEGRAM_CHAT_ID,
      text
    })
  });

  if (!response.ok) {
    throw new Error(`Telegram 通知失败:${response.status}`);
  }

  return await response.json();
}

在订单处理成功后调用:

await sendTelegramMessage(
  env,
  `订单 ${order.order_id} 已处理完成,用户:${order.user_email}`
);

这样,每当订单处理完成,你就可以在 Telegram 中收到通知。


十一、使用 R2 保存报表或日志文件

如果你的自动化系统需要保存大量日志、报表、导出文件,建议使用 R2,而不是全部写入 D1。

1. 创建 R2 Bucket

wrangler r2 bucket create workflow-logs

2. 配置 wrangler.toml

[[r2_buckets]]
binding = "LOG_BUCKET"
bucket_name = "workflow-logs"

3. 写入日志文件

async function saveLogToR2(env, orderId, content) {
  const key = `orders/${orderId}/${Date.now()}.json`;

  await env.LOG_BUCKET.put(
    key,
    JSON.stringify(content, null, 2),
    {
      httpMetadata: {
        contentType: "application/json"
      }
    }
  );

  return key;
}

适合保存:

  • 订单处理详情;
  • 第三方 API 响应;
  • 批量导入结果;
  • 日报、周报、月报;
  • 大体积调试日志。

十二、使用 KV 管理自动化开关

有时你可能希望临时关闭某个自动化流程,例如第三方接口维护、系统升级、活动结束等。

可以使用 KV 存储配置。

1. 创建 KV Namespace

wrangler kv namespace create WORKFLOW_CONFIG

将返回的配置添加到 wrangler.toml

[[kv_namespaces]]
binding = "WORKFLOW_CONFIG"
id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

2. 设置开关

wrangler kv key put --binding=WORKFLOW_CONFIG "order_workflow_enabled" "true"

3. 在代码中读取

async function isWorkflowEnabled(env) {
  const value = await env.WORKFLOW_CONFIG.get("order_workflow_enabled");
  return value !== "false";
}

在创建订单时判断:

const enabled = await isWorkflowEnabled(env);

if (!enabled) {
  return Response.json(
    { error: "订单自动化流程暂时关闭" },
    { status: 503 }
  );
}

这种方式非常适合做:

  • 功能开关;
  • 灰度发布;
  • 临时禁用流程;
  • API 限流配置;
  • 多环境配置管理。

十三、使用 Cloudflare Workflows 编排复杂任务

当自动化流程变得越来越复杂时,仅靠 Worker + Queue 可能会出现几个问题:

  • 任务步骤太多,代码难以维护;
  • 某些步骤需要等待较长时间;
  • 需要在失败后从指定步骤重试;
  • 需要记录每一步状态;
  • 任务可能跨越多个 API、多个系统;
  • 需要人工审批或延迟执行。

这时可以考虑使用 Cloudflare Workflows

一个典型工作流可以设计为:

step 1:验证订单
step 2:扣减库存
step 3:调用支付确认接口
step 4:生成发票
step 5:发送邮件
step 6:同步 CRM
step 7:完成任务

Workflows 的价值在于:它不是简单地“执行一个函数”,而是帮助你管理多步骤状态,让自动化流程更可靠。

在复杂业务中,推荐采用以下架构:

Worker:负责接收请求和鉴权
D1:记录业务数据和状态
Queue:处理高并发异步任务
Workflow:编排长流程任务
R2:保存文件和日志
KV:保存配置和开关
Cron:执行定时检查和补偿

十四、安全最佳实践

自动化系统一旦暴露 API,就必须重视安全。

1. 校验请求来源

对于 Webhook 或第三方回调,必须验证签名。例如:

function verifyToken(request, env) {
  const token = request.headers.get("X-Webhook-Token");
  return token === env.WEBHOOK_SECRET;
}

2. 使用 Secret 管理敏感信息

不要在代码中写入:

  • API Token;
  • 数据库密码;
  • Telegram Bot Token;
  • Slack Webhook;
  • 支付密钥。

统一使用:

wrangler secret put SECRET_NAME

3. 设置速率限制

可以结合 Cloudflare WAF、Rate Limiting 或代码级限流,避免接口被恶意刷爆。

4. 保留执行日志

每个自动化流程都应记录:

  • 任务 ID;
  • 执行步骤;
  • 执行时间;
  • 成功或失败状态;
  • 错误信息;
  • 第三方 API 返回摘要。

这样便于排查问题和审计。

5. 避免重复处理

队列和 Webhook 都可能出现重复投递,因此业务代码要具备幂等性。例如订单号唯一、任务状态判断、重复请求直接返回已有结果。

const existing = await env.DB.prepare(`
  SELECT order_id FROM orders WHERE order_id = ?
`).bind(orderId).first();

if (existing) {
  return Response.json({
    success: true,
    message: "订单已存在,无需重复创建"
  });
}

十五、部署到 Cloudflare

完成开发后,执行:

wrangler deploy

部署成功后会得到一个线上地址,例如:

https://cloudflare-workflow-demo.yourname.workers.dev

测试线上接口:

curl -X POST https://cloudflare-workflow-demo.yourname.workers.dev/orders \
  -H "Content-Type: application/json" \
  -d '{
    "order_id": "ORD-2026-0002",
    "user_email": "test@example.com",
    "amount": 299
  }'

十六、常见问题与解决方案

1. Worker 请求超时怎么办?

如果任务执行时间较长,不要在 HTTP 请求中同步完成全部逻辑。推荐做法是:

  • HTTP 请求只负责接收和校验;
  • 将任务写入 Queue;
  • 后台异步消费;
  • 使用 D1 保存任务状态。

2. 第三方 API 调用失败怎么办?

建议使用:

  • Queue 自动重试;
  • D1 记录失败状态;
  • Cron 定时补偿;
  • Workflows 管理步骤级重试;
  • 通知系统发送告警。

3. D1 适合存储大量日志吗?

D1 适合结构化业务数据,但如果日志量很大,建议把详细日志写入 R2,D1 只保存索引和状态。

4. KV 可以当数据库使用吗?

不建议。KV 更适合配置、缓存、简单键值数据。对于订单、用户、任务状态等结构化数据,建议使用 D1。

5. 如何避免队列重复消费?

业务层必须做幂等处理。例如:

  • 使用唯一任务 ID;
  • 处理前检查状态;
  • 已完成任务直接跳过;
  • 数据库使用唯一索引;
  • 日志中记录处理结果。

十七、推荐自动化架构模板

如果你正在设计 Cloudflare 自动化系统,可以参考下面这个模板:

外部系统 / 用户请求
        ↓
Cloudflare Worker
        ↓
鉴权 / 参数校验 / 幂等判断
        ↓
D1 写入任务记录
        ↓
Queue 异步任务处理
        ↓
Workflows 编排复杂步骤
        ↓
调用第三方 API / AI 接口 / 支付接口
        ↓
R2 保存文件或日志
        ↓
D1 更新最终状态
        ↓
Telegram / Slack / 邮件通知
        ↓
Cron 定时检查和补偿

这个架构适合大多数自动化场景,包括:

  • 订单处理系统;
  • AI 内容生成流程;
  • 网站监控告警;
  • 数据采集与清洗;
  • 自动报表生成;
  • 用户注册自动化;
  • 文件上传处理;
  • 跨平台内容同步;
  • SaaS 后台任务系统。

十八、总结

Cloudflare 在 2026 年已经具备了非常完整的 Serverless 自动化能力。对于大多数中小型项目来说,你完全可以不购买传统服务器,而是通过 Cloudflare Workers、Queues、D1、R2、KV、Cron Triggers 和 Workflows 搭建一套稳定、低成本、可扩展的自动化系统。

本文重点介绍了:

  • Cloudflare 自动化的核心组件;
  • 如何创建 Workers 项目;
  • 如何使用 D1 保存订单和日志;
  • 如何使用 Queue 异步处理任务;
  • 如何使用 Cron 定时补偿;
  • 如何集成 Telegram 通知;
  • 如何使用 R2 保存报表和大日志;
  • 如何使用 KV 管理自动化开关;
  • 如何利用 Workflows 编排复杂流程;
  • 自动化系统的安全和幂等最佳实践。

如果你只是做简单自动化,推荐从 Worker + Cron + D1 开始;如果任务量较大,加入 Queues;如果涉及文件和日志,加入 R2;如果流程复杂、步骤较多、需要状态恢复,则进一步引入 Cloudflare Workflows

最终的核心原则是:

同步请求只做轻量工作,复杂任务交给队列和工作流;状态写入数据库,文件写入对象存储;失败可重试,流程可追踪,系统可恢复。

按照这个思路,你就可以用 Cloudflare 构建出一套真正适合生产环境的工作流自动化平台。

目录结构
全文