Chapter 23

Case Study: Content Generation Pipeline — Batch Processing and QA

Chapter 23: Project — Content Generation Pipeline — Batch Processing and Quality Control

Transform AI content generation from occasional usage into a core enterprise capability — this chapter shows you how to build a batch-capable, quality-assured, continuously improvable content generation pipeline.

Chapter Overview

Product detail pages, marketing copy, SEO content, product manual translations — content demand is nearly endless. After teams taste the power of LLM-generated content, their first bottleneck isn't "how to write well," but "how to write at scale" and "how to guarantee quality at scale."

This chapter is built around a real case: a cross-border e-commerce platform (120,000 SKUs) needed to translate and localize Chinese product content into 5 languages (English, Japanese, Spanish, German, French), while generating SEO titles, meta descriptions, and marketing copy in each language.

Business scale:


Level 1: Core Concepts (1–3 Years Experience)

Architecture Options for Content Pipelines

Option A: Dify Batch Run (built-in)

Dify Workflow supports "Batch Run" mode — upload a CSV file to process multiple records.

Pros: No additional development needed, simple to operate Cons: Limited concurrency, unsuitable for very large batches

Option B: Dify API + External Scheduler

Use Dify Workflow API with Celery/Airflow task queues for true production-grade batch processing.

Pros: Full control, supports resume-from-failure, error retry, progress monitoring Cons: Requires development effort

Option C: Local LangChain/LlamaIndex scripts

Bypass Dify, write batch processing scripts directly.

Pros: Maximum flexibility Cons: Loses Dify's prompt management, version control, and monitoring advantages

Recommendation: This company chose Option B (Dify API + Celery). This chapter covers that approach in detail.

Dify Workflow Design: Multilingual Content Generation

Design the workflow in Dify Console:

Inputs:
  - product_id
  - product_name (Chinese)
  - product_description (Chinese)
  - target_language (en/ja/es/de/fr)
  - product_category

Workflow nodes:
  1. Language config node (set language-specific rules)
  2. Core information extraction node (extract key selling points)
  3. Translation node (translate product description)
  4. SEO content generation node (generate Title/Meta/H1)
  5. Marketing copy node (generate 3 copy variations)
  6. Quality check node (auto-check length, keywords, forbidden words)
  7. Output merge node

Outputs:
  - translated_description
  - seo_title (≤ 60 characters)
  - meta_description (≤ 160 characters)
  - marketing_copy_a (value-focused)
  - marketing_copy_b (emotion-focused)
  - marketing_copy_c (promotion-focused)
  - quality_score (0–100)
  - quality_issues (list)

Core prompt examples:

# Node 2: Core information extraction
You are a product copywriter. Extract key information from this Chinese product description:

Product name: {{product_name}}
Description: {{product_description}}
Category: {{product_category}}

Output JSON:
{
  "core_features": ["feature1", "feature2", "feature3"],
  "target_audience": "target user description",
  "key_benefits": ["benefit1", "benefit2"],
  "technical_specs": {"spec_name": "spec_value"},
  "unique_selling_point": "single most important differentiator"
}
# Node 4: SEO content generation (English)
Based on the following information, generate SEO-optimized content:

Product name (English): {{translated_title}}
Core features: {{core_features}}
Target keywords: {{target_keywords}}

Output JSON:
{
  "seo_title": "SEO title containing primary keyword, 50-60 characters",
  "meta_description": "Meta description with CTA, 140-160 characters",
  "h1_tag": "Page H1 tag, slightly different from SEO title, natural language",
  "suggested_keywords": ["keyword1", "keyword2"]
}

Level 2: Mechanism Deep Dive (3–5 Years Experience)

Production Batch Processing Architecture

            +-----------------------------+
            |     Task Management DB       |
            |    (PostgreSQL + Redis)      |
            +------------|----------------+
                         |
            +------------|----------------+
            |     Task Scheduler          |
            |  (Celery + Redis Broker)    |
            |  - Priority queues          |
            |  - Concurrency control      |
            |  - Retry logic (3 retries)  |
            +------------|----------------+
                         |
         +---------------+---------------+
         v               v               v
   +---------+     +---------+     +---------+
   | Worker  |     | Worker  |     | Worker  |
   | (x10)   |     | (x10)   |     | (x10)   |
   +---------+     +---------+     +---------+
         |               |               |
         +---------------+---------------+
                         |
            +------------|----------------+
            |     Dify Workflow API       |
            |   (Load balanced + Rate     |
            |    limited)                 |
            +------------|----------------+
                         |
            +------------|----------------+
            |       LLM Provider          |
            |    (OpenAI / Claude)        |
            +-----------------------------+

Core Celery task:

from celery import Celery
import requests
import json

app = Celery('content_pipeline', broker='redis://redis:6379/2')

app.conf.update(
    task_acks_late=True,       # Acknowledge only after completion
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_annotations={
        'tasks.generate_content': {'rate_limit': '5/s'}
    }
)

DIFY_API_URL = "https://dify.yourcompany.com/v1"
DIFY_WORKFLOW_TOKEN = "workflow-api-token"

@app.task(bind=True, max_retries=3, default_retry_delay=30,
          soft_time_limit=120, time_limit=180)
def generate_content(self, task_id: int, sku_data: dict, target_language: str):
    session = Session()
    task = session.get(ContentTask, task_id)
    
    try:
        task.status = TaskStatus.PROCESSING
        task.started_at = datetime.now()
        session.commit()
        
        response = call_dify_workflow(sku_data, target_language)
        
        if response['status'] == 'succeeded':
            outputs = response['outputs']
            task.result = {k: outputs[k] for k in [
                'translated_description', 'seo_title', 'meta_description',
                'marketing_copy_a', 'marketing_copy_b', 'marketing_copy_c',
                'quality_score', 'quality_issues'
            ]}
            task.status = TaskStatus.COMPLETED
        else:
            raise Exception(f"Workflow failed: {response.get('error')}")
        
        session.commit()
    
    except Exception as exc:
        task.status = TaskStatus.FAILED
        task.error_message = str(exc)
        session.commit()
        if self.request.retries < self.max_retries:
            raise self.retry(exc=exc)
    finally:
        session.close()

def call_dify_workflow(sku_data: dict, target_language: str) -> dict:
    response = requests.post(
        f'{DIFY_API_URL}/workflows/run',
        headers={'Authorization': f'Bearer {DIFY_WORKFLOW_TOKEN}'},
        json={
            'inputs': {
                'product_id': sku_data['product_id'],
                'product_name': sku_data['product_name'],
                'product_description': sku_data['product_description'],
                'target_language': target_language,
                'product_category': sku_data.get('category', ''),
            },
            'response_mode': 'blocking',
            'user': 'batch_processor'
        },
        timeout=90
    )
    if response.status_code != 200:
        raise Exception(f"Dify API error {response.status_code}: {response.text}")
    return response.json()

Quality Control System

import re
from dataclasses import dataclass
from typing import List

@dataclass
class QualityIssue:
    severity: str  # critical/warning/info
    rule: str
    description: str
    fix_suggestion: str

class ContentQualityChecker:
    def __init__(self, language: str):
        self.language = language
        self.config = {
            'en': {
                'seo_title_min': 30, 'seo_title_max': 60,
                'meta_desc_min': 120, 'meta_desc_max': 160,
                'min_description_words': 100,
                'forbidden_words': ['cheap', 'knockoff', 'replica', 'fake', 'counterfeit'],
                'required_cta_patterns': [r'\b(buy|shop|order|get|discover)\b'],
            }
        }
    
    def check(self, content: dict) -> tuple:
        issues = []
        score = 100
        cfg = self.config.get(self.language, self.config['en'])
        
        # Check SEO title length
        title = content.get('seo_title', '')
        if len(title) > cfg['seo_title_max']:
            issues.append(QualityIssue(
                'critical', 'seo_title_too_long',
                f"SEO title too long ({len(title)} chars) — Google will truncate",
                f"SEO title must be ≤ {cfg['seo_title_max']} characters"
            ))
            score -= 20
        
        # Check forbidden words
        desc = content.get('translated_description', '').lower()
        for word in cfg.get('forbidden_words', []):
            if word.lower() in desc:
                issues.append(QualityIssue(
                    'critical', 'forbidden_word',
                    f"Content contains forbidden word: '{word}'",
                    f"Remove or replace '{word}'"
                ))
                score -= 30
        
        # Check for missing CTA in marketing copy
        for i, copy_key in enumerate(['marketing_copy_a', 'marketing_copy_b', 'marketing_copy_c']):
            copy = content.get(copy_key, '')
            has_cta = any(
                re.search(p, copy, re.IGNORECASE)
                for p in cfg.get('required_cta_patterns', [])
            )
            if not has_cta:
                issues.append(QualityIssue(
                    'warning', f'missing_cta_copy_{i+1}',
                    f"Marketing copy {chr(65+i)} missing call-to-action",
                    "Add CTA phrases like 'Shop Now', 'Order Today'"
                ))
                score -= 5
        
        # Hallucination check: numbers present in original but missing in translation
        orig_numbers = set(re.findall(r'\b\d+(?:\.\d+)?\b', 
                                      content.get('original_description', '')))
        trans_numbers = set(re.findall(r'\b\d+(?:\.\d+)?\b',
                                       content.get('translated_description', '')))
        missing = orig_numbers - trans_numbers
        if len(missing) > 2:
            issues.append(QualityIssue(
                'warning', 'potential_hallucination',
                f"Numbers from original may be missing in translation: {list(missing)[:5]}",
                "Check that all technical specs are preserved"
            ))
            score -= 15
        
        return max(0, score), issues

Auto-Regeneration on Quality Failure

async def process_with_quality_check(
    task_id: int, sku_data: dict, target_language: str, max_attempts: int = 3
) -> dict:
    checker = ContentQualityChecker(target_language)
    previous_issues = []
    
    for attempt in range(max_attempts):
        feedback = ""
        if attempt > 0:
            feedback = "\n\nPlease avoid these quality issues from the previous attempt:\n"
            for issue in previous_issues:
                feedback += f"- {issue.description}: {issue.fix_suggestion}\n"
        
        content = await call_dify_workflow(sku_data, target_language, extra_prompt=feedback)
        score, issues = checker.check(content)
        critical_issues = [i for i in issues if i.severity == 'critical']
        
        if not critical_issues or attempt == max_attempts - 1:
            content['quality_score'] = score
            content['quality_issues'] = [i.__dict__ for i in issues]
            content['generation_attempts'] = attempt + 1
            if critical_issues:
                content['requires_human_review'] = True
            return content
        
        previous_issues = issues
    
    return content

Level 3: Source Code and Architecture (5+ Years)

Rate Limiting with Token Bucket Algorithm

import asyncio
import time

class TokenBucketRateLimiter:
    def __init__(self, rpm_limit: int, tpm_limit: int):
        self.rpm_limit = rpm_limit
        self.tpm_limit = tpm_limit
        self.request_tokens = float(rpm_limit)
        self.token_tokens = float(tpm_limit)
        self.request_refill_rate = rpm_limit / 60.0
        self.token_refill_rate = tpm_limit / 60.0
        self.last_refill = time.time()
        self.lock = asyncio.Lock()
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.request_tokens = min(self.rpm_limit,
                                   self.request_tokens + elapsed * self.request_refill_rate)
        self.token_tokens = min(self.tpm_limit,
                                 self.token_tokens + elapsed * self.token_refill_rate)
        self.last_refill = now
    
    async def acquire(self, estimated_tokens: int = 1000):
        async with self.lock:
            while True:
                self._refill()
                if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens:
                    self.request_tokens -= 1
                    self.token_tokens -= estimated_tokens
                    return
                wait_needed = max(
                    (1 - self.request_tokens) / self.request_refill_rate,
                    (estimated_tokens - self.token_tokens) / self.token_refill_rate
                )
                await asyncio.sleep(min(wait_needed, 1.0))

# Global rate limiter for OpenAI Tier 3
rate_limiter = TokenBucketRateLimiter(rpm_limit=4000, tpm_limit=3_000_000)

Progress Monitoring API

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/api/batch/<job_id>/progress')
def get_progress(job_id: str):
    with Session() as session:
        total = session.query(ContentTask).filter_by(job_id=job_id).count()
        completed = session.query(ContentTask).filter_by(
            job_id=job_id, status=TaskStatus.COMPLETED).count()
        failed = session.query(ContentTask).filter_by(
            job_id=job_id, status=TaskStatus.FAILED).count()
        
        return jsonify({
            'job_id': job_id,
            'total': total,
            'completed': completed,
            'failed': failed,
            'progress_pct': round((completed + failed) / total * 100, 1) if total else 0,
            'cost_usd_so_far': get_total_cost(job_id),
        })

@app.route('/api/batch/<job_id>/resume', methods=['POST'])
def resume_job(job_id: str):
    with Session() as session:
        failed_tasks = session.query(ContentTask).filter_by(
            job_id=job_id, status=TaskStatus.FAILED
        ).filter(ContentTask.retry_count < 3).all()
        
        for task in failed_tasks:
            sku_data = json.loads(task.input_data)
            generate_content.apply_async(args=[task.id, sku_data, task.target_language])
            task.status = TaskStatus.PENDING
        
        session.commit()
        return jsonify({'resumed_tasks': len(failed_tasks)})

Level 4: Production Traps and Decisions (Expert Perspective)

Real Case Data

After 3 months of running the batch content generation pipeline:

Cost analysis:

Quality outcomes:

Trap 1: Hallucinations Are Amplified in Batch Processing

Single-use LLM hallucinations are noticeable but manageable. In batch processing, even a 1% hallucination rate means 1,200 SKUs with fabricated specs in the database.

Prevention:

class HallucinationDetector:
    def check(self, original: str, translated: str) -> list:
        warnings = []
        orig_values = set(re.findall(r'\b\d+(?:\.\d+)?\b', original))
        trans_values = set(re.findall(r'\b\d+(?:\.\d+)?\b', translated))
        missing = orig_values - trans_values
        new_numbers = trans_values - orig_values
        
        if len(missing) > 1:
            warnings.append(f"Numbers dropped in translation: {list(missing)[:5]}")
        if len(new_numbers) > 2:
            warnings.append(f"New numbers appeared (possible hallucination): {list(new_numbers)[:5]}")
        return warnings

Trap 2: Multilingual Version Inconsistency

When generating English and Japanese versions independently from Chinese, the same product spec may be described differently in each language.

Solution: Use English as the intermediate language — all other languages translate from English, not from Chinese:

async def generate_multilingual_sequential(sku_data: dict, languages: list):
    # Step 1: Generate English as the master version
    en_content = await generate_content(sku_data, 'en')
    
    # Step 2: Other languages translate from English
    other_languages = [l for l in languages if l != 'en']
    tasks = [
        generate_content(
            {**sku_data, 'source_content': en_content['translated_description']},
            lang
        )
        for lang in other_languages
    ]
    other_contents = await asyncio.gather(*tasks)
    return {'en': en_content, **dict(zip(other_languages, other_contents))}

Trap 3: No Version History for A/B Testing

Without version tracking, you cannot roll back poor-quality generated content or run A/B tests to compare AI vs. human content performance.

Always store version history and maintain the ability to activate, deactivate, or roll back content versions per SKU per language.


Chapter Summary

Key takeaways:

  1. Choose the right architecture: Under 1,000 tasks → Dify built-in batch run; over 1,000 tasks → Celery + Dify API external scheduler.

  2. Quality control is non-negotiable: Every batch must have automated quality checks. Critical failures must trigger automatic regeneration before content reaches production.

  3. Rate limiting is the critical constraint: Implement a token bucket algorithm controlling both RPM and TPM, otherwise OpenAI will reject requests and tasks fail in bulk.

  4. Hallucinations are amplified at scale: A 1% hallucination rate creates 1,200 bad records out of 120,000 SKUs. Build hallucination detection from day one.

  5. Use an intermediate language for multilingual consistency: All language versions should derive from a single master (English), not each independently from the source (Chinese).

  6. Track costs precisely: Token-level cost tracking enables accurate ROI comparisons across models and strategies.

Reference parameters (for a 120,000-SKU project):

Parameter Value Reason
Celery workers 50 Balance concurrency vs. rate limits
Task timeout 180 seconds Covers slowest LLM responses
Max retries 3 Recovers from transient failures
Quality score threshold 70 Below 70 → flag for human review
Auto-regen trigger Any critical issue Never let critical failures reach production
RPM control 4,000 (20% buffer) Stay below OpenAI's 5,000 RPM Tier 3 limit
Rate this chapter
4.8  / 5  (6 ratings)

💬 Comments