第 23 章

实战:内容生成流水线——批量处理与质量控制

第23章:实战——内容生成流水线——批量处理与质量控制

把 AI 内容生成从"偶尔用用"变成企业的核心生产力——本章教你构建可批量运行、有质量保证、能持续迭代的内容生成流水线。

本章导读

电商详情页、营销文案、SEO 内容、产品说明书翻译……内容需求几乎无穷无尽。很多团队在尝到 LLM 生成内容的甜头后,面临的第一个瓶颈不是"怎么写好",而是"怎么批量写"和"怎么保证批量写的质量"。

本章围绕一个真实案例展开:某跨境电商平台(SKU 数量 12 万),需要将中文产品详情翻译并本地化为英文、日文、西班牙文、德文、法文 5 种语言,同时生成对应语言的 SEO 标题、Meta 描述和营销文案。

业务规模


Level 1:基础认知(1-3 年经验)

内容生成流水线的架构选择

选项A:Dify 批量运行(Batch Run)

Dify 工作流支持"批量运行"模式,可以上传 CSV 文件批量处理。

优点:无需额外开发,操作简单 缺点:并发数有限,不适合超大规模批量任务

选项B:Dify API + 外部调度器

通过 Dify Workflow API,结合 Celery/Airflow 等任务队列管理器,实现真正的生产级批量处理。

优点:可控性强,支持断点续传、错误重试、进度监控 缺点:需要开发工作

选项C:本地 LangChain/LlamaIndex 脚本

绕过 Dify,直接用 LangChain 编写批量处理脚本。

优点:灵活性最高 缺点:失去 Dify 的 Prompt 管理、版本控制、监控等优势

推荐:该企业采用 选项B(Dify API + Celery),本章重点介绍此方案。

Dify Workflow 设计:多语言内容生成

首先在 Dify Console 中设计工作流:

输入:
  - product_id: 产品 ID
  - product_name: 产品名称(中文)
  - product_description: 产品描述(中文)
  - target_language: 目标语言(en/ja/es/de/fr)
  - product_category: 产品类别

工作流节点:
  1. 语言适配配置节点(根据 target_language 设置语言规则)
  2. 核心信息提取节点(提取产品核心卖点)
  3. 翻译节点(翻译产品描述)
  4. SEO 内容生成节点(生成 Title/Meta/H1)
  5. 营销文案节点(生成 3 版本营销文案)
  6. 质量检查节点(自动检查长度、关键词、禁词)
  7. 输出整合节点

输出:
  - translated_description: 翻译后的描述
  - seo_title: SEO 标题(≤ 60 字符)
  - meta_description: Meta 描述(≤ 160 字符)
  - marketing_copy_a: 营销文案版本 A(主打价值)
  - marketing_copy_b: 营销文案版本 B(主打情感)
  - marketing_copy_c: 营销文案版本 C(主打促销)
  - quality_score: 质量评分(0-100)
  - quality_issues: 质量问题列表

各节点核心 Prompt(英文生成示例)

# 节点2:核心信息提取
你是一位产品文案专家。从以下中文产品描述中提取核心信息:

产品名称:{{product_name}}
产品描述:{{product_description}}
产品类别:{{product_category}}

请提取并输出 JSON:
{
  "core_features": ["核心特性1", "核心特性2", "核心特性3"],
  "target_audience": "目标用户描述",
  "key_benefits": ["主要好处1", "主要好处2"],
  "technical_specs": {"规格名": "规格值"},
  "unique_selling_point": "最核心的差异化卖点"
}
# 节点3:翻译节点(英文)
你是一位专业的中英文产品文案翻译,专注于跨境电商领域。

翻译以下中文产品描述为英文,要求:
1. 语言地道自然,符合英语母语用户的阅读习惯
2. 保留所有技术参数和规格
3. 翻译品牌名称时保持一致(不翻译)
4. 适当使用 Amazon/Shopify 风格的电商语言

核心信息(已提取):{{extracted_info}}
原始描述:{{product_description}}

请直接输出翻译结果,不要加任何解释或注释:
# 节点4:SEO 内容生成(英文)
基于以下信息生成 SEO 优化内容:

产品名称(英文):{{translated_title}}
核心特性:{{core_features}}
目标关键词:{{target_keywords}}

输出 JSON:
{
  "seo_title": "SEO 标题(必须包含主关键词,50-60字符)",
  "meta_description": "Meta描述(包含CTA,140-160字符)",
  "h1_tag": "页面H1标签(与SEO标题略有差异,自然语言)",
  "suggested_keywords": ["建议关键词1", "建议关键词2"]
}

使用 Dify 批量运行模式(快速验证)

如果你的批量规模在 1000 个以下,可以先用 Dify 内置的批量运行:

  1. 准备 CSV 文件(列名与工作流输入变量对应):
product_id,product_name,product_description,target_language,product_category
SKU001,无线蓝牙耳机,"音质出色,续航20小时...",en,Electronics
SKU002,木质餐桌,"实木材质,可拆卸设计...",en,Furniture
  1. 在 Dify Workflow 页面点击"批量运行"
  2. 上传 CSV,设置并发数(建议 5-10)
  3. 下载结果 CSV

Level 2:机制深解(3-5 年经验)

生产级批量处理系统架构

对于 12 万 SKU 的规模,需要一个真正的生产级批量处理系统:

            ┌─────────────────────────────┐
            │      任务管理数据库          │
            │    (PostgreSQL + Redis)      │
            └──────────────┬──────────────┘
                           │
            ┌──────────────▼──────────────┐
            │      任务调度器              │
            │  (Celery + Redis Broker)    │
            │  - 优先级队列               │
            │  - 并发控制(50 workers)   │
            │  - 重试机制(最多3次)      │
            └──────────────┬──────────────┘
                           │
         ┌─────────────────┼─────────────────┐
         ▼                 ▼                 ▼
   ┌──────────┐    ┌──────────┐    ┌──────────┐
   │  Worker  │    │  Worker  │    │  Worker  │
   │  (x10)   │    │  (x10)   │    │  (x10)   │
   └──────────┘    └──────────┘    └──────────┘
         │                 │                 │
         └─────────────────┼─────────────────┘
                           │
            ┌──────────────▼──────────────┐
            │      Dify Workflow API      │
            │    (负载均衡 + 限速控制)    │
            └──────────────┬──────────────┘
                           │
            ┌──────────────▼──────────────┐
            │        LLM Provider         │
            │     (OpenAI / Claude)       │
            └─────────────────────────────┘

核心代码实现

# tasks/content_generation.py — Celery 任务定义

from celery import Celery
from celery.utils.log import get_task_logger
import requests
import time
import json
from database import Session, ContentTask, TaskStatus

app = Celery('content_pipeline', broker='redis://redis:6379/2')
logger = get_task_logger(__name__)

# 配置 Celery
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_backend='redis://redis:6379/3',
    task_acks_late=True,  # 任务执行完才确认,防止丢失
    worker_prefetch_multiplier=1,  # 每个 worker 一次只取一个任务
    task_routes={
        'tasks.generate_content': {'queue': 'content'},
        'tasks.quality_check': {'queue': 'quality'},
    },
    # 限速:每秒最多处理 5 个任务(防止触发 OpenAI 限速)
    task_annotations={
        'tasks.content_generation.generate_content': {'rate_limit': '5/s'}
    }
)


DIFY_API_URL = "https://dify.yourcompany.com/v1"
DIFY_WORKFLOW_TOKEN = "workflow-api-token"


@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=30,  # 重试间隔 30 秒
    soft_time_limit=120,     # 2 分钟软超时
    time_limit=180,          # 3 分钟硬超时
    name='tasks.generate_content'
)
def generate_content(self, task_id: int, sku_data: dict, target_language: str):
    """
    为单个 SKU 生成多语言内容
    
    Args:
        task_id: 数据库任务 ID
        sku_data: SKU 数据字典
        target_language: 目标语言代码
    """
    session = Session()
    task = session.get(ContentTask, task_id)
    
    try:
        # 更新任务状态
        task.status = TaskStatus.PROCESSING
        task.started_at = datetime.now()
        session.commit()
        
        # 调用 Dify Workflow API
        response = call_dify_workflow(sku_data, target_language)
        
        if response['status'] == 'succeeded':
            outputs = response['outputs']
            
            # 保存结果
            task.result = {
                'translated_description': outputs['translated_description'],
                'seo_title': outputs['seo_title'],
                'meta_description': outputs['meta_description'],
                'marketing_copy_a': outputs['marketing_copy_a'],
                'marketing_copy_b': outputs['marketing_copy_b'],
                'marketing_copy_c': outputs['marketing_copy_c'],
                'quality_score': outputs['quality_score'],
                'quality_issues': outputs['quality_issues'],
            }
            task.status = TaskStatus.COMPLETED
            task.tokens_used = response.get('total_tokens', 0)
            task.cost_usd = response.get('total_steps', 0) * 0.001  # 估算
        else:
            raise Exception(f"Dify workflow failed: {response.get('error')}")
        
        session.commit()
        logger.info(f"Task {task_id} completed for SKU {sku_data['product_id']}")
        
    except Exception as exc:
        task.status = TaskStatus.FAILED
        task.error_message = str(exc)
        task.retry_count = (task.retry_count or 0) + 1
        session.commit()
        
        # 重试(最多3次)
        if self.request.retries < self.max_retries:
            logger.warning(f"Task {task_id} failed, retrying... ({self.request.retries + 1}/3)")
            raise self.retry(exc=exc)
        else:
            logger.error(f"Task {task_id} permanently failed after 3 retries")
    
    finally:
        session.close()


def call_dify_workflow(sku_data: dict, target_language: str, timeout: int = 90) -> dict:
    """调用 Dify Workflow API"""
    headers = {
        'Authorization': f'Bearer {DIFY_WORKFLOW_TOKEN}',
        'Content-Type': 'application/json'
    }
    
    payload = {
        'inputs': {
            'product_id': sku_data['product_id'],
            'product_name': sku_data['product_name'],
            'product_description': sku_data['product_description'],
            'target_language': target_language,
            'product_category': sku_data.get('category', ''),
            'target_keywords': sku_data.get('keywords', ''),
        },
        'response_mode': 'blocking',
        'user': 'batch_processor'
    }
    
    start_time = time.time()
    
    response = requests.post(
        f'{DIFY_API_URL}/workflows/run',
        headers=headers,
        json=payload,
        timeout=timeout
    )
    
    elapsed = time.time() - start_time
    logger.info(f"Dify API call took {elapsed:.2f}s for product {sku_data['product_id']}")
    
    if response.status_code != 200:
        raise Exception(f"Dify API error {response.status_code}: {response.text}")
    
    return response.json()

批量任务调度器

# scheduler.py — 批量任务调度

from sqlalchemy import create_engine
from models import ContentTask, TaskStatus
import pandas as pd

def create_batch_job(
    csv_file: str,
    target_languages: list = ['en', 'ja', 'es', 'de', 'fr'],
    priority: str = 'normal'
) -> dict:
    """
    从 CSV 文件创建批量任务
    
    Args:
        csv_file: SKU 数据 CSV 文件路径
        target_languages: 目标语言列表
        priority: 优先级(high/normal/low)
    
    Returns:
        job_info: 任务统计信息
    """
    
    df = pd.read_csv(csv_file)
    required_columns = ['product_id', 'product_name', 'product_description', 'category']
    
    # 验证列
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"CSV 缺少必要列:{missing}")
    
    total_tasks = len(df) * len(target_languages)
    print(f"准备创建 {len(df)} 个 SKU × {len(target_languages)} 种语言 = {total_tasks} 个任务")
    
    with Session() as session:
        tasks_created = 0
        
        for _, row in df.iterrows():
            sku_data = row.to_dict()
            
            for lang in target_languages:
                # 检查是否已有相同任务(幂等性)
                existing = session.query(ContentTask).filter_by(
                    product_id=sku_data['product_id'],
                    target_language=lang,
                    status=TaskStatus.COMPLETED
                ).first()
                
                if existing:
                    continue  # 跳过已完成的
                
                # 创建任务记录
                task = ContentTask(
                    product_id=sku_data['product_id'],
                    target_language=lang,
                    input_data=json.dumps(sku_data),
                    status=TaskStatus.PENDING,
                    priority=priority
                )
                session.add(task)
                session.flush()
                
                # 提交到 Celery 队列
                celery_task = generate_content.apply_async(
                    args=[task.id, sku_data, lang],
                    priority={'high': 9, 'normal': 5, 'low': 1}[priority],
                    countdown=0  # 立即执行
                )
                task.celery_task_id = celery_task.id
                tasks_created += 1
        
        session.commit()
    
    return {
        'total_skus': len(df),
        'target_languages': target_languages,
        'tasks_created': tasks_created,
        'estimated_completion': estimate_completion_time(tasks_created)
    }


def estimate_completion_time(task_count: int) -> str:
    """估算完成时间"""
    # 基于:50 workers,每个任务平均 15 秒(含 LLM 生成时间)
    workers = 50
    avg_task_time = 15  # 秒
    # 考虑 OpenAI 限速,实际并发约 30
    effective_concurrency = 30
    
    total_seconds = (task_count / effective_concurrency) * avg_task_time
    hours = total_seconds / 3600
    
    return f"约 {hours:.1f} 小时"

质量控制体系

批量生成内容必须有自动化质量控制,否则大量劣质内容会直接上线:

# quality_control.py — 内容质量自动检查

import re
from dataclasses import dataclass
from typing import List

@dataclass
class QualityIssue:
    severity: str  # critical/warning/info
    rule: str
    description: str
    fix_suggestion: str

class ContentQualityChecker:
    
    def __init__(self, language: str):
        self.language = language
        self.load_config(language)
    
    def load_config(self, language: str):
        """加载语言相关的质量规则"""
        self.config = {
            'en': {
                'seo_title_min': 30,
                'seo_title_max': 60,
                'meta_desc_min': 120,
                'meta_desc_max': 160,
                'min_description_words': 100,
                'max_description_words': 500,
                'forbidden_words': ['cheap', 'knockoff', 'replica', 'fake', 'counterfeit'],
                'required_cta_patterns': [r'\b(buy|shop|order|get|discover)\b'],
                'max_consecutive_caps': 3,  # 连续大写单词不超过3个
            },
            'ja': {
                'seo_title_min': 10,
                'seo_title_max': 40,
                'meta_desc_min': 60,
                'meta_desc_max': 120,
                'min_description_chars': 200,
                'forbidden_words': ['安い', '偽物'],
                'required_cta_patterns': [r'(購入|注文|今すぐ|詳細)'],
            }
        }
    
    def check(self, content: dict) -> tuple[int, List[QualityIssue]]:
        """
        检查生成内容质量
        
        Returns:
            (quality_score, issues)
        """
        issues = []
        score = 100
        
        lang_config = self.config.get(self.language, self.config['en'])
        
        # 检查1:SEO 标题长度
        seo_title = content.get('seo_title', '')
        if len(seo_title) < lang_config['seo_title_min']:
            issues.append(QualityIssue(
                severity='warning',
                rule='seo_title_too_short',
                description=f"SEO 标题过短 ({len(seo_title)} 字符)",
                fix_suggestion=f"SEO 标题应在 {lang_config['seo_title_min']}-{lang_config['seo_title_max']} 字符之间"
            ))
            score -= 10
        elif len(seo_title) > lang_config['seo_title_max']:
            issues.append(QualityIssue(
                severity='critical',
                rule='seo_title_too_long',
                description=f"SEO 标题过长 ({len(seo_title)} 字符),Google 会截断",
                fix_suggestion=f"SEO 标题必须 ≤ {lang_config['seo_title_max']} 字符"
            ))
            score -= 20
        
        # 检查2:Meta 描述长度
        meta_desc = content.get('meta_description', '')
        if len(meta_desc) > lang_config['meta_desc_max']:
            issues.append(QualityIssue(
                severity='critical',
                rule='meta_too_long',
                description=f"Meta 描述过长 ({len(meta_desc)} 字符)",
                fix_suggestion=f"Meta 描述必须 ≤ {lang_config['meta_desc_max']} 字符"
            ))
            score -= 15
        
        # 检查3:禁词检测
        desc = content.get('translated_description', '').lower()
        for word in lang_config.get('forbidden_words', []):
            if word.lower() in desc:
                issues.append(QualityIssue(
                    severity='critical',
                    rule='forbidden_word',
                    description=f"内容包含禁词:'{word}'",
                    fix_suggestion=f"移除或替换禁词 '{word}'"
                ))
                score -= 30
        
        # 检查4:内容长度
        if self.language == 'en':
            word_count = len(desc.split())
            if word_count < lang_config['min_description_words']:
                issues.append(QualityIssue(
                    severity='warning',
                    rule='description_too_short',
                    description=f"描述词数过少 ({word_count} 词)",
                    fix_suggestion=f"描述应至少包含 {lang_config['min_description_words']} 词"
                ))
                score -= 10
        
        # 检查5:是否包含 CTA
        marketing_copies = [
            content.get('marketing_copy_a', ''),
            content.get('marketing_copy_b', ''),
            content.get('marketing_copy_c', ''),
        ]
        
        cta_patterns = lang_config.get('required_cta_patterns', [])
        if cta_patterns:
            for i, copy in enumerate(marketing_copies):
                has_cta = any(
                    re.search(pattern, copy, re.IGNORECASE)
                    for pattern in cta_patterns
                )
                if not has_cta:
                    issues.append(QualityIssue(
                        severity='warning',
                        rule=f'missing_cta_copy_{i+1}',
                        description=f"营销文案 {chr(65+i)} 缺少行动号召(CTA)",
                        fix_suggestion="添加如 'Shop Now'、'Order Today' 等 CTA 词语"
                    ))
                    score -= 5
        
        # 检查6:连续大写(英文)
        if self.language == 'en':
            caps_pattern = r'\b[A-Z]{4,}\b'
            caps_matches = re.findall(caps_pattern, desc)
            if len(caps_matches) > lang_config.get('max_consecutive_caps', 3):
                issues.append(QualityIssue(
                    severity='warning',
                    rule='excessive_caps',
                    description=f"过多全大写单词:{caps_matches[:5]}",
                    fix_suggestion="全大写单词应仅用于品牌名或缩写"
                ))
                score -= 5
        
        # 检查7:AI 幻觉检测(关键数字一致性)
        # 提取描述中的数字
        original_numbers = set(re.findall(r'\d+(?:\.\d+)?', 
                                          content.get('original_description', '')))
        translated_numbers = set(re.findall(r'\d+(?:\.\d+)?', 
                                           content.get('translated_description', '')))
        
        # 检查是否有数字在翻译中消失(可能是幻觉)
        missing_numbers = original_numbers - translated_numbers
        if len(missing_numbers) > 2:  # 允许少量差异(如格式变化)
            issues.append(QualityIssue(
                severity='warning',
                rule='potential_hallucination',
                description=f"翻译中可能丢失了数据:{list(missing_numbers)[:5]}",
                fix_suggestion="检查翻译是否保留了所有技术规格数字"
            ))
            score -= 15
        
        return max(0, score), issues

自动重生成失败内容

当质量检查发现 Critical 问题时,自动触发重生成:

# auto_regen.py — 自动重生成流程

async def process_with_quality_check(
    task_id: int,
    sku_data: dict,
    target_language: str,
    max_attempts: int = 3
) -> dict:
    """
    生成内容并自动检查质量,质量不达标时重生成
    """
    
    checker = ContentQualityChecker(target_language)
    
    for attempt in range(max_attempts):
        logger.info(f"Task {task_id}: 第 {attempt+1} 次尝试生成")
        
        # 生成内容(第2次起附加质量问题反馈)
        feedback = ""
        if attempt > 0:
            prev_issues = previous_issues
            feedback = f"\n\n请注意,上次生成的内容有以下质量问题,请避免:\n"
            for issue in prev_issues:
                feedback += f"- {issue.description}:{issue.fix_suggestion}\n"
        
        content = await call_dify_workflow(sku_data, target_language, extra_prompt=feedback)
        
        # 质量检查
        score, issues = checker.check(content)
        
        critical_issues = [i for i in issues if i.severity == 'critical']
        
        if not critical_issues or attempt == max_attempts - 1:
            # 没有严重问题,或已达最大尝试次数
            content['quality_score'] = score
            content['quality_issues'] = [i.__dict__ for i in issues]
            content['generation_attempts'] = attempt + 1
            
            if critical_issues and attempt == max_attempts - 1:
                content['requires_human_review'] = True
                logger.warning(f"Task {task_id}: 达到最大重试次数,标记需要人工审核")
            
            return content
        
        # 记录问题,准备重试
        previous_issues = issues
        logger.warning(
            f"Task {task_id}: 第 {attempt+1} 次生成质量不达标 "
            f"(score={score}, critical issues={len(critical_issues)}),重新生成"
        )
    
    return content

Level 3:源码与原理(5 年以上)

速率限制与成本优化

批量处理的最大挑战是 OpenAI API 的速率限制(Rate Limit):

理解 OpenAI 限速层级

Tier 1(默认):  TPM 200K tokens/min, RPM 500 requests/min
Tier 2($50+):  TPM 2M tokens/min, RPM 5,000 requests/min
Tier 3($100+): TPM 4M tokens/min, RPM 5,000 requests/min
Tier 4($500+): TPM 10M tokens/min, RPM 10,000 requests/min

令牌桶算法实现速率控制

# rate_limiter.py — 令牌桶算法

import asyncio
import time

class TokenBucketRateLimiter:
    """
    令牌桶速率限制器
    同时限制 RPM(请求/分钟)和 TPM(Token/分钟)
    """
    
    def __init__(self, rpm_limit: int, tpm_limit: int):
        self.rpm_limit = rpm_limit
        self.tpm_limit = tpm_limit
        
        # 请求速率桶
        self.request_tokens = rpm_limit
        self.request_refill_rate = rpm_limit / 60  # 每秒补充
        self.request_last_refill = time.time()
        
        # Token 速率桶
        self.token_tokens = tpm_limit
        self.token_refill_rate = tpm_limit / 60
        self.token_last_refill = time.time()
        
        self.lock = asyncio.Lock()
    
    def _refill(self):
        """补充令牌"""
        now = time.time()
        
        # 补充请求令牌
        elapsed = now - self.request_last_refill
        self.request_tokens = min(
            self.rpm_limit,
            self.request_tokens + elapsed * self.request_refill_rate
        )
        self.request_last_refill = now
        
        # 补充 Token 令牌
        elapsed = now - self.token_last_refill
        self.token_tokens = min(
            self.tpm_limit,
            self.token_tokens + elapsed * self.token_refill_rate
        )
        self.token_last_refill = now
    
    async def acquire(self, estimated_tokens: int = 1000):
        """
        获取执行权限(阻塞等待,直到有足够令牌)
        
        Args:
            estimated_tokens: 预估本次请求消耗的 Token 数
        """
        async with self.lock:
            while True:
                self._refill()
                
                if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens:
                    self.request_tokens -= 1
                    self.token_tokens -= estimated_tokens
                    return
                
                # 计算需要等待的时间
                request_wait = max(0, (1 - self.request_tokens) / self.request_refill_rate)
                token_wait = max(0, (estimated_tokens - self.token_tokens) / self.token_refill_rate)
                wait_time = max(request_wait, token_wait)
                
                await asyncio.sleep(min(wait_time, 1.0))  # 最多等 1 秒再检查

# 全局速率限制器(Tier 3 限制)
rate_limiter = TokenBucketRateLimiter(rpm_limit=4000, tpm_limit=3_000_000)


async def rate_limited_dify_call(sku_data: dict, language: str) -> dict:
    """带速率限制的 Dify 调用"""
    estimated_tokens = estimate_tokens(sku_data['product_description']) + 2000
    await rate_limiter.acquire(estimated_tokens)
    return await call_dify_workflow_async(sku_data, language)

断点续传与进度监控

# progress_monitor.py — 进度监控与断点续传

from flask import Flask, jsonify
import redis

app = Flask(__name__)
redis_client = redis.Redis()

@app.route('/api/batch/<job_id>/progress')
def get_progress(job_id: str):
    """获取批量任务进度"""
    with Session() as session:
        total = session.query(ContentTask).filter_by(job_id=job_id).count()
        completed = session.query(ContentTask).filter_by(
            job_id=job_id, status=TaskStatus.COMPLETED
        ).count()
        failed = session.query(ContentTask).filter_by(
            job_id=job_id, status=TaskStatus.FAILED
        ).count()
        processing = session.query(ContentTask).filter_by(
            job_id=job_id, status=TaskStatus.PROCESSING
        ).count()
        
        # 计算进度和估算剩余时间
        done_ratio = (completed + failed) / total if total > 0 else 0
        
        # 从 Redis 获取速率统计
        recent_count = redis_client.llen(f"job:{job_id}:recent_completions")
        
        return jsonify({
            'job_id': job_id,
            'total': total,
            'completed': completed,
            'failed': failed,
            'processing': processing,
            'pending': total - completed - failed - processing,
            'progress_pct': round(done_ratio * 100, 1),
            'avg_quality_score': get_avg_quality_score(job_id),
            'estimated_remaining_hours': estimate_remaining(
                total - completed - failed, recent_count
            ),
            'cost_usd_so_far': get_total_cost(job_id),
        })

@app.route('/api/batch/<job_id>/resume', methods=['POST'])
def resume_job(job_id: str):
    """恢复失败的任务"""
    with Session() as session:
        failed_tasks = session.query(ContentTask).filter_by(
            job_id=job_id,
            status=TaskStatus.FAILED,
        ).filter(ContentTask.retry_count < 3).all()
        
        resumed = 0
        for task in failed_tasks:
            sku_data = json.loads(task.input_data)
            generate_content.apply_async(
                args=[task.id, sku_data, task.target_language],
                priority=5
            )
            task.status = TaskStatus.PENDING
            resumed += 1
        
        session.commit()
    
    return jsonify({'resumed_tasks': resumed})

成本精确追踪

# cost_tracker.py — 精确追踪批量任务成本

OPENAI_PRICING = {
    'gpt-4': {'input': 0.03 / 1000, 'output': 0.06 / 1000},
    'gpt-4-turbo': {'input': 0.01 / 1000, 'output': 0.03 / 1000},
    'gpt-3.5-turbo': {'input': 0.001 / 1000, 'output': 0.002 / 1000},
}

def calculate_job_cost(job_id: str) -> dict:
    """计算批量任务总成本"""
    with Session() as session:
        tasks = session.query(ContentTask).filter_by(
            job_id=job_id,
            status=TaskStatus.COMPLETED
        ).all()
        
        cost_breakdown = {}
        total_cost = 0
        total_input_tokens = 0
        total_output_tokens = 0
        
        for task in tasks:
            if task.token_details:
                details = json.loads(task.token_details)
                model = details.get('model', 'gpt-4-turbo')
                input_tokens = details.get('prompt_tokens', 0)
                output_tokens = details.get('completion_tokens', 0)
                
                pricing = OPENAI_PRICING.get(model, OPENAI_PRICING['gpt-4-turbo'])
                task_cost = (
                    input_tokens * pricing['input'] +
                    output_tokens * pricing['output']
                )
                
                cost_breakdown[model] = cost_breakdown.get(model, 0) + task_cost
                total_cost += task_cost
                total_input_tokens += input_tokens
                total_output_tokens += output_tokens
        
        return {
            'job_id': job_id,
            'total_cost_usd': round(total_cost, 2),
            'cost_per_sku': round(total_cost / len(tasks), 4) if tasks else 0,
            'total_input_tokens': total_input_tokens,
            'total_output_tokens': total_output_tokens,
            'cost_breakdown_by_model': cost_breakdown,
        }

Level 4:生产陷阱与决策(专家视角)

真实案例数据

该跨境电商在运行了 3 个月批量内容生成后:

成本统计

质量评估

陷阱1:批量任务中的幻觉问题更严重

单次使用时,LLM 幻觉(编造规格参数)不明显,但批量运行时很快会出现大量错误数据进入生产数据库。

预防措施:

class HallucinationDetector:
    def __init__(self, original_content: str):
        self.original_numbers = self._extract_numbers(original_content)
        self.original_brand_names = self._extract_brand_names(original_content)
    
    def _extract_numbers(self, text: str) -> set:
        """提取文本中的数字(包括小数)"""
        return set(re.findall(r'\b\d+(?:\.\d+)?\s*(?:cm|mm|kg|g|W|V|mAh|MHz|GB|TB)?\b', text))
    
    def check(self, translated_content: str) -> list:
        """检查翻译中是否有幻觉迹象"""
        warnings = []
        
        translated_numbers = self._extract_numbers(translated_content)
        
        # 检查原文数字是否在译文中(不要求完全一致,因为单位可能变化)
        original_values = {re.search(r'\d+(?:\.\d+)?', n).group() 
                          for n in self.original_numbers if re.search(r'\d+', n)}
        translated_values = {re.search(r'\d+(?:\.\d+)?', n).group() 
                            for n in translated_numbers if re.search(r'\d+', n)}
        
        missing = original_values - translated_values
        new_numbers = translated_values - original_values
        
        if len(missing) > 1:
            warnings.append(f"数字丢失(可能被省略):{list(missing)[:5]}")
        if len(new_numbers) > 2:
            warnings.append(f"出现原文没有的数字(可能是幻觉):{list(new_numbers)[:5]}")
        
        return warnings

陷阱2:同一 SKU 的多语言版本不一致

批量生成时,同一产品的英文版和日文版可能对同一规格有不同的描述(因为是独立生成的)。

解决:先生成英文作为"中间语言",其他语言基于英文翻译,而不是各自基于中文:

# 生成顺序:中文 → 英文(核心版本) → 其他语言基于英文
async def generate_multilingual_sequential(sku_data: dict, languages: list):
    # 第一步:生成英文版本(质量最高)
    en_content = await generate_content(sku_data, 'en')
    
    # 第二步:其他语言基于英文版本生成
    other_languages = [l for l in languages if l != 'en']
    
    tasks = [
        generate_content(
            {**sku_data, 'source_language_content': en_content['translated_description']},
            lang
        )
        for lang in other_languages
    ]
    
    other_contents = await asyncio.gather(*tasks)
    
    return {'en': en_content, **dict(zip(other_languages, other_contents))}

批量内容的版本管理

# 支持内容版本历史,方便 A/B 测试和回滚
class ContentVersion:
    def save(self, product_id: str, language: str, content: dict, version: int):
        """保存内容版本"""
        pass
    
    def get_active(self, product_id: str, language: str) -> dict:
        """获取当前激活版本"""
        pass
    
    def rollback(self, product_id: str, language: str, version: int):
        """回滚到指定版本"""
        pass
    
    def ab_test(self, product_id: str, language: str, 
                version_a: int, version_b: int, 
                traffic_split: float = 0.5):
        """启动 A/B 测试"""
        pass

本章小结

核心要点

  1. 架构选择:< 1000 个任务用 Dify 内置批量运行;> 1000 个用 Celery + Dify API 的外部调度架构。

  2. 质量控制不可省略:批量生成必须有自动质量检查,Critical 问题必须自动重生成,而不是直接上线。

  3. 速率限制是关键约束:必须实现令牌桶算法控制 RPM 和 TPM,否则 OpenAI 会拒绝请求,任务大量失败。

  4. 幻觉在批量场景中被放大:100 个任务可能只有 1 个幻觉,但 10 万个任务就有 1000 个错误数据进生产库。

  5. 多语言一致性:使用"中间语言"策略(所有语言基于英文翻译),而不是各自独立翻译,保证版本一致性。

  6. 成本必须精确追踪:建立 Token 级别的成本追踪,方便对比不同模型和策略的 ROI。

关键参数参考(12 万 SKU 项目):

参数 配置值 原因
Celery Workers 50 平衡并发和限速
任务超时 180 秒 覆盖最慢的 LLM 响应
最大重试 3 次 临时错误可以恢复
Quality Score 阈值 70 低于 70 标记人工审核
自动重生成阈值 有 Critical 问题 严重问题必须重生成
RPM 控制 4000 (留 20% 缓冲) 避免触发限速
本章评分
4.8  / 5  (6 评分)

💬 留言讨论