第 75 章

数据飞轮:持续改进的闭环机制

第七十五章:数据飞轮:持续改进的闭环机制

章节导语

飞轮(Flywheel)是工业时代最重要的发明之一——它利用旋转惯性,将断续的动力输入转化为平滑的持续输出。亚马逊创始人贝佐斯将飞轮效应引入商业战略:低价 → 更多客户 → 更多卖家 → 更低成本 → 更低价格。而在 AI Agent 时代,数据飞轮具有同样的惊人力量:更多使用 → 更多轨迹 → 更好的训练数据 → 更好的模型 → 更多使用。本章将系统设计 Hermes Agent 的数据飞轮架构,从冷启动、数据筛选、A/B 测试到企业级部署,构建真正能够自主加速的 AI 改进引擎。


75.1 数据飞轮的完整闭环设计

飞轮闭环的六个环节

┌─────────────────────────────────────────────────────────────┐
│                     数据飞轮闭环                             │
│                                                             │
│                    ① 用户使用                               │
│                  ╱           ╲                              │
│          ⑥ 更好使用          ② 产生轨迹                    │
│               │                  │                          │
│               │   数据飞轮       │                          │
│               │   ↻ ↻ ↻         │                          │
│          ⑤ 部署上线          ③ 隐式反馈                    │
│                  ╲           ╱                              │
│                    ④ 数据筛选                               │
│                    ↓                                        │
│                   训练微调                                   │
└─────────────────────────────────────────────────────────────┘

① 用户使用:用户通过 Hermes Agent 完成各类任务
② 产生轨迹:系统自动记录完整的 Agent 运行轨迹
③ 隐式反馈:从用户行为中提取质量信号(无需主动评分)
④ 数据筛选:过滤低质量轨迹,平衡训练数据分布
   ↓ 微调训练(Atropos RL)
⑤ 部署上线:新模型经 A/B 测试验证后上线
⑥ 更好使用:更好的模型吸引更多用户,产生更多轨迹

隐式反馈信号设计

与传统 RLHF 不同,数据飞轮的核心是从用户行为中自动提取质量信号,无需用户主动打分。

信号类型 来源行为 正信号(好) 负信号(差)
任务完成率 用户是否接受了 Agent 的最终输出 直接采纳 多次要求重试
会话长度 完成任务所需的交互轮数 1-3轮完成 10+ 轮仍未完成
用户重做率 输出后用户是否手动修改 无修改采纳 大幅修改后使用
会话继续 完成后是否继续使用 继续追加任务 立即退出
错误报告 用户是否报告了错误 无投诉 提交错误反馈
分享行为 是否分享了 Agent 的输出 分享/引用 不使用
# flywheel/feedback_extractor.py
from dataclasses import dataclass
from typing import Optional
from datetime import datetime

@dataclass
class ImplicitFeedback:
    """从用户行为提取的隐式反馈"""
    session_id: str
    trajectory_id: str
    
    # 行为信号
    task_accepted: bool = False      # 用户是否接受最终输出
    retry_count: int = 0             # 重试次数
    manual_edit_ratio: float = 0.0  # 手动修改比例 (0-1)
    session_continued: bool = False  # 完成后是否继续使用
    error_reported: bool = False     # 是否报告错误
    completion_time_s: float = 0.0  # 任务完成时间
    
    # 计算综合质量分
    def quality_score(self) -> float:
        score = 0.5  # 基础分
        
        if self.task_accepted:
            score += 0.3
        else:
            score -= 0.3
        
        if self.retry_count == 0:
            score += 0.1
        elif self.retry_count >= 3:
            score -= 0.2
        
        # 修改比例越高,质量越差
        score -= self.manual_edit_ratio * 0.3
        
        if self.session_continued:
            score += 0.1
        
        if self.error_reported:
            score -= 0.4
        
        return max(-1.0, min(1.0, score))  # 裁剪到 [-1, 1]


class FeedbackCollector:
    """收集和聚合隐式反馈"""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    def record_session_event(
        self,
        session_id: str,
        trajectory_id: str,
        event_type: str,
        event_data: dict
    ):
        """记录一个会话事件"""
        self.db.execute("""
            INSERT INTO session_events 
            (session_id, trajectory_id, event_type, event_data, created_at)
            VALUES (?, ?, ?, ?, ?)
        """, (session_id, trajectory_id, event_type, 
              json.dumps(event_data), datetime.now().isoformat()))
    
    def extract_feedback_for_session(self, session_id: str) -> ImplicitFeedback:
        """从一个会话的所有事件中提取综合反馈"""
        events = self.db.fetch_all(
            "SELECT event_type, event_data FROM session_events WHERE session_id = ?",
            (session_id,)
        )
        
        feedback = ImplicitFeedback(session_id=session_id, trajectory_id="")
        
        for event_type, event_data_str in events:
            data = json.loads(event_data_str)
            
            if event_type == "output_accepted":
                feedback.trajectory_id = data.get("trajectory_id", "")
                feedback.task_accepted = True
            elif event_type == "retry_requested":
                feedback.retry_count += 1
            elif event_type == "output_edited":
                feedback.manual_edit_ratio = data.get("edit_ratio", 0)
            elif event_type == "session_continued":
                feedback.session_continued = True
            elif event_type == "error_reported":
                feedback.error_reported = True
        
        return feedback

75.2 冷启动问题解决方案

冷启动的三个阶段

阶段 0 - 零数据启动(第1-2周)
┌────────────────────────────────────────────────────────┐
│ 策略:合成数据 + 人工标注种子                           │
│                                                        │
│ 1. 从公开数据集提取高质量 Agent 轨迹示例               │
│    - AgentBench / ToolBench 数据集                     │
│    - ShareGPT 中的工具调用案例                         │
│ 2. 手工设计 20-50 个核心任务的"黄金轨迹"              │
│ 3. 用 GPT-4 生成多样化的合成训练数据                  │
│ 4. 部署基础版 Hermes 开始收集真实轨迹                  │
└────────────────────────────────────────────────────────┘

阶段 1 - 数据积累期(第3-8周)
┌────────────────────────────────────────────────────────┐
│ 策略:小批量高频微调                                    │
│                                                        │
│ 1. 收集 500-2000 条真实用户轨迹                        │
│ 2. 每周微调一次(小数据量用 LoRA 快速迭代)            │
│ 3. 重点关注高置信度正样本(用户明确采纳的轨迹)        │
│ 4. A/B 测试验证改进效果                               │
└────────────────────────────────────────────────────────┘

阶段 2 - 飞轮成熟期(第9周+)
┌────────────────────────────────────────────────────────┐
│ 策略:全自动飞轮 + 持续改进                             │
│                                                        │
│ 1. 每天自动筛选新轨迹                                  │
│ 2. 每周/每月触发训练流水线                             │
│ 3. 自动 A/B 测试 + 自动上线                           │
│ 4. 重点优化飞轮速度和数据质量                          │
└────────────────────────────────────────────────────────┘

合成数据生成

# flywheel/cold_start/synthetic_data.py
import os
import json
from openai import OpenAI

# 使用 GPT-4 生成合成训练数据
teacher_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

TASK_TEMPLATES = [
    {
        "task_type": "code_review",
        "templates": [
            "请审查以下 {language} 代码并指出潜在问题:{code_snippet}",
            "这段 {language} 函数有没有性能问题?{code_snippet}"
        ]
    },
    {
        "task_type": "data_analysis",
        "templates": [
            "分析这份 CSV 数据并找出异常值",
            "对比两个数据集,找出关键差异"
        ]
    },
    {
        "task_type": "research",
        "templates": [
            "研究 {topic} 的最新进展并写一份摘要",
            "对比 {topic_a} 和 {topic_b} 的优劣"
        ]
    }
]

def generate_synthetic_trajectory(
    task_type: str, 
    task_description: str,
    tools_available: list
) -> dict:
    """使用 GPT-4 生成一条合成训练轨迹"""
    
    prompt = f"""你是一个 AI Agent 训练数据生成专家。
    
请为以下任务生成一条完整的 Agent 轨迹,包含推理步骤、工具调用和最终响应。

任务类型: {task_type}
任务描述: {task_description}
可用工具: {json.dumps(tools_available, ensure_ascii=False)}

请生成一个真实、高质量的 Agent 执行轨迹,格式如下:
{{
  "task": "{task_description}",
  "task_type": "{task_type}",
  "thinking_steps": [
    {{
      "step_index": 0,
      "thought": "我需要先...",
      "tool_calls": [
        {{"name": "工具名", "arguments": {{}}, "result": "工具返回值", "success": true}}
      ],
      "observation": "观察到..."
    }}
  ],
  "final_response": "最终回答...",
  "quality_label": "high"
}}"""
    
    response = teacher_client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
        temperature=0.7
    )
    
    return json.loads(response.choices[0].message.content)


def generate_cold_start_dataset(
    output_path: str,
    num_samples: int = 200
) -> int:
    """生成冷启动数据集"""
    import random
    
    generated = []
    task_types = ["code_review", "data_analysis", "research", "debugging", "planning"]
    
    for i in range(num_samples):
        task_type = random.choice(task_types)
        
        # 根据任务类型选择模板
        task_desc = f"示例{task_type}任务 #{i+1}"
        
        try:
            traj = generate_synthetic_trajectory(
                task_type=task_type,
                task_description=task_desc,
                tools_available=["search_web", "run_code", "read_file", "write_file"]
            )
            traj["reward"] = 0.8  # 合成数据给予较高但非满分的奖励
            traj["is_synthetic"] = True
            generated.append(traj)
            
            if (i + 1) % 10 == 0:
                print(f"  已生成 {i+1}/{num_samples}")
        except Exception as e:
            print(f"  生成失败: {e}")
    
    with open(output_path, "w", encoding="utf-8") as f:
        for item in generated:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")
    
    print(f"[ColdStart] 生成 {len(generated)} 条合成数据 → {output_path}")
    return len(generated)

75.3 数据质量 vs 数据量的权衡

质量与数量的效益曲线

改进效果

   ▲
1.0│
   │         ●●●●●●●●●●●●●●●●(高质量数据)
0.8│      ●●●
   │    ●●
0.6│  ●●
   │ ●
0.4│●
   │
0.2│●●●●●●●●●●●●●●●●●●●●●●●●(低质量数据,无论多少改进有限)
   │
   └─────────────────────────────→
        100  500  1K   5K   10K   数据量

数据质量分级体系

# flywheel/quality/scorer.py

class DataQualityTier:
    """数据质量分级"""
    
    TIER_DEFINITIONS = {
        "platinum": {
            "description": "人工验证的高质量轨迹",
            "reward_range": (0.85, 1.0),
            "weight_multiplier": 5.0,   # 训练时权重 5 倍
            "selection_strategy": "manual_review"
        },
        "gold": {
            "description": "高置信度自动评判轨迹",
            "reward_range": (0.6, 0.85),
            "weight_multiplier": 2.0,
            "selection_strategy": "auto_judge"
        },
        "silver": {
            "description": "中等质量,有明显用户采纳信号",
            "reward_range": (0.3, 0.6),
            "weight_multiplier": 1.0,
            "selection_strategy": "implicit_feedback"
        },
        "bronze": {
            "description": "低质量但有训练价值(作为负样本)",
            "reward_range": (-0.5, 0.3),
            "weight_multiplier": 0.3,
            "selection_strategy": "negative_mining"
        }
    }
    
    @classmethod
    def classify(cls, reward: float) -> str:
        for tier, defn in cls.TIER_DEFINITIONS.items():
            lo, hi = defn["reward_range"]
            if lo <= reward <= hi:
                return tier
        return "bronze"
    
    @classmethod
    def get_weight(cls, tier: str) -> float:
        return cls.TIER_DEFINITIONS.get(tier, {}).get("weight_multiplier", 1.0)


def select_training_data(
    trajectories: list,
    target_count: int = 5000,
    quality_budget: dict = None
) -> list:
    """
    按质量预算选择训练数据
    quality_budget: {"platinum": 200, "gold": 1000, "silver": 2000, "bronze": 500}
    """
    if quality_budget is None:
        quality_budget = {
            "platinum": max(50, target_count // 25),
            "gold": max(200, target_count // 5),
            "silver": max(500, target_count // 2),
            "bronze": max(100, target_count // 10)
        }
    
    # 按质量分桶
    buckets = {"platinum": [], "gold": [], "silver": [], "bronze": []}
    for traj in trajectories:
        if traj.combined_score is not None:
            tier = DataQualityTier.classify(traj.combined_score)
            buckets[tier].append(traj)
    
    selected = []
    import random
    
    for tier, budget in quality_budget.items():
        available = buckets[tier]
        count = min(len(available), budget)
        chosen = random.sample(available, count)
        selected.extend(chosen)
        print(f"  [{tier}] 可用: {len(available)}, 选取: {count}")
    
    random.shuffle(selected)
    print(f"[DataSelection] 共选取 {len(selected)} 条训练数据")
    return selected

75.4 A/B 测试新模型的方法

A/B 测试框架

# flywheel/ab_testing/framework.py
import hashlib
import random
from datetime import datetime, timedelta
from typing import Callable, Optional

class ABTestManager:
    """A/B 测试管理器"""
    
    def __init__(self, db_connection):
        self.db = db_connection
        self._active_tests = {}
    
    def create_test(
        self,
        test_name: str,
        control_version: str,    # 当前生产模型版本
        treatment_version: str,  # 新模型版本
        traffic_split: float = 0.1,  # 新模型流量比例(10%)
        min_samples: int = 1000,
        duration_days: int = 14,
        success_metric: str = "task_acceptance_rate"
    ) -> str:
        """创建 A/B 测试"""
        test_id = f"ab_{test_name}_{datetime.now().strftime('%Y%m%d')}"
        
        self._active_tests[test_id] = {
            "test_id": test_id,
            "control": control_version,
            "treatment": treatment_version,
            "traffic_split": traffic_split,
            "min_samples": min_samples,
            "start_time": datetime.now().isoformat(),
            "end_time": (datetime.now() + timedelta(days=duration_days)).isoformat(),
            "success_metric": success_metric,
            "status": "running",
            "results": {"control": [], "treatment": []}
        }
        
        print(f"[AB Test] 启动测试: {test_id}")
        print(f"  Control: {control_version}, Treatment: {treatment_version}")
        print(f"  流量分配: {(1-traffic_split)*100:.0f}% control / {traffic_split*100:.0f}% treatment")
        
        return test_id
    
    def assign_variant(self, user_id: str, test_id: str) -> str:
        """根据用户 ID 稳定分配变体(同一用户始终看到同一变体)"""
        if test_id not in self._active_tests:
            return "control"
        
        test = self._active_tests[test_id]
        
        # 使用哈希保证分配稳定性
        hash_val = int(hashlib.md5(f"{user_id}:{test_id}".encode()).hexdigest(), 16)
        normalized = hash_val / (2 ** 128)  # 归一化到 [0, 1]
        
        if normalized < test["traffic_split"]:
            return "treatment"
        return "control"
    
    def record_outcome(
        self,
        test_id: str,
        variant: str,
        session_id: str,
        metric_value: float
    ):
        """记录一个测试样本的结果"""
        if test_id not in self._active_tests:
            return
        
        self._active_tests[test_id]["results"][variant].append({
            "session_id": session_id,
            "metric_value": metric_value,
            "timestamp": datetime.now().isoformat()
        })
    
    def analyze_results(self, test_id: str) -> dict:
        """分析测试结果,进行统计显著性检验"""
        from scipy import stats
        import numpy as np
        
        if test_id not in self._active_tests:
            return {"error": "测试不存在"}
        
        test = self._active_tests[test_id]
        
        control_values = [r["metric_value"] for r in test["results"]["control"]]
        treatment_values = [r["metric_value"] for r in test["results"]["treatment"]]
        
        if len(control_values) < 30 or len(treatment_values) < 30:
            return {
                "status": "insufficient_data",
                "control_samples": len(control_values),
                "treatment_samples": len(treatment_values)
            }
        
        # t 检验
        t_stat, p_value = stats.ttest_ind(control_values, treatment_values)
        
        control_mean = np.mean(control_values)
        treatment_mean = np.mean(treatment_values)
        relative_improvement = (treatment_mean - control_mean) / control_mean * 100
        
        # 判断是否达到显著性
        is_significant = p_value < 0.05
        is_improvement = treatment_mean > control_mean
        
        result = {
            "test_id": test_id,
            "control_version": test["control"],
            "treatment_version": test["treatment"],
            "control_samples": len(control_values),
            "treatment_samples": len(treatment_values),
            "control_mean": round(control_mean, 4),
            "treatment_mean": round(treatment_mean, 4),
            "relative_improvement": round(relative_improvement, 2),
            "p_value": round(p_value, 4),
            "is_statistically_significant": is_significant,
            "recommendation": self._get_recommendation(
                is_significant, is_improvement, relative_improvement
            )
        }
        
        return result
    
    def _get_recommendation(
        self, is_significant: bool, is_improvement: bool, improvement_pct: float
    ) -> str:
        if is_significant and is_improvement and improvement_pct >= 2:
            return "SHIP: 新模型显著优于控制组,建议全量上线"
        elif is_significant and not is_improvement:
            return "ROLLBACK: 新模型显著差于控制组,不要上线"
        elif not is_significant:
            return "CONTINUE: 差异不显著,继续收集数据"
        else:
            return "CAUTION: 改进幅度较小,评估是否值得上线"
    
    def auto_rollout(
        self,
        test_id: str,
        rollout_threshold: float = 0.02,  # 相对改进 >= 2% 才自动上线
        confidence_level: float = 0.95
    ) -> bool:
        """基于测试结果自动决策是否全量上线"""
        result = self.analyze_results(test_id)
        
        if result.get("status") == "insufficient_data":
            print(f"[AutoRollout] 数据不足,继续观察")
            return False
        
        should_ship = (
            result["is_statistically_significant"] and
            result["relative_improvement"] >= rollout_threshold * 100 and
            result["control_samples"] >= self._active_tests[test_id]["min_samples"]
        )
        
        if should_ship:
            print(f"[AutoRollout] ✅ 自动上线 {result['treatment_version']}")
            print(f"  改进: +{result['relative_improvement']}%, p={result['p_value']}")
            self._deploy_model(result["treatment_version"])
        else:
            print(f"[AutoRollout] ⏸️ 不满足自动上线条件")
        
        return should_ship
    
    def _deploy_model(self, version: str):
        """触发模型部署(实际实现需接入 CD 系统)"""
        print(f"[Deploy] 部署模型版本: {version}")
        # 实际实现:
        # - 更新 Kubernetes deployment
        # - 切换 API Gateway 路由
        # - 通知团队

75.5 飞轮速度影响因素分析

飞轮速度的量化模型

飞轮速度 = f(使用量, 数据质量, 训练频率, 模型改进效率)

具体公式:
V = (U × Q × F) / (C × E)

其中:
U = 日活用户数 × 平均任务数/用户  (使用量)
Q = 平均数据质量分 (0-1)          (数据质量)
F = 每月训练频率                   (训练频率)
C = 每次训练成本(GPU小时数)       (成本)
E = 训练→部署平均天数              (工程效率)

关键瓶颈分析

飞轮阶段 常见瓶颈 解决策略 预期加速
轨迹采集 用户量少 内部使用、邀测、合成数据 2-5x
数据质量 信号噪声大 多评判者融合、人工抽检 1.5-3x
训练触发 人工判断延迟 自动化触发条件 2-4x
训练速度 GPU 不足 高效 LoRA、梯度累积 2-3x
部署验证 人工审批慢 自动化 A/B + 自动上线 3-10x

飞轮监控仪表盘

# flywheel/monitoring/dashboard.py
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class FlywheelMetrics:
    """飞轮核心指标快照"""
    snapshot_time: str
    
    # 数据积累
    trajectories_collected_today: int
    trajectories_collected_7d: int
    high_quality_rate: float          # 质量分 > 0.7 的比例
    
    # 训练状态
    last_training_date: str
    training_frequency_days: float    # 平均训练间隔天数
    latest_model_version: str
    
    # 模型性能
    current_task_acceptance_rate: float
    baseline_task_acceptance_rate: float
    cumulative_improvement: float     # 相比初始版本的累计改进
    
    # 飞轮速度
    active_users_7d: int
    avg_tasks_per_user: float
    flywheel_velocity_score: float   # 综合飞轮速度评分 (0-100)
    
    def print_summary(self):
        print(f"""
╔══════════════════════════════════════════════╗
║        Hermes Agent 数据飞轮状态报告          ║
║  {self.snapshot_time}                        ║
╠══════════════════════════════════════════════╣
║ 数据积累                                      ║
║  今日轨迹: {self.trajectories_collected_today:>6} 条               ║
║  7日轨迹: {self.trajectories_collected_7d:>7} 条               ║
║  高质量率: {self.high_quality_rate:.1%}                    ║
╠══════════════════════════════════════════════╣
║ 训练状态                                      ║
║  最近训练: {self.last_training_date}                ║
║  当前版本: {self.latest_model_version}              ║
║  训练频率: 每 {self.training_frequency_days:.1f} 天                 ║
╠══════════════════════════════════════════════╣
║ 模型效果                                      ║
║  任务采纳率: {self.current_task_acceptance_rate:.1%}               ║
║  基线采纳率: {self.baseline_task_acceptance_rate:.1%}               ║
║  累计改进: +{self.cumulative_improvement:.1%}                ║
╠══════════════════════════════════════════════╣
║ 飞轮速度评分: {self.flywheel_velocity_score:.1f}/100               ║
╚══════════════════════════════════════════════╝
        """)

75.6 企业级数据飞轮架构

完整系统架构图

┌─────────────────────────────────────────────────────────────────┐
│                    企业级 Hermes 数据飞轮架构                     │
│                                                                 │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                      用户层                                │ │
│  │  Web App │ API 集成 │ 企业内网部署 │ SaaS 多租户          │ │
│  └─────────────────────────────┬─────────────────────────────┘ │
│                                │                               │
│  ┌─────────────────────────────▼─────────────────────────────┐ │
│  │                    API 网关层                              │ │
│  │  负载均衡 │ A/B 流量分配 │ 认证鉴权 │ 请求日志            │ │
│  └─────────────────────────────┬─────────────────────────────┘ │
│                                │                               │
│           ┌────────────────────┼────────────────────┐          │
│           ▼                    ▼                    ▼          │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐│
│  │  Hermes v1.2    │  │  Hermes v1.3    │  │  Hermes v1.4    ││
│  │  (90% 流量)     │  │  (9% A/B测试)   │  │  (1% 金丝雀)    ││
│  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘│
│           └────────────────────┼────────────────────┘          │
│                                │                               │
│  ┌─────────────────────────────▼─────────────────────────────┐ │
│  │                   轨迹采集与存储层                          │ │
│  │                                                           │ │
│  │  Kafka → 轨迹流式处理 → 分级存储                           │ │
│  │  ├── 热存储(Redis):最近24小时轨迹                       │ │
│  │  ├── 温存储(S3):近30天完整轨迹                         │ │
│  │  └── 冷存储(Glacier):历史归档                          │ │
│  └─────────────────────────────┬─────────────────────────────┘ │
│                                │                               │
│  ┌─────────────────────────────▼─────────────────────────────┐ │
│  │                    数据处理流水线                           │ │
│  │                                                           │ │
│  │  隐式反馈提取 → 自动评判打分 → 质量过滤 → 训练集构建       │ │
│  │  (每日触发,Airflow 调度)                                │ │
│  └─────────────────────────────┬─────────────────────────────┘ │
│                                │                               │
│  ┌─────────────────────────────▼─────────────────────────────┐ │
│  │                    训练与部署层                            │ │
│  │                                                           │ │
│  │  满足条件 → 触发 LoRA 微调(GPU 集群)                     │ │
│  │  → 自动评估 → A/B 测试 → 满足条件 → 自动全量上线           │ │
│  └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

训练触发条件配置

# flywheel/automation/trigger.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class TrainingTriggerConfig:
    """自动触发训练的条件配置"""
    
    # 数据量触发
    min_new_trajectories: int = 500      # 新增轨迹 >= 500 才触发
    min_high_quality_rate: float = 0.4  # 高质量轨迹 >= 40%
    
    # 时间触发
    max_days_since_last_training: int = 14  # 最多 14 天强制触发一次
    min_days_between_training: int = 3      # 最少间隔 3 天
    
    # 性能触发
    performance_degradation_threshold: float = -0.05  # 性能下降 > 5% 紧急触发
    
    # 资源限制
    training_budget_usd_per_run: float = 200.0
    max_concurrent_trainings: int = 1


class AutoTrainingTrigger:
    """自动化训练触发器"""
    
    def __init__(self, config: TrainingTriggerConfig, db_connection):
        self.config = config
        self.db = db_connection
    
    def should_trigger_training(self) -> tuple[bool, str]:
        """判断是否应该触发训练,返回 (是否触发, 原因)"""
        
        # 1. 检查间隔限制
        days_since_last = self._days_since_last_training()
        if days_since_last < self.config.min_days_between_training:
            return False, f"距上次训练仅 {days_since_last} 天,未达到最小间隔"
        
        # 2. 强制触发:超过最大间隔
        if days_since_last >= self.config.max_days_since_last_training:
            return True, f"已 {days_since_last} 天未训练,强制触发"
        
        # 3. 性能下降紧急触发
        perf_change = self._get_recent_performance_change()
        if perf_change <= self.config.performance_degradation_threshold:
            return True, f"性能下降 {perf_change:.1%},紧急触发"
        
        # 4. 数据量触发
        new_count = self._count_new_trajectories()
        hq_rate = self._high_quality_rate()
        
        if (new_count >= self.config.min_new_trajectories and
                hq_rate >= self.config.min_high_quality_rate):
            return True, f"新增 {new_count} 条高质量轨迹({hq_rate:.1%}),数据就绪"
        
        return False, f"条件未满足: 新增 {new_count} 条(需 {self.config.min_new_trajectories}),高质量率 {hq_rate:.1%}"
    
    def _days_since_last_training(self) -> int:
        # 从数据库查询最近一次训练时间
        result = self.db.fetch_one("SELECT MAX(completed_at) FROM training_runs WHERE status = 'success'")
        if not result or not result[0]:
            return 999  # 从未训练过
        from datetime import datetime
        last = datetime.fromisoformat(result[0])
        return (datetime.now() - last).days
    
    def _count_new_trajectories(self) -> int:
        result = self.db.fetch_one(
            "SELECT COUNT(*) FROM trajectories WHERE used_in_training = 0 AND combined_score IS NOT NULL"
        )
        return result[0] if result else 0
    
    def _high_quality_rate(self) -> float:
        total = self._count_new_trajectories()
        if total == 0:
            return 0.0
        hq = self.db.fetch_one(
            "SELECT COUNT(*) FROM trajectories WHERE used_in_training = 0 AND combined_score >= 0.7"
        )[0]
        return hq / total
    
    def _get_recent_performance_change(self) -> float:
        # 比较最近7天 vs 前7天的平均任务采纳率
        # 简化版实现
        return 0.0  # 实际需从监控系统查询

75.7 未来展望:Hermes Agent 的演进方向

近期路线图(1-2年)

当前状态 → 下一步演进

① 单机微调 → 联邦学习(多租户数据共同改进,隐私保护)
② 文本工具 → 多模态工具(图像、音频、视频的理解与生成)
③ 单次对话 → 长期记忆(跨会话记住用户偏好和历史)
④ 人工设计奖励 → 自动奖励发现(Agent 自动发现什么行为值得鼓励)
⑤ 单 Agent → 自组织多 Agent 生态(Agent 动态组合成团队)

数据飞轮的终极形态

未来的 Hermes 数据飞轮:

用户使用 → 轨迹采集 → 自动评判 → 模型微调
    ↑                                    │
    └────────────────────────────────────┘
                    │
                    ▼ 增益
    
    评判者也会持续改进(评判者飞轮)
    工具也会持续改进(工具飞轮)
    数据飞轮自身的设计也会被优化(元飞轮)
    
最终:Hermes Agent 具备真正的自主进化能力

本章小结

本章构建了 Hermes Agent 数据飞轮的完整闭环体系:

数据飞轮不是一蹴而就的——它需要工程、数据、ML 三个团队的紧密协作,需要耐心地从冷启动到成熟,但一旦飞轮开始真正转动,它会以指数级的速度加速,形成竞争对手几乎无法追赶的护城河。

思考题

  1. 在多租户 SaaS 场景中,不同客户的数据是否应该隔离训练?如何设计隔离但又互益的飞轮?
  2. 如何检测飞轮中的"负反馈循环"——坏模型产生坏数据,坏数据训练出更坏的模型?
  3. 数据飞轮中的数据隐私如何保护?轨迹中可能包含用户的敏感信息。
  4. 当飞轮速度达到瓶颈时(如数据质量已经很高,但用户增长停滞),下一步增长动力来自哪里?
本章评分
4.8  / 5  (3 评分)

💬 留言讨论