第 74 章

Atropos RL 微调:轨迹采集与训练

第七十四章:Atropos RL 微调:轨迹采集与训练

章节导语

在古希腊神话中,阿特罗波斯(Atropos)是命运三女神中负责"剪断生命之线"的那位——她决定什么样的命运应该延续,什么样的命运应该终结。NousResearch 以此命名他们的强化学习框架,寓意深刻:Atropos RL 的核心工作正是判断哪些 Agent 行为轨迹值得保留,哪些应该被淘汰,并以此为训练信号,让 Hermes Agent 在不断试错中进化。本章将深入 Atropos 框架的技术细节,从轨迹采集到评判者设计,从数据质量过滤到 LoRA 微调训练,构建完整的 Agent 自我改进闭环。


74.1 Atropos 框架架构详解

核心设计哲学

Atropos 与传统 RLHF(人类反馈强化学习)的根本区别在于:反馈来源从人类转向自动化评判者,实现了高度可扩展的训练信号生成。

传统 RLHF:
Agent 输出 → 人类评分 → 奖励模型 → 策略更新
(瓶颈:人类评分速度慢、成本高、难以扩展)

Atropos RL:
Agent 行动 → 环境反馈 + 自动评判者打分 → 直接训练信号 → 策略更新
(优势:全自动、低成本、可大规模扩展)

系统架构图

┌─────────────────────────────────────────────────────────────┐
│                      Atropos RL Framework                    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  轨迹采集层                          │   │
│  │                                                     │   │
│  │  Hermes Agent 运行 → 记录完整轨迹                   │   │
│  │  {prompt, thoughts, tool_calls, observations, final}│   │
│  └────────────────────────┬────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  评判者层(Judge)                   │   │
│  │                                                     │   │
│  │  ┌───────────────┐  ┌───────────────┐              │   │
│  │  │  结果评判者   │  │  过程评判者   │              │   │
│  │  │  (Outcome)    │  │  (Process)    │              │   │
│  │  │               │  │               │              │   │
│  │  │ 任务是否完成?│  │ 工具使用合理?│              │   │
│  │  │ 答案是否正确?│  │ 推理是否清晰?│              │   │
│  │  └───────────────┘  └───────────────┘              │   │
│  │         ↓                   ↓                      │   │
│  │         └────────┬──────────┘                      │   │
│  │                  ▼                                  │   │
│  │          综合奖励分数 r ∈ [-1, 1]                   │   │
│  └────────────────────────┬────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  数据质量过滤层                      │   │
│  │  去重 → 长度过滤 → 奖励分布平衡 → 多样性保证         │   │
│  └────────────────────────┬────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  LoRA 微调训练层                     │   │
│  │  准备训练集 → LoRA 配置 → 训练 → 评估 → 合并        │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

轨迹数据格式

# atropos/trajectory.py
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
import json

@dataclass
class ToolCall:
    """工具调用记录"""
    name: str
    arguments: dict
    result: Any
    duration_ms: int
    success: bool

@dataclass
class ThinkingStep:
    """Agent 的思考步骤"""
    step_index: int
    thought: str                    # 推理文本
    tool_calls: List[ToolCall]     # 本步骤的工具调用
    observation: str               # 工具返回结果的观察

@dataclass
class Trajectory:
    """完整 Agent 运行轨迹"""
    trajectory_id: str
    task: str                      # 任务描述
    task_type: str                 # 任务类型分类
    initial_prompt: str
    system_prompt: str
    
    thinking_steps: List[ThinkingStep]
    final_response: str
    
    # 评分信息(训练前为空)
    outcome_score: Optional[float] = None  # [-1, 1]
    process_score: Optional[float] = None  # [-1, 1]
    combined_score: Optional[float] = None
    
    # 元数据
    model_name: str = ""
    timestamp: str = ""
    duration_seconds: float = 0.0
    total_tokens: int = 0
    
    def to_training_format(self) -> dict:
        """转换为 SFT/RL 训练格式"""
        # 构建对话历史
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": self.initial_prompt}
        ]
        
        for step in self.thinking_steps:
            # 添加 Agent 的思考(作为 assistant 消息)
            assistant_content = step.thought
            if step.tool_calls:
                # 格式化工具调用
                tool_calls_formatted = [
                    {
                        "type": "function",
                        "function": {
                            "name": tc.name,
                            "arguments": json.dumps(tc.arguments)
                        }
                    }
                    for tc in step.tool_calls
                ]
                messages.append({
                    "role": "assistant",
                    "content": assistant_content,
                    "tool_calls": tool_calls_formatted
                })
                # 添加工具结果
                for tc in step.tool_calls:
                    messages.append({
                        "role": "tool",
                        "content": json.dumps(tc.result, ensure_ascii=False)
                    })
            else:
                messages.append({"role": "assistant", "content": assistant_content})
        
        # 最终响应
        messages.append({"role": "assistant", "content": self.final_response})
        
        return {
            "trajectory_id": self.trajectory_id,
            "messages": messages,
            "reward": self.combined_score,
            "task_type": self.task_type,
            "metadata": {
                "model": self.model_name,
                "duration": self.duration_seconds,
                "steps": len(self.thinking_steps)
            }
        }
    
    def to_json(self) -> str:
        return json.dumps({
            "trajectory_id": self.trajectory_id,
            "task": self.task,
            "task_type": self.task_type,
            "thinking_steps": [
                {
                    "step": s.step_index,
                    "thought": s.thought,
                    "tool_calls": [
                        {"name": tc.name, "args": tc.arguments, 
                         "result": tc.result, "success": tc.success}
                        for tc in s.tool_calls
                    ],
                    "observation": s.observation
                }
                for s in self.thinking_steps
            ],
            "final_response": self.final_response,
            "scores": {
                "outcome": self.outcome_score,
                "process": self.process_score,
                "combined": self.combined_score
            },
            "metadata": {
                "model": self.model_name,
                "timestamp": self.timestamp,
                "duration_s": self.duration_seconds,
                "total_tokens": self.total_tokens
            }
        }, ensure_ascii=False, indent=2)

74.2 从运行日志提取训练轨迹

轨迹采集中间件

# atropos/collector.py
import json
import time
import uuid
from datetime import datetime
from typing import List, Optional
from openai import OpenAI
from .trajectory import Trajectory, ThinkingStep, ToolCall

class TrajectoryCollector:
    """轨迹采集器:包装 OpenAI 调用,自动记录完整轨迹"""
    
    def __init__(self, client: OpenAI, model: str, output_dir: str = "./trajectories"):
        self.client = client
        self.model = model
        self.output_dir = output_dir
        import os
        os.makedirs(output_dir, exist_ok=True)
    
    def run_and_collect(
        self,
        task: str,
        task_type: str,
        system_prompt: str,
        initial_user_message: str,
        tools: list,
        tool_dispatcher: callable,
        max_iterations: int = 20
    ) -> Trajectory:
        """运行 Agent 并收集完整轨迹"""
        
        traj_id = str(uuid.uuid4())
        start_time = time.time()
        
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": initial_user_message}
        ]
        
        thinking_steps = []
        total_tokens = 0
        step_index = 0
        
        while step_index < max_iterations:
            # 调用模型
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=tools,
                tool_choice="auto",
                temperature=0.3,
            )
            
            msg = response.choices[0].message
            total_tokens += response.usage.total_tokens if response.usage else 0
            messages.append(msg)
            
            # 提取思考内容
            thought_text = msg.content or ""
            tool_calls_in_step = []
            observation_text = ""
            
            if msg.tool_calls:
                for tc in msg.tool_calls:
                    tc_start = time.time()
                    args = json.loads(tc.function.arguments)
                    result = tool_dispatcher(tc.function.name, args)
                    tc_duration = int((time.time() - tc_start) * 1000)
                    
                    tool_calls_in_step.append(ToolCall(
                        name=tc.function.name,
                        arguments=args,
                        result=result,
                        duration_ms=tc_duration,
                        success="error" not in str(result).lower()
                    ))
                    
                    result_str = json.dumps(result, ensure_ascii=False)
                    observation_text += f"[{tc.function.name}]: {result_str[:500]}\n"
                    
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result_str
                    })
                
                thinking_steps.append(ThinkingStep(
                    step_index=step_index,
                    thought=thought_text,
                    tool_calls=tool_calls_in_step,
                    observation=observation_text
                ))
                step_index += 1
            else:
                # 最终响应,无工具调用
                thinking_steps.append(ThinkingStep(
                    step_index=step_index,
                    thought=thought_text,
                    tool_calls=[],
                    observation=""
                ))
                break
        
        final_response = messages[-1].get("content", "") if isinstance(messages[-1], dict) else (messages[-1].content or "")
        
        trajectory = Trajectory(
            trajectory_id=traj_id,
            task=task,
            task_type=task_type,
            initial_prompt=initial_user_message,
            system_prompt=system_prompt,
            thinking_steps=thinking_steps,
            final_response=final_response,
            model_name=self.model,
            timestamp=datetime.now().isoformat(),
            duration_seconds=time.time() - start_time,
            total_tokens=total_tokens
        )
        
        # 保存原始轨迹
        output_path = f"{self.output_dir}/{traj_id}.json"
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(trajectory.to_json())
        
        print(f"[Collector] 轨迹已保存: {output_path}")
        return trajectory

74.3 评判者设计与奖励函数

评判者基类

# atropos/judges/base.py
from abc import ABC, abstractmethod
from ..trajectory import Trajectory

class BaseJudge(ABC):
    """评判者基类"""
    
    @abstractmethod
    def score(self, trajectory: Trajectory) -> float:
        """返回 [-1, 1] 的奖励分数"""
        pass
    
    def batch_score(self, trajectories: list) -> list:
        """批量评分"""
        return [self.score(t) for t in trajectories]
    
    @property
    def name(self) -> str:
        return self.__class__.__name__


class OutcomeJudge(BaseJudge):
    """结果评判者:评估任务是否成功完成"""
    
    def __init__(self, success_checker: callable):
        """
        success_checker: (trajectory) -> (success: bool, score: float)
        """
        self.success_checker = success_checker
    
    def score(self, trajectory: Trajectory) -> float:
        success, raw_score = self.success_checker(trajectory)
        return raw_score if success else -raw_score


class ProcessJudge(BaseJudge):
    """过程评判者:评估工具使用和推理过程的质量"""
    
    def score(self, trajectory: Trajectory) -> float:
        scores = []
        
        # 1. 工具使用成功率
        all_calls = []
        for step in trajectory.thinking_steps:
            all_calls.extend(step.tool_calls)
        
        if all_calls:
            success_rate = sum(1 for tc in all_calls if tc.success) / len(all_calls)
            scores.append(success_rate * 2 - 1)  # 映射到 [-1, 1]
        
        # 2. 推理链完整性
        has_reasoning = all(
            len(step.thought.strip()) > 20
            for step in trajectory.thinking_steps
        )
        scores.append(0.5 if has_reasoning else -0.3)
        
        # 3. 步骤效率(过多步骤扣分)
        step_count = len(trajectory.thinking_steps)
        if step_count <= 5:
            scores.append(0.5)
        elif step_count <= 10:
            scores.append(0.2)
        else:
            scores.append(-0.3)
        
        # 4. 响应完整性
        final_len = len(trajectory.final_response)
        if final_len >= 100:
            scores.append(0.3)
        elif final_len >= 20:
            scores.append(0.1)
        else:
            scores.append(-0.5)
        
        return sum(scores) / len(scores) if scores else 0.0


class LLMJudge(BaseJudge):
    """LLM 评判者:使用另一个 LLM 评估轨迹质量"""
    
    def __init__(self, client, model: str = "gpt-4o"):
        self.client = client
        self.model = model
    
    def score(self, trajectory: Trajectory) -> float:
        # 构建评判 prompt
        traj_summary = self._summarize_trajectory(trajectory)
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": "你是一个 AI Agent 轨迹评判者。请评估以下 Agent 的表现,返回 JSON 格式的评分。"
                },
                {
                    "role": "user",
                    "content": f"""请评估此 Agent 轨迹的质量:

任务:{trajectory.task}
轨迹摘要:{traj_summary}
最终输出:{trajectory.final_response[:500]}

请以 JSON 格式返回:
{{
  "task_completion": 0-10,
  "reasoning_quality": 0-10,
  "tool_use_efficiency": 0-10,
  "response_quality": 0-10,
  "overall_score": 0-10,
  "brief_comment": "简短评价"
}}"""
                }
            ],
            response_format={"type": "json_object"},
            temperature=0.1
        )
        
        import json
        result = json.loads(response.choices[0].message.content)
        # 将 0-10 分转换为 -1 到 1
        raw = result.get("overall_score", 5) / 10
        return raw * 2 - 1
    
    def _summarize_trajectory(self, traj: Trajectory) -> str:
        steps_summary = []
        for step in traj.thinking_steps[:5]:  # 只看前5步
            tool_names = [tc.name for tc in step.tool_calls]
            steps_summary.append(f"步骤{step.step_index}: 工具={tool_names}, 成功={all(tc.success for tc in step.tool_calls)}")
        return "\n".join(steps_summary)


class CompositeJudge(BaseJudge):
    """组合评判者:多个评判者加权综合"""
    
    def __init__(self, judges: list, weights: list = None):
        self.judges = judges
        self.weights = weights or [1.0] * len(judges)
        total = sum(self.weights)
        self.weights = [w / total for w in self.weights]
    
    def score(self, trajectory: Trajectory) -> float:
        scores = [j.score(trajectory) for j in self.judges]
        return sum(s * w for s, w in zip(scores, self.weights))

74.4 数据质量过滤标准

# atropos/filter.py
import json
from typing import List, Tuple
from .trajectory import Trajectory

class TrajectoryFilter:
    """轨迹数据质量过滤器"""
    
    def __init__(
        self,
        min_reward: float = -0.5,      # 过滤过低奖励
        max_reward: float = 1.0,
        min_steps: int = 1,            # 最少步骤数
        max_steps: int = 20,
        min_response_length: int = 50, # 最终响应最少字符
        max_response_length: int = 8000,
        dedup_threshold: float = 0.9,  # 相似度去重阈值
    ):
        self.min_reward = min_reward
        self.max_reward = max_reward
        self.min_steps = min_steps
        self.max_steps = max_steps
        self.min_response_length = min_response_length
        self.max_response_length = max_response_length
        self.dedup_threshold = dedup_threshold
    
    def filter_batch(
        self, trajectories: List[Trajectory]
    ) -> Tuple[List[Trajectory], dict]:
        """批量过滤,返回通过的轨迹和过滤统计"""
        
        stats = {
            "total": len(trajectories),
            "passed": 0,
            "filtered_reasons": {
                "no_score": 0,
                "low_reward": 0,
                "too_few_steps": 0,
                "too_many_steps": 0,
                "response_too_short": 0,
                "response_too_long": 0,
                "duplicate": 0
            }
        }
        
        passed = []
        seen_fingerprints = set()
        
        for traj in trajectories:
            reason = self._check_single(traj, seen_fingerprints)
            if reason is None:
                passed.append(traj)
                stats["passed"] += 1
                # 记录指纹防重
                fp = self._fingerprint(traj)
                seen_fingerprints.add(fp)
            else:
                stats["filtered_reasons"][reason] = stats["filtered_reasons"].get(reason, 0) + 1
        
        print(f"[Filter] 过滤完成: {stats['passed']}/{stats['total']} 通过")
        print(f"  过滤原因: {stats['filtered_reasons']}")
        
        return passed, stats
    
    def _check_single(self, traj: Trajectory, seen: set) -> str | None:
        """检查单条轨迹,返回 None 表示通过,否则返回过滤原因"""
        
        if traj.combined_score is None:
            return "no_score"
        
        if traj.combined_score < self.min_reward:
            return "low_reward"
        
        step_count = len(traj.thinking_steps)
        if step_count < self.min_steps:
            return "too_few_steps"
        if step_count > self.max_steps:
            return "too_many_steps"
        
        resp_len = len(traj.final_response)
        if resp_len < self.min_response_length:
            return "response_too_short"
        if resp_len > self.max_response_length:
            return "response_too_long"
        
        fp = self._fingerprint(traj)
        if fp in seen:
            return "duplicate"
        
        return None  # 通过
    
    def _fingerprint(self, traj: Trajectory) -> str:
        """生成轨迹指纹用于去重"""
        key = traj.task[:100] + traj.final_response[:200]
        import hashlib
        return hashlib.md5(key.encode()).hexdigest()
    
    def balance_reward_distribution(
        self, trajectories: List[Trajectory], 
        bins: int = 5, max_per_bin: int = 1000
    ) -> List[Trajectory]:
        """平衡奖励分布:确保正负样本比例合理"""
        import numpy as np
        
        rewards = [t.combined_score for t in trajectories]
        bin_edges = np.linspace(-1, 1, bins + 1)
        
        binned = [[] for _ in range(bins)]
        for traj in trajectories:
            for i in range(bins):
                if bin_edges[i] <= traj.combined_score <= bin_edges[i + 1]:
                    binned[i].append(traj)
                    break
        
        balanced = []
        for bin_trajs in binned:
            # 每个分桶最多保留 max_per_bin 条
            import random
            selected = random.sample(bin_trajs, min(len(bin_trajs), max_per_bin))
            balanced.extend(selected)
        
        print(f"[Filter] 奖励分布平衡: {len(trajectories)} → {len(balanced)}")
        return balanced

74.5 训练配置与完整训练脚本

LoRA 训练配置

# atropos/training/config.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class AtroposTrainingConfig:
    """Atropos RL 微调训练配置"""
    
    # 基础模型
    base_model: str = "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO"
    model_revision: str = "main"
    
    # LoRA 配置
    lora_r: int = 64               # rank,越大表达能力越强但参数越多
    lora_alpha: int = 128          # scaling factor = alpha/r
    lora_dropout: float = 0.05
    lora_target_modules: list = None  # None=自动检测
    
    # 训练超参数
    learning_rate: float = 2e-4
    num_train_epochs: int = 3
    per_device_train_batch_size: int = 2
    gradient_accumulation_steps: int = 8  # 有效批大小 = 2*8 = 16
    warmup_ratio: float = 0.05
    lr_scheduler_type: str = "cosine"
    max_grad_norm: float = 1.0
    
    # 序列长度
    max_seq_length: int = 4096
    
    # 数据集
    train_data_path: str = "./data/train.jsonl"
    eval_data_path: str = "./data/eval.jsonl"
    
    # RL 特定配置
    use_reward_scaling: bool = True     # 是否对奖励归一化
    reward_baseline: float = 0.0       # 奖励基线(减去基线减少方差)
    kl_coef: float = 0.1              # KL 散度惩罚系数(防止偏离参考模型太远)
    
    # 输出
    output_dir: str = "./checkpoints"
    save_steps: int = 100
    eval_steps: int = 50
    logging_steps: int = 10
    
    # 硬件
    fp16: bool = False
    bf16: bool = True             # 推荐:A100/H100 使用 bf16
    gradient_checkpointing: bool = True
    
    def __post_init__(self):
        if self.lora_target_modules is None:
            # Mistral/Mixtral 架构的 LoRA 目标层
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj",
                "gate_proj", "up_proj", "down_proj"
            ]

完整训练脚本

# atropos/training/train.py
import json
import os
import torch
from datasets import Dataset
from transformers import (
    AutoModelForCausalLM, AutoTokenizer, TrainingArguments
)
from peft import LoraConfig, get_peft_model, TaskType
from trl import SFTTrainer, DataCollatorForCompletionOnlyLM

from .config import AtroposTrainingConfig
from ..trajectory import Trajectory


def prepare_dataset(
    trajectories_path: str,
    tokenizer,
    max_seq_length: int = 4096,
    reward_threshold: float = 0.0
) -> Dataset:
    """准备训练数据集"""
    
    data = []
    with open(trajectories_path, "r", encoding="utf-8") as f:
        for line in f:
            item = json.loads(line.strip())
            reward = item.get("reward", 0)
            
            # 只使用奖励 > 阈值的轨迹做 SFT
            if reward < reward_threshold:
                continue
            
            # 格式化为对话模板
            messages = item.get("messages", [])
            text = tokenizer.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=False
            )
            
            # 过滤超长序列
            tokens = tokenizer(text, return_tensors="pt")
            if tokens["input_ids"].shape[1] > max_seq_length:
                continue
            
            data.append({
                "text": text,
                "reward": reward,
                "task_type": item.get("metadata", {}).get("task_type", "unknown")
            })
    
    print(f"[Training] 加载 {len(data)} 条训练样本")
    return Dataset.from_list(data)


def load_model_and_tokenizer(config: AtroposTrainingConfig):
    """加载基础模型和 tokenizer"""
    print(f"[Training] 加载模型: {config.base_model}")
    
    tokenizer = AutoTokenizer.from_pretrained(
        config.base_model,
        trust_remote_code=True,
        padding_side="right"
    )
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    model = AutoModelForCausalLM.from_pretrained(
        config.base_model,
        torch_dtype=torch.bfloat16 if config.bf16 else torch.float16,
        device_map="auto",
        trust_remote_code=True,
    )
    
    if config.gradient_checkpointing:
        model.gradient_checkpointing_enable()
    
    return model, tokenizer


def apply_lora(model, config: AtroposTrainingConfig):
    """应用 LoRA 适配器"""
    lora_config = LoraConfig(
        task_type=TaskType.CAUSAL_LM,
        r=config.lora_r,
        lora_alpha=config.lora_alpha,
        lora_dropout=config.lora_dropout,
        target_modules=config.lora_target_modules,
        bias="none",
        inference_mode=False,
    )
    
    model = get_peft_model(model, lora_config)
    model.print_trainable_parameters()
    return model


def train(config: AtroposTrainingConfig):
    """执行完整训练流程"""
    
    # 1. 加载模型
    model, tokenizer = load_model_and_tokenizer(config)
    model = apply_lora(model, config)
    
    # 2. 准备数据集
    train_dataset = prepare_dataset(
        config.train_data_path, tokenizer,
        config.max_seq_length, reward_threshold=0.0
    )
    eval_dataset = prepare_dataset(
        config.eval_data_path, tokenizer,
        config.max_seq_length, reward_threshold=0.0
    ) if os.path.exists(config.eval_data_path) else None
    
    # 3. 训练参数
    training_args = TrainingArguments(
        output_dir=config.output_dir,
        num_train_epochs=config.num_train_epochs,
        per_device_train_batch_size=config.per_device_train_batch_size,
        gradient_accumulation_steps=config.gradient_accumulation_steps,
        learning_rate=config.learning_rate,
        lr_scheduler_type=config.lr_scheduler_type,
        warmup_ratio=config.warmup_ratio,
        max_grad_norm=config.max_grad_norm,
        fp16=config.fp16,
        bf16=config.bf16,
        logging_steps=config.logging_steps,
        save_steps=config.save_steps,
        eval_steps=config.eval_steps if eval_dataset else None,
        evaluation_strategy="steps" if eval_dataset else "no",
        save_total_limit=3,
        load_best_model_at_end=True if eval_dataset else False,
        report_to=["wandb"],  # 推荐使用 wandb 监控训练
        run_name=f"atropos-hermes-{config.lora_r}r",
    )
    
    # 4. 创建 Trainer
    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=tokenizer,
        dataset_text_field="text",
        max_seq_length=config.max_seq_length,
        packing=True,  # 提高 GPU 利用率
    )
    
    # 5. 开始训练
    print("[Training] 开始训练...")
    trainer.train()
    
    # 6. 保存最终模型
    final_output = os.path.join(config.output_dir, "final")
    trainer.save_model(final_output)
    tokenizer.save_pretrained(final_output)
    print(f"[Training] 模型已保存至: {final_output}")
    
    return trainer


if __name__ == "__main__":
    config = AtroposTrainingConfig(
        base_model="NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO",
        lora_r=64,
        lora_alpha=128,
        learning_rate=2e-4,
        num_train_epochs=3,
        per_device_train_batch_size=2,
        gradient_accumulation_steps=8,
        train_data_path="./data/trajectories_train.jsonl",
        eval_data_path="./data/trajectories_eval.jsonl",
        output_dir="./checkpoints/hermes-atropos-v1",
        bf16=True,
    )
    
    train(config)

74.6 评估改进效果

# atropos/evaluation/benchmark.py
import json
from typing import List, Dict

BENCHMARK_TASKS = [
    {
        "id": "code_debug_001",
        "task_type": "code_debugging",
        "prompt": "以下 Python 函数有 bug,请找出并修复:\n```python\ndef fibonacci(n):\n    if n <= 1:\n        return n\n    return fibonacci(n-1) + fibonacci(n-2)\n```\n要求:修复性能问题,支持 n=50",
        "success_criteria": ["memoization", "lru_cache", "dynamic programming"]
    },
    {
        "id": "research_001",
        "task_type": "research",
        "prompt": "请研究并总结 RAG(检索增强生成)技术的最新进展",
        "success_criteria": ["vector database", "embedding", "retrieval"]
    },
    {
        "id": "data_analysis_001",
        "task_type": "data_analysis",
        "prompt": "给定一份销售数据 CSV,分析月度趋势并找出最佳销售员",
        "success_criteria": ["trend", "analysis", "recommendation"]
    }
]

def run_benchmark(
    agent_runner: callable,
    tasks: List[dict] = None,
    model_version: str = "baseline"
) -> Dict:
    """运行基准测试评估模型改进效果"""
    if tasks is None:
        tasks = BENCHMARK_TASKS
    
    results = []
    for task in tasks:
        print(f"[Benchmark] 运行任务: {task['id']}")
        try:
            trajectory = agent_runner(task["prompt"], task["task_type"])
            
            # 检查成功标准
            response_lower = trajectory.final_response.lower()
            criteria_met = sum(
                1 for criterion in task["success_criteria"]
                if criterion.lower() in response_lower
            )
            success_rate = criteria_met / len(task["success_criteria"])
            
            results.append({
                "task_id": task["id"],
                "task_type": task["task_type"],
                "success_rate": success_rate,
                "criteria_met": criteria_met,
                "total_criteria": len(task["success_criteria"]),
                "steps_used": len(trajectory.thinking_steps),
                "duration_s": trajectory.duration_seconds
            })
        except Exception as e:
            results.append({
                "task_id": task["id"],
                "error": str(e),
                "success_rate": 0.0
            })
    
    summary = {
        "model_version": model_version,
        "total_tasks": len(tasks),
        "avg_success_rate": sum(r.get("success_rate", 0) for r in results) / len(results),
        "avg_steps": sum(r.get("steps_used", 0) for r in results) / len(results),
        "avg_duration_s": sum(r.get("duration_s", 0) for r in results) / len(results),
        "task_results": results
    }
    
    print(f"\n[Benchmark] 结果 ({model_version}):")
    print(f"  平均成功率: {summary['avg_success_rate']:.1%}")
    print(f"  平均步骤数: {summary['avg_steps']:.1f}")
    print(f"  平均耗时: {summary['avg_duration_s']:.1f}s")
    
    return summary

本章小结

本章深入 Atropos RL 框架的完整技术栈:

Atropos RL 的核心价值在于打破了"需要人类反馈才能改进 AI"的瓶颈——它让 Agent 在任务执行中自动产生训练信号,实现了真正意义上的自主进化

思考题

  1. 当评判者本身出错(给出错误的奖励信号)时,如何设计检测和纠错机制?
  2. LoRA rank 的选择对模型最终效果有多大影响?有什么系统化的选取方法?
  3. 如何防止模型在优化评判者奖励的同时"作弊"(奖励黑客问题)?
  4. 对于多工具协作的复杂任务,如何设计公平的过程奖励函数?
本章评分
4.6  / 5  (3 评分)

💬 留言讨论