← Back to Skills Marketplace
bingfoon

Batch Processing Patterns

by BingFoon · GitHub ↗ · v1.0.0
cross-platform ⚠ suspicious
486
Downloads
0
Stars
1
Active Installs
1
Versions
Install in OpenClaw
/install batch-processing-patterns
Description
批量处理与长时任务编排模式。涵盖队列管理、并发调度、中断恢复、熔断器、远程任务轮询、进度报告和反风控策略。适用于批量文件处理、AI API 调用、爬虫和后台任务场景。
README (SKILL.md)

批量处理与长时任务指南

来自生产级桌面应用的实战经验,覆盖批量文件处理、远程 API 轮询、并发控制和错误恢复。

适用场景

  • 批量文件处理(转码、压缩、水印)
  • 远程 AI 任务轮询(视频生成、语音合成)
  • 爬虫/批量 HTTP 请求
  • 后台队列任务

1. 批处理架构

任务队列
├── 并发调度器(动态调整并发数)
│   ├── Worker 1 → processItem()
│   ├── Worker 2 → processItem()
│   └── Worker N → processItem()
├── 中止控制器(shouldStop + 子进程清理)
├── 熔断器(连续失败 N 次暂停)
├── 跳过检查(断点续传 / 前置过滤)
└── 进度报告(per-item + overall)

核心原则

  1. 逐项处理,逐项报告 — 不等全部完成
  2. 中断即停 — 每个 item 之间检查 abort 信号
  3. 失败不中断 — 单项失败标记 failed,继续处理其他
  4. 熔断保护 — 连续失败超阈值暂停整个队列

2. 并发调度

自适应并发池

根据每个 item 的处理耗时动态调整并发数:

class AdaptiveScheduler {
  private concurrency: number;
  private running = 0;
  private queue: (() => void)[] = [];

  constructor(
    private min: number,
    private max: number,
    private slowThresholdMs: number
  ) {
    this.concurrency = Math.ceil((min + max) / 2);
  }

  async run\x3CT>(fn: () => Promise\x3CT>): Promise\x3CT> {
    if (this.running >= this.concurrency) {
      await new Promise\x3Cvoid>((resolve) => this.queue.push(resolve));
    }
    this.running++;
    const start = Date.now();

    try {
      return await fn();
    } finally {
      const elapsed = Date.now() - start;
      this.running--;

      // 自适应调整
      if (elapsed > this.slowThresholdMs && this.concurrency > this.min) {
        this.concurrency--;
      } else if (elapsed \x3C this.slowThresholdMs / 2 && this.concurrency \x3C this.max) {
        this.concurrency++;
      }

      if (this.queue.length > 0) {
        this.queue.shift()!();
      }
    }
  }
}

预设配置

场景 初始 最小 最大 慢阈值
CPU 密集(FFmpeg 转码) 4 1 CPU 核数 3s
API 调用(AI 服务) 3 1 5 8s
文件 I/O 2 1 3 30s
串行(必须顺序) 1 1 1 -

简易并发池(不需要自适应时)

async function runPool\x3CT>(
  items: T[],
  fn: (item: T) => Promise\x3Cvoid>,
  concurrency: number,
  signal?: AbortSignal
): Promise\x3C{ completed: number; failed: number }> {
  let completed = 0, failed = 0;
  const running = new Set\x3CPromise\x3Cvoid>>();

  for (const item of items) {
    if (signal?.aborted) break;

    const p = fn(item)
      .then(() => { completed++; })
      .catch(() => { failed++; });

    running.add(p.then(() => { running.delete(p); }));

    if (running.size >= concurrency) {
      await Promise.race(running);
    }
  }
  await Promise.all(running);
  return { completed, failed };
}

3. 中断与恢复

AbortController 模式

class BatchAbortController {
  private _aborted = false;
  private callbacks: (() => void)[] = [];

  get aborted() { return this._aborted; }

  abort() {
    if (this._aborted) return; // 幂等
    this._aborted = true;
    this.callbacks.forEach((cb) => cb());
  }

  onAbort(cb: () => void) {
    if (this._aborted) { cb(); return; }
    this.callbacks.push(cb);
  }

  reset() {
    this._aborted = false;
    this.callbacks = [];
  }
}

断点续传(shouldSkip)

const completedSet = new Set(loadCompletedFromDisk());

function shouldSkip(item: FileItem): string | null {
  if (completedSet.has(item.path)) return '已完成(断点续传)';
  if (item.size \x3C= targetSize) return '已满足目标条件';
  return null; // 正常处理
}

// 在批处理循环中
for (const item of items) {
  if (abortController.aborted) break;

  const skipReason = shouldSkip(item);
  if (skipReason) {
    onItemSkip(item, skipReason);
    continue;
  }

  try {
    await processItem(item);
    completedSet.add(item.path);
    saveCompletedToDisk(completedSet); // 持久化进度
    onItemComplete(item);
  } catch (err) {
    onItemError(item, err);
  }
}

4. 熔断器

连续失败过多时自动暂停,避免无意义的重试浪费资源。

class CircuitBreaker {
  private consecutiveFailures = 0;

  constructor(
    private maxFailures: number = 5,
    private onTrip?: (failures: number) => void
  ) {}

  recordSuccess() {
    this.consecutiveFailures = 0;
  }

  recordFailure(): boolean {
    this.consecutiveFailures++;
    if (this.consecutiveFailures >= this.maxFailures) {
      this.onTrip?.(this.consecutiveFailures);
      return true; // tripped
    }
    return false;
  }

  get isTripped() {
    return this.consecutiveFailures >= this.maxFailures;
  }

  reset() {
    this.consecutiveFailures = 0;
  }
}

5. 远程任务轮询

递归 setTimeout 模式(推荐)

function startPolling(taskId: string, interval = 5000) {
  const poll = async () => {
    try {
      const result = await queryTaskStatus(taskId);

      if (result.status === 'completed') {
        onComplete(result);
        return; // 停止轮询
      }
      if (result.status === 'failed') {
        onFailed(result);
        return;
      }

      // 超时检查
      if (Date.now() - startTime > MAX_POLL_TIME) {
        onTimeout(taskId);
        return;
      }

      setTimeout(poll, interval); // 继续
    } catch (err) {
      if (isCriticalError(err)) return; // 停止
      setTimeout(poll, interval); // 瞬态错误继续
    }
  };

  poll();
}

关键要点

  • setTimeout 而非 setInterval — 确保前一次完成后再调度下一次
  • 超时保护 — 总轮询时间有上限,避免永久轮询
  • 错误分类 — 瞬态错误继续轮询,致命错误立即停止
  • 去重 — 用 Set 防止同一任务重复轮询

批量轮询(多任务顺序轮询)

// 多任务并发轮询可能触发限流,改为顺序轮询
const pollAll = async () => {
  const activeTasks = getActiveTasks();

  for (const task of activeTasks) {
    if (aborted) return;
    await pollOne(task.id);
    // 任务间延迟,尊重 QPS
    if (activeTasks.length > 1) {
      await sleep(2000);
    }
  }

  if (getActiveTasks().length > 0) {
    setTimeout(pollAll, POLL_INTERVAL);
  }
};

6. 错误分类与处理

错误类型 行为 示例
瞬态网络错误 重试 timeout, ECONNRESET
401 Unauthorized 停止 API Key 无效
403 / 余额不足 停止 账户问题
429 Too Many Requests 退避重试 限流
500+ Server Error 有限重试(3 次) 服务端异常
文件不存在 跳过此项 输入文件被删
磁盘空间不足 停止全部 ENOSPC

指数退避重试

async function withRetry\x3CT>(
  fn: () => Promise\x3CT>,
  maxRetries = 3,
  baseDelay = 2000
): Promise\x3CT> {
  for (let attempt = 0; attempt \x3C= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (err) {
      if (attempt === maxRetries) throw err;
      if (isFatalError(err)) throw err; // 不重试

      const delay = baseDelay * Math.pow(2, attempt)
        + Math.random() * 1000; // 抖动
      await sleep(delay);
    }
  }
  throw new Error('unreachable');
}

7. 进度报告

双层进度

interface BatchProgress {
  // 整体进度
  current: number;       // 已处理项数
  total: number;         // 总项数
  percentage: number;    // 0-100

  // 当前项进度(可选)
  currentItem?: {
    name: string;
    itemProgress: number; // 0-100
  };
}

// 报告整体进度
function reportProgress(done: number, total: number) {
  send({ current: done, total, percentage: Math.round(done / total * 100) });
}

进度节流

// GUI 场景必须节流,否则高频更新会卡 UI
const throttledProgress = throttle(reportProgress, 100); // 10fps

8. 反风控策略(批量 HTTP 请求)

三层防护

请求节流(同域名限速)→ 风控检测(响应分析)→ 指数退避重试

请求节流

const lastRequest = new Map\x3Cstring, number>();

async function throttleByDomain(domain: string) {
  const last = lastRequest.get(domain) || 0;
  const minInterval = 1500 + Math.random() * 1500; // 1.5-3s 随机
  const wait = minInterval - (Date.now() - last);
  if (wait > 0) await sleep(wait);
  lastRequest.set(domain, Date.now());
}

风控检测

即使 HTTP 200 也要检查响应内容:

  • 验证码页面关键词(captcha, 滑块验证)
  • 空壳页面(body \x3C 500 字节)
  • 重定向到登录页

UA 轮换

const USER_AGENTS = [
  'Mozilla/5.0 (Macintosh; ...) Chrome/...',
  'Mozilla/5.0 (Windows NT ...) Chrome/...',
  'Mozilla/5.0 (iPhone; ...) Safari/...',
];

function randomUA(): string {
  return USER_AGENTS[Math.floor(Math.random() * USER_AGENTS.length)];
}

9. Checklist

新增批处理功能

  • 有 AbortController 中止机制
  • 有并发控制(不超过 CPU 核数)
  • 单项失败不中断整体
  • 有熔断器(连续失败 5 次暂停)
  • 进度通过节流后的回调报告
  • 错误分类处理(瞬态 vs 致命)
  • 临时文件在 finally 中清理

远程任务轮询

  • 用 setTimeout 而非 setInterval
  • 有超时上限
  • 有去重机制
  • 401/403 立即停止轮询
  • 429 指数退避

批量 HTTP 请求

  • 同域名请求间隔 ≥ 1.5s
  • 检测风控响应(captcha/空页面)
  • 退避重试(2s → 4s → 8s)

来源

提炼自生产级 Electron 桌面应用,覆盖 6+ 个批处理模块和 3 个远程轮询场景的实战经验。

Usage Guidance
This SKILL.md is purely instructional and appears coherent for building batch jobs. Before using the patterns in production: (1) review and implement the placeholder functions (disk persistence, network calls) carefully so you don't accidentally read/write sensitive paths or persist secrets; (2) enforce input validation, limits, and timeouts to avoid runaway jobs; (3) set conservative circuit-breaker and retry parameters and respect remote API rate limits/quotas; (4) run implementations in a staging environment and audit what files/network endpoints they access. If you plan to integrate with real APIs, provide credentials only to your implementation code and not to this document — the skill itself does not request credentials.
Capability Analysis
Type: OpenClaw Skill Name: batch-processing-patterns Version: 1.0.0 The skill provides comprehensive patterns for batch processing and long-running tasks, including concurrency control, error handling, and progress reporting. However, the 'Anti-Risk Control Strategies' section in SKILL.md describes techniques like request throttling, risk control detection (e.g., bypassing CAPTCHA, empty pages, login redirects), and User-Agent rotation. While these can be used for legitimate web scraping, they are also commonly employed to circumvent automated defenses and could facilitate activities that violate terms of service or are otherwise unethical, indicating a risky capability without explicit malicious intent within the skill itself.
Capability Assessment
Purpose & Capability
The name/description (batch processing, polling, retries, circuit breakers, abort/recovery) match the SKILL.md content. No unrelated binaries, env vars, or config paths are requested.
Instruction Scope
SKILL.md contains code patterns and implementation guidance (concurrency pools, abort controller, circuit breaker, polling, retry/backoff). It references reading/writing progress to disk and calling remote APIs via placeholders (loadCompletedFromDisk, queryTaskStatus) — these are expected for this domain and are scoped to batch processing. The instructions do not tell the agent to read arbitrary system files, exfiltrate data, or send results to unexpected external endpoints.
Install Mechanism
No install spec and no code files beyond SKILL.md. Because it's instruction-only, nothing is downloaded or executed by the installer and there is no elevated install risk.
Credentials
The skill declares no required environment variables, secrets, or credentials. The content mentions handling API errors (401/403/429) but does not request API keys itself; this is proportional for guidance material.
Persistence & Privilege
always is false; the skill is user-invocable and can be invoked autonomously (default), which is normal. The skill does not request persistent system presence or modify other skills' configuration.
How to Use
  1. Make sure OpenClaw is installed (local or Docker)
  2. Run the install command in chat: /install batch-processing-patterns
  3. After installation, invoke the skill by name or use /batch-processing-patterns
  4. Provide required inputs per the skill's parameter spec and get structured output
Version History
v1.0.0
批处理与长时任务编排模式。自适应并发池、AbortController、熔断器、远程任务轮询、错误分类、反风控策略。
Metadata
Slug batch-processing-patterns
Version 1.0.0
License
All-time Installs 1
Active Installs 1
Total Versions 1
Frequently Asked Questions

What is Batch Processing Patterns?

批量处理与长时任务编排模式。涵盖队列管理、并发调度、中断恢复、熔断器、远程任务轮询、进度报告和反风控策略。适用于批量文件处理、AI API 调用、爬虫和后台任务场景。 It is an AI Agent Skill for Claude Code / OpenClaw, with 486 downloads so far.

How do I install Batch Processing Patterns?

Run "/install batch-processing-patterns" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.

Is Batch Processing Patterns free?

Yes, Batch Processing Patterns is completely free (open-source). You can download, install and use it at no cost.

Which platforms does Batch Processing Patterns support?

Batch Processing Patterns is cross-platform and runs anywhere OpenClaw / Claude Code is available (cross-platform).

Who created Batch Processing Patterns?

It is built and maintained by BingFoon (@bingfoon); the current version is v1.0.0.

💬 Comments