第 79 章
案例二:企业知识库 Agent(RAG + Memory Tool + Managed Agents 完整实现)
第七十九章:构建代码审查 Agent:自动 PR Review 与质量门控流水线
79.1 代码审查 Agent 的价值与挑战
代码审查(Code Review)是软件工程中成本最高的人工活动之一。研究表明,经验丰富的工程师平均每小时只能仔细审查 200-400 行代码。对于快速迭代的团队,代码审查往往成为发布周期的瓶颈。
AI 代码审查 Agent 可以:
- 24/7 即时响应,将 PR Review 等待时间从小时级降至分钟级
- 一致地检查编码规范、安全漏洞、性能问题
- 释放人类工程师专注于架构设计和业务逻辑审查
然而,自动化代码审查也面临独特挑战:
- 上下文理解:PR 变更不能脱离项目整体理解
- 误报控制:过多的误报会让工程师失去信任
- 质量门控:如何将 AI 审查结果集成到 CI/CD 流水线
本章提供一套完整的生产级代码审查 Agent 实现方案。
79.2 系统架构
GitHub/GitLab
↓ Webhook (PR opened/updated)
[Webhook Server]
↓
[PR 上下文收集器]
- 获取 diff
- 获取变更文件列表
- 获取相关上下文文件
- 获取 PR 描述和标题
↓
[代码审查 Agent (Claude)]
- 安全漏洞扫描
- 代码规范检查
- 逻辑错误检测
- 性能问题识别
- 文档完整性检查
↓
[审查结果处理器]
- 格式化评论
- 判断质量门控结果
- 通过 GitHub API 发布评论
↓
[质量门控]
- Pass: 自动 Approve(可选)
- Fail: 请求修改并阻断合并
79.3 Webhook 服务器
79.3.1 接收 GitHub Webhook
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import hmac
import hashlib
import json
import httpx
app = FastAPI(title="Code Review Agent")
GITHUB_WEBHOOK_SECRET = "your-webhook-secret"
GITHUB_TOKEN = "your-github-token"
ANTHROPIC_API_KEY = "your-anthropic-key"
def verify_github_signature(payload: bytes, signature: str) -> bool:
"""验证 GitHub Webhook 签名"""
expected = "sha256=" + hmac.new(
GITHUB_WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhook/github")
async def handle_github_webhook(
request: Request,
background_tasks: BackgroundTasks
):
"""GitHub Webhook 入口"""
# 验证签名
signature = request.headers.get("X-Hub-Signature-256", "")
payload = await request.body()
if not verify_github_signature(payload, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = request.headers.get("X-GitHub-Event")
data = json.loads(payload)
# 只处理 PR 事件
if event == "pull_request":
action = data.get("action")
if action in ["opened", "synchronize", "reopened"]:
# 异步处理,立即返回 200
background_tasks.add_task(
process_pull_request,
data
)
return {"status": "received"}
79.3.2 PR 上下文收集
class GitHubPRContextCollector:
"""收集 PR 的完整上下文信息"""
def __init__(self, token: str):
self.token = token
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github.v3+json"
}
self.base_url = "https://api.github.com"
async def collect(self, repo_full_name: str, pr_number: int) -> dict:
"""
收集 PR 的完整上下文
"""
async with httpx.AsyncClient() as client:
# 获取 PR 基础信息
pr_info = await self._get_pr_info(client, repo_full_name, pr_number)
# 获取 PR diff
diff = await self._get_pr_diff(client, repo_full_name, pr_number)
# 获取变更文件列表
files = await self._get_pr_files(client, repo_full_name, pr_number)
# 获取相关配置文件
config_files = await self._get_config_files(
client, repo_full_name, pr_info.get("base", {}).get("sha", "HEAD")
)
return {
"pr_title": pr_info.get("title", ""),
"pr_description": pr_info.get("body", ""),
"pr_author": pr_info.get("user", {}).get("login", ""),
"base_branch": pr_info.get("base", {}).get("ref", ""),
"head_branch": pr_info.get("head", {}).get("ref", ""),
"diff": diff,
"changed_files": files,
"config_files": config_files,
"stats": {
"additions": pr_info.get("additions", 0),
"deletions": pr_info.get("deletions", 0),
"changed_files_count": pr_info.get("changed_files", 0)
}
}
async def _get_pr_diff(self, client, repo: str, pr_number: int) -> str:
"""获取 PR 的 unified diff"""
response = await client.get(
f"{self.base_url}/repos/{repo}/pulls/{pr_number}",
headers={**self.headers, "Accept": "application/vnd.github.v3.diff"},
)
return response.text[:50000] # 截取前 50K 字符,避免超出上下文
async def _get_pr_files(self, client, repo: str, pr_number: int) -> list:
"""获取变更文件列表"""
response = await client.get(
f"{self.base_url}/repos/{repo}/pulls/{pr_number}/files",
headers=self.headers,
params={"per_page": 100}
)
files = response.json()
return [
{
"filename": f["filename"],
"status": f["status"], # added/modified/removed
"additions": f["additions"],
"deletions": f["deletions"],
"patch": f.get("patch", "")[:3000] # 截取 patch
}
for f in files
]
async def _get_config_files(self, client, repo: str, sha: str) -> dict:
"""获取项目配置文件"""
config_paths = [
".eslintrc.json", ".eslintrc.js", "pyproject.toml",
"setup.cfg", ".flake8", "CONTRIBUTING.md", ".github/PULL_REQUEST_TEMPLATE.md"
]
configs = {}
for path in config_paths:
try:
response = await client.get(
f"{self.base_url}/repos/{repo}/contents/{path}",
headers=self.headers,
params={"ref": sha}
)
if response.status_code == 200:
import base64
content = base64.b64decode(response.json()["content"]).decode()
configs[path] = content[:2000] # 截取配置文件
except Exception:
pass
return configs
async def _get_pr_info(self, client, repo: str, pr_number: int) -> dict:
response = await client.get(
f"{self.base_url}/repos/{repo}/pulls/{pr_number}",
headers=self.headers
)
return response.json()
79.4 代码审查 Agent
79.4.1 审查 Prompt 设计
from anthropic import Anthropic
client = Anthropic()
CODE_REVIEW_SYSTEM = """你是一位经验丰富的高级工程师,专注于代码质量、安全性和可维护性。
审查原则:
1. **建设性**:指出问题时,始终提供具体的改进建议
2. **优先级分级**:区分 BLOCKER(阻断合并)、MAJOR(重要建议)、MINOR(细节建议)、PRAISE(值得称赞)
3. **精确定位**:指出具体文件名和行号
4. **上下文感知**:考虑变更的整体目的,避免对合理的设计决策提出无意义的批评
5. **误报控制**:宁可少报,不要过度报告不确定的问题
审查重点(按优先级):
- 🔴 BLOCKER:安全漏洞(SQL注入、XSS、硬编码凭据、权限绕过)
- 🔴 BLOCKER:数据丢失风险(未处理的异常、资源泄漏)
- 🟡 MAJOR:逻辑错误、边界条件未处理、性能O(n²)以上的热路径
- 🟡 MAJOR:缺少必要的错误处理
- 🟢 MINOR:代码规范、命名、注释
- 💪 PRAISE:值得肯定的设计决策或改进"""
def build_review_prompt(pr_context: dict) -> str:
"""构建代码审查 Prompt"""
files_summary = "\n".join([
f"- {f['filename']} ({f['status']}, +{f['additions']}/-{f['deletions']})"
for f in pr_context["changed_files"][:20]
])
config_context = ""
if pr_context.get("config_files"):
config_context = "\n\n项目配置参考:\n" + "\n".join([
f"### {path}\n```\n{content[:500]}\n```"
for path, content in list(pr_context["config_files"].items())[:3]
])
return f"""请审查以下 Pull Request。
## PR 基本信息
- **标题**:{pr_context['pr_title']}
- **描述**:{pr_context['pr_description'] or '(无描述)'}
- **作者**:{pr_context['pr_author']}
- **目标分支**:{pr_context['base_branch']}
- **变更统计**:+{pr_context['stats']['additions']} / -{pr_context['stats']['deletions']}
## 变更文件列表
{files_summary}
{config_context}
## 代码变更(diff)
```diff
{pr_context['diff'][:40000]}
请按以下格式输出审查结果:
<review_summary> 对这次 PR 的整体评价(2-3句话) </review_summary>
79.4.2 审查结果处理与发布
import re
class ReviewResultProcessor:
"""处理审查结果并发布到 GitHub"""
def __init__(self, github_token: str):
self.token = github_token
self.headers = {
"Authorization": f"Bearer {github_token}",
"Accept": "application/vnd.github.v3+json"
}
def parse_review_response(self, raw_response: str) -> dict:
"""解析 Claude 的审查响应"""
result = {
"summary": "",
"issues": [],
"verdict": {}
}
# 解析摘要
if "<review_summary>" in raw_response:
result["summary"] = raw_response.split("<review_summary>")[1]\
.split("</review_summary>")[0].strip()
# 解析问题列表
if "<issues>" in raw_response:
issues_text = raw_response.split("<issues>")[1].split("</issues>")[0].strip()
try:
result["issues"] = json.loads(issues_text)
except json.JSONDecodeError:
# 尝试提取部分结构
result["issues"] = self._extract_issues_fuzzy(issues_text)
# 解析裁决
if "<verdict>" in raw_response:
verdict_text = raw_response.split("<verdict>")[1].split("</verdict>")[0].strip()
try:
result["verdict"] = json.loads(verdict_text)
except json.JSONDecodeError:
result["verdict"] = {"recommendation": "COMMENT", "summary": "解析失败"}
return result
def format_pr_comment(self, review_result: dict) -> str:
"""将审查结果格式化为 GitHub Markdown 评论"""
verdict = review_result.get("verdict", {})
issues = review_result.get("issues", [])
# 统计图标
recommendation = verdict.get("recommendation", "COMMENT")
rec_emoji = {"APPROVE": "✅", "REQUEST_CHANGES": "❌", "COMMENT": "💬"}.get(recommendation, "💬")
blockers = [i for i in issues if i.get("severity") == "BLOCKER"]
majors = [i for i in issues if i.get("severity") == "MAJOR"]
minors = [i for i in issues if i.get("severity") == "MINOR"]
praises = [i for i in issues if i.get("severity") == "PRAISE"]
comment = f"""## {rec_emoji} AI 代码审查报告
**总体评估**:{review_result.get('summary', '')}
**审查统计**:🔴 {len(blockers)} BLOCKER | 🟡 {len(majors)} MAJOR | 🟢 {len(minors)} MINOR | 💪 {len(praises)} PRAISE
---
"""
# 输出问题
if blockers:
comment += "### 🔴 BLOCKER(必须修复)\n\n"
for issue in blockers:
comment += self._format_issue(issue)
if majors:
comment += "### 🟡 MAJOR(建议修复)\n\n"
for issue in majors:
comment += self._format_issue(issue)
if minors:
comment += "### 🟢 MINOR(可选优化)\n\n"
for issue in minors:
comment += self._format_issue(issue)
if praises:
comment += "### 💪 值得肯定\n\n"
for issue in praises:
comment += self._format_issue(issue)
comment += f"""
---
*由 AI Code Review Agent 自动生成 | 模型:Claude Opus | [查看配置](.github/review-agent.yml)*
*此审查为辅助参考,最终决策由人工审查员负责*"""
return comment
def _format_issue(self, issue: dict) -> str:
"""格式化单个问题"""
location = f"`{issue.get('file', 'unknown')}`"
if issue.get('line'):
location += f" (第 {issue['line']} 行)"
return f"""**{issue.get('title', 'Issue')}** — {location}
{issue.get('description', '')}
{f"> 💡 建议:{issue['suggestion']}" if issue.get('suggestion') else ""}
"""
async def post_review(
self,
repo: str,
pr_number: int,
review_result: dict
) -> bool:
"""发布审查评论到 GitHub"""
comment_body = self.format_pr_comment(review_result)
verdict = review_result.get("verdict", {})
recommendation = verdict.get("recommendation", "COMMENT")
# GitHub review event 映射
event_map = {
"APPROVE": "APPROVE",
"REQUEST_CHANGES": "REQUEST_CHANGES",
"COMMENT": "COMMENT"
}
async with httpx.AsyncClient() as client:
# 发布 review
response = await client.post(
f"https://api.github.com/repos/{repo}/pulls/{pr_number}/reviews",
headers=self.headers,
json={
"body": comment_body,
"event": event_map.get(recommendation, "COMMENT")
}
)
return response.status_code in [200, 201]
79.5 质量门控流水线
79.5.1 质量门控规则
class QualityGate:
"""质量门控:决定 PR 是否可以合并"""
def __init__(self, config: dict):
"""
config 示例:
{
"block_on_blocker": true,
"block_on_major_count": 5, # 超过5个 MAJOR 阻断
"required_coverage": 0.80, # 需要80%测试覆盖率
"blocked_file_patterns": ["secrets.py", "*.pem"],
"max_pr_size": 500 # 超过500行变更需要拆分
}
"""
self.config = config
def evaluate(self, pr_context: dict, review_result: dict) -> dict:
"""评估 PR 是否通过质量门控"""
failures = []
warnings = []
verdict = review_result.get("verdict", {})
issues = review_result.get("issues", [])
blockers = [i for i in issues if i.get("severity") == "BLOCKER"]
majors = [i for i in issues if i.get("severity") == "MAJOR"]
# 规则 1:BLOCKER 问题阻断合并
if self.config.get("block_on_blocker", True) and blockers:
failures.append({
"rule": "no_blockers",
"message": f"存在 {len(blockers)} 个 BLOCKER 问题,必须修复后才能合并",
"issues": [b.get("title") for b in blockers]
})
# 规则 2:MAJOR 数量超限
max_major = self.config.get("block_on_major_count", 999)
if len(majors) > max_major:
failures.append({
"rule": "major_count_limit",
"message": f"MAJOR 问题数量 ({len(majors)}) 超过限制 ({max_major})"
})
# 规则 3:PR 大小检查
max_size = self.config.get("max_pr_size", 1000)
total_changes = pr_context["stats"]["additions"] + pr_context["stats"]["deletions"]
if total_changes > max_size:
warnings.append({
"rule": "pr_size",
"message": f"PR 变更量 ({total_changes} 行) 较大,建议拆分为多个小 PR"
})
# 规则 4:检查敏感文件
blocked_patterns = self.config.get("blocked_file_patterns", [])
for file in pr_context["changed_files"]:
filename = file["filename"]
for pattern in blocked_patterns:
if self._matches_pattern(filename, pattern):
failures.append({
"rule": "blocked_file",
"message": f"变更了受保护文件:{filename}"
})
passed = len(failures) == 0
return {
"passed": passed,
"failures": failures,
"warnings": warnings,
"recommendation": "APPROVE" if passed else "REQUEST_CHANGES"
}
def _matches_pattern(self, filename: str, pattern: str) -> bool:
import fnmatch
return fnmatch.fnmatch(filename, pattern)
79.5.2 完整的 PR 处理流程
async def process_pull_request(data: dict):
"""完整的 PR 审查流程"""
repo = data["repository"]["full_name"]
pr_number = data["pull_request"]["number"]
print(f"处理 PR #{pr_number} in {repo}")
try:
# Step 1: 收集上下文
collector = GitHubPRContextCollector(GITHUB_TOKEN)
pr_context = await collector.collect(repo, pr_number)
# Step 2: 调用 Claude 审查
review_prompt = build_review_prompt(pr_context)
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4000,
system=CODE_REVIEW_SYSTEM,
messages=[{"role": "user", "content": review_prompt}]
)
raw_review = response.content[0].text
# Step 3: 解析审查结果
processor = ReviewResultProcessor(GITHUB_TOKEN)
review_result = processor.parse_review_response(raw_review)
# Step 4: 质量门控评估
gate_config = {
"block_on_blocker": True,
"block_on_major_count": 5,
"max_pr_size": 800
}
gate = QualityGate(gate_config)
gate_result = gate.evaluate(pr_context, review_result)
# 合并质量门控结果到 review
if not gate_result["passed"] and gate_result["recommendation"] == "REQUEST_CHANGES":
review_result["verdict"]["recommendation"] = "REQUEST_CHANGES"
# Step 5: 发布审查评论
success = await processor.post_review(repo, pr_number, review_result)
print(f"PR #{pr_number} 审查完成,通过质量门控:{gate_result['passed']}")
except Exception as e:
print(f"PR #{pr_number} 审查失败:{e}")
# 发送失败通知
await post_error_comment(repo, pr_number, str(e))
79.6 高级功能
79.6.1 增量审查(只审查新增变更)
def get_incremental_diff(
full_diff: str,
previous_review_sha: str,
current_sha: str
) -> str:
"""
只获取相对于上次审查的新增变更
避免对已审查的代码重复评论
"""
# 生产环境中需要调用 GitHub compare API
# https://api.github.com/repos/{owner}/{repo}/compare/{base}...{head}
pass
79.6.2 专项审查模式
对于特定类型的文件或变更,可以启用专项审查模式:
SPECIALIZED_REVIEW_PROMPTS = {
"security": """特别关注以下安全问题:
- SQL 注入:是否有未参数化的数据库查询?
- XSS:是否有未转义的用户输入直接输出到 HTML?
- 硬编码密钥:代码中是否有 API key、密码、token?
- 权限检查:关键操作是否有适当的鉴权?
- 依赖安全:新增依赖是否有已知漏洞?""",
"performance": """特别关注以下性能问题:
- N+1 查询:是否有在循环内进行数据库查询?
- 未使用索引:高频查询是否利用了索引?
- 内存泄漏:是否有资源未被正确释放?
- 同步阻塞:是否有不必要的同步操作可以改为异步?""",
"api_design": """特别关注 API 设计质量:
- RESTful 规范遵守
- 错误响应格式一致性
- 向后兼容性(是否破坏了现有客户端)
- API 文档完整性"""
}
小结
代码审查 Agent 的核心工程模式是:Webhook 触发 → 上下文收集 → AI 分析 → 结构化输出 → API 发布。每个环节都有工程挑战:Webhook 的可靠性与安全性、上下文收集的完整性与 Token 限制的平衡、审查结果的结构化解析、以及质量门控的阈值调优。
关键成功因素是误报控制:一个发出太多无意义评论的审查机器人会比没有机器人更糟糕,因为工程师会开始忽视所有机器人评论,最终失去信任。从保守的阈值开始,随着数据积累逐步调整,是最稳健的上线策略。