第 79 章

案例二:企业知识库 Agent(RAG + Memory Tool + Managed Agents 完整实现)

第七十九章:构建代码审查 Agent:自动 PR Review 与质量门控流水线

79.1 代码审查 Agent 的价值与挑战

代码审查(Code Review)是软件工程中成本最高的人工活动之一。研究表明,经验丰富的工程师平均每小时只能仔细审查 200-400 行代码。对于快速迭代的团队,代码审查往往成为发布周期的瓶颈。

AI 代码审查 Agent 可以:

然而,自动化代码审查也面临独特挑战:

本章提供一套完整的生产级代码审查 Agent 实现方案。

79.2 系统架构

GitHub/GitLab
     ↓ Webhook (PR opened/updated)
[Webhook Server]
     ↓
[PR 上下文收集器]
  - 获取 diff
  - 获取变更文件列表
  - 获取相关上下文文件
  - 获取 PR 描述和标题
     ↓
[代码审查 Agent (Claude)]
  - 安全漏洞扫描
  - 代码规范检查
  - 逻辑错误检测
  - 性能问题识别
  - 文档完整性检查
     ↓
[审查结果处理器]
  - 格式化评论
  - 判断质量门控结果
  - 通过 GitHub API 发布评论
     ↓
[质量门控]
  - Pass: 自动 Approve(可选)
  - Fail: 请求修改并阻断合并

79.3 Webhook 服务器

79.3.1 接收 GitHub Webhook

from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import hmac
import hashlib
import json
import httpx

app = FastAPI(title="Code Review Agent")

GITHUB_WEBHOOK_SECRET = "your-webhook-secret"
GITHUB_TOKEN = "your-github-token"
ANTHROPIC_API_KEY = "your-anthropic-key"

def verify_github_signature(payload: bytes, signature: str) -> bool:
    """验证 GitHub Webhook 签名"""
    expected = "sha256=" + hmac.new(
        GITHUB_WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.post("/webhook/github")
async def handle_github_webhook(
    request: Request,
    background_tasks: BackgroundTasks
):
    """GitHub Webhook 入口"""
    # 验证签名
    signature = request.headers.get("X-Hub-Signature-256", "")
    payload = await request.body()
    
    if not verify_github_signature(payload, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    event = request.headers.get("X-GitHub-Event")
    data = json.loads(payload)
    
    # 只处理 PR 事件
    if event == "pull_request":
        action = data.get("action")
        if action in ["opened", "synchronize", "reopened"]:
            # 异步处理,立即返回 200
            background_tasks.add_task(
                process_pull_request,
                data
            )
    
    return {"status": "received"}

79.3.2 PR 上下文收集

class GitHubPRContextCollector:
    """收集 PR 的完整上下文信息"""
    
    def __init__(self, token: str):
        self.token = token
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/vnd.github.v3+json"
        }
        self.base_url = "https://api.github.com"
    
    async def collect(self, repo_full_name: str, pr_number: int) -> dict:
        """
        收集 PR 的完整上下文
        """
        async with httpx.AsyncClient() as client:
            # 获取 PR 基础信息
            pr_info = await self._get_pr_info(client, repo_full_name, pr_number)
            
            # 获取 PR diff
            diff = await self._get_pr_diff(client, repo_full_name, pr_number)
            
            # 获取变更文件列表
            files = await self._get_pr_files(client, repo_full_name, pr_number)
            
            # 获取相关配置文件
            config_files = await self._get_config_files(
                client, repo_full_name, pr_info.get("base", {}).get("sha", "HEAD")
            )
        
        return {
            "pr_title": pr_info.get("title", ""),
            "pr_description": pr_info.get("body", ""),
            "pr_author": pr_info.get("user", {}).get("login", ""),
            "base_branch": pr_info.get("base", {}).get("ref", ""),
            "head_branch": pr_info.get("head", {}).get("ref", ""),
            "diff": diff,
            "changed_files": files,
            "config_files": config_files,
            "stats": {
                "additions": pr_info.get("additions", 0),
                "deletions": pr_info.get("deletions", 0),
                "changed_files_count": pr_info.get("changed_files", 0)
            }
        }
    
    async def _get_pr_diff(self, client, repo: str, pr_number: int) -> str:
        """获取 PR 的 unified diff"""
        response = await client.get(
            f"{self.base_url}/repos/{repo}/pulls/{pr_number}",
            headers={**self.headers, "Accept": "application/vnd.github.v3.diff"},
        )
        return response.text[:50000]  # 截取前 50K 字符,避免超出上下文
    
    async def _get_pr_files(self, client, repo: str, pr_number: int) -> list:
        """获取变更文件列表"""
        response = await client.get(
            f"{self.base_url}/repos/{repo}/pulls/{pr_number}/files",
            headers=self.headers,
            params={"per_page": 100}
        )
        files = response.json()
        return [
            {
                "filename": f["filename"],
                "status": f["status"],  # added/modified/removed
                "additions": f["additions"],
                "deletions": f["deletions"],
                "patch": f.get("patch", "")[:3000]  # 截取 patch
            }
            for f in files
        ]
    
    async def _get_config_files(self, client, repo: str, sha: str) -> dict:
        """获取项目配置文件"""
        config_paths = [
            ".eslintrc.json", ".eslintrc.js", "pyproject.toml", 
            "setup.cfg", ".flake8", "CONTRIBUTING.md", ".github/PULL_REQUEST_TEMPLATE.md"
        ]
        
        configs = {}
        for path in config_paths:
            try:
                response = await client.get(
                    f"{self.base_url}/repos/{repo}/contents/{path}",
                    headers=self.headers,
                    params={"ref": sha}
                )
                if response.status_code == 200:
                    import base64
                    content = base64.b64decode(response.json()["content"]).decode()
                    configs[path] = content[:2000]  # 截取配置文件
            except Exception:
                pass
        
        return configs
    
    async def _get_pr_info(self, client, repo: str, pr_number: int) -> dict:
        response = await client.get(
            f"{self.base_url}/repos/{repo}/pulls/{pr_number}",
            headers=self.headers
        )
        return response.json()

79.4 代码审查 Agent

79.4.1 审查 Prompt 设计

from anthropic import Anthropic

client = Anthropic()

CODE_REVIEW_SYSTEM = """你是一位经验丰富的高级工程师,专注于代码质量、安全性和可维护性。

审查原则:
1. **建设性**:指出问题时,始终提供具体的改进建议
2. **优先级分级**:区分 BLOCKER(阻断合并)、MAJOR(重要建议)、MINOR(细节建议)、PRAISE(值得称赞)
3. **精确定位**:指出具体文件名和行号
4. **上下文感知**:考虑变更的整体目的,避免对合理的设计决策提出无意义的批评
5. **误报控制**:宁可少报,不要过度报告不确定的问题

审查重点(按优先级):
- 🔴 BLOCKER:安全漏洞(SQL注入、XSS、硬编码凭据、权限绕过)
- 🔴 BLOCKER:数据丢失风险(未处理的异常、资源泄漏)
- 🟡 MAJOR:逻辑错误、边界条件未处理、性能O(n²)以上的热路径
- 🟡 MAJOR:缺少必要的错误处理
- 🟢 MINOR:代码规范、命名、注释
- 💪 PRAISE:值得肯定的设计决策或改进"""

def build_review_prompt(pr_context: dict) -> str:
    """构建代码审查 Prompt"""
    
    files_summary = "\n".join([
        f"- {f['filename']} ({f['status']}, +{f['additions']}/-{f['deletions']})"
        for f in pr_context["changed_files"][:20]
    ])
    
    config_context = ""
    if pr_context.get("config_files"):
        config_context = "\n\n项目配置参考:\n" + "\n".join([
            f"### {path}\n```\n{content[:500]}\n```"
            for path, content in list(pr_context["config_files"].items())[:3]
        ])
    
    return f"""请审查以下 Pull Request。

## PR 基本信息
- **标题**:{pr_context['pr_title']}
- **描述**:{pr_context['pr_description'] or '(无描述)'}
- **作者**:{pr_context['pr_author']}
- **目标分支**:{pr_context['base_branch']}
- **变更统计**:+{pr_context['stats']['additions']} / -{pr_context['stats']['deletions']}

## 变更文件列表
{files_summary}

{config_context}

## 代码变更(diff)

```diff
{pr_context['diff'][:40000]}

请按以下格式输出审查结果:

<review_summary> 对这次 PR 的整体评价(2-3句话) </review_summary>

[ {{ "severity": "BLOCKER|MAJOR|MINOR|PRAISE", "file": "文件路径", "line": 行号或null, "category": "security|logic|performance|style|docs|design", "title": "简短标题", "description": "详细说明", "suggestion": "具体改进建议(如有)" }} ] {{ "recommendation": "APPROVE|REQUEST_CHANGES|COMMENT", "blocker_count": 0, "major_count": 0, "minor_count": 0, "praise_count": 0, "summary": "一句话总结" }} """ ```

79.4.2 审查结果处理与发布

import re

class ReviewResultProcessor:
    """处理审查结果并发布到 GitHub"""
    
    def __init__(self, github_token: str):
        self.token = github_token
        self.headers = {
            "Authorization": f"Bearer {github_token}",
            "Accept": "application/vnd.github.v3+json"
        }
    
    def parse_review_response(self, raw_response: str) -> dict:
        """解析 Claude 的审查响应"""
        result = {
            "summary": "",
            "issues": [],
            "verdict": {}
        }
        
        # 解析摘要
        if "<review_summary>" in raw_response:
            result["summary"] = raw_response.split("<review_summary>")[1]\
                                            .split("</review_summary>")[0].strip()
        
        # 解析问题列表
        if "<issues>" in raw_response:
            issues_text = raw_response.split("<issues>")[1].split("</issues>")[0].strip()
            try:
                result["issues"] = json.loads(issues_text)
            except json.JSONDecodeError:
                # 尝试提取部分结构
                result["issues"] = self._extract_issues_fuzzy(issues_text)
        
        # 解析裁决
        if "<verdict>" in raw_response:
            verdict_text = raw_response.split("<verdict>")[1].split("</verdict>")[0].strip()
            try:
                result["verdict"] = json.loads(verdict_text)
            except json.JSONDecodeError:
                result["verdict"] = {"recommendation": "COMMENT", "summary": "解析失败"}
        
        return result
    
    def format_pr_comment(self, review_result: dict) -> str:
        """将审查结果格式化为 GitHub Markdown 评论"""
        
        verdict = review_result.get("verdict", {})
        issues = review_result.get("issues", [])
        
        # 统计图标
        recommendation = verdict.get("recommendation", "COMMENT")
        rec_emoji = {"APPROVE": "✅", "REQUEST_CHANGES": "❌", "COMMENT": "💬"}.get(recommendation, "💬")
        
        blockers = [i for i in issues if i.get("severity") == "BLOCKER"]
        majors = [i for i in issues if i.get("severity") == "MAJOR"]
        minors = [i for i in issues if i.get("severity") == "MINOR"]
        praises = [i for i in issues if i.get("severity") == "PRAISE"]
        
        comment = f"""## {rec_emoji} AI 代码审查报告

**总体评估**:{review_result.get('summary', '')}

**审查统计**:🔴 {len(blockers)} BLOCKER | 🟡 {len(majors)} MAJOR | 🟢 {len(minors)} MINOR | 💪 {len(praises)} PRAISE

---

"""
        
        # 输出问题
        if blockers:
            comment += "### 🔴 BLOCKER(必须修复)\n\n"
            for issue in blockers:
                comment += self._format_issue(issue)
        
        if majors:
            comment += "### 🟡 MAJOR(建议修复)\n\n"
            for issue in majors:
                comment += self._format_issue(issue)
        
        if minors:
            comment += "### 🟢 MINOR(可选优化)\n\n"
            for issue in minors:
                comment += self._format_issue(issue)
        
        if praises:
            comment += "### 💪 值得肯定\n\n"
            for issue in praises:
                comment += self._format_issue(issue)
        
        comment += f"""
---
*由 AI Code Review Agent 自动生成 | 模型:Claude Opus | [查看配置](.github/review-agent.yml)*
*此审查为辅助参考,最终决策由人工审查员负责*"""
        
        return comment
    
    def _format_issue(self, issue: dict) -> str:
        """格式化单个问题"""
        location = f"`{issue.get('file', 'unknown')}`"
        if issue.get('line'):
            location += f" (第 {issue['line']} 行)"
        
        return f"""**{issue.get('title', 'Issue')}** — {location}

{issue.get('description', '')}

{f"> 💡 建议:{issue['suggestion']}" if issue.get('suggestion') else ""}

"""
    
    async def post_review(
        self,
        repo: str,
        pr_number: int,
        review_result: dict
    ) -> bool:
        """发布审查评论到 GitHub"""
        comment_body = self.format_pr_comment(review_result)
        verdict = review_result.get("verdict", {})
        recommendation = verdict.get("recommendation", "COMMENT")
        
        # GitHub review event 映射
        event_map = {
            "APPROVE": "APPROVE",
            "REQUEST_CHANGES": "REQUEST_CHANGES",
            "COMMENT": "COMMENT"
        }
        
        async with httpx.AsyncClient() as client:
            # 发布 review
            response = await client.post(
                f"https://api.github.com/repos/{repo}/pulls/{pr_number}/reviews",
                headers=self.headers,
                json={
                    "body": comment_body,
                    "event": event_map.get(recommendation, "COMMENT")
                }
            )
            
            return response.status_code in [200, 201]

79.5 质量门控流水线

79.5.1 质量门控规则

class QualityGate:
    """质量门控:决定 PR 是否可以合并"""
    
    def __init__(self, config: dict):
        """
        config 示例:
        {
            "block_on_blocker": true,
            "block_on_major_count": 5,  # 超过5个 MAJOR 阻断
            "required_coverage": 0.80,  # 需要80%测试覆盖率
            "blocked_file_patterns": ["secrets.py", "*.pem"],
            "max_pr_size": 500  # 超过500行变更需要拆分
        }
        """
        self.config = config
    
    def evaluate(self, pr_context: dict, review_result: dict) -> dict:
        """评估 PR 是否通过质量门控"""
        failures = []
        warnings = []
        
        verdict = review_result.get("verdict", {})
        issues = review_result.get("issues", [])
        
        blockers = [i for i in issues if i.get("severity") == "BLOCKER"]
        majors = [i for i in issues if i.get("severity") == "MAJOR"]
        
        # 规则 1:BLOCKER 问题阻断合并
        if self.config.get("block_on_blocker", True) and blockers:
            failures.append({
                "rule": "no_blockers",
                "message": f"存在 {len(blockers)} 个 BLOCKER 问题,必须修复后才能合并",
                "issues": [b.get("title") for b in blockers]
            })
        
        # 规则 2:MAJOR 数量超限
        max_major = self.config.get("block_on_major_count", 999)
        if len(majors) > max_major:
            failures.append({
                "rule": "major_count_limit",
                "message": f"MAJOR 问题数量 ({len(majors)}) 超过限制 ({max_major})"
            })
        
        # 规则 3:PR 大小检查
        max_size = self.config.get("max_pr_size", 1000)
        total_changes = pr_context["stats"]["additions"] + pr_context["stats"]["deletions"]
        if total_changes > max_size:
            warnings.append({
                "rule": "pr_size",
                "message": f"PR 变更量 ({total_changes} 行) 较大,建议拆分为多个小 PR"
            })
        
        # 规则 4:检查敏感文件
        blocked_patterns = self.config.get("blocked_file_patterns", [])
        for file in pr_context["changed_files"]:
            filename = file["filename"]
            for pattern in blocked_patterns:
                if self._matches_pattern(filename, pattern):
                    failures.append({
                        "rule": "blocked_file",
                        "message": f"变更了受保护文件:{filename}"
                    })
        
        passed = len(failures) == 0
        
        return {
            "passed": passed,
            "failures": failures,
            "warnings": warnings,
            "recommendation": "APPROVE" if passed else "REQUEST_CHANGES"
        }
    
    def _matches_pattern(self, filename: str, pattern: str) -> bool:
        import fnmatch
        return fnmatch.fnmatch(filename, pattern)

79.5.2 完整的 PR 处理流程

async def process_pull_request(data: dict):
    """完整的 PR 审查流程"""
    
    repo = data["repository"]["full_name"]
    pr_number = data["pull_request"]["number"]
    
    print(f"处理 PR #{pr_number} in {repo}")
    
    try:
        # Step 1: 收集上下文
        collector = GitHubPRContextCollector(GITHUB_TOKEN)
        pr_context = await collector.collect(repo, pr_number)
        
        # Step 2: 调用 Claude 审查
        review_prompt = build_review_prompt(pr_context)
        
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=4000,
            system=CODE_REVIEW_SYSTEM,
            messages=[{"role": "user", "content": review_prompt}]
        )
        
        raw_review = response.content[0].text
        
        # Step 3: 解析审查结果
        processor = ReviewResultProcessor(GITHUB_TOKEN)
        review_result = processor.parse_review_response(raw_review)
        
        # Step 4: 质量门控评估
        gate_config = {
            "block_on_blocker": True,
            "block_on_major_count": 5,
            "max_pr_size": 800
        }
        gate = QualityGate(gate_config)
        gate_result = gate.evaluate(pr_context, review_result)
        
        # 合并质量门控结果到 review
        if not gate_result["passed"] and gate_result["recommendation"] == "REQUEST_CHANGES":
            review_result["verdict"]["recommendation"] = "REQUEST_CHANGES"
        
        # Step 5: 发布审查评论
        success = await processor.post_review(repo, pr_number, review_result)
        
        print(f"PR #{pr_number} 审查完成,通过质量门控:{gate_result['passed']}")
        
    except Exception as e:
        print(f"PR #{pr_number} 审查失败:{e}")
        # 发送失败通知
        await post_error_comment(repo, pr_number, str(e))

79.6 高级功能

79.6.1 增量审查(只审查新增变更)

def get_incremental_diff(
    full_diff: str,
    previous_review_sha: str,
    current_sha: str
) -> str:
    """
    只获取相对于上次审查的新增变更
    避免对已审查的代码重复评论
    """
    # 生产环境中需要调用 GitHub compare API
    # https://api.github.com/repos/{owner}/{repo}/compare/{base}...{head}
    pass

79.6.2 专项审查模式

对于特定类型的文件或变更,可以启用专项审查模式:

SPECIALIZED_REVIEW_PROMPTS = {
    "security": """特别关注以下安全问题:
- SQL 注入:是否有未参数化的数据库查询?
- XSS:是否有未转义的用户输入直接输出到 HTML?
- 硬编码密钥:代码中是否有 API key、密码、token?
- 权限检查:关键操作是否有适当的鉴权?
- 依赖安全:新增依赖是否有已知漏洞?""",
    
    "performance": """特别关注以下性能问题:
- N+1 查询:是否有在循环内进行数据库查询?
- 未使用索引:高频查询是否利用了索引?
- 内存泄漏:是否有资源未被正确释放?
- 同步阻塞:是否有不必要的同步操作可以改为异步?""",
    
    "api_design": """特别关注 API 设计质量:
- RESTful 规范遵守
- 错误响应格式一致性
- 向后兼容性(是否破坏了现有客户端)
- API 文档完整性"""
}

小结

代码审查 Agent 的核心工程模式是:Webhook 触发 → 上下文收集 → AI 分析 → 结构化输出 → API 发布。每个环节都有工程挑战:Webhook 的可靠性与安全性、上下文收集的完整性与 Token 限制的平衡、审查结果的结构化解析、以及质量门控的阈值调优。

关键成功因素是误报控制:一个发出太多无意义评论的审查机器人会比没有机器人更糟糕,因为工程师会开始忽视所有机器人评论,最终失去信任。从保守的阈值开始,随着数据积累逐步调整,是最稳健的上线策略。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论