上一篇 下一篇 分享链接 返回 返回顶部

用 Cloudflare Workers 搭一个自动化任务系统:队列、KV 与源码实战

发布人:慈云数据-客服中心 发布时间:1 天前 阅读量:4

Cloudflare 工作流自动化教程|附源码

在现代 Web 开发与运维体系中,“自动化”几乎已经成为提升效率、降低错误率、缩短交付周期的必备能力。无论是自动部署、定时任务、接口编排、数据同步,还是告警通知、图片处理、订单状态流转,只要流程中存在重复步骤,都可以通过工作流自动化来优化。

Cloudflare 近年来已经不再只是一个 CDN 和 DNS 服务商,它提供了包括 Workers、Pages、KV、D1、R2、Queues、Cron Triggers、Workflows 等一整套边缘计算能力。借助这些能力,我们可以在 Cloudflare 的全球边缘网络上构建轻量、稳定、低成本的自动化系统。

本文将以一个完整示例为主线,介绍如何使用 Cloudflare Workers 实现一个“工作流自动化”项目:通过接口触发任务,将任务写入队列,由 Worker 后台消费处理,并在处理完成后记录状态。文章会包含项目结构、配置方式、核心源码、部署步骤以及扩展建议。


一、什么是 Cloudflare 工作流自动化?

所谓“工作流自动化”,本质上是把一个业务流程拆分成多个可执行步骤,并通过程序自动完成这些步骤。

例如:

  1. 用户提交表单;
  2. 系统校验数据;
  3. 生成任务;
  4. 写入消息队列;
  5. 后台异步处理;
  6. 调用第三方 API;
  7. 保存处理结果;
  8. 发送通知。

如果这些步骤全部由人工操作,不仅效率低,而且容易出错。而使用 Cloudflare Workers 这类无服务器平台,我们可以把这些步骤部署到边缘节点,由系统自动完成。

Cloudflare 的优势主要包括:

  • 无需维护服务器:不用购买 VPS,也不用配置 Nginx、PM2、Docker 等;
  • 全球边缘网络:请求就近执行,延迟低;
  • 按量计费:小项目成本非常低;
  • 生态完整:Workers、KV、D1、R2、Queues 可组合使用;
  • 适合自动化任务:支持 HTTP 触发、定时触发、队列消费等模式。

二、本文要实现的功能

本文将实现一个简单但完整的自动化工作流系统。

假设我们有一个内容处理场景:

用户通过 API 提交一段文本,系统自动创建任务,将任务加入队列,后台 Worker 异步处理文本,最后保存任务状态和结果。

整体流程如下:

客户端请求
   ↓
Cloudflare Worker API
   ↓
参数校验
   ↓
写入任务状态到 KV
   ↓
发送消息到 Cloudflare Queue
   ↓
Queue Consumer 异步消费
   ↓
执行文本处理逻辑
   ↓
更新任务结果到 KV
   ↓
客户端查询任务状态

我们将实现三个核心接口:

接口 方法 说明
/api/tasks POST 创建自动化任务
/api/tasks/:id GET 查询任务状态
/api/health GET 健康检查

同时,我们会使用:

  • Cloudflare Workers:运行 API 和消费任务;
  • Cloudflare Queues:实现异步任务队列;
  • Cloudflare KV:保存任务状态和执行结果;
  • Wrangler:本地开发与部署工具。

三、准备工作

在开始之前,你需要准备以下内容:

1. Cloudflare 账号

前往 Cloudflare 官网注册账号:

https://dash.cloudflare.com/

2. 安装 Node.js

建议使用 Node.js 18 或更高版本。

查看版本:

node -v
npm -v

3. 安装 Wrangler

Wrangler 是 Cloudflare Workers 的官方 CLI 工具。

npm install -g wrangler

登录 Cloudflare:

wrangler login

登录成功后,Wrangler 会在本地保存认证信息。


四、创建项目

使用 npm 创建项目目录:

mkdir cloudflare-workflow-demo
cd cloudflare-workflow-demo
npm init -y

安装必要依赖:

npm install nanoid
npm install -D wrangler typescript

初始化 Worker 项目也可以使用:

npm create cloudflare@latest

不过为了便于理解,本文采用手动创建文件的方式。


五、项目目录结构

最终项目结构如下:

cloudflare-workflow-demo
├── package.json
├── wrangler.toml
├── tsconfig.json
└── src
    ├── index.ts
    ├── router.ts
    ├── task.ts
    └── types.ts

各文件作用如下:

文件 说明
index.ts Worker 入口文件,处理 fetch 和 queue
router.ts 简易路由分发
task.ts 任务创建、查询、处理逻辑
types.ts 类型定义
wrangler.toml Cloudflare 配置文件

六、配置 wrangler.toml

首先创建 wrangler.toml

name = "cloudflare-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-06-01"

[vars]
APP_NAME = "Cloudflare Workflow Demo"

[[kv_namespaces]]
binding = "TASK_KV"
id = "替换为你的KV命名空间ID"

[[queues.producers]]
binding = "TASK_QUEUE"
queue = "task-workflow-queue"

[[queues.consumers]]
queue = "task-workflow-queue"
max_batch_size = 5
max_batch_timeout = 10

这里有几个重点:

  • TASK_KV:在 Worker 中访问 KV 的绑定名称;
  • TASK_QUEUE:发送任务消息时使用的队列绑定;
  • task-workflow-queue:Cloudflare Queue 的名称;
  • max_batch_size:一次最多消费多少条消息;
  • max_batch_timeout:最多等待多少秒后触发消费。

七、创建 Cloudflare KV

执行命令创建 KV 命名空间:

wrangler kv namespace create TASK_KV

创建成功后会看到类似输出:

🌀 Creating namespace with title "cloudflare-workflow-demo-TASK_KV"
✨ Success!
Add the following to your configuration file in your kv_namespaces array:
{ binding = "TASK_KV", id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }

将输出中的 id 填入 wrangler.toml

如果你还需要预览环境,可以创建 preview namespace:

wrangler kv namespace create TASK_KV --preview

八、创建 Cloudflare Queue

创建队列:

wrangler queues create task-workflow-queue

创建完成后,确认 wrangler.toml 中配置的队列名称与这里一致。

查看队列列表:

wrangler queues list

九、编写类型定义

创建 src/types.ts

export type TaskStatus = "pending" | "processing" | "success" | "failed";

export interface CreateTaskPayload {
  text: string;
  action?: "uppercase" | "lowercase" | "summary";
}

export interface TaskRecord {
  id: string;
  text: string;
  action: "uppercase" | "lowercase" | "summary";
  status: TaskStatus;
  result?: string;
  error?: string;
  createdAt: string;
  updatedAt: string;
}

export interface QueueMessageBody {
  taskId: string;
}

export interface Env {
  APP_NAME: string;
  TASK_KV: KVNamespace;
  TASK_QUEUE: Queue;
}

这里定义了任务状态、创建任务的请求体、任务记录结构、队列消息体以及 Cloudflare Worker 的环境变量类型。

任务状态说明:

状态 含义
pending 任务已创建,等待处理
processing 任务正在处理
success 任务处理成功
failed 任务处理失败

十、编写任务逻辑

创建 src/task.ts

import { nanoid } from "nanoid";
import type {
  CreateTaskPayload,
  Env,
  QueueMessageBody,
  TaskRecord,
} from "./types";

function now() {
  return new Date().toISOString();
}

function getTaskKey(taskId: string) {
  return `task:${taskId}`;
}

export async function createTask(env: Env, payload: CreateTaskPayload) {
  if (!payload || typeof payload.text !== "string") {
    return jsonResponse(
      {
        success: false,
        message: "字段 text 必须是字符串",
      },
      400
    );
  }

  const text = payload.text.trim();

  if (!text) {
    return jsonResponse(
      {
        success: false,
        message: "字段 text 不能为空",
      },
      400
    );
  }

  if (text.length > 5000) {
    return jsonResponse(
      {
        success: false,
        message: "text 长度不能超过 5000 个字符",
      },
      400
    );
  }

  const action = payload.action || "uppercase";

  if (!["uppercase", "lowercase", "summary"].includes(action)) {
    return jsonResponse(
      {
        success: false,
        message: "action 只支持 uppercase、lowercase、summary",
      },
      400
    );
  }

  const taskId = nanoid();
  const record: TaskRecord = {
    id: taskId,
    text,
    action,
    status: "pending",
    createdAt: now(),
    updatedAt: now(),
  };

  await env.TASK_KV.put(getTaskKey(taskId), JSON.stringify(record));

  const message: QueueMessageBody = {
    taskId,
  };

  await env.TASK_QUEUE.send(message);

  return jsonResponse({
    success: true,
    message: "任务已创建",
    data: {
      taskId,
      status: record.status,
    },
  });
}

export async function getTask(env: Env, taskId: string) {
  if (!taskId) {
    return jsonResponse(
      {
        success: false,
        message: "缺少任务 ID",
      },
      400
    );
  }

  const recordText = await env.TASK_KV.get(getTaskKey(taskId));

  if (!recordText) {
    return jsonResponse(
      {
        success: false,
        message: "任务不存在",
      },
      404
    );
  }

  const record = JSON.parse(recordText) as TaskRecord;

  return jsonResponse({
    success: true,
    data: record,
  });
}

export async function processTask(env: Env, taskId: string) {
  const key = getTaskKey(taskId);
  const recordText = await env.TASK_KV.get(key);

  if (!recordText) {
    throw new Error(`任务不存在:${taskId}`);
  }

  const record = JSON.parse(recordText) as TaskRecord;

  if (record.status === "success") {
    return;
  }

  record.status = "processing";
  record.updatedAt = now();

  await env.TASK_KV.put(key, JSON.stringify(record));

  try {
    const result = await runAction(record.text, record.action);

    record.status = "success";
    record.result = result;
    record.updatedAt = now();

    await env.TASK_KV.put(key, JSON.stringify(record));
  } catch (error) {
    record.status = "failed";
    record.error = error instanceof Error ? error.message : String(error);
    record.updatedAt = now();

    await env.TASK_KV.put(key, JSON.stringify(record));

    throw error;
  }
}

async function runAction(
  text: string,
  action: "uppercase" | "lowercase" | "summary"
) {
  await sleep(500);

  if (action === "uppercase") {
    return text.toUpperCase();
  }

  if (action === "lowercase") {
    return text.toLowerCase();
  }

  if (action === "summary") {
    return mockSummary(text);
  }

  return text;
}

function mockSummary(text: string) {
  if (text.length <= 80) {
    return text;
  }

  return `${text.slice(0, 80)}...`;
}

function sleep(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

export function jsonResponse(data: unknown, status = 200) {
  return new Response(JSON.stringify(data, null, 2), {
    status,
    headers: {
      "content-type": "application/json;charset=UTF-8",
      "access-control-allow-origin": "*",
      "access-control-allow-methods": "GET,POST,OPTIONS",
      "access-control-allow-headers": "content-type,authorization",
    },
  });
}

这段代码完成了几个关键动作:

  1. 接收并校验用户输入;
  2. 创建任务 ID;
  3. 将任务初始状态写入 KV;
  4. 将任务 ID 发送到 Queue;
  5. 队列消费时读取任务;
  6. 执行具体动作;
  7. 更新任务状态和结果。

在真实项目中,runAction 可以替换为更复杂的逻辑,例如调用 OpenAI、发送邮件、同步数据库、处理图片、请求第三方接口等。


十一、编写路由逻辑

创建 src/router.ts

import type { Env } from "./types";
import { createTask, getTask, jsonResponse } from "./task";

export async function handleRequest(request: Request, env: Env) {
  const url = new URL(request.url);
  const method = request.method.toUpperCase();

  if (method === "OPTIONS") {
    return jsonResponse({
      success: true,
    });
  }

  if (url.pathname === "/api/health" && method === "GET") {
    return jsonResponse({
      success: true,
      message: "ok",
      app: env.APP_NAME,
      timestamp: new Date().toISOString(),
    });
  }

  if (url.pathname === "/api/tasks" && method === "POST") {
    let payload;

    try {
      payload = await request.json();
    } catch {
      return jsonResponse(
        {
          success: false,
          message: "请求体必须是合法 JSON",
        },
        400
      );
    }

    return createTask(env, payload);
  }

  const taskMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)$/);

  if (taskMatch && method === "GET") {
    const taskId = taskMatch[1];
    return getTask(env, taskId);
  }

  return jsonResponse(
    {
      success: false,
      message: "接口不存在",
    },
    404
  );
}

这里没有使用任何第三方 Web 框架,而是直接基于 RequestURL 实现了一个轻量路由。对于小型自动化工具来说,这种写法足够清晰,也减少了依赖。

如果你的项目接口较多,也可以考虑使用:

  • Hono;
  • itty-router;
  • hono-openapi;
  • zod;
  • valibot。

十二、编写 Worker 入口

创建 src/index.ts

import type { Env, QueueMessageBody } from "./types";
import { handleRequest } from "./router";
import { processTask } from "./task";

export default {
  async fetch(request: Request, env: Env): Promise {
    return handleRequest(request, env);
  },

  async queue(
    batch: MessageBatch,
    env: Env
  ): Promise {
    for (const message of batch.messages) {
      try {
        await processTask(env, message.body.taskId);
        message.ack();
      } catch (error) {
        console.error("任务处理失败:", error);
        message.retry();
      }
    }
  },
};

Cloudflare Workers 支持多个事件入口:

  • fetch:处理 HTTP 请求;
  • scheduled:处理定时任务;
  • queue:处理队列消息;
  • email:处理邮件事件。

本文示例中,我们同时使用了 fetchqueue

当用户调用 /api/tasks 创建任务后,Worker 会把消息发送到队列;Cloudflare Queue 随后触发同一个 Worker 的 queue 方法进行异步消费。


十三、配置 TypeScript

创建 tsconfig.json

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ESNext",
    "moduleResolution": "Bundler",
    "lib": ["ES2022", "WebWorker"],
    "types": ["@cloudflare/workers-types"],
    "strict": true,
    "skipLibCheck": true,
    "noEmit": true
  },
  "include": ["src"]
}

安装 Cloudflare 类型:

npm install -D @cloudflare/workers-types

十四、配置 package.json

修改 package.json

{
  "name": "cloudflare-workflow-demo",
  "version": "1.0.0",
  "description": "Cloudflare workflow automation demo",
  "type": "module",
  "scripts": {
    "dev": "wrangler dev",
    "deploy": "wrangler deploy",
    "tail": "wrangler tail"
  },
  "dependencies": {
    "nanoid": "^5.0.7"
  },
  "devDependencies": {
    "@cloudflare/workers-types": "^4.20240620.0",
    "typescript": "^5.4.5",
    "wrangler": "^3.60.0"
  }
}

十五、本地运行

启动本地开发环境:

npm run dev

如果配置正确,终端会显示本地访问地址,例如:

http://localhost:8787

测试健康检查接口:

curl http://localhost:8787/api/health

返回示例:

{
  "success": true,
  "message": "ok",
  "app": "Cloudflare Workflow Demo",
  "timestamp": "2024-06-01T12:00:00.000Z"
}

十六、创建任务

发送 POST 请求:

curl -X POST http://localhost:8787/api/tasks \
  -H "content-type: application/json" \
  -d '{
    "text": "Hello Cloudflare Workflow Automation",
    "action": "uppercase"
  }'

返回示例:

{
  "success": true,
  "message": "任务已创建",
  "data": {
    "taskId": "abc123",
    "status": "pending"
  }
}

保存 taskId,稍后用于查询任务状态。


十七、查询任务状态

curl http://localhost:8787/api/tasks/abc123

如果任务还未处理完成,可能返回:

{
  "success": true,
  "data": {
    "id": "abc123",
    "text": "Hello Cloudflare Workflow Automation",
    "action": "uppercase",
    "status": "processing",
    "createdAt": "2024-06-01T12:00:00.000Z",
    "updatedAt": "2024-06-01T12:00:01.000Z"
  }
}

处理完成后返回:

{
  "success": true,
  "data": {
    "id": "abc123",
    "text": "Hello Cloudflare Workflow Automation",
    "action": "uppercase",
    "status": "success",
    "result": "HELLO CLOUDFLARE WORKFLOW AUTOMATION",
    "createdAt": "2024-06-01T12:00:00.000Z",
    "updatedAt": "2024-06-01T12:00:02.000Z"
  }
}

十八、部署到 Cloudflare

确认已经登录 Wrangler:

wrangler whoami

部署:

npm run deploy

部署成功后,会看到类似地址:

https://cloudflare-workflow-demo.yourname.workers.dev

在线测试:

curl https://cloudflare-workflow-demo.yourname.workers.dev/api/health

如果返回正常,说明部署成功。


十九、加入定时触发能力

除了 API 触发,很多自动化场景还需要定时执行。例如:

  • 每天凌晨同步数据;
  • 每 10 分钟检查订单状态;
  • 每小时清理过期任务;
  • 每周生成报表;
  • 定时抓取 RSS 或 Sitemap。

Cloudflare Workers 支持 Cron Triggers。

wrangler.toml 添加:

[triggers]
crons = ["*/10 * * * *"]

表示每 10 分钟执行一次。

然后修改 src/index.ts

import type { Env, QueueMessageBody } from "./types";
import { handleRequest } from "./router";
import { processTask } from "./task";

export default {
  async fetch(request: Request, env: Env): Promise {
    return handleRequest(request, env);
  },

  async queue(
    batch: MessageBatch,
    env: Env
  ): Promise {
    for (const message of batch.messages) {
      try {
        await processTask(env, message.body.taskId);
        message.ack();
      } catch (error) {
        console.error("任务处理失败:", error);
        message.retry();
      }
    }
  },

  async scheduled(
    event: ScheduledEvent,
    env: Env,
    ctx: ExecutionContext
  ): Promise {
    ctx.waitUntil(runScheduledJob(env));
  },
};

async function runScheduledJob(env: Env) {
  console.log("定时任务触发:", new Date().toISOString());

  // 示例:可以在这里扫描外部 API、创建任务、发送队列消息等
}

这样,我们就同时拥有了:

  • HTTP API 触发;
  • Queue 异步消费;
  • Cron 定时触发。

这已经可以覆盖大多数轻量工作流自动化需求。


二十、实际业务扩展示例

1. 自动发送邮件

runAction 中可以调用邮件服务商接口,例如 Resend、SendGrid、Mailgun:

await fetch("https://api.resend.com/emails", {
  method: "POST",
  headers: {
    authorization: `Bearer ${env.RESEND_API_KEY}`,
    "content-type": "application/json",
  },
  body: JSON.stringify({
    from: "noreply@example.com",
    to: "user@example.com",
    subject: "任务完成通知",
    html: "

你的任务已经处理完成。

", }), });

2. 自动调用 AI 接口

可以将文本处理替换为 AI 总结:

const response = await fetch("https://api.openai.com/v1/chat/completions", {
  method: "POST",
  headers: {
    authorization: `Bearer ${env.OPENAI_API_KEY}`,
    "content-type": "application/json",
  },
  body: JSON.stringify({
    model: "gpt-4o-mini",
    messages: [
      {
        role: "user",
        content: `请总结以下内容:${text}`,
      },
    ],
  }),
});

3. 自动同步数据

定时从一个接口拉取数据,再写入 D1 数据库或外部系统:

const res = await fetch("https://example.com/api/orders");
const orders = await res.json();

for (const order of orders) {
  await env.TASK_QUEUE.send({
    taskId: order.id,
  });
}

4. 自动清理过期任务

KV 可以设置过期时间,也可以通过定时任务清理旧数据:

await env.TASK_KV.put(key, JSON.stringify(record), {
  expirationTtl: 60 * 60 * 24 * 7,
});

表示任务记录 7 天后自动过期。


二十一、错误处理与重试机制

在自动化工作流中,错误处理非常重要。常见问题包括:

  • 第三方 API 超时;
  • 网络失败;
  • 请求频率限制;
  • 数据格式不符合预期;
  • 队列消息重复消费;
  • 某一步处理成功但后续失败。

Cloudflare Queues 支持消息重试。本文中的代码:

message.retry();

表示当前消息处理失败后交给队列重试。

不过在生产环境中,建议加入以下机制:

1. 幂等处理

同一个任务可能被重复消费,所以任务处理逻辑应当可以重复执行而不产生错误结果。

例如:

if (record.status === "success") {
  return;
}

这就是一种简单的幂等判断。

2. 最大重试次数

可以在任务记录中增加 retryCount 字段,超过次数后标记为失败,避免无限重试。

3. 死信队列

对于无法处理的消息,可以发送到另一个队列,例如:

task-dead-letter-queue

后续由人工或专门 Worker 处理。

4. 结构化日志

建议打印包含任务 ID、状态、耗时的信息,方便排查问题:

console.log(JSON.stringify({
  event: "task_completed",
  taskId,
  duration,
  status: "success"
}));

二十二、安全建议

自动化接口通常会触发实际业务动作,因此一定要重视安全。

1. 加入 API Token

可以在请求头中要求携带 Token:

const token = request.headers.get("authorization");

if (token !== `Bearer ${env.API_TOKEN}`) {
  return jsonResponse(
    {
      success: false,
      message: "Unauthorized",
    },
    401
  );
}

然后在 wrangler.toml 或 Cloudflare Dashboard 中配置环境变量。

更推荐使用 Secret:

wrangler secret put API_TOKEN

2. 限制请求大小

本文已经对 text.length 做了限制。对于文件、图片、JSON 批量数据,也应该控制大小,防止滥用。

3. 校验来源

如果接口只供你的前端使用,可以结合:

  • Cloudflare Access;
  • Turnstile;
  • 自定义签名;
  • IP 白名单;
  • HMAC 校验。

4. 不要把密钥写进源码

所有 API Key、Token、Webhook Secret 都应使用 wrangler secret put 管理。


二十三、KV、D1、R2 如何选择?

在 Cloudflare 生态中,不同存储适合不同场景。

产品 适合场景
KV 简单键值存储、缓存、任务状态
D1 关系型数据、订单、用户、报表
R2 文件、图片、日志归档、大对象
Queues 异步任务、削峰、解耦
Durable Objects 强一致状态、协同、聊天室、计数器

本文选择 KV,是因为任务状态查询非常适合键值模式:

task:abc123 -> JSON

如果你的任务需要复杂查询,例如按用户查询、按状态分页、统计成功率,那么可以考虑使用 D1。


二十四、完整源码汇总

为了方便复制,这里给出核心源码汇总。

src/types.ts

export type TaskStatus = "pending" | "processing" | "success" | "failed";

export interface CreateTaskPayload {
  text: string;
  action?: "uppercase" | "lowercase" | "summary";
}

export interface TaskRecord {
  id: string;
  text: string;
  action: "uppercase" | "lowercase" | "summary";
  status: TaskStatus;
  result?: string;
  error?: string;
  createdAt: string;
  updatedAt: string;
}

export interface QueueMessageBody {
  taskId: string;
}

export interface Env {
  APP_NAME: string;
  TASK_KV: KVNamespace;
  TASK_QUEUE: Queue;
}

src/index.ts

import type { Env, QueueMessageBody } from "./types";
import { handleRequest } from "./router";
import { processTask } from "./task";

export default {
  async fetch(request: Request, env: Env): Promise {
    return handleRequest(request, env);
  },

  async queue(
    batch: MessageBatch,
    env: Env
  ): Promise {
    for (const message of batch.messages) {
      try {
        await processTask(env, message.body.taskId);
        message.ack();
      } catch (error) {
        console.error("任务处理失败:", error);
        message.retry();
      }
    }
  },
};

完整项目还包括 router.tstask.tswrangler.tomlpackage.json,上文已经给出。


二十五、总结

本文从零实现了一个基于 Cloudflare Workers 的工作流自动化示例。它虽然是一个简单的文本处理系统,但已经包含了自动化工作流中的核心思想:

  • 使用 API 创建任务;
  • 使用 KV 保存任务状态;
  • 使用 Queue 解耦请求与后台处理;
  • 使用 Worker 消费队列;
  • 使用状态字段追踪任务生命周期;
  • 支持进一步扩展定时任务、通知、AI 处理和数据同步。

Cloudflare 的边缘计算能力非常适合构建轻量级自动化系统。对于个人开发者、小团队、独立站、SaaS 工具、内部运维平台来说,它可以用很低的成本完成许多过去需要服务器、消息队列、定时任务系统才能实现的功能。

如果你正在做以下事情:

  • 自动部署;
  • 自动采集;
  • 自动通知;
  • 表单自动处理;
  • AI 内容加工;
  • Webhook 编排;
  • 订单状态同步;
  • 定时数据报表;

那么 Cloudflare Workers + Queues + KV/D1 是一个非常值得尝试的组合。你可以从本文的示例开始,把 runAction 替换成自己的业务逻辑,一个实用的工作流自动化系统就搭建完成了。

目录结构
全文