第 75 章
数据飞轮:持续改进的闭环机制
第七十五章:数据飞轮:持续改进的闭环机制
章节导语
飞轮(Flywheel)是工业时代最重要的发明之一——它利用旋转惯性,将断续的动力输入转化为平滑的持续输出。亚马逊创始人贝佐斯将飞轮效应引入商业战略:低价 → 更多客户 → 更多卖家 → 更低成本 → 更低价格。而在 AI Agent 时代,数据飞轮具有同样的惊人力量:更多使用 → 更多轨迹 → 更好的训练数据 → 更好的模型 → 更多使用。本章将系统设计 Hermes Agent 的数据飞轮架构,从冷启动、数据筛选、A/B 测试到企业级部署,构建真正能够自主加速的 AI 改进引擎。
75.1 数据飞轮的完整闭环设计
飞轮闭环的六个环节
┌─────────────────────────────────────────────────────────────┐
│ 数据飞轮闭环 │
│ │
│ ① 用户使用 │
│ ╱ ╲ │
│ ⑥ 更好使用 ② 产生轨迹 │
│ │ │ │
│ │ 数据飞轮 │ │
│ │ ↻ ↻ ↻ │ │
│ ⑤ 部署上线 ③ 隐式反馈 │
│ ╲ ╱ │
│ ④ 数据筛选 │
│ ↓ │
│ 训练微调 │
└─────────────────────────────────────────────────────────────┘
① 用户使用:用户通过 Hermes Agent 完成各类任务
② 产生轨迹:系统自动记录完整的 Agent 运行轨迹
③ 隐式反馈:从用户行为中提取质量信号(无需主动评分)
④ 数据筛选:过滤低质量轨迹,平衡训练数据分布
↓ 微调训练(Atropos RL)
⑤ 部署上线:新模型经 A/B 测试验证后上线
⑥ 更好使用:更好的模型吸引更多用户,产生更多轨迹
隐式反馈信号设计
与传统 RLHF 不同,数据飞轮的核心是从用户行为中自动提取质量信号,无需用户主动打分。
| 信号类型 | 来源行为 | 正信号(好) | 负信号(差) |
|---|---|---|---|
| 任务完成率 | 用户是否接受了 Agent 的最终输出 | 直接采纳 | 多次要求重试 |
| 会话长度 | 完成任务所需的交互轮数 | 1-3轮完成 | 10+ 轮仍未完成 |
| 用户重做率 | 输出后用户是否手动修改 | 无修改采纳 | 大幅修改后使用 |
| 会话继续 | 完成后是否继续使用 | 继续追加任务 | 立即退出 |
| 错误报告 | 用户是否报告了错误 | 无投诉 | 提交错误反馈 |
| 分享行为 | 是否分享了 Agent 的输出 | 分享/引用 | 不使用 |
# flywheel/feedback_extractor.py
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
@dataclass
class ImplicitFeedback:
"""从用户行为提取的隐式反馈"""
session_id: str
trajectory_id: str
# 行为信号
task_accepted: bool = False # 用户是否接受最终输出
retry_count: int = 0 # 重试次数
manual_edit_ratio: float = 0.0 # 手动修改比例 (0-1)
session_continued: bool = False # 完成后是否继续使用
error_reported: bool = False # 是否报告错误
completion_time_s: float = 0.0 # 任务完成时间
# 计算综合质量分
def quality_score(self) -> float:
score = 0.5 # 基础分
if self.task_accepted:
score += 0.3
else:
score -= 0.3
if self.retry_count == 0:
score += 0.1
elif self.retry_count >= 3:
score -= 0.2
# 修改比例越高,质量越差
score -= self.manual_edit_ratio * 0.3
if self.session_continued:
score += 0.1
if self.error_reported:
score -= 0.4
return max(-1.0, min(1.0, score)) # 裁剪到 [-1, 1]
class FeedbackCollector:
"""收集和聚合隐式反馈"""
def __init__(self, db_connection):
self.db = db_connection
def record_session_event(
self,
session_id: str,
trajectory_id: str,
event_type: str,
event_data: dict
):
"""记录一个会话事件"""
self.db.execute("""
INSERT INTO session_events
(session_id, trajectory_id, event_type, event_data, created_at)
VALUES (?, ?, ?, ?, ?)
""", (session_id, trajectory_id, event_type,
json.dumps(event_data), datetime.now().isoformat()))
def extract_feedback_for_session(self, session_id: str) -> ImplicitFeedback:
"""从一个会话的所有事件中提取综合反馈"""
events = self.db.fetch_all(
"SELECT event_type, event_data FROM session_events WHERE session_id = ?",
(session_id,)
)
feedback = ImplicitFeedback(session_id=session_id, trajectory_id="")
for event_type, event_data_str in events:
data = json.loads(event_data_str)
if event_type == "output_accepted":
feedback.trajectory_id = data.get("trajectory_id", "")
feedback.task_accepted = True
elif event_type == "retry_requested":
feedback.retry_count += 1
elif event_type == "output_edited":
feedback.manual_edit_ratio = data.get("edit_ratio", 0)
elif event_type == "session_continued":
feedback.session_continued = True
elif event_type == "error_reported":
feedback.error_reported = True
return feedback
75.2 冷启动问题解决方案
冷启动的三个阶段
阶段 0 - 零数据启动(第1-2周)
┌────────────────────────────────────────────────────────┐
│ 策略:合成数据 + 人工标注种子 │
│ │
│ 1. 从公开数据集提取高质量 Agent 轨迹示例 │
│ - AgentBench / ToolBench 数据集 │
│ - ShareGPT 中的工具调用案例 │
│ 2. 手工设计 20-50 个核心任务的"黄金轨迹" │
│ 3. 用 GPT-4 生成多样化的合成训练数据 │
│ 4. 部署基础版 Hermes 开始收集真实轨迹 │
└────────────────────────────────────────────────────────┘
阶段 1 - 数据积累期(第3-8周)
┌────────────────────────────────────────────────────────┐
│ 策略:小批量高频微调 │
│ │
│ 1. 收集 500-2000 条真实用户轨迹 │
│ 2. 每周微调一次(小数据量用 LoRA 快速迭代) │
│ 3. 重点关注高置信度正样本(用户明确采纳的轨迹) │
│ 4. A/B 测试验证改进效果 │
└────────────────────────────────────────────────────────┘
阶段 2 - 飞轮成熟期(第9周+)
┌────────────────────────────────────────────────────────┐
│ 策略:全自动飞轮 + 持续改进 │
│ │
│ 1. 每天自动筛选新轨迹 │
│ 2. 每周/每月触发训练流水线 │
│ 3. 自动 A/B 测试 + 自动上线 │
│ 4. 重点优化飞轮速度和数据质量 │
└────────────────────────────────────────────────────────┘
合成数据生成
# flywheel/cold_start/synthetic_data.py
import os
import json
from openai import OpenAI
# 使用 GPT-4 生成合成训练数据
teacher_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
TASK_TEMPLATES = [
{
"task_type": "code_review",
"templates": [
"请审查以下 {language} 代码并指出潜在问题:{code_snippet}",
"这段 {language} 函数有没有性能问题?{code_snippet}"
]
},
{
"task_type": "data_analysis",
"templates": [
"分析这份 CSV 数据并找出异常值",
"对比两个数据集,找出关键差异"
]
},
{
"task_type": "research",
"templates": [
"研究 {topic} 的最新进展并写一份摘要",
"对比 {topic_a} 和 {topic_b} 的优劣"
]
}
]
def generate_synthetic_trajectory(
task_type: str,
task_description: str,
tools_available: list
) -> dict:
"""使用 GPT-4 生成一条合成训练轨迹"""
prompt = f"""你是一个 AI Agent 训练数据生成专家。
请为以下任务生成一条完整的 Agent 轨迹,包含推理步骤、工具调用和最终响应。
任务类型: {task_type}
任务描述: {task_description}
可用工具: {json.dumps(tools_available, ensure_ascii=False)}
请生成一个真实、高质量的 Agent 执行轨迹,格式如下:
{{
"task": "{task_description}",
"task_type": "{task_type}",
"thinking_steps": [
{{
"step_index": 0,
"thought": "我需要先...",
"tool_calls": [
{{"name": "工具名", "arguments": {{}}, "result": "工具返回值", "success": true}}
],
"observation": "观察到..."
}}
],
"final_response": "最终回答...",
"quality_label": "high"
}}"""
response = teacher_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.7
)
return json.loads(response.choices[0].message.content)
def generate_cold_start_dataset(
output_path: str,
num_samples: int = 200
) -> int:
"""生成冷启动数据集"""
import random
generated = []
task_types = ["code_review", "data_analysis", "research", "debugging", "planning"]
for i in range(num_samples):
task_type = random.choice(task_types)
# 根据任务类型选择模板
task_desc = f"示例{task_type}任务 #{i+1}"
try:
traj = generate_synthetic_trajectory(
task_type=task_type,
task_description=task_desc,
tools_available=["search_web", "run_code", "read_file", "write_file"]
)
traj["reward"] = 0.8 # 合成数据给予较高但非满分的奖励
traj["is_synthetic"] = True
generated.append(traj)
if (i + 1) % 10 == 0:
print(f" 已生成 {i+1}/{num_samples}")
except Exception as e:
print(f" 生成失败: {e}")
with open(output_path, "w", encoding="utf-8") as f:
for item in generated:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"[ColdStart] 生成 {len(generated)} 条合成数据 → {output_path}")
return len(generated)
75.3 数据质量 vs 数据量的权衡
质量与数量的效益曲线
改进效果
▲
1.0│
│ ●●●●●●●●●●●●●●●●(高质量数据)
0.8│ ●●●
│ ●●
0.6│ ●●
│ ●
0.4│●
│
0.2│●●●●●●●●●●●●●●●●●●●●●●●●(低质量数据,无论多少改进有限)
│
└─────────────────────────────→
100 500 1K 5K 10K 数据量
数据质量分级体系
# flywheel/quality/scorer.py
class DataQualityTier:
"""数据质量分级"""
TIER_DEFINITIONS = {
"platinum": {
"description": "人工验证的高质量轨迹",
"reward_range": (0.85, 1.0),
"weight_multiplier": 5.0, # 训练时权重 5 倍
"selection_strategy": "manual_review"
},
"gold": {
"description": "高置信度自动评判轨迹",
"reward_range": (0.6, 0.85),
"weight_multiplier": 2.0,
"selection_strategy": "auto_judge"
},
"silver": {
"description": "中等质量,有明显用户采纳信号",
"reward_range": (0.3, 0.6),
"weight_multiplier": 1.0,
"selection_strategy": "implicit_feedback"
},
"bronze": {
"description": "低质量但有训练价值(作为负样本)",
"reward_range": (-0.5, 0.3),
"weight_multiplier": 0.3,
"selection_strategy": "negative_mining"
}
}
@classmethod
def classify(cls, reward: float) -> str:
for tier, defn in cls.TIER_DEFINITIONS.items():
lo, hi = defn["reward_range"]
if lo <= reward <= hi:
return tier
return "bronze"
@classmethod
def get_weight(cls, tier: str) -> float:
return cls.TIER_DEFINITIONS.get(tier, {}).get("weight_multiplier", 1.0)
def select_training_data(
trajectories: list,
target_count: int = 5000,
quality_budget: dict = None
) -> list:
"""
按质量预算选择训练数据
quality_budget: {"platinum": 200, "gold": 1000, "silver": 2000, "bronze": 500}
"""
if quality_budget is None:
quality_budget = {
"platinum": max(50, target_count // 25),
"gold": max(200, target_count // 5),
"silver": max(500, target_count // 2),
"bronze": max(100, target_count // 10)
}
# 按质量分桶
buckets = {"platinum": [], "gold": [], "silver": [], "bronze": []}
for traj in trajectories:
if traj.combined_score is not None:
tier = DataQualityTier.classify(traj.combined_score)
buckets[tier].append(traj)
selected = []
import random
for tier, budget in quality_budget.items():
available = buckets[tier]
count = min(len(available), budget)
chosen = random.sample(available, count)
selected.extend(chosen)
print(f" [{tier}] 可用: {len(available)}, 选取: {count}")
random.shuffle(selected)
print(f"[DataSelection] 共选取 {len(selected)} 条训练数据")
return selected
75.4 A/B 测试新模型的方法
A/B 测试框架
# flywheel/ab_testing/framework.py
import hashlib
import random
from datetime import datetime, timedelta
from typing import Callable, Optional
class ABTestManager:
"""A/B 测试管理器"""
def __init__(self, db_connection):
self.db = db_connection
self._active_tests = {}
def create_test(
self,
test_name: str,
control_version: str, # 当前生产模型版本
treatment_version: str, # 新模型版本
traffic_split: float = 0.1, # 新模型流量比例(10%)
min_samples: int = 1000,
duration_days: int = 14,
success_metric: str = "task_acceptance_rate"
) -> str:
"""创建 A/B 测试"""
test_id = f"ab_{test_name}_{datetime.now().strftime('%Y%m%d')}"
self._active_tests[test_id] = {
"test_id": test_id,
"control": control_version,
"treatment": treatment_version,
"traffic_split": traffic_split,
"min_samples": min_samples,
"start_time": datetime.now().isoformat(),
"end_time": (datetime.now() + timedelta(days=duration_days)).isoformat(),
"success_metric": success_metric,
"status": "running",
"results": {"control": [], "treatment": []}
}
print(f"[AB Test] 启动测试: {test_id}")
print(f" Control: {control_version}, Treatment: {treatment_version}")
print(f" 流量分配: {(1-traffic_split)*100:.0f}% control / {traffic_split*100:.0f}% treatment")
return test_id
def assign_variant(self, user_id: str, test_id: str) -> str:
"""根据用户 ID 稳定分配变体(同一用户始终看到同一变体)"""
if test_id not in self._active_tests:
return "control"
test = self._active_tests[test_id]
# 使用哈希保证分配稳定性
hash_val = int(hashlib.md5(f"{user_id}:{test_id}".encode()).hexdigest(), 16)
normalized = hash_val / (2 ** 128) # 归一化到 [0, 1]
if normalized < test["traffic_split"]:
return "treatment"
return "control"
def record_outcome(
self,
test_id: str,
variant: str,
session_id: str,
metric_value: float
):
"""记录一个测试样本的结果"""
if test_id not in self._active_tests:
return
self._active_tests[test_id]["results"][variant].append({
"session_id": session_id,
"metric_value": metric_value,
"timestamp": datetime.now().isoformat()
})
def analyze_results(self, test_id: str) -> dict:
"""分析测试结果,进行统计显著性检验"""
from scipy import stats
import numpy as np
if test_id not in self._active_tests:
return {"error": "测试不存在"}
test = self._active_tests[test_id]
control_values = [r["metric_value"] for r in test["results"]["control"]]
treatment_values = [r["metric_value"] for r in test["results"]["treatment"]]
if len(control_values) < 30 or len(treatment_values) < 30:
return {
"status": "insufficient_data",
"control_samples": len(control_values),
"treatment_samples": len(treatment_values)
}
# t 检验
t_stat, p_value = stats.ttest_ind(control_values, treatment_values)
control_mean = np.mean(control_values)
treatment_mean = np.mean(treatment_values)
relative_improvement = (treatment_mean - control_mean) / control_mean * 100
# 判断是否达到显著性
is_significant = p_value < 0.05
is_improvement = treatment_mean > control_mean
result = {
"test_id": test_id,
"control_version": test["control"],
"treatment_version": test["treatment"],
"control_samples": len(control_values),
"treatment_samples": len(treatment_values),
"control_mean": round(control_mean, 4),
"treatment_mean": round(treatment_mean, 4),
"relative_improvement": round(relative_improvement, 2),
"p_value": round(p_value, 4),
"is_statistically_significant": is_significant,
"recommendation": self._get_recommendation(
is_significant, is_improvement, relative_improvement
)
}
return result
def _get_recommendation(
self, is_significant: bool, is_improvement: bool, improvement_pct: float
) -> str:
if is_significant and is_improvement and improvement_pct >= 2:
return "SHIP: 新模型显著优于控制组,建议全量上线"
elif is_significant and not is_improvement:
return "ROLLBACK: 新模型显著差于控制组,不要上线"
elif not is_significant:
return "CONTINUE: 差异不显著,继续收集数据"
else:
return "CAUTION: 改进幅度较小,评估是否值得上线"
def auto_rollout(
self,
test_id: str,
rollout_threshold: float = 0.02, # 相对改进 >= 2% 才自动上线
confidence_level: float = 0.95
) -> bool:
"""基于测试结果自动决策是否全量上线"""
result = self.analyze_results(test_id)
if result.get("status") == "insufficient_data":
print(f"[AutoRollout] 数据不足,继续观察")
return False
should_ship = (
result["is_statistically_significant"] and
result["relative_improvement"] >= rollout_threshold * 100 and
result["control_samples"] >= self._active_tests[test_id]["min_samples"]
)
if should_ship:
print(f"[AutoRollout] ✅ 自动上线 {result['treatment_version']}")
print(f" 改进: +{result['relative_improvement']}%, p={result['p_value']}")
self._deploy_model(result["treatment_version"])
else:
print(f"[AutoRollout] ⏸️ 不满足自动上线条件")
return should_ship
def _deploy_model(self, version: str):
"""触发模型部署(实际实现需接入 CD 系统)"""
print(f"[Deploy] 部署模型版本: {version}")
# 实际实现:
# - 更新 Kubernetes deployment
# - 切换 API Gateway 路由
# - 通知团队
75.5 飞轮速度影响因素分析
飞轮速度的量化模型
飞轮速度 = f(使用量, 数据质量, 训练频率, 模型改进效率)
具体公式:
V = (U × Q × F) / (C × E)
其中:
U = 日活用户数 × 平均任务数/用户 (使用量)
Q = 平均数据质量分 (0-1) (数据质量)
F = 每月训练频率 (训练频率)
C = 每次训练成本(GPU小时数) (成本)
E = 训练→部署平均天数 (工程效率)
关键瓶颈分析
| 飞轮阶段 | 常见瓶颈 | 解决策略 | 预期加速 |
|---|---|---|---|
| 轨迹采集 | 用户量少 | 内部使用、邀测、合成数据 | 2-5x |
| 数据质量 | 信号噪声大 | 多评判者融合、人工抽检 | 1.5-3x |
| 训练触发 | 人工判断延迟 | 自动化触发条件 | 2-4x |
| 训练速度 | GPU 不足 | 高效 LoRA、梯度累积 | 2-3x |
| 部署验证 | 人工审批慢 | 自动化 A/B + 自动上线 | 3-10x |
飞轮监控仪表盘
# flywheel/monitoring/dashboard.py
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class FlywheelMetrics:
"""飞轮核心指标快照"""
snapshot_time: str
# 数据积累
trajectories_collected_today: int
trajectories_collected_7d: int
high_quality_rate: float # 质量分 > 0.7 的比例
# 训练状态
last_training_date: str
training_frequency_days: float # 平均训练间隔天数
latest_model_version: str
# 模型性能
current_task_acceptance_rate: float
baseline_task_acceptance_rate: float
cumulative_improvement: float # 相比初始版本的累计改进
# 飞轮速度
active_users_7d: int
avg_tasks_per_user: float
flywheel_velocity_score: float # 综合飞轮速度评分 (0-100)
def print_summary(self):
print(f"""
╔══════════════════════════════════════════════╗
║ Hermes Agent 数据飞轮状态报告 ║
║ {self.snapshot_time} ║
╠══════════════════════════════════════════════╣
║ 数据积累 ║
║ 今日轨迹: {self.trajectories_collected_today:>6} 条 ║
║ 7日轨迹: {self.trajectories_collected_7d:>7} 条 ║
║ 高质量率: {self.high_quality_rate:.1%} ║
╠══════════════════════════════════════════════╣
║ 训练状态 ║
║ 最近训练: {self.last_training_date} ║
║ 当前版本: {self.latest_model_version} ║
║ 训练频率: 每 {self.training_frequency_days:.1f} 天 ║
╠══════════════════════════════════════════════╣
║ 模型效果 ║
║ 任务采纳率: {self.current_task_acceptance_rate:.1%} ║
║ 基线采纳率: {self.baseline_task_acceptance_rate:.1%} ║
║ 累计改进: +{self.cumulative_improvement:.1%} ║
╠══════════════════════════════════════════════╣
║ 飞轮速度评分: {self.flywheel_velocity_score:.1f}/100 ║
╚══════════════════════════════════════════════╝
""")
75.6 企业级数据飞轮架构
完整系统架构图
┌─────────────────────────────────────────────────────────────────┐
│ 企业级 Hermes 数据飞轮架构 │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 用户层 │ │
│ │ Web App │ API 集成 │ 企业内网部署 │ SaaS 多租户 │ │
│ └─────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼─────────────────────────────┐ │
│ │ API 网关层 │ │
│ │ 负载均衡 │ A/B 流量分配 │ 认证鉴权 │ 请求日志 │ │
│ └─────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│
│ │ Hermes v1.2 │ │ Hermes v1.3 │ │ Hermes v1.4 ││
│ │ (90% 流量) │ │ (9% A/B测试) │ │ (1% 金丝雀) ││
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘│
│ └────────────────────┼────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼─────────────────────────────┐ │
│ │ 轨迹采集与存储层 │ │
│ │ │ │
│ │ Kafka → 轨迹流式处理 → 分级存储 │ │
│ │ ├── 热存储(Redis):最近24小时轨迹 │ │
│ │ ├── 温存储(S3):近30天完整轨迹 │ │
│ │ └── 冷存储(Glacier):历史归档 │ │
│ └─────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼─────────────────────────────┐ │
│ │ 数据处理流水线 │ │
│ │ │ │
│ │ 隐式反馈提取 → 自动评判打分 → 质量过滤 → 训练集构建 │ │
│ │ (每日触发,Airflow 调度) │ │
│ └─────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼─────────────────────────────┐ │
│ │ 训练与部署层 │ │
│ │ │ │
│ │ 满足条件 → 触发 LoRA 微调(GPU 集群) │ │
│ │ → 自动评估 → A/B 测试 → 满足条件 → 自动全量上线 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
训练触发条件配置
# flywheel/automation/trigger.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class TrainingTriggerConfig:
"""自动触发训练的条件配置"""
# 数据量触发
min_new_trajectories: int = 500 # 新增轨迹 >= 500 才触发
min_high_quality_rate: float = 0.4 # 高质量轨迹 >= 40%
# 时间触发
max_days_since_last_training: int = 14 # 最多 14 天强制触发一次
min_days_between_training: int = 3 # 最少间隔 3 天
# 性能触发
performance_degradation_threshold: float = -0.05 # 性能下降 > 5% 紧急触发
# 资源限制
training_budget_usd_per_run: float = 200.0
max_concurrent_trainings: int = 1
class AutoTrainingTrigger:
"""自动化训练触发器"""
def __init__(self, config: TrainingTriggerConfig, db_connection):
self.config = config
self.db = db_connection
def should_trigger_training(self) -> tuple[bool, str]:
"""判断是否应该触发训练,返回 (是否触发, 原因)"""
# 1. 检查间隔限制
days_since_last = self._days_since_last_training()
if days_since_last < self.config.min_days_between_training:
return False, f"距上次训练仅 {days_since_last} 天,未达到最小间隔"
# 2. 强制触发:超过最大间隔
if days_since_last >= self.config.max_days_since_last_training:
return True, f"已 {days_since_last} 天未训练,强制触发"
# 3. 性能下降紧急触发
perf_change = self._get_recent_performance_change()
if perf_change <= self.config.performance_degradation_threshold:
return True, f"性能下降 {perf_change:.1%},紧急触发"
# 4. 数据量触发
new_count = self._count_new_trajectories()
hq_rate = self._high_quality_rate()
if (new_count >= self.config.min_new_trajectories and
hq_rate >= self.config.min_high_quality_rate):
return True, f"新增 {new_count} 条高质量轨迹({hq_rate:.1%}),数据就绪"
return False, f"条件未满足: 新增 {new_count} 条(需 {self.config.min_new_trajectories}),高质量率 {hq_rate:.1%}"
def _days_since_last_training(self) -> int:
# 从数据库查询最近一次训练时间
result = self.db.fetch_one("SELECT MAX(completed_at) FROM training_runs WHERE status = 'success'")
if not result or not result[0]:
return 999 # 从未训练过
from datetime import datetime
last = datetime.fromisoformat(result[0])
return (datetime.now() - last).days
def _count_new_trajectories(self) -> int:
result = self.db.fetch_one(
"SELECT COUNT(*) FROM trajectories WHERE used_in_training = 0 AND combined_score IS NOT NULL"
)
return result[0] if result else 0
def _high_quality_rate(self) -> float:
total = self._count_new_trajectories()
if total == 0:
return 0.0
hq = self.db.fetch_one(
"SELECT COUNT(*) FROM trajectories WHERE used_in_training = 0 AND combined_score >= 0.7"
)[0]
return hq / total
def _get_recent_performance_change(self) -> float:
# 比较最近7天 vs 前7天的平均任务采纳率
# 简化版实现
return 0.0 # 实际需从监控系统查询
75.7 未来展望:Hermes Agent 的演进方向
近期路线图(1-2年)
当前状态 → 下一步演进
① 单机微调 → 联邦学习(多租户数据共同改进,隐私保护)
② 文本工具 → 多模态工具(图像、音频、视频的理解与生成)
③ 单次对话 → 长期记忆(跨会话记住用户偏好和历史)
④ 人工设计奖励 → 自动奖励发现(Agent 自动发现什么行为值得鼓励)
⑤ 单 Agent → 自组织多 Agent 生态(Agent 动态组合成团队)
数据飞轮的终极形态
未来的 Hermes 数据飞轮:
用户使用 → 轨迹采集 → 自动评判 → 模型微调
↑ │
└────────────────────────────────────┘
│
▼ 增益
评判者也会持续改进(评判者飞轮)
工具也会持续改进(工具飞轮)
数据飞轮自身的设计也会被优化(元飞轮)
最终:Hermes Agent 具备真正的自主进化能力
本章小结
本章构建了 Hermes Agent 数据飞轮的完整闭环体系:
- 闭环设计:六环飞轮从用户使用到模型改进的完整路径
- 冷启动:合成数据 + 种子轨迹 + 三阶段启动策略
- 质量与量:四级质量分层(铂金/黄金/白银/铜级)精确控制训练数据配比
- A/B 测试:统计显著性检验 + 自动上线决策,消除人工判断瓶颈
- 飞轮监控:量化飞轮速度,持续识别瓶颈并优化
数据飞轮不是一蹴而就的——它需要工程、数据、ML 三个团队的紧密协作,需要耐心地从冷启动到成熟,但一旦飞轮开始真正转动,它会以指数级的速度加速,形成竞争对手几乎无法追赶的护城河。
思考题
- 在多租户 SaaS 场景中,不同客户的数据是否应该隔离训练?如何设计隔离但又互益的飞轮?
- 如何检测飞轮中的"负反馈循环"——坏模型产生坏数据,坏数据训练出更坏的模型?
- 数据飞轮中的数据隐私如何保护?轨迹中可能包含用户的敏感信息。
- 当飞轮速度达到瓶颈时(如数据质量已经很高,但用户增长停滞),下一步增长动力来自哪里?