Chapter 74

Atropos RL Fine-Tuning: Trajectory Collection and Training

Chapter 74: Atropos RL Fine-Tuning โ€” Trajectory Collection and Training

Chapter Introduction

In Greek mythology, Atropos was the Fate who cut the thread of life โ€” deciding what destinies should continue and which should end. NousResearch named their reinforcement learning framework accordingly: Atropos RL's core function is judging which agent behavior trajectories are worth preserving and which should be discarded, then using those judgments as training signal to let Hermes agents evolve through trial and error. This chapter dives into Atropos's full technical stack: trajectory collection, judge design, data quality filtering, LoRA fine-tuning, and effect evaluation.


74.1 Atropos Framework Architecture

Core Design Philosophy

The fundamental difference between Atropos and traditional RLHF (Reinforcement Learning from Human Feedback) is that the feedback source shifts from humans to automated judges, enabling highly scalable training signal generation.

Traditional RLHF:
Agent output โ†’ Human rating โ†’ Reward model โ†’ Policy update
(Bottleneck: human rating is slow, expensive, hard to scale)

Atropos RL:
Agent action โ†’ Environment feedback + auto-judge scoring โ†’ Training signal โ†’ Policy update
(Advantages: fully automated, low cost, massively scalable)

System Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      Atropos RL Framework                    โ”‚
โ”‚                                                             โ”‚
โ”‚  [Trajectory Collection Layer]                              โ”‚
โ”‚  Hermes Agent runs โ†’ logs complete trajectory               โ”‚
โ”‚  {prompt, thoughts, tool_calls, observations, final}        โ”‚
โ”‚                           โ†“                                 โ”‚
โ”‚  [Judge Layer]                                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                      โ”‚
โ”‚  โ”‚ Outcome Judge โ”‚  โ”‚ Process Judge โ”‚                      โ”‚
โ”‚  โ”‚               โ”‚  โ”‚               โ”‚                      โ”‚
โ”‚  โ”‚ Task complete?โ”‚  โ”‚ Tool use OK?  โ”‚                      โ”‚
โ”‚  โ”‚ Answer right? โ”‚  โ”‚ Reasoning OK? โ”‚                      โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                      โ”‚
โ”‚         โ†“                   โ†“                               โ”‚
โ”‚  Combined reward score r โˆˆ [-1, 1]                          โ”‚
โ”‚                           โ†“                                 โ”‚
โ”‚  [Quality Filter Layer]                                     โ”‚
โ”‚  Dedup โ†’ length filter โ†’ reward balance โ†’ diversity         โ”‚
โ”‚                           โ†“                                 โ”‚
โ”‚  [LoRA Fine-Tuning Layer]                                   โ”‚
โ”‚  Prepare dataset โ†’ LoRA config โ†’ Train โ†’ Eval โ†’ Merge       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Trajectory Data Format

# atropos/trajectory.py
from dataclasses import dataclass, field
from typing import List, Optional, Any
import json

@dataclass
class ToolCall:
    name: str
    arguments: dict
    result: Any
    duration_ms: int
    success: bool

@dataclass
class ThinkingStep:
    step_index: int
    thought: str
    tool_calls: List[ToolCall]
    observation: str

@dataclass
class Trajectory:
    trajectory_id: str
    task: str
    task_type: str
    initial_prompt: str
    system_prompt: str
    thinking_steps: List[ThinkingStep]
    final_response: str

    outcome_score: Optional[float] = None  # [-1, 1]
    process_score: Optional[float] = None
    combined_score: Optional[float] = None

    model_name: str = ""
    timestamp: str = ""
    duration_seconds: float = 0.0
    total_tokens: int = 0

    def to_training_format(self) -> dict:
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": self.initial_prompt}
        ]
        for step in self.thinking_steps:
            if step.tool_calls:
                messages.append({
                    "role": "assistant",
                    "content": step.thought,
                    "tool_calls": [
                        {"type": "function",
                         "function": {"name": tc.name,
                                     "arguments": json.dumps(tc.arguments)}}
                        for tc in step.tool_calls
                    ]
                })
                for tc in step.tool_calls:
                    messages.append({
                        "role": "tool",
                        "content": json.dumps(tc.result)
                    })
            else:
                messages.append({"role": "assistant", "content": step.thought})

        messages.append({"role": "assistant", "content": self.final_response})
        return {
            "trajectory_id": self.trajectory_id,
            "messages": messages,
            "reward": self.combined_score,
            "task_type": self.task_type,
        }

74.2 Trajectory Collection from Agent Logs

Collection Middleware

# atropos/collector.py
import json, time, uuid
from datetime import datetime
from openai import OpenAI
from .trajectory import Trajectory, ThinkingStep, ToolCall

class TrajectoryCollector:
    def __init__(self, client: OpenAI, model: str, output_dir: str = "./trajectories"):
        self.client = client
        self.model = model
        self.output_dir = output_dir
        import os; os.makedirs(output_dir, exist_ok=True)

    def run_and_collect(self, task, task_type, system_prompt, user_message,
                        tools, tool_dispatcher, max_iterations=20) -> Trajectory:
        traj_id = str(uuid.uuid4())
        start = time.time()
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message}
        ]
        thinking_steps = []
        total_tokens = 0

        for step_idx in range(max_iterations):
            resp = self.client.chat.completions.create(
                model=self.model, messages=messages, tools=tools,
                tool_choice="auto", temperature=0.3
            )
            msg = resp.choices[0].message
            total_tokens += resp.usage.total_tokens if resp.usage else 0
            messages.append(msg)

            tool_calls_in_step = []
            observation = ""

            if msg.tool_calls:
                for tc in msg.tool_calls:
                    tc_start = time.time()
                    args = json.loads(tc.function.arguments)
                    result = tool_dispatcher(tc.function.name, args)
                    tool_calls_in_step.append(ToolCall(
                        name=tc.function.name, arguments=args, result=result,
                        duration_ms=int((time.time() - tc_start) * 1000),
                        success="error" not in str(result).lower()
                    ))
                    result_str = json.dumps(result)
                    observation += f"[{tc.function.name}]: {result_str[:500]}\n"
                    messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str})

                thinking_steps.append(ThinkingStep(
                    step_index=step_idx, thought=msg.content or "",
                    tool_calls=tool_calls_in_step, observation=observation
                ))
            else:
                thinking_steps.append(ThinkingStep(
                    step_index=step_idx, thought=msg.content or "",
                    tool_calls=[], observation=""
                ))
                break

        final = messages[-1].content if hasattr(messages[-1], "content") else messages[-1].get("content", "")
        traj = Trajectory(
            trajectory_id=traj_id, task=task, task_type=task_type,
            initial_prompt=user_message, system_prompt=system_prompt,
            thinking_steps=thinking_steps, final_response=final,
            model_name=self.model, timestamp=datetime.now().isoformat(),
            duration_seconds=time.time() - start, total_tokens=total_tokens
        )
        with open(f"{self.output_dir}/{traj_id}.json", "w") as f:
            f.write(traj.to_json())
        return traj

74.3 Judge Design & Reward Functions

# atropos/judges/base.py
from abc import ABC, abstractmethod

class BaseJudge(ABC):
    @abstractmethod
    def score(self, trajectory) -> float:
        """Return reward in [-1, 1]"""
        pass

class ProcessJudge(BaseJudge):
    def score(self, traj) -> float:
        scores = []
        all_calls = [tc for step in traj.thinking_steps for tc in step.tool_calls]
        if all_calls:
            success_rate = sum(1 for tc in all_calls if tc.success) / len(all_calls)
            scores.append(success_rate * 2 - 1)
        has_reasoning = all(len(s.thought.strip()) > 20 for s in traj.thinking_steps)
        scores.append(0.5 if has_reasoning else -0.3)
        step_count = len(traj.thinking_steps)
        scores.append(0.5 if step_count <= 5 else 0.2 if step_count <= 10 else -0.3)
        scores.append(0.3 if len(traj.final_response) >= 100 else -0.5)
        return sum(scores) / len(scores) if scores else 0.0

class CompositeJudge(BaseJudge):
    def __init__(self, judges, weights=None):
        self.judges = judges
        total = sum(weights or [1.0] * len(judges))
        self.weights = [(w or 1.0) / total for w in (weights or [1.0] * len(judges))]

    def score(self, traj) -> float:
        return sum(j.score(traj) * w for j, w in zip(self.judges, self.weights))

74.4 Data Quality Filtering

# atropos/filter.py
import hashlib, random
from typing import List, Tuple

class TrajectoryFilter:
    def __init__(self, min_reward=-0.5, min_steps=1, max_steps=20,
                 min_response_length=50, max_response_length=8000):
        self.min_reward = min_reward
        self.min_steps = min_steps
        self.max_steps = max_steps
        self.min_response_length = min_response_length
        self.max_response_length = max_response_length

    def filter_batch(self, trajectories) -> Tuple[list, dict]:
        stats = {"total": len(trajectories), "passed": 0, "filtered": {}}
        passed, seen = [], set()

        for traj in trajectories:
            reason = self._check(traj, seen)
            if reason is None:
                passed.append(traj)
                stats["passed"] += 1
                seen.add(self._fp(traj))
            else:
                stats["filtered"][reason] = stats["filtered"].get(reason, 0) + 1

        return passed, stats

    def _check(self, traj, seen) -> str | None:
        if traj.combined_score is None: return "no_score"
        if traj.combined_score < self.min_reward: return "low_reward"
        steps = len(traj.thinking_steps)
        if steps < self.min_steps: return "too_few_steps"
        if steps > self.max_steps: return "too_many_steps"
        resp = len(traj.final_response)
        if resp < self.min_response_length: return "too_short"
        if resp > self.max_response_length: return "too_long"
        if self._fp(traj) in seen: return "duplicate"
        return None

    def _fp(self, traj) -> str:
        key = traj.task[:100] + traj.final_response[:200]
        return hashlib.md5(key.encode()).hexdigest()

    def balance_rewards(self, trajectories, bins=5, max_per_bin=1000) -> list:
        import numpy as np
        edges = np.linspace(-1, 1, bins + 1)
        buckets = [[] for _ in range(bins)]
        for t in trajectories:
            for i in range(bins):
                if edges[i] <= t.combined_score <= edges[i + 1]:
                    buckets[i].append(t); break
        balanced = []
        for bucket in buckets:
            balanced.extend(random.sample(bucket, min(len(bucket), max_per_bin)))
        return balanced

74.5 Training Configuration & Script

# atropos/training/train.py
import json, os, torch
from datasets import Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from peft import LoraConfig, get_peft_model, TaskType
from trl import SFTTrainer

# Recommended LoRA configuration for Hermes/Mixtral
LORA_CONFIG = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=64,                # Rank: higher = more expressiveness, more params
    lora_alpha=128,      # Scaling: alpha/r = 2 (standard)
    lora_dropout=0.05,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
                    "gate_proj", "up_proj", "down_proj"],
    bias="none",
)

TRAINING_ARGS = TrainingArguments(
    output_dir="./checkpoints/hermes-atropos-v1",
    num_train_epochs=3,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=8,   # effective batch = 16
    learning_rate=2e-4,
    lr_scheduler_type="cosine",
    warmup_ratio=0.05,
    max_grad_norm=1.0,
    bf16=True,
    gradient_checkpointing=True,
    logging_steps=10,
    save_steps=100,
    eval_steps=50,
    report_to=["wandb"],
    run_name="atropos-hermes-r64",
)


def prepare_dataset(path: str, tokenizer, max_seq: int = 4096,
                    reward_threshold: float = 0.0) -> Dataset:
    data = []
    with open(path) as f:
        for line in f:
            item = json.loads(line)
            if item.get("reward", 0) < reward_threshold:
                continue
            text = tokenizer.apply_chat_template(
                item["messages"], tokenize=False, add_generation_prompt=False
            )
            tokens = tokenizer(text, return_tensors="pt")
            if tokens["input_ids"].shape[1] <= max_seq:
                data.append({"text": text, "reward": item["reward"]})
    print(f"[Dataset] Loaded {len(data)} training samples")
    return Dataset.from_list(data)


def train(base_model: str, train_path: str, eval_path: str = None):
    tokenizer = AutoTokenizer.from_pretrained(base_model, padding_side="right")
    if not tokenizer.pad_token:
        tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(
        base_model, torch_dtype=torch.bfloat16, device_map="auto"
    )
    model.gradient_checkpointing_enable()
    model = get_peft_model(model, LORA_CONFIG)
    model.print_trainable_parameters()

    train_ds = prepare_dataset(train_path, tokenizer)
    eval_ds = prepare_dataset(eval_path, tokenizer) if eval_path else None

    args = TRAINING_ARGS
    if not eval_ds:
        args = TrainingArguments(**{**vars(args), "evaluation_strategy": "no"})

    trainer = SFTTrainer(
        model=model, args=args, tokenizer=tokenizer,
        train_dataset=train_ds, eval_dataset=eval_ds,
        dataset_text_field="text", max_seq_length=4096, packing=True,
    )
    trainer.train()
    trainer.save_model(os.path.join(args.output_dir, "final"))
    tokenizer.save_pretrained(os.path.join(args.output_dir, "final"))
    print("Training complete.")

74.6 Evaluating Improvement

LoRA Hyperparameter Impact

Parameter Low Value High Value Recommendation
lora_r Fewer params, less expressive More params, more expressive 16-128 (start at 64)
lora_alpha Smaller scaling Larger scaling = 2 * r
learning_rate Slow convergence Instability 1e-4 to 3e-4
num_epochs Underfit Overfit 2-5

Benchmark Evaluation Script

# atropos/evaluation/benchmark.py

TASKS = [
    {
        "id": "code_debug", "task_type": "coding",
        "prompt": "Find and fix the bug in this Python function: [...]",
        "criteria": ["memoization", "lru_cache", "dynamic"]
    },
    {
        "id": "research", "task_type": "research",
        "prompt": "Summarize the latest developments in RAG technology",
        "criteria": ["vector database", "embedding", "retrieval"]
    },
]

def run_benchmark(agent_runner, model_version="baseline"):
    results = []
    for task in TASKS:
        traj = agent_runner(task["prompt"], task["task_type"])
        resp = traj.final_response.lower()
        hits = sum(1 for c in task["criteria"] if c.lower() in resp)
        results.append({
            "id": task["id"],
            "success_rate": hits / len(task["criteria"]),
            "steps": len(traj.thinking_steps),
            "duration": traj.duration_seconds
        })

    avg_success = sum(r["success_rate"] for r in results) / len(results)
    print(f"\nBenchmark [{model_version}]")
    print(f"  Avg success rate: {avg_success:.1%}")
    print(f"  Avg steps: {sum(r['steps'] for r in results)/len(results):.1f}")
    return {"version": model_version, "avg_success": avg_success, "tasks": results}

Chapter Summary

This chapter covered the complete Atropos RL technical stack:

Atropos RL's core value is breaking the bottleneck of "requiring human feedback to improve AI" โ€” it lets agents automatically generate training signals during task execution, enabling true autonomous evolution.

Discussion Questions

  1. When a judge produces incorrect reward signals, how do you design detection and correction mechanisms?
  2. How much does LoRA rank choice affect final model quality? Is there a principled selection method?
  3. How do you prevent the model from "reward hacking" โ€” optimizing judge scores without actually improving?
  4. For complex multi-tool tasks, how do you design a fair process reward function?
Rate this chapter
4.6  / 5  (3 ratings)

๐Ÿ’ฌ Comments