Atropos RL Fine-Tuning: Trajectory Collection and Training
Chapter 74: Atropos RL Fine-Tuning — Trajectory Collection and Training
Chapter Introduction
In Greek mythology, Atropos was the Fate who cut the thread of life — deciding what destinies should continue and which should end. NousResearch named their reinforcement learning framework accordingly: Atropos RL's core function is judging which agent behavior trajectories are worth preserving and which should be discarded, then using those judgments as training signal to let Hermes agents evolve through trial and error. This chapter dives into Atropos's full technical stack: trajectory collection, judge design, data quality filtering, LoRA fine-tuning, and effect evaluation.
74.1 Atropos Framework Architecture
Core Design Philosophy
The fundamental difference between Atropos and traditional RLHF (Reinforcement Learning from Human Feedback) is that the feedback source shifts from humans to automated judges, enabling highly scalable training signal generation.
Traditional RLHF:
Agent output → Human rating → Reward model → Policy update
(Bottleneck: human rating is slow, expensive, hard to scale)
Atropos RL:
Agent action → Environment feedback + auto-judge scoring → Training signal → Policy update
(Advantages: fully automated, low cost, massively scalable)
System Architecture
┌─────────────────────────────────────────────────────────────┐
│ Atropos RL Framework │
│ │
│ [Trajectory Collection Layer] │
│ Hermes Agent runs → logs complete trajectory │
│ {prompt, thoughts, tool_calls, observations, final} │
│ ↓ │
│ [Judge Layer] │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ Outcome Judge │ │ Process Judge │ │
│ │ │ │ │ │
│ │ Task complete?│ │ Tool use OK? │ │
│ │ Answer right? │ │ Reasoning OK? │ │
│ └───────────────┘ └───────────────┘ │
│ ↓ ↓ │
│ Combined reward score r ∈ [-1, 1] │
│ ↓ │
│ [Quality Filter Layer] │
│ Dedup → length filter → reward balance → diversity │
│ ↓ │
│ [LoRA Fine-Tuning Layer] │
│ Prepare dataset → LoRA config → Train → Eval → Merge │
└─────────────────────────────────────────────────────────────┘
Trajectory Data Format
# atropos/trajectory.py
from dataclasses import dataclass, field
from typing import List, Optional, Any
import json
@dataclass
class ToolCall:
name: str
arguments: dict
result: Any
duration_ms: int
success: bool
@dataclass
class ThinkingStep:
step_index: int
thought: str
tool_calls: List[ToolCall]
observation: str
@dataclass
class Trajectory:
trajectory_id: str
task: str
task_type: str
initial_prompt: str
system_prompt: str
thinking_steps: List[ThinkingStep]
final_response: str
outcome_score: Optional[float] = None # [-1, 1]
process_score: Optional[float] = None
combined_score: Optional[float] = None
model_name: str = ""
timestamp: str = ""
duration_seconds: float = 0.0
total_tokens: int = 0
def to_training_format(self) -> dict:
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": self.initial_prompt}
]
for step in self.thinking_steps:
if step.tool_calls:
messages.append({
"role": "assistant",
"content": step.thought,
"tool_calls": [
{"type": "function",
"function": {"name": tc.name,
"arguments": json.dumps(tc.arguments)}}
for tc in step.tool_calls
]
})
for tc in step.tool_calls:
messages.append({
"role": "tool",
"content": json.dumps(tc.result)
})
else:
messages.append({"role": "assistant", "content": step.thought})
messages.append({"role": "assistant", "content": self.final_response})
return {
"trajectory_id": self.trajectory_id,
"messages": messages,
"reward": self.combined_score,
"task_type": self.task_type,
}
74.2 Trajectory Collection from Agent Logs
Collection Middleware
# atropos/collector.py
import json, time, uuid
from datetime import datetime
from openai import OpenAI
from .trajectory import Trajectory, ThinkingStep, ToolCall
class TrajectoryCollector:
def __init__(self, client: OpenAI, model: str, output_dir: str = "./trajectories"):
self.client = client
self.model = model
self.output_dir = output_dir
import os; os.makedirs(output_dir, exist_ok=True)
def run_and_collect(self, task, task_type, system_prompt, user_message,
tools, tool_dispatcher, max_iterations=20) -> Trajectory:
traj_id = str(uuid.uuid4())
start = time.time()
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
]
thinking_steps = []
total_tokens = 0
for step_idx in range(max_iterations):
resp = self.client.chat.completions.create(
model=self.model, messages=messages, tools=tools,
tool_choice="auto", temperature=0.3
)
msg = resp.choices[0].message
total_tokens += resp.usage.total_tokens if resp.usage else 0
messages.append(msg)
tool_calls_in_step = []
observation = ""
if msg.tool_calls:
for tc in msg.tool_calls:
tc_start = time.time()
args = json.loads(tc.function.arguments)
result = tool_dispatcher(tc.function.name, args)
tool_calls_in_step.append(ToolCall(
name=tc.function.name, arguments=args, result=result,
duration_ms=int((time.time() - tc_start) * 1000),
success="error" not in str(result).lower()
))
result_str = json.dumps(result)
observation += f"[{tc.function.name}]: {result_str[:500]}\n"
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str})
thinking_steps.append(ThinkingStep(
step_index=step_idx, thought=msg.content or "",
tool_calls=tool_calls_in_step, observation=observation
))
else:
thinking_steps.append(ThinkingStep(
step_index=step_idx, thought=msg.content or "",
tool_calls=[], observation=""
))
break
final = messages[-1].content if hasattr(messages[-1], "content") else messages[-1].get("content", "")
traj = Trajectory(
trajectory_id=traj_id, task=task, task_type=task_type,
initial_prompt=user_message, system_prompt=system_prompt,
thinking_steps=thinking_steps, final_response=final,
model_name=self.model, timestamp=datetime.now().isoformat(),
duration_seconds=time.time() - start, total_tokens=total_tokens
)
with open(f"{self.output_dir}/{traj_id}.json", "w") as f:
f.write(traj.to_json())
return traj
74.3 Judge Design & Reward Functions
# atropos/judges/base.py
from abc import ABC, abstractmethod
class BaseJudge(ABC):
@abstractmethod
def score(self, trajectory) -> float:
"""Return reward in [-1, 1]"""
pass
class ProcessJudge(BaseJudge):
def score(self, traj) -> float:
scores = []
all_calls = [tc for step in traj.thinking_steps for tc in step.tool_calls]
if all_calls:
success_rate = sum(1 for tc in all_calls if tc.success) / len(all_calls)
scores.append(success_rate * 2 - 1)
has_reasoning = all(len(s.thought.strip()) > 20 for s in traj.thinking_steps)
scores.append(0.5 if has_reasoning else -0.3)
step_count = len(traj.thinking_steps)
scores.append(0.5 if step_count <= 5 else 0.2 if step_count <= 10 else -0.3)
scores.append(0.3 if len(traj.final_response) >= 100 else -0.5)
return sum(scores) / len(scores) if scores else 0.0
class CompositeJudge(BaseJudge):
def __init__(self, judges, weights=None):
self.judges = judges
total = sum(weights or [1.0] * len(judges))
self.weights = [(w or 1.0) / total for w in (weights or [1.0] * len(judges))]
def score(self, traj) -> float:
return sum(j.score(traj) * w for j, w in zip(self.judges, self.weights))
74.4 Data Quality Filtering
# atropos/filter.py
import hashlib, random
from typing import List, Tuple
class TrajectoryFilter:
def __init__(self, min_reward=-0.5, min_steps=1, max_steps=20,
min_response_length=50, max_response_length=8000):
self.min_reward = min_reward
self.min_steps = min_steps
self.max_steps = max_steps
self.min_response_length = min_response_length
self.max_response_length = max_response_length
def filter_batch(self, trajectories) -> Tuple[list, dict]:
stats = {"total": len(trajectories), "passed": 0, "filtered": {}}
passed, seen = [], set()
for traj in trajectories:
reason = self._check(traj, seen)
if reason is None:
passed.append(traj)
stats["passed"] += 1
seen.add(self._fp(traj))
else:
stats["filtered"][reason] = stats["filtered"].get(reason, 0) + 1
return passed, stats
def _check(self, traj, seen) -> str | None:
if traj.combined_score is None: return "no_score"
if traj.combined_score < self.min_reward: return "low_reward"
steps = len(traj.thinking_steps)
if steps < self.min_steps: return "too_few_steps"
if steps > self.max_steps: return "too_many_steps"
resp = len(traj.final_response)
if resp < self.min_response_length: return "too_short"
if resp > self.max_response_length: return "too_long"
if self._fp(traj) in seen: return "duplicate"
return None
def _fp(self, traj) -> str:
key = traj.task[:100] + traj.final_response[:200]
return hashlib.md5(key.encode()).hexdigest()
def balance_rewards(self, trajectories, bins=5, max_per_bin=1000) -> list:
import numpy as np
edges = np.linspace(-1, 1, bins + 1)
buckets = [[] for _ in range(bins)]
for t in trajectories:
for i in range(bins):
if edges[i] <= t.combined_score <= edges[i + 1]:
buckets[i].append(t); break
balanced = []
for bucket in buckets:
balanced.extend(random.sample(bucket, min(len(bucket), max_per_bin)))
return balanced
74.5 Training Configuration & Script
# atropos/training/train.py
import json, os, torch
from datasets import Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from peft import LoraConfig, get_peft_model, TaskType
from trl import SFTTrainer
# Recommended LoRA configuration for Hermes/Mixtral
LORA_CONFIG = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=64, # Rank: higher = more expressiveness, more params
lora_alpha=128, # Scaling: alpha/r = 2 (standard)
lora_dropout=0.05,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj"],
bias="none",
)
TRAINING_ARGS = TrainingArguments(
output_dir="./checkpoints/hermes-atropos-v1",
num_train_epochs=3,
per_device_train_batch_size=2,
gradient_accumulation_steps=8, # effective batch = 16
learning_rate=2e-4,
lr_scheduler_type="cosine",
warmup_ratio=0.05,
max_grad_norm=1.0,
bf16=True,
gradient_checkpointing=True,
logging_steps=10,
save_steps=100,
eval_steps=50,
report_to=["wandb"],
run_name="atropos-hermes-r64",
)
def prepare_dataset(path: str, tokenizer, max_seq: int = 4096,
reward_threshold: float = 0.0) -> Dataset:
data = []
with open(path) as f:
for line in f:
item = json.loads(line)
if item.get("reward", 0) < reward_threshold:
continue
text = tokenizer.apply_chat_template(
item["messages"], tokenize=False, add_generation_prompt=False
)
tokens = tokenizer(text, return_tensors="pt")
if tokens["input_ids"].shape[1] <= max_seq:
data.append({"text": text, "reward": item["reward"]})
print(f"[Dataset] Loaded {len(data)} training samples")
return Dataset.from_list(data)
def train(base_model: str, train_path: str, eval_path: str = None):
tokenizer = AutoTokenizer.from_pretrained(base_model, padding_side="right")
if not tokenizer.pad_token:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
base_model, torch_dtype=torch.bfloat16, device_map="auto"
)
model.gradient_checkpointing_enable()
model = get_peft_model(model, LORA_CONFIG)
model.print_trainable_parameters()
train_ds = prepare_dataset(train_path, tokenizer)
eval_ds = prepare_dataset(eval_path, tokenizer) if eval_path else None
args = TRAINING_ARGS
if not eval_ds:
args = TrainingArguments(**{**vars(args), "evaluation_strategy": "no"})
trainer = SFTTrainer(
model=model, args=args, tokenizer=tokenizer,
train_dataset=train_ds, eval_dataset=eval_ds,
dataset_text_field="text", max_seq_length=4096, packing=True,
)
trainer.train()
trainer.save_model(os.path.join(args.output_dir, "final"))
tokenizer.save_pretrained(os.path.join(args.output_dir, "final"))
print("Training complete.")
74.6 Evaluating Improvement
LoRA Hyperparameter Impact
| Parameter | Low Value | High Value | Recommendation |
|---|---|---|---|
lora_r |
Fewer params, less expressive | More params, more expressive | 16-128 (start at 64) |
lora_alpha |
Smaller scaling | Larger scaling | = 2 * r |
learning_rate |
Slow convergence | Instability | 1e-4 to 3e-4 |
num_epochs |
Underfit | Overfit | 2-5 |
Benchmark Evaluation Script
# atropos/evaluation/benchmark.py
TASKS = [
{
"id": "code_debug", "task_type": "coding",
"prompt": "Find and fix the bug in this Python function: [...]",
"criteria": ["memoization", "lru_cache", "dynamic"]
},
{
"id": "research", "task_type": "research",
"prompt": "Summarize the latest developments in RAG technology",
"criteria": ["vector database", "embedding", "retrieval"]
},
]
def run_benchmark(agent_runner, model_version="baseline"):
results = []
for task in TASKS:
traj = agent_runner(task["prompt"], task["task_type"])
resp = traj.final_response.lower()
hits = sum(1 for c in task["criteria"] if c.lower() in resp)
results.append({
"id": task["id"],
"success_rate": hits / len(task["criteria"]),
"steps": len(traj.thinking_steps),
"duration": traj.duration_seconds
})
avg_success = sum(r["success_rate"] for r in results) / len(results)
print(f"\nBenchmark [{model_version}]")
print(f" Avg success rate: {avg_success:.1%}")
print(f" Avg steps: {sum(r['steps'] for r in results)/len(results):.1f}")
return {"version": model_version, "avg_success": avg_success, "tasks": results}
Chapter Summary
This chapter covered the complete Atropos RL technical stack:
- Trajectory format: Complete data structure covering thinking steps, tool calls, and observations
- Judge design: Outcome + Process + LLM judge composition for robust reward signals
- Data filtering: Multi-dimension filtering + reward distribution balancing for training data quality
- LoRA fine-tuning: Complete training configuration and script from data prep to model saving
- Effect evaluation: Standardized benchmark framework to quantify model improvements
Atropos RL's core value is breaking the bottleneck of "requiring human feedback to improve AI" — it lets agents automatically generate training signals during task execution, enabling true autonomous evolution.
Discussion Questions
- When a judge produces incorrect reward signals, how do you design detection and correction mechanisms?
- How much does LoRA rank choice affect final model quality? Is there a principled selection method?
- How do you prevent the model from "reward hacking" — optimizing judge scores without actually improving?
- For complex multi-tool tasks, how do you design a fair process reward function?