上一篇 下一篇 分享链接 返回 返回顶部

把定时任务搬到 Cloudflare:一套生产可用的自动化工作流实践

发布人:慈云数据-客服中心 发布时间:1 天前 阅读量:4

Cloudflare 工作流自动化教程|生产环境实测

在过去很长一段时间里,很多团队在做自动化任务时,常见方案是:服务器定时任务、GitHub Actions、Jenkins、Airflow、n8n、自建 Node.js 脚本,或者直接在业务系统里写一堆异步逻辑。
这些方案当然都能用,但一旦进入生产环境,就会遇到几个很现实的问题:

  • 服务器需要维护;
  • 定时任务失败后不好恢复;
  • 异步任务链路过长,排查困难;
  • 跨系统调用容易超时;
  • 大批量任务需要限流和重试;
  • 日志、告警、状态追踪成本较高;
  • 海外业务对网络延迟和稳定性要求更高。

如果你的业务已经在使用 Cloudflare,例如 Workers、Pages、R2、D1、KV、Queues、Cron Triggers,那么用 Cloudflare 来做工作流自动化会是一个非常自然的选择。

本文将以生产环境实测角度,介绍如何使用 Cloudflare 体系搭建一套稳定的工作流自动化方案,适合用于:

  • 定时同步数据;
  • 批量处理订单;
  • 自动生成报表;
  • 内容抓取与清洗;
  • Webhook 分发;
  • AI 内容处理任务;
  • 文件上传后的异步处理;
  • 失败自动重试;
  • 跨服务自动化编排。

说明:本文重点讲的是基于 Cloudflare Workers、Queues、Cron Triggers、R2、D1、KV 等能力组合实现工作流自动化。不同账号、套餐、区域以及 Cloudflare 产品更新情况可能有所差异,生产环境请以官方文档和实际控制台为准。


一、为什么选择 Cloudflare 做工作流自动化?

Cloudflare 给人的第一印象通常是 CDN、DNS、防护、WAF,但近几年 Cloudflare 的开发者平台已经越来越完整。尤其是 Cloudflare Workers,可以理解为运行在边缘网络上的 Serverless 运行环境。

和传统服务器相比,它有几个明显优势。

1. 不需要维护服务器

传统方式中,你可能需要购买 VPS、配置 Node.js / Python 环境、安装 PM2、配置 Nginx、写 crontab、做日志轮转、处理磁盘空间、监控进程存活。

而 Cloudflare Workers 是 Serverless 模式,你只需要关注代码本身,不需要关心服务器运维。

对于中小型团队来说,这一点非常重要。因为很多自动化任务本质上不是核心业务,却会消耗大量维护精力。

2. 全球边缘节点,网络质量好

如果你的自动化任务涉及海外 API,比如 Stripe、GitHub、OpenAI、Telegram、Shopify、Notion、Slack 等,Cloudflare Workers 的网络环境通常比普通国内服务器或廉价 VPS 更稳定。

生产环境里,网络质量直接影响任务成功率。尤其是 Webhook、支付回调、第三方接口同步场景,低延迟和稳定连接非常关键。

3. 原生支持定时任务

Cloudflare Workers 支持 Cron Triggers,可以定时执行 Worker,例如:

  • 每分钟执行一次;
  • 每小时执行一次;
  • 每天凌晨执行;
  • 每周固定时间执行。

这对于自动化非常实用。例如:

  • 每天同步订单;
  • 每小时生成统计数据;
  • 每 10 分钟检查失败任务;
  • 每天清理过期缓存;
  • 定时调用接口拉取数据。

4. 可以结合队列做异步任务

Cloudflare Queues 可以用于消息队列场景。简单来说,你可以把任务先投递到队列,然后由消费者异步处理。

这在生产环境非常重要,因为很多任务不应该在 HTTP 请求中直接执行。

例如用户上传文件后,你不应该让用户等待文件压缩、识别、转码、AI 分析全部完成,而是应该:

  1. 用户上传文件;
  2. Worker 记录任务;
  3. 投递消息到 Queue;
  4. 后台异步处理;
  5. 处理完成后更新状态;
  6. 用户再查询结果。

5. 可以搭配 R2、D1、KV 使用

Cloudflare 提供了不少存储能力:

产品 适合场景
KV 简单键值缓存、配置、临时状态
D1 SQLite 数据库,适合结构化数据
R2 对象存储,适合文件、图片、日志、报表
Durable Objects 强一致状态、连接管理、协同控制
Queues 异步任务队列
Workers Analytics Engine 分析数据写入与查询

使用这些能力可以构建一套完整的自动化系统。


二、生产环境中的典型工作流架构

我们先看一个实际可落地的工作流架构。

假设我们要实现一个“每日自动生成业务报表并发送通知”的任务,流程如下:

  1. 每天凌晨 2 点触发定时任务;
  2. 从业务 API 拉取订单数据;
  3. 清洗并统计数据;
  4. 将统计结果写入 D1;
  5. 将报表文件写入 R2;
  6. 发送通知到 Slack / 企业微信 / Telegram;
  7. 如果某一步失败,则自动重试;
  8. 失败超过次数后记录日志并告警。

对应到 Cloudflare,可以这样设计:

Cron Trigger
     ↓
Cloudflare Worker
     ↓
Cloudflare Queue
     ↓
Queue Consumer Worker
     ↓
D1 / R2 / KV
     ↓
Webhook 通知

为什么不让 Cron Trigger 直接完成所有任务?

因为生产环境中,任务可能会比较耗时。如果直接在定时触发器里完成全部逻辑,可能会遇到超时、重试困难、无法拆分任务等问题。

更合理的方式是:

  • Cron Trigger 只负责生成任务;
  • Queue 负责承载任务;
  • Consumer 负责实际处理;
  • D1 / KV 负责记录任务状态;
  • R2 负责保存文件;
  • Webhook 负责通知结果。

这种模式更加稳定,也更容易扩展。


三、准备工作

在开始之前,你需要准备以下内容:

  1. 一个 Cloudflare 账号;
  2. 已安装 Node.js;
  3. 已安装 Wrangler CLI;
  4. 一个 Cloudflare Worker 项目;
  5. 根据需要创建 D1、R2、Queue 等资源。

安装 Wrangler:

npm install -g wrangler

登录 Cloudflare:

wrangler login

创建 Worker 项目:

npm create cloudflare@latest cloudflare-workflow-demo

进入项目:

cd cloudflare-workflow-demo

本地启动:

npm run dev

部署:

npm run deploy

或者使用:

wrangler deploy

四、配置 Cron 定时任务

wrangler.toml 中配置定时任务:

name = "cloudflare-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-09-01"

[triggers]
crons = ["0 2 * * *"]

上面表示每天凌晨 2 点执行一次。

如果你想每 10 分钟执行一次:

[triggers]
crons = ["*/10 * * * *"]

Worker 中处理定时任务:

export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    console.log("Cron triggered:", event.cron);

    const task = {
      type: "daily_report",
      createdAt: new Date().toISOString(),
      retry: 0
    };

    await env.REPORT_QUEUE.send(task);
  },

  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    return new Response("Workflow service is running");
  }
};

这里有一个关键点:定时任务触发后,不直接做复杂处理,而是把任务发送到队列。


五、配置 Cloudflare Queues

创建队列:

wrangler queues create report-queue

wrangler.toml 中绑定队列:

[[queues.producers]]
queue = "report-queue"
binding = "REPORT_QUEUE"

[[queues.consumers]]
queue = "report-queue"
max_batch_size = 5
max_batch_timeout = 30

队列消费者代码:

export default {
  async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
    for (const message of batch.messages) {
      try {
        const task = message.body;

        if (task.type === "daily_report") {
          await handleDailyReport(task, env);
        }

        message.ack();
      } catch (error) {
        console.error("Task failed:", error);
        message.retry();
      }
    }
  }
};

async function handleDailyReport(task: any, env: Env) {
  const data = await fetchOrderData(env);
  const report = buildReport(data);

  await saveReportToR2(report, env);
  await saveReportRecordToD1(report, env);
  await sendNotification(report, env);
}

这里使用了 message.ack()message.retry()
在生产环境中,务必明确任务成功和失败的处理逻辑,不要让任务“静默失败”。


六、使用 D1 记录任务状态

自动化工作流最常见的问题之一是:任务到底执行到哪一步了?

如果没有状态记录,出现问题时只能翻日志,非常痛苦。

建议为每个任务建立状态表。

创建 D1 数据库:

wrangler d1 create workflow-db

创建表结构:

CREATE TABLE workflow_tasks (
  id TEXT PRIMARY KEY,
  type TEXT NOT NULL,
  status TEXT NOT NULL,
  retry_count INTEGER DEFAULT 0,
  payload TEXT,
  result TEXT,
  error_message TEXT,
  created_at TEXT,
  updated_at TEXT
);

绑定 D1:

[[d1_databases]]
binding = "DB"
database_name = "workflow-db"
database_id = "你的 database_id"

写入任务状态:

async function createTaskRecord(env: Env, task: any) {
  const id = crypto.randomUUID();
  const now = new Date().toISOString();

  await env.DB.prepare(`
    INSERT INTO workflow_tasks 
    (id, type, status, retry_count, payload, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?, ?, ?)
  `).bind(
    id,
    task.type,
    "pending",
    0,
    JSON.stringify(task),
    now,
    now
  ).run();

  return id;
}

更新任务状态:

async function updateTaskStatus(env: Env, id: string, status: string, result?: any, error?: any) {
  await env.DB.prepare(`
    UPDATE workflow_tasks
    SET status = ?, result = ?, error_message = ?, updated_at = ?
    WHERE id = ?
  `).bind(
    status,
    result ? JSON.stringify(result) : null,
    error ? String(error) : null,
    new Date().toISOString(),
    id
  ).run();
}

推荐使用以下状态:

状态 含义
pending 已创建,等待执行
running 正在执行
success 执行成功
failed 执行失败
retrying 等待重试
cancelled 已取消

生产环境中,任务状态非常重要。它不仅用于排查问题,也可以作为后台管理页面的数据来源。


七、使用 R2 保存报表和文件

如果你的工作流会生成文件,例如:

  • CSV 报表;
  • JSON 数据快照;
  • PDF 文件;
  • 图片处理结果;
  • AI 分析结果;
  • 日志归档文件;

那么建议使用 R2 存储。

创建 R2 Bucket:

wrangler r2 bucket create workflow-reports

绑定 R2:

[[r2_buckets]]
binding = "REPORT_BUCKET"
bucket_name = "workflow-reports"

保存报表:

async function saveReportToR2(report: any, env: Env) {
  const date = new Date().toISOString().slice(0, 10);
  const key = `reports/${date}/daily-report.json`;

  await env.REPORT_BUCKET.put(
    key,
    JSON.stringify(report, null, 2),
    {
      httpMetadata: {
        contentType: "application/json"
      }
    }
  );

  return key;
}

如果是 CSV:

function toCSV(rows: any[]) {
  if (!rows.length) return "";

  const headers = Object.keys(rows[0]);
  const csv = [
    headers.join(","),
    ...rows.map(row => headers.map(h => JSON.stringify(row[h] ?? "")).join(","))
  ].join("\n");

  return csv;
}

上传 CSV:

await env.REPORT_BUCKET.put("reports/daily.csv", csv, {
  httpMetadata: {
    contentType: "text/csv"
  }
});

八、失败重试与幂等设计

生产环境中,失败是常态,不是例外。

常见失败包括:

  • 第三方 API 超时;
  • 网络波动;
  • 数据库写入失败;
  • Webhook 发送失败;
  • 任务执行超时;
  • 队列重复投递;
  • 请求被限流;
  • 外部系统返回 500。

因此,工作流必须考虑两个核心问题:

  1. 如何重试?
  2. 如何避免重复执行造成脏数据?

1. 重试机制

Cloudflare Queues 本身支持失败重试,但你仍然应该在业务层做重试次数控制。

例如:

async function processTask(message: any, env: Env) {
  const task = message.body;

  try {
    await updateTaskStatus(env, task.id, "running");
    await handleDailyReport(task, env);
    await updateTaskStatus(env, task.id, "success");
  } catch (error) {
    const retryCount = task.retryCount || 0;

    if (retryCount >= 3) {
      await updateTaskStatus(env, task.id, "failed", null, error);
      throw error;
    }

    await updateTaskStatus(env, task.id, "retrying", null, error);

    await env.REPORT_QUEUE.send({
      ...task,
      retryCount: retryCount + 1
    });
  }
}

2. 幂等设计

所谓幂等,就是同一个任务执行多次,结果仍然一致,不会产生重复数据。

例如生成日报时,可以使用固定任务 ID:

const date = new Date().toISOString().slice(0, 10);
const taskId = `daily_report_${date}`;

插入数据库时使用 INSERT OR IGNORE

INSERT OR IGNORE INTO workflow_tasks 
(id, type, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?);

这样即使定时任务被重复触发,也不会重复创建任务。

对于报表文件,也可以使用固定路径:

reports/2025-01-01/daily-report.json

这样重复执行只会覆盖同一个文件,而不会生成一堆重复文件。


九、生产环境实测经验

下面是实际在生产环境使用 Cloudflare 自动化工作流时,总结出的经验。

1. 不要把所有逻辑写在一个 Worker 里

一开始很多人会把定时任务、HTTP 接口、队列消费、数据处理、通知发送全部写在同一个文件里。短期看很方便,长期维护会非常痛苦。

建议按模块拆分:

src/
  index.ts
  handlers/
    cron.ts
    queue.ts
    http.ts
  services/
    report.ts
    notification.ts
    storage.ts
    database.ts
  utils/
    retry.ts
    logger.ts

这样的结构更适合生产维护。

2. 外部 API 一定要设置超时

不要直接裸写:

await fetch(url)

建议封装超时:

async function fetchWithTimeout(url: string, options: RequestInit = {}, timeout = 10000) {
  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), timeout);

  try {
    return await fetch(url, {
      ...options,
      signal: controller.signal
    });
  } finally {
    clearTimeout(timer);
  }
}

否则某些外部接口异常时,会拖慢整个任务。

3. 日志要包含 taskId

生产排查问题时,最重要的是能快速定位某个任务的完整链路。

建议所有日志都带上 taskId:

console.log(`[task:${task.id}] start processing`);
console.log(`[task:${task.id}] fetch orders success`);
console.log(`[task:${task.id}] report saved`);

这样在 Cloudflare Dashboard 里查看日志时更容易过滤。

4. Webhook 通知要降级处理

很多自动化任务最终会发送通知,例如 Slack、Telegram、企业微信、飞书。

但是通知失败不一定代表主任务失败。
例如报表已经生成成功,只是通知没发出去,这种情况可以单独记录通知失败,而不要把整个任务标记为失败。

建议区分主流程和附加流程:

try {
  await sendNotification(report, env);
} catch (error) {
  console.error("Notification failed:", error);
}

当然,如果通知本身就是关键业务流程,那就另当别论。

5. 注意 API 限流

很多第三方 API 都有限流,比如每分钟 60 次、每秒 10 次等。
如果队列消费速度太快,可能会导致大量请求被拒绝。

解决方法:

  • 降低 max_batch_size
  • 增加任务间隔;
  • 对请求做分批;
  • 在业务层实现简单限速;
  • 失败后指数退避重试。

6. 避免大任务一次性执行

如果你要处理 10 万条数据,不建议一个任务全部处理完。
更好的方式是拆分成多个小任务。

例如:

sync_orders_2025_01_01_page_1
sync_orders_2025_01_01_page_2
sync_orders_2025_01_01_page_3

每个任务只处理一页数据。
这样即使某一页失败,也可以单独重试,而不是整个任务重来。


十、一个完整的工作流示例

下面给出一个简化版完整示例。

export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    const date = new Date().toISOString().slice(0, 10);
    const taskId = `daily_report_${date}`;

    await env.DB.prepare(`
      INSERT OR IGNORE INTO workflow_tasks
      (id, type, status, retry_count, created_at, updated_at)
      VALUES (?, ?, ?, ?, ?, ?)
    `).bind(
      taskId,
      "daily_report",
      "pending",
      0,
      new Date().toISOString(),
      new Date().toISOString()
    ).run();

    await env.REPORT_QUEUE.send({
      id: taskId,
      type: "daily_report",
      date,
      retryCount: 0
    });

    console.log(`[task:${taskId}] queued`);
  },

  async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
    for (const message of batch.messages) {
      const task = message.body;

      try {
        await env.DB.prepare(`
          UPDATE workflow_tasks
          SET status = ?, updated_at = ?
          WHERE id = ?
        `).bind("running", new Date().toISOString(), task.id).run();

        const orders = await fetchOrders(task.date, env);
        const report = buildDailyReport(orders);

        const key = `reports/${task.date}/daily-report.json`;

        await env.REPORT_BUCKET.put(key, JSON.stringify(report, null, 2), {
          httpMetadata: {
            contentType: "application/json"
          }
        });

        await env.DB.prepare(`
          UPDATE workflow_tasks
          SET status = ?, result = ?, updated_at = ?
          WHERE id = ?
        `).bind(
          "success",
          JSON.stringify({ r2Key: key, total: report.total }),
          new Date().toISOString(),
          task.id
        ).run();

        try {
          await sendTelegramMessage(`日报生成成功:${task.date},订单数:${report.total}`, env);
        } catch (noticeError) {
          console.error(`[task:${task.id}] notice failed`, noticeError);
        }

        message.ack();
      } catch (error) {
        console.error(`[task:${task.id}] failed`, error);

        const retryCount = task.retryCount || 0;

        if (retryCount >= 3) {
          await env.DB.prepare(`
            UPDATE workflow_tasks
            SET status = ?, error_message = ?, updated_at = ?
            WHERE id = ?
          `).bind(
            "failed",
            String(error),
            new Date().toISOString(),
            task.id
          ).run();

          message.ack();
        } else {
          await env.DB.prepare(`
            UPDATE workflow_tasks
            SET status = ?, retry_count = ?, error_message = ?, updated_at = ?
            WHERE id = ?
          `).bind(
            "retrying",
            retryCount + 1,
            String(error),
            new Date().toISOString(),
            task.id
          ).run();

          await env.REPORT_QUEUE.send({
            ...task,
            retryCount: retryCount + 1
          });

          message.ack();
        }
      }
    }
  },

  async fetch(request: Request, env: Env) {
    const url = new URL(request.url);

    if (url.pathname === "/health") {
      return Response.json({ status: "ok" });
    }

    return new Response("Not Found", { status: 404 });
  }
};

async function fetchOrders(date: string, env: Env) {
  const res = await fetch(`${env.API_BASE_URL}/orders?date=${date}`, {
    headers: {
      Authorization: `Bearer ${env.API_TOKEN}`
    }
  });

  if (!res.ok) {
    throw new Error(`Fetch orders failed: ${res.status}`);
  }

  return await res.json();
}

function buildDailyReport(orders: any[]) {
  return {
    total: orders.length,
    amount: orders.reduce((sum, item) => sum + Number(item.amount || 0), 0),
    generatedAt: new Date().toISOString()
  };
}

async function sendTelegramMessage(text: string, env: Env) {
  const url = `https://api.telegram.org/bot${env.TELEGRAM_BOT_TOKEN}/sendMessage`;

  const res = await fetch(url, {
    method: "POST",
    headers: {
      "Content-Type": "application/json"
    },
    body: JSON.stringify({
      chat_id: env.TELEGRAM_CHAT_ID,
      text
    })
  });

  if (!res.ok) {
    throw new Error(`Telegram notice failed: ${res.status}`);
  }
}

这个示例虽然简化,但已经包含生产环境中最重要的几个点:

  • 定时触发;
  • 队列异步处理;
  • D1 记录状态;
  • R2 保存结果;
  • 通知发送;
  • 失败重试;
  • 幂等任务 ID;
  • 健康检查接口。

十一、上线前检查清单

生产环境上线前,建议逐项检查:

  • [ ] Cron 表达式是否正确;
  • [ ] 时区是否符合预期;
  • [ ] 队列是否绑定成功;
  • [ ] D1 表结构是否已初始化;
  • [ ] R2 Bucket 是否存在;
  • [ ] 环境变量是否配置完整;
  • [ ] API Token 是否使用 Secret 管理;
  • [ ] 外部 API 是否设置超时;
  • [ ] 任务是否具备幂等性;
  • [ ] 重试次数是否有限制;
  • [ ] 失败后是否有告警;
  • [ ] 日志是否包含 taskId;
  • [ ] 是否有 /health 健康检查接口;
  • [ ] 是否有手动触发任务的入口;
  • [ ] 是否区分测试环境和生产环境。

配置 Secret:

wrangler secret put API_TOKEN
wrangler secret put TELEGRAM_BOT_TOKEN
wrangler secret put TELEGRAM_CHAT_ID

不要把密钥直接写进代码或 wrangler.toml


十二、适合 Cloudflare 自动化的场景

根据实测经验,Cloudflare 非常适合以下自动化任务:

1. Webhook 中转与分发

比如 GitHub、Stripe、Shopify、Paddle、Lemon Squeezy 的 Webhook,可以先进入 Worker,再分发到内部系统。

这样可以实现:

  • 签名校验;
  • 请求过滤;
  • 重试;
  • 日志记录;
  • 安全隔离;
  • 多目标转发。

2. 定时报表

每天生成销售日报、访问日报、订单日报,然后保存到 R2 并发送通知。

3. 文件异步处理

用户上传文件后,通过队列异步处理,例如:

  • 图片压缩;
  • JSON 清洗;
  • CSV 导入;
  • 日志分析;
  • AI 摘要;
  • 文档解析。

4. 轻量级数据同步

比如每小时从第三方系统拉取数据,写入 D1 或转发到业务服务器。

5. 自动告警

定时检测接口状态、库存数量、余额、证书过期时间、订单异常等,一旦触发条件就发送通知。


十三、不适合的场景

Cloudflare 虽然强大,但并不是所有任务都适合。

以下场景需要谨慎:

  1. 超长时间运行任务
    如果任务需要连续运行几十分钟甚至几小时,Workers 并不是最佳选择。

  2. 重 CPU 计算任务
    例如视频转码、大模型本地推理、大规模压缩计算,更适合专用服务器或云计算平台。

  3. 复杂 DAG 工作流
    如果你需要非常复杂的任务依赖、可视化编排、人工审批、任务回滚,Airflow、Temporal、Prefect 等专业工作流系统可能更合适。

  4. 强事务数据库场景
    D1 适合轻量结构化数据,但如果你的业务要求高并发复杂事务,仍然需要评估传统数据库。


十四、总结

Cloudflare 工作流自动化的核心思路不是“把所有事情都塞进 Worker”,而是充分利用 Cloudflare 平台能力进行拆分:

  • Cron Triggers 做定时触发;
  • Workers 做业务入口和调度;
  • Queues 做异步解耦;
  • D1 记录任务状态;
  • R2 保存文件结果;
  • KV 存储配置和轻量缓存;
  • Webhook 做通知和系统联动。

从生产环境实测来看,这套方案非常适合中小型自动化系统,尤其适合需要快速上线、低维护成本、全球网络稳定、与第三方 API 高频交互的业务。

但要想稳定运行,必须重视以下几点:

  1. 任务要拆小;
  2. 状态要可追踪;
  3. 失败要可重试;
  4. 重复执行要幂等;
  5. 日志要可定位;
  6. 密钥要安全管理;
  7. 关键任务要有告警。

如果你正在维护一堆服务器脚本、crontab 或 GitHub Actions 自动化任务,那么可以尝试把其中一部分迁移到 Cloudflare Workers 体系。
对于很多轻量到中等规模的自动化需求,Cloudflare 已经足够稳定、灵活,并且维护成本非常低。

目录结构
全文