实战:内容生成流水线——批量处理与质量控制
第23章:实战——内容生成流水线——批量处理与质量控制
把 AI 内容生成从"偶尔用用"变成企业的核心生产力——本章教你构建可批量运行、有质量保证、能持续迭代的内容生成流水线。
本章导读
电商详情页、营销文案、SEO 内容、产品说明书翻译……内容需求几乎无穷无尽。很多团队在尝到 LLM 生成内容的甜头后,面临的第一个瓶颈不是"怎么写好",而是"怎么批量写"和"怎么保证批量写的质量"。
本章围绕一个真实案例展开:某跨境电商平台(SKU 数量 12 万),需要将中文产品详情翻译并本地化为英文、日文、西班牙文、德文、法文 5 种语言,同时生成对应语言的 SEO 标题、Meta 描述和营销文案。
业务规模:
- 12 万 SKU,每个 SKU 约 500 字中文详情
- 每天有约 300 个新品上架,需要即时生成多语言内容
- 存量内容需要在 30 天内完成批量处理(4 万 SKU/天)
- 质量要求:翻译准确率 ≥ 95%,SEO 内容通过 Google 指南,营销文案转化率对比纯人工下降 ≤ 15%
Level 1:基础认知(1-3 年经验)
内容生成流水线的架构选择
选项A:Dify 批量运行(Batch Run)
Dify 工作流支持"批量运行"模式,可以上传 CSV 文件批量处理。
优点:无需额外开发,操作简单 缺点:并发数有限,不适合超大规模批量任务
选项B:Dify API + 外部调度器
通过 Dify Workflow API,结合 Celery/Airflow 等任务队列管理器,实现真正的生产级批量处理。
优点:可控性强,支持断点续传、错误重试、进度监控 缺点:需要开发工作
选项C:本地 LangChain/LlamaIndex 脚本
绕过 Dify,直接用 LangChain 编写批量处理脚本。
优点:灵活性最高 缺点:失去 Dify 的 Prompt 管理、版本控制、监控等优势
推荐:该企业采用 选项B(Dify API + Celery),本章重点介绍此方案。
Dify Workflow 设计:多语言内容生成
首先在 Dify Console 中设计工作流:
输入:
- product_id: 产品 ID
- product_name: 产品名称(中文)
- product_description: 产品描述(中文)
- target_language: 目标语言(en/ja/es/de/fr)
- product_category: 产品类别
工作流节点:
1. 语言适配配置节点(根据 target_language 设置语言规则)
2. 核心信息提取节点(提取产品核心卖点)
3. 翻译节点(翻译产品描述)
4. SEO 内容生成节点(生成 Title/Meta/H1)
5. 营销文案节点(生成 3 版本营销文案)
6. 质量检查节点(自动检查长度、关键词、禁词)
7. 输出整合节点
输出:
- translated_description: 翻译后的描述
- seo_title: SEO 标题(≤ 60 字符)
- meta_description: Meta 描述(≤ 160 字符)
- marketing_copy_a: 营销文案版本 A(主打价值)
- marketing_copy_b: 营销文案版本 B(主打情感)
- marketing_copy_c: 营销文案版本 C(主打促销)
- quality_score: 质量评分(0-100)
- quality_issues: 质量问题列表
各节点核心 Prompt(英文生成示例):
# 节点2:核心信息提取
你是一位产品文案专家。从以下中文产品描述中提取核心信息:
产品名称:{{product_name}}
产品描述:{{product_description}}
产品类别:{{product_category}}
请提取并输出 JSON:
{
"core_features": ["核心特性1", "核心特性2", "核心特性3"],
"target_audience": "目标用户描述",
"key_benefits": ["主要好处1", "主要好处2"],
"technical_specs": {"规格名": "规格值"},
"unique_selling_point": "最核心的差异化卖点"
}
# 节点3:翻译节点(英文)
你是一位专业的中英文产品文案翻译,专注于跨境电商领域。
翻译以下中文产品描述为英文,要求:
1. 语言地道自然,符合英语母语用户的阅读习惯
2. 保留所有技术参数和规格
3. 翻译品牌名称时保持一致(不翻译)
4. 适当使用 Amazon/Shopify 风格的电商语言
核心信息(已提取):{{extracted_info}}
原始描述:{{product_description}}
请直接输出翻译结果,不要加任何解释或注释:
# 节点4:SEO 内容生成(英文)
基于以下信息生成 SEO 优化内容:
产品名称(英文):{{translated_title}}
核心特性:{{core_features}}
目标关键词:{{target_keywords}}
输出 JSON:
{
"seo_title": "SEO 标题(必须包含主关键词,50-60字符)",
"meta_description": "Meta描述(包含CTA,140-160字符)",
"h1_tag": "页面H1标签(与SEO标题略有差异,自然语言)",
"suggested_keywords": ["建议关键词1", "建议关键词2"]
}
使用 Dify 批量运行模式(快速验证)
如果你的批量规模在 1000 个以下,可以先用 Dify 内置的批量运行:
- 准备 CSV 文件(列名与工作流输入变量对应):
product_id,product_name,product_description,target_language,product_category
SKU001,无线蓝牙耳机,"音质出色,续航20小时...",en,Electronics
SKU002,木质餐桌,"实木材质,可拆卸设计...",en,Furniture
- 在 Dify Workflow 页面点击"批量运行"
- 上传 CSV,设置并发数(建议 5-10)
- 下载结果 CSV
Level 2:机制深解(3-5 年经验)
生产级批量处理系统架构
对于 12 万 SKU 的规模,需要一个真正的生产级批量处理系统:
┌─────────────────────────────┐
│ 任务管理数据库 │
│ (PostgreSQL + Redis) │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ 任务调度器 │
│ (Celery + Redis Broker) │
│ - 优先级队列 │
│ - 并发控制(50 workers) │
│ - 重试机制(最多3次) │
└──────────────┬──────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker │ │ Worker │ │ Worker │
│ (x10) │ │ (x10) │ │ (x10) │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└─────────────────┼─────────────────┘
│
┌──────────────▼──────────────┐
│ Dify Workflow API │
│ (负载均衡 + 限速控制) │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ LLM Provider │
│ (OpenAI / Claude) │
└─────────────────────────────┘
核心代码实现:
# tasks/content_generation.py — Celery 任务定义
from celery import Celery
from celery.utils.log import get_task_logger
import requests
import time
import json
from database import Session, ContentTask, TaskStatus
app = Celery('content_pipeline', broker='redis://redis:6379/2')
logger = get_task_logger(__name__)
# 配置 Celery
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_backend='redis://redis:6379/3',
task_acks_late=True, # 任务执行完才确认,防止丢失
worker_prefetch_multiplier=1, # 每个 worker 一次只取一个任务
task_routes={
'tasks.generate_content': {'queue': 'content'},
'tasks.quality_check': {'queue': 'quality'},
},
# 限速:每秒最多处理 5 个任务(防止触发 OpenAI 限速)
task_annotations={
'tasks.content_generation.generate_content': {'rate_limit': '5/s'}
}
)
DIFY_API_URL = "https://dify.yourcompany.com/v1"
DIFY_WORKFLOW_TOKEN = "workflow-api-token"
@app.task(
bind=True,
max_retries=3,
default_retry_delay=30, # 重试间隔 30 秒
soft_time_limit=120, # 2 分钟软超时
time_limit=180, # 3 分钟硬超时
name='tasks.generate_content'
)
def generate_content(self, task_id: int, sku_data: dict, target_language: str):
"""
为单个 SKU 生成多语言内容
Args:
task_id: 数据库任务 ID
sku_data: SKU 数据字典
target_language: 目标语言代码
"""
session = Session()
task = session.get(ContentTask, task_id)
try:
# 更新任务状态
task.status = TaskStatus.PROCESSING
task.started_at = datetime.now()
session.commit()
# 调用 Dify Workflow API
response = call_dify_workflow(sku_data, target_language)
if response['status'] == 'succeeded':
outputs = response['outputs']
# 保存结果
task.result = {
'translated_description': outputs['translated_description'],
'seo_title': outputs['seo_title'],
'meta_description': outputs['meta_description'],
'marketing_copy_a': outputs['marketing_copy_a'],
'marketing_copy_b': outputs['marketing_copy_b'],
'marketing_copy_c': outputs['marketing_copy_c'],
'quality_score': outputs['quality_score'],
'quality_issues': outputs['quality_issues'],
}
task.status = TaskStatus.COMPLETED
task.tokens_used = response.get('total_tokens', 0)
task.cost_usd = response.get('total_steps', 0) * 0.001 # 估算
else:
raise Exception(f"Dify workflow failed: {response.get('error')}")
session.commit()
logger.info(f"Task {task_id} completed for SKU {sku_data['product_id']}")
except Exception as exc:
task.status = TaskStatus.FAILED
task.error_message = str(exc)
task.retry_count = (task.retry_count or 0) + 1
session.commit()
# 重试(最多3次)
if self.request.retries < self.max_retries:
logger.warning(f"Task {task_id} failed, retrying... ({self.request.retries + 1}/3)")
raise self.retry(exc=exc)
else:
logger.error(f"Task {task_id} permanently failed after 3 retries")
finally:
session.close()
def call_dify_workflow(sku_data: dict, target_language: str, timeout: int = 90) -> dict:
"""调用 Dify Workflow API"""
headers = {
'Authorization': f'Bearer {DIFY_WORKFLOW_TOKEN}',
'Content-Type': 'application/json'
}
payload = {
'inputs': {
'product_id': sku_data['product_id'],
'product_name': sku_data['product_name'],
'product_description': sku_data['product_description'],
'target_language': target_language,
'product_category': sku_data.get('category', ''),
'target_keywords': sku_data.get('keywords', ''),
},
'response_mode': 'blocking',
'user': 'batch_processor'
}
start_time = time.time()
response = requests.post(
f'{DIFY_API_URL}/workflows/run',
headers=headers,
json=payload,
timeout=timeout
)
elapsed = time.time() - start_time
logger.info(f"Dify API call took {elapsed:.2f}s for product {sku_data['product_id']}")
if response.status_code != 200:
raise Exception(f"Dify API error {response.status_code}: {response.text}")
return response.json()
批量任务调度器:
# scheduler.py — 批量任务调度
from sqlalchemy import create_engine
from models import ContentTask, TaskStatus
import pandas as pd
def create_batch_job(
csv_file: str,
target_languages: list = ['en', 'ja', 'es', 'de', 'fr'],
priority: str = 'normal'
) -> dict:
"""
从 CSV 文件创建批量任务
Args:
csv_file: SKU 数据 CSV 文件路径
target_languages: 目标语言列表
priority: 优先级(high/normal/low)
Returns:
job_info: 任务统计信息
"""
df = pd.read_csv(csv_file)
required_columns = ['product_id', 'product_name', 'product_description', 'category']
# 验证列
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"CSV 缺少必要列:{missing}")
total_tasks = len(df) * len(target_languages)
print(f"准备创建 {len(df)} 个 SKU × {len(target_languages)} 种语言 = {total_tasks} 个任务")
with Session() as session:
tasks_created = 0
for _, row in df.iterrows():
sku_data = row.to_dict()
for lang in target_languages:
# 检查是否已有相同任务(幂等性)
existing = session.query(ContentTask).filter_by(
product_id=sku_data['product_id'],
target_language=lang,
status=TaskStatus.COMPLETED
).first()
if existing:
continue # 跳过已完成的
# 创建任务记录
task = ContentTask(
product_id=sku_data['product_id'],
target_language=lang,
input_data=json.dumps(sku_data),
status=TaskStatus.PENDING,
priority=priority
)
session.add(task)
session.flush()
# 提交到 Celery 队列
celery_task = generate_content.apply_async(
args=[task.id, sku_data, lang],
priority={'high': 9, 'normal': 5, 'low': 1}[priority],
countdown=0 # 立即执行
)
task.celery_task_id = celery_task.id
tasks_created += 1
session.commit()
return {
'total_skus': len(df),
'target_languages': target_languages,
'tasks_created': tasks_created,
'estimated_completion': estimate_completion_time(tasks_created)
}
def estimate_completion_time(task_count: int) -> str:
"""估算完成时间"""
# 基于:50 workers,每个任务平均 15 秒(含 LLM 生成时间)
workers = 50
avg_task_time = 15 # 秒
# 考虑 OpenAI 限速,实际并发约 30
effective_concurrency = 30
total_seconds = (task_count / effective_concurrency) * avg_task_time
hours = total_seconds / 3600
return f"约 {hours:.1f} 小时"
质量控制体系
批量生成内容必须有自动化质量控制,否则大量劣质内容会直接上线:
# quality_control.py — 内容质量自动检查
import re
from dataclasses import dataclass
from typing import List
@dataclass
class QualityIssue:
severity: str # critical/warning/info
rule: str
description: str
fix_suggestion: str
class ContentQualityChecker:
def __init__(self, language: str):
self.language = language
self.load_config(language)
def load_config(self, language: str):
"""加载语言相关的质量规则"""
self.config = {
'en': {
'seo_title_min': 30,
'seo_title_max': 60,
'meta_desc_min': 120,
'meta_desc_max': 160,
'min_description_words': 100,
'max_description_words': 500,
'forbidden_words': ['cheap', 'knockoff', 'replica', 'fake', 'counterfeit'],
'required_cta_patterns': [r'\b(buy|shop|order|get|discover)\b'],
'max_consecutive_caps': 3, # 连续大写单词不超过3个
},
'ja': {
'seo_title_min': 10,
'seo_title_max': 40,
'meta_desc_min': 60,
'meta_desc_max': 120,
'min_description_chars': 200,
'forbidden_words': ['安い', '偽物'],
'required_cta_patterns': [r'(購入|注文|今すぐ|詳細)'],
}
}
def check(self, content: dict) -> tuple[int, List[QualityIssue]]:
"""
检查生成内容质量
Returns:
(quality_score, issues)
"""
issues = []
score = 100
lang_config = self.config.get(self.language, self.config['en'])
# 检查1:SEO 标题长度
seo_title = content.get('seo_title', '')
if len(seo_title) < lang_config['seo_title_min']:
issues.append(QualityIssue(
severity='warning',
rule='seo_title_too_short',
description=f"SEO 标题过短 ({len(seo_title)} 字符)",
fix_suggestion=f"SEO 标题应在 {lang_config['seo_title_min']}-{lang_config['seo_title_max']} 字符之间"
))
score -= 10
elif len(seo_title) > lang_config['seo_title_max']:
issues.append(QualityIssue(
severity='critical',
rule='seo_title_too_long',
description=f"SEO 标题过长 ({len(seo_title)} 字符),Google 会截断",
fix_suggestion=f"SEO 标题必须 ≤ {lang_config['seo_title_max']} 字符"
))
score -= 20
# 检查2:Meta 描述长度
meta_desc = content.get('meta_description', '')
if len(meta_desc) > lang_config['meta_desc_max']:
issues.append(QualityIssue(
severity='critical',
rule='meta_too_long',
description=f"Meta 描述过长 ({len(meta_desc)} 字符)",
fix_suggestion=f"Meta 描述必须 ≤ {lang_config['meta_desc_max']} 字符"
))
score -= 15
# 检查3:禁词检测
desc = content.get('translated_description', '').lower()
for word in lang_config.get('forbidden_words', []):
if word.lower() in desc:
issues.append(QualityIssue(
severity='critical',
rule='forbidden_word',
description=f"内容包含禁词:'{word}'",
fix_suggestion=f"移除或替换禁词 '{word}'"
))
score -= 30
# 检查4:内容长度
if self.language == 'en':
word_count = len(desc.split())
if word_count < lang_config['min_description_words']:
issues.append(QualityIssue(
severity='warning',
rule='description_too_short',
description=f"描述词数过少 ({word_count} 词)",
fix_suggestion=f"描述应至少包含 {lang_config['min_description_words']} 词"
))
score -= 10
# 检查5:是否包含 CTA
marketing_copies = [
content.get('marketing_copy_a', ''),
content.get('marketing_copy_b', ''),
content.get('marketing_copy_c', ''),
]
cta_patterns = lang_config.get('required_cta_patterns', [])
if cta_patterns:
for i, copy in enumerate(marketing_copies):
has_cta = any(
re.search(pattern, copy, re.IGNORECASE)
for pattern in cta_patterns
)
if not has_cta:
issues.append(QualityIssue(
severity='warning',
rule=f'missing_cta_copy_{i+1}',
description=f"营销文案 {chr(65+i)} 缺少行动号召(CTA)",
fix_suggestion="添加如 'Shop Now'、'Order Today' 等 CTA 词语"
))
score -= 5
# 检查6:连续大写(英文)
if self.language == 'en':
caps_pattern = r'\b[A-Z]{4,}\b'
caps_matches = re.findall(caps_pattern, desc)
if len(caps_matches) > lang_config.get('max_consecutive_caps', 3):
issues.append(QualityIssue(
severity='warning',
rule='excessive_caps',
description=f"过多全大写单词:{caps_matches[:5]}",
fix_suggestion="全大写单词应仅用于品牌名或缩写"
))
score -= 5
# 检查7:AI 幻觉检测(关键数字一致性)
# 提取描述中的数字
original_numbers = set(re.findall(r'\d+(?:\.\d+)?',
content.get('original_description', '')))
translated_numbers = set(re.findall(r'\d+(?:\.\d+)?',
content.get('translated_description', '')))
# 检查是否有数字在翻译中消失(可能是幻觉)
missing_numbers = original_numbers - translated_numbers
if len(missing_numbers) > 2: # 允许少量差异(如格式变化)
issues.append(QualityIssue(
severity='warning',
rule='potential_hallucination',
description=f"翻译中可能丢失了数据:{list(missing_numbers)[:5]}",
fix_suggestion="检查翻译是否保留了所有技术规格数字"
))
score -= 15
return max(0, score), issues
自动重生成失败内容
当质量检查发现 Critical 问题时,自动触发重生成:
# auto_regen.py — 自动重生成流程
async def process_with_quality_check(
task_id: int,
sku_data: dict,
target_language: str,
max_attempts: int = 3
) -> dict:
"""
生成内容并自动检查质量,质量不达标时重生成
"""
checker = ContentQualityChecker(target_language)
for attempt in range(max_attempts):
logger.info(f"Task {task_id}: 第 {attempt+1} 次尝试生成")
# 生成内容(第2次起附加质量问题反馈)
feedback = ""
if attempt > 0:
prev_issues = previous_issues
feedback = f"\n\n请注意,上次生成的内容有以下质量问题,请避免:\n"
for issue in prev_issues:
feedback += f"- {issue.description}:{issue.fix_suggestion}\n"
content = await call_dify_workflow(sku_data, target_language, extra_prompt=feedback)
# 质量检查
score, issues = checker.check(content)
critical_issues = [i for i in issues if i.severity == 'critical']
if not critical_issues or attempt == max_attempts - 1:
# 没有严重问题,或已达最大尝试次数
content['quality_score'] = score
content['quality_issues'] = [i.__dict__ for i in issues]
content['generation_attempts'] = attempt + 1
if critical_issues and attempt == max_attempts - 1:
content['requires_human_review'] = True
logger.warning(f"Task {task_id}: 达到最大重试次数,标记需要人工审核")
return content
# 记录问题,准备重试
previous_issues = issues
logger.warning(
f"Task {task_id}: 第 {attempt+1} 次生成质量不达标 "
f"(score={score}, critical issues={len(critical_issues)}),重新生成"
)
return content
Level 3:源码与原理(5 年以上)
速率限制与成本优化
批量处理的最大挑战是 OpenAI API 的速率限制(Rate Limit):
理解 OpenAI 限速层级:
Tier 1(默认): TPM 200K tokens/min, RPM 500 requests/min
Tier 2($50+): TPM 2M tokens/min, RPM 5,000 requests/min
Tier 3($100+): TPM 4M tokens/min, RPM 5,000 requests/min
Tier 4($500+): TPM 10M tokens/min, RPM 10,000 requests/min
令牌桶算法实现速率控制:
# rate_limiter.py — 令牌桶算法
import asyncio
import time
class TokenBucketRateLimiter:
"""
令牌桶速率限制器
同时限制 RPM(请求/分钟)和 TPM(Token/分钟)
"""
def __init__(self, rpm_limit: int, tpm_limit: int):
self.rpm_limit = rpm_limit
self.tpm_limit = tpm_limit
# 请求速率桶
self.request_tokens = rpm_limit
self.request_refill_rate = rpm_limit / 60 # 每秒补充
self.request_last_refill = time.time()
# Token 速率桶
self.token_tokens = tpm_limit
self.token_refill_rate = tpm_limit / 60
self.token_last_refill = time.time()
self.lock = asyncio.Lock()
def _refill(self):
"""补充令牌"""
now = time.time()
# 补充请求令牌
elapsed = now - self.request_last_refill
self.request_tokens = min(
self.rpm_limit,
self.request_tokens + elapsed * self.request_refill_rate
)
self.request_last_refill = now
# 补充 Token 令牌
elapsed = now - self.token_last_refill
self.token_tokens = min(
self.tpm_limit,
self.token_tokens + elapsed * self.token_refill_rate
)
self.token_last_refill = now
async def acquire(self, estimated_tokens: int = 1000):
"""
获取执行权限(阻塞等待,直到有足够令牌)
Args:
estimated_tokens: 预估本次请求消耗的 Token 数
"""
async with self.lock:
while True:
self._refill()
if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens:
self.request_tokens -= 1
self.token_tokens -= estimated_tokens
return
# 计算需要等待的时间
request_wait = max(0, (1 - self.request_tokens) / self.request_refill_rate)
token_wait = max(0, (estimated_tokens - self.token_tokens) / self.token_refill_rate)
wait_time = max(request_wait, token_wait)
await asyncio.sleep(min(wait_time, 1.0)) # 最多等 1 秒再检查
# 全局速率限制器(Tier 3 限制)
rate_limiter = TokenBucketRateLimiter(rpm_limit=4000, tpm_limit=3_000_000)
async def rate_limited_dify_call(sku_data: dict, language: str) -> dict:
"""带速率限制的 Dify 调用"""
estimated_tokens = estimate_tokens(sku_data['product_description']) + 2000
await rate_limiter.acquire(estimated_tokens)
return await call_dify_workflow_async(sku_data, language)
断点续传与进度监控
# progress_monitor.py — 进度监控与断点续传
from flask import Flask, jsonify
import redis
app = Flask(__name__)
redis_client = redis.Redis()
@app.route('/api/batch/<job_id>/progress')
def get_progress(job_id: str):
"""获取批量任务进度"""
with Session() as session:
total = session.query(ContentTask).filter_by(job_id=job_id).count()
completed = session.query(ContentTask).filter_by(
job_id=job_id, status=TaskStatus.COMPLETED
).count()
failed = session.query(ContentTask).filter_by(
job_id=job_id, status=TaskStatus.FAILED
).count()
processing = session.query(ContentTask).filter_by(
job_id=job_id, status=TaskStatus.PROCESSING
).count()
# 计算进度和估算剩余时间
done_ratio = (completed + failed) / total if total > 0 else 0
# 从 Redis 获取速率统计
recent_count = redis_client.llen(f"job:{job_id}:recent_completions")
return jsonify({
'job_id': job_id,
'total': total,
'completed': completed,
'failed': failed,
'processing': processing,
'pending': total - completed - failed - processing,
'progress_pct': round(done_ratio * 100, 1),
'avg_quality_score': get_avg_quality_score(job_id),
'estimated_remaining_hours': estimate_remaining(
total - completed - failed, recent_count
),
'cost_usd_so_far': get_total_cost(job_id),
})
@app.route('/api/batch/<job_id>/resume', methods=['POST'])
def resume_job(job_id: str):
"""恢复失败的任务"""
with Session() as session:
failed_tasks = session.query(ContentTask).filter_by(
job_id=job_id,
status=TaskStatus.FAILED,
).filter(ContentTask.retry_count < 3).all()
resumed = 0
for task in failed_tasks:
sku_data = json.loads(task.input_data)
generate_content.apply_async(
args=[task.id, sku_data, task.target_language],
priority=5
)
task.status = TaskStatus.PENDING
resumed += 1
session.commit()
return jsonify({'resumed_tasks': resumed})
成本精确追踪
# cost_tracker.py — 精确追踪批量任务成本
OPENAI_PRICING = {
'gpt-4': {'input': 0.03 / 1000, 'output': 0.06 / 1000},
'gpt-4-turbo': {'input': 0.01 / 1000, 'output': 0.03 / 1000},
'gpt-3.5-turbo': {'input': 0.001 / 1000, 'output': 0.002 / 1000},
}
def calculate_job_cost(job_id: str) -> dict:
"""计算批量任务总成本"""
with Session() as session:
tasks = session.query(ContentTask).filter_by(
job_id=job_id,
status=TaskStatus.COMPLETED
).all()
cost_breakdown = {}
total_cost = 0
total_input_tokens = 0
total_output_tokens = 0
for task in tasks:
if task.token_details:
details = json.loads(task.token_details)
model = details.get('model', 'gpt-4-turbo')
input_tokens = details.get('prompt_tokens', 0)
output_tokens = details.get('completion_tokens', 0)
pricing = OPENAI_PRICING.get(model, OPENAI_PRICING['gpt-4-turbo'])
task_cost = (
input_tokens * pricing['input'] +
output_tokens * pricing['output']
)
cost_breakdown[model] = cost_breakdown.get(model, 0) + task_cost
total_cost += task_cost
total_input_tokens += input_tokens
total_output_tokens += output_tokens
return {
'job_id': job_id,
'total_cost_usd': round(total_cost, 2),
'cost_per_sku': round(total_cost / len(tasks), 4) if tasks else 0,
'total_input_tokens': total_input_tokens,
'total_output_tokens': total_output_tokens,
'cost_breakdown_by_model': cost_breakdown,
}
Level 4:生产陷阱与决策(专家视角)
真实案例数据
该跨境电商在运行了 3 个月批量内容生成后:
成本统计:
- 12 万 SKU × 5 语言 = 60 万个生成任务
- 平均每个任务消耗约 3,500 tokens(输入 2000 + 输出 1500)
- 使用 GPT-4 Turbo:60 万 × 3,500 / 1000 × $0.015 ≈ $31,500
- 对比人工翻译成本:60 万字 × 5 语言 × $0.1/字 ≈ $300,000
- 节省 90%
质量评估:
- AI 内容人工抽检通过率:首次生成 87%,经质量控制重生成后 94%
- SEO 内容:3 个月后关键词排名较人工内容平均提升 12%(得益于系统化关键词覆盖)
- 营销文案转化率:对比纯人工下降约 8%(在可接受范围内),某些品类(电子产品)甚至有所提升
陷阱1:批量任务中的幻觉问题更严重
单次使用时,LLM 幻觉(编造规格参数)不明显,但批量运行时很快会出现大量错误数据进入生产数据库。
预防措施:
class HallucinationDetector:
def __init__(self, original_content: str):
self.original_numbers = self._extract_numbers(original_content)
self.original_brand_names = self._extract_brand_names(original_content)
def _extract_numbers(self, text: str) -> set:
"""提取文本中的数字(包括小数)"""
return set(re.findall(r'\b\d+(?:\.\d+)?\s*(?:cm|mm|kg|g|W|V|mAh|MHz|GB|TB)?\b', text))
def check(self, translated_content: str) -> list:
"""检查翻译中是否有幻觉迹象"""
warnings = []
translated_numbers = self._extract_numbers(translated_content)
# 检查原文数字是否在译文中(不要求完全一致,因为单位可能变化)
original_values = {re.search(r'\d+(?:\.\d+)?', n).group()
for n in self.original_numbers if re.search(r'\d+', n)}
translated_values = {re.search(r'\d+(?:\.\d+)?', n).group()
for n in translated_numbers if re.search(r'\d+', n)}
missing = original_values - translated_values
new_numbers = translated_values - original_values
if len(missing) > 1:
warnings.append(f"数字丢失(可能被省略):{list(missing)[:5]}")
if len(new_numbers) > 2:
warnings.append(f"出现原文没有的数字(可能是幻觉):{list(new_numbers)[:5]}")
return warnings
陷阱2:同一 SKU 的多语言版本不一致
批量生成时,同一产品的英文版和日文版可能对同一规格有不同的描述(因为是独立生成的)。
解决:先生成英文作为"中间语言",其他语言基于英文翻译,而不是各自基于中文:
# 生成顺序:中文 → 英文(核心版本) → 其他语言基于英文
async def generate_multilingual_sequential(sku_data: dict, languages: list):
# 第一步:生成英文版本(质量最高)
en_content = await generate_content(sku_data, 'en')
# 第二步:其他语言基于英文版本生成
other_languages = [l for l in languages if l != 'en']
tasks = [
generate_content(
{**sku_data, 'source_language_content': en_content['translated_description']},
lang
)
for lang in other_languages
]
other_contents = await asyncio.gather(*tasks)
return {'en': en_content, **dict(zip(other_languages, other_contents))}
批量内容的版本管理
# 支持内容版本历史,方便 A/B 测试和回滚
class ContentVersion:
def save(self, product_id: str, language: str, content: dict, version: int):
"""保存内容版本"""
pass
def get_active(self, product_id: str, language: str) -> dict:
"""获取当前激活版本"""
pass
def rollback(self, product_id: str, language: str, version: int):
"""回滚到指定版本"""
pass
def ab_test(self, product_id: str, language: str,
version_a: int, version_b: int,
traffic_split: float = 0.5):
"""启动 A/B 测试"""
pass
本章小结
核心要点:
-
架构选择:< 1000 个任务用 Dify 内置批量运行;> 1000 个用 Celery + Dify API 的外部调度架构。
-
质量控制不可省略:批量生成必须有自动质量检查,Critical 问题必须自动重生成,而不是直接上线。
-
速率限制是关键约束:必须实现令牌桶算法控制 RPM 和 TPM,否则 OpenAI 会拒绝请求,任务大量失败。
-
幻觉在批量场景中被放大:100 个任务可能只有 1 个幻觉,但 10 万个任务就有 1000 个错误数据进生产库。
-
多语言一致性:使用"中间语言"策略(所有语言基于英文翻译),而不是各自独立翻译,保证版本一致性。
-
成本必须精确追踪:建立 Token 级别的成本追踪,方便对比不同模型和策略的 ROI。
关键参数参考(12 万 SKU 项目):
| 参数 | 配置值 | 原因 |
|---|---|---|
| Celery Workers | 50 | 平衡并发和限速 |
| 任务超时 | 180 秒 | 覆盖最慢的 LLM 响应 |
| 最大重试 | 3 次 | 临时错误可以恢复 |
| Quality Score 阈值 | 70 | 低于 70 标记人工审核 |
| 自动重生成阈值 | 有 Critical 问题 | 严重问题必须重生成 |
| RPM 控制 | 4000 (留 20% 缓冲) | 避免触发限速 |