Case Study: Content Generation Pipeline — Batch Processing and QA
Chapter 23: Project — Content Generation Pipeline — Batch Processing and Quality Control
Transform AI content generation from occasional usage into a core enterprise capability — this chapter shows you how to build a batch-capable, quality-assured, continuously improvable content generation pipeline.
Chapter Overview
Product detail pages, marketing copy, SEO content, product manual translations — content demand is nearly endless. After teams taste the power of LLM-generated content, their first bottleneck isn't "how to write well," but "how to write at scale" and "how to guarantee quality at scale."
This chapter is built around a real case: a cross-border e-commerce platform (120,000 SKUs) needed to translate and localize Chinese product content into 5 languages (English, Japanese, Spanish, German, French), while generating SEO titles, meta descriptions, and marketing copy in each language.
Business scale:
- 120,000 SKUs, approximately 500 Chinese characters per SKU
- ~300 new products listed daily, requiring immediate multilingual content
- Backlog of content needing batch processing: 120,000 SKUs within 30 days (~4,000 SKU/day)
- Quality targets: translation accuracy ≥ 95%, SEO content compliant with Google guidelines, marketing copy conversion rate ≤ 15% below human-written baseline
Level 1: Core Concepts (1–3 Years Experience)
Architecture Options for Content Pipelines
Option A: Dify Batch Run (built-in)
Dify Workflow supports "Batch Run" mode — upload a CSV file to process multiple records.
Pros: No additional development needed, simple to operate Cons: Limited concurrency, unsuitable for very large batches
Option B: Dify API + External Scheduler
Use Dify Workflow API with Celery/Airflow task queues for true production-grade batch processing.
Pros: Full control, supports resume-from-failure, error retry, progress monitoring Cons: Requires development effort
Option C: Local LangChain/LlamaIndex scripts
Bypass Dify, write batch processing scripts directly.
Pros: Maximum flexibility Cons: Loses Dify's prompt management, version control, and monitoring advantages
Recommendation: This company chose Option B (Dify API + Celery). This chapter covers that approach in detail.
Dify Workflow Design: Multilingual Content Generation
Design the workflow in Dify Console:
Inputs:
- product_id
- product_name (Chinese)
- product_description (Chinese)
- target_language (en/ja/es/de/fr)
- product_category
Workflow nodes:
1. Language config node (set language-specific rules)
2. Core information extraction node (extract key selling points)
3. Translation node (translate product description)
4. SEO content generation node (generate Title/Meta/H1)
5. Marketing copy node (generate 3 copy variations)
6. Quality check node (auto-check length, keywords, forbidden words)
7. Output merge node
Outputs:
- translated_description
- seo_title (≤ 60 characters)
- meta_description (≤ 160 characters)
- marketing_copy_a (value-focused)
- marketing_copy_b (emotion-focused)
- marketing_copy_c (promotion-focused)
- quality_score (0–100)
- quality_issues (list)
Core prompt examples:
# Node 2: Core information extraction
You are a product copywriter. Extract key information from this Chinese product description:
Product name: {{product_name}}
Description: {{product_description}}
Category: {{product_category}}
Output JSON:
{
"core_features": ["feature1", "feature2", "feature3"],
"target_audience": "target user description",
"key_benefits": ["benefit1", "benefit2"],
"technical_specs": {"spec_name": "spec_value"},
"unique_selling_point": "single most important differentiator"
}
# Node 4: SEO content generation (English)
Based on the following information, generate SEO-optimized content:
Product name (English): {{translated_title}}
Core features: {{core_features}}
Target keywords: {{target_keywords}}
Output JSON:
{
"seo_title": "SEO title containing primary keyword, 50-60 characters",
"meta_description": "Meta description with CTA, 140-160 characters",
"h1_tag": "Page H1 tag, slightly different from SEO title, natural language",
"suggested_keywords": ["keyword1", "keyword2"]
}
Level 2: Mechanism Deep Dive (3–5 Years Experience)
Production Batch Processing Architecture
+-----------------------------+
| Task Management DB |
| (PostgreSQL + Redis) |
+------------|----------------+
|
+------------|----------------+
| Task Scheduler |
| (Celery + Redis Broker) |
| - Priority queues |
| - Concurrency control |
| - Retry logic (3 retries) |
+------------|----------------+
|
+---------------+---------------+
v v v
+---------+ +---------+ +---------+
| Worker | | Worker | | Worker |
| (x10) | | (x10) | | (x10) |
+---------+ +---------+ +---------+
| | |
+---------------+---------------+
|
+------------|----------------+
| Dify Workflow API |
| (Load balanced + Rate |
| limited) |
+------------|----------------+
|
+------------|----------------+
| LLM Provider |
| (OpenAI / Claude) |
+-----------------------------+
Core Celery task:
from celery import Celery
import requests
import json
app = Celery('content_pipeline', broker='redis://redis:6379/2')
app.conf.update(
task_acks_late=True, # Acknowledge only after completion
worker_prefetch_multiplier=1, # One task at a time per worker
task_annotations={
'tasks.generate_content': {'rate_limit': '5/s'}
}
)
DIFY_API_URL = "https://dify.yourcompany.com/v1"
DIFY_WORKFLOW_TOKEN = "workflow-api-token"
@app.task(bind=True, max_retries=3, default_retry_delay=30,
soft_time_limit=120, time_limit=180)
def generate_content(self, task_id: int, sku_data: dict, target_language: str):
session = Session()
task = session.get(ContentTask, task_id)
try:
task.status = TaskStatus.PROCESSING
task.started_at = datetime.now()
session.commit()
response = call_dify_workflow(sku_data, target_language)
if response['status'] == 'succeeded':
outputs = response['outputs']
task.result = {k: outputs[k] for k in [
'translated_description', 'seo_title', 'meta_description',
'marketing_copy_a', 'marketing_copy_b', 'marketing_copy_c',
'quality_score', 'quality_issues'
]}
task.status = TaskStatus.COMPLETED
else:
raise Exception(f"Workflow failed: {response.get('error')}")
session.commit()
except Exception as exc:
task.status = TaskStatus.FAILED
task.error_message = str(exc)
session.commit()
if self.request.retries < self.max_retries:
raise self.retry(exc=exc)
finally:
session.close()
def call_dify_workflow(sku_data: dict, target_language: str) -> dict:
response = requests.post(
f'{DIFY_API_URL}/workflows/run',
headers={'Authorization': f'Bearer {DIFY_WORKFLOW_TOKEN}'},
json={
'inputs': {
'product_id': sku_data['product_id'],
'product_name': sku_data['product_name'],
'product_description': sku_data['product_description'],
'target_language': target_language,
'product_category': sku_data.get('category', ''),
},
'response_mode': 'blocking',
'user': 'batch_processor'
},
timeout=90
)
if response.status_code != 200:
raise Exception(f"Dify API error {response.status_code}: {response.text}")
return response.json()
Quality Control System
import re
from dataclasses import dataclass
from typing import List
@dataclass
class QualityIssue:
severity: str # critical/warning/info
rule: str
description: str
fix_suggestion: str
class ContentQualityChecker:
def __init__(self, language: str):
self.language = language
self.config = {
'en': {
'seo_title_min': 30, 'seo_title_max': 60,
'meta_desc_min': 120, 'meta_desc_max': 160,
'min_description_words': 100,
'forbidden_words': ['cheap', 'knockoff', 'replica', 'fake', 'counterfeit'],
'required_cta_patterns': [r'\b(buy|shop|order|get|discover)\b'],
}
}
def check(self, content: dict) -> tuple:
issues = []
score = 100
cfg = self.config.get(self.language, self.config['en'])
# Check SEO title length
title = content.get('seo_title', '')
if len(title) > cfg['seo_title_max']:
issues.append(QualityIssue(
'critical', 'seo_title_too_long',
f"SEO title too long ({len(title)} chars) — Google will truncate",
f"SEO title must be ≤ {cfg['seo_title_max']} characters"
))
score -= 20
# Check forbidden words
desc = content.get('translated_description', '').lower()
for word in cfg.get('forbidden_words', []):
if word.lower() in desc:
issues.append(QualityIssue(
'critical', 'forbidden_word',
f"Content contains forbidden word: '{word}'",
f"Remove or replace '{word}'"
))
score -= 30
# Check for missing CTA in marketing copy
for i, copy_key in enumerate(['marketing_copy_a', 'marketing_copy_b', 'marketing_copy_c']):
copy = content.get(copy_key, '')
has_cta = any(
re.search(p, copy, re.IGNORECASE)
for p in cfg.get('required_cta_patterns', [])
)
if not has_cta:
issues.append(QualityIssue(
'warning', f'missing_cta_copy_{i+1}',
f"Marketing copy {chr(65+i)} missing call-to-action",
"Add CTA phrases like 'Shop Now', 'Order Today'"
))
score -= 5
# Hallucination check: numbers present in original but missing in translation
orig_numbers = set(re.findall(r'\b\d+(?:\.\d+)?\b',
content.get('original_description', '')))
trans_numbers = set(re.findall(r'\b\d+(?:\.\d+)?\b',
content.get('translated_description', '')))
missing = orig_numbers - trans_numbers
if len(missing) > 2:
issues.append(QualityIssue(
'warning', 'potential_hallucination',
f"Numbers from original may be missing in translation: {list(missing)[:5]}",
"Check that all technical specs are preserved"
))
score -= 15
return max(0, score), issues
Auto-Regeneration on Quality Failure
async def process_with_quality_check(
task_id: int, sku_data: dict, target_language: str, max_attempts: int = 3
) -> dict:
checker = ContentQualityChecker(target_language)
previous_issues = []
for attempt in range(max_attempts):
feedback = ""
if attempt > 0:
feedback = "\n\nPlease avoid these quality issues from the previous attempt:\n"
for issue in previous_issues:
feedback += f"- {issue.description}: {issue.fix_suggestion}\n"
content = await call_dify_workflow(sku_data, target_language, extra_prompt=feedback)
score, issues = checker.check(content)
critical_issues = [i for i in issues if i.severity == 'critical']
if not critical_issues or attempt == max_attempts - 1:
content['quality_score'] = score
content['quality_issues'] = [i.__dict__ for i in issues]
content['generation_attempts'] = attempt + 1
if critical_issues:
content['requires_human_review'] = True
return content
previous_issues = issues
return content
Level 3: Source Code and Architecture (5+ Years)
Rate Limiting with Token Bucket Algorithm
import asyncio
import time
class TokenBucketRateLimiter:
def __init__(self, rpm_limit: int, tpm_limit: int):
self.rpm_limit = rpm_limit
self.tpm_limit = tpm_limit
self.request_tokens = float(rpm_limit)
self.token_tokens = float(tpm_limit)
self.request_refill_rate = rpm_limit / 60.0
self.token_refill_rate = tpm_limit / 60.0
self.last_refill = time.time()
self.lock = asyncio.Lock()
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
self.request_tokens = min(self.rpm_limit,
self.request_tokens + elapsed * self.request_refill_rate)
self.token_tokens = min(self.tpm_limit,
self.token_tokens + elapsed * self.token_refill_rate)
self.last_refill = now
async def acquire(self, estimated_tokens: int = 1000):
async with self.lock:
while True:
self._refill()
if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens:
self.request_tokens -= 1
self.token_tokens -= estimated_tokens
return
wait_needed = max(
(1 - self.request_tokens) / self.request_refill_rate,
(estimated_tokens - self.token_tokens) / self.token_refill_rate
)
await asyncio.sleep(min(wait_needed, 1.0))
# Global rate limiter for OpenAI Tier 3
rate_limiter = TokenBucketRateLimiter(rpm_limit=4000, tpm_limit=3_000_000)
Progress Monitoring API
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/api/batch/<job_id>/progress')
def get_progress(job_id: str):
with Session() as session:
total = session.query(ContentTask).filter_by(job_id=job_id).count()
completed = session.query(ContentTask).filter_by(
job_id=job_id, status=TaskStatus.COMPLETED).count()
failed = session.query(ContentTask).filter_by(
job_id=job_id, status=TaskStatus.FAILED).count()
return jsonify({
'job_id': job_id,
'total': total,
'completed': completed,
'failed': failed,
'progress_pct': round((completed + failed) / total * 100, 1) if total else 0,
'cost_usd_so_far': get_total_cost(job_id),
})
@app.route('/api/batch/<job_id>/resume', methods=['POST'])
def resume_job(job_id: str):
with Session() as session:
failed_tasks = session.query(ContentTask).filter_by(
job_id=job_id, status=TaskStatus.FAILED
).filter(ContentTask.retry_count < 3).all()
for task in failed_tasks:
sku_data = json.loads(task.input_data)
generate_content.apply_async(args=[task.id, sku_data, task.target_language])
task.status = TaskStatus.PENDING
session.commit()
return jsonify({'resumed_tasks': len(failed_tasks)})
Level 4: Production Traps and Decisions (Expert Perspective)
Real Case Data
After 3 months of running the batch content generation pipeline:
Cost analysis:
- 120,000 SKUs × 5 languages = 600,000 generation tasks
- Average ~3,500 tokens per task (2,000 input + 1,500 output)
- Using GPT-4 Turbo: 600,000 × 3,500 / 1,000 × $0.015 ≈ $31,500
- Compare to human translation cost: 600,000 words × 5 languages × $0.10/word ≈ $300,000
- 90% cost reduction
Quality outcomes:
- Human spot-check pass rate: 87% on first generation, 94% after quality control regeneration
- SEO performance: keyword rankings improved an average 12% vs. human-written content after 3 months
- Marketing copy conversion rate: ~8% below pure human baseline (within acceptable range; electronics category showed improvement)
Trap 1: Hallucinations Are Amplified in Batch Processing
Single-use LLM hallucinations are noticeable but manageable. In batch processing, even a 1% hallucination rate means 1,200 SKUs with fabricated specs in the database.
Prevention:
class HallucinationDetector:
def check(self, original: str, translated: str) -> list:
warnings = []
orig_values = set(re.findall(r'\b\d+(?:\.\d+)?\b', original))
trans_values = set(re.findall(r'\b\d+(?:\.\d+)?\b', translated))
missing = orig_values - trans_values
new_numbers = trans_values - orig_values
if len(missing) > 1:
warnings.append(f"Numbers dropped in translation: {list(missing)[:5]}")
if len(new_numbers) > 2:
warnings.append(f"New numbers appeared (possible hallucination): {list(new_numbers)[:5]}")
return warnings
Trap 2: Multilingual Version Inconsistency
When generating English and Japanese versions independently from Chinese, the same product spec may be described differently in each language.
Solution: Use English as the intermediate language — all other languages translate from English, not from Chinese:
async def generate_multilingual_sequential(sku_data: dict, languages: list):
# Step 1: Generate English as the master version
en_content = await generate_content(sku_data, 'en')
# Step 2: Other languages translate from English
other_languages = [l for l in languages if l != 'en']
tasks = [
generate_content(
{**sku_data, 'source_content': en_content['translated_description']},
lang
)
for lang in other_languages
]
other_contents = await asyncio.gather(*tasks)
return {'en': en_content, **dict(zip(other_languages, other_contents))}
Trap 3: No Version History for A/B Testing
Without version tracking, you cannot roll back poor-quality generated content or run A/B tests to compare AI vs. human content performance.
Always store version history and maintain the ability to activate, deactivate, or roll back content versions per SKU per language.
Chapter Summary
Key takeaways:
-
Choose the right architecture: Under 1,000 tasks → Dify built-in batch run; over 1,000 tasks → Celery + Dify API external scheduler.
-
Quality control is non-negotiable: Every batch must have automated quality checks. Critical failures must trigger automatic regeneration before content reaches production.
-
Rate limiting is the critical constraint: Implement a token bucket algorithm controlling both RPM and TPM, otherwise OpenAI will reject requests and tasks fail in bulk.
-
Hallucinations are amplified at scale: A 1% hallucination rate creates 1,200 bad records out of 120,000 SKUs. Build hallucination detection from day one.
-
Use an intermediate language for multilingual consistency: All language versions should derive from a single master (English), not each independently from the source (Chinese).
-
Track costs precisely: Token-level cost tracking enables accurate ROI comparisons across models and strategies.
Reference parameters (for a 120,000-SKU project):
| Parameter | Value | Reason |
|---|---|---|
| Celery workers | 50 | Balance concurrency vs. rate limits |
| Task timeout | 180 seconds | Covers slowest LLM responses |
| Max retries | 3 | Recovers from transient failures |
| Quality score threshold | 70 | Below 70 → flag for human review |
| Auto-regen trigger | Any critical issue | Never let critical failures reach production |
| RPM control | 4,000 (20% buffer) | Stay below OpenAI's 5,000 RPM Tier 3 limit |