RAG 管道:检索增强生成
RAG 管道:检索增强生成
L1:概念层——为什么 RAG 存在
LLM 的三大根本局限
大型语言模型是人类知识的压缩表示,但这个"压缩"过程引入了三个根本性的局限,这些局限无法通过让模型变得更大来解决。
局限一:知识截止日期(Knowledge Cutoff)
LLM 的训练数据在某个时间点截止。Claude 不知道今天发生的新闻,GPT-4 不了解最新的 API 变更,任何模型都无法访问你公司内部的业务数据。这不是 bug,而是所有基于训练数据的系统必然具有的特性。
局限二:私有数据不可见
你的公司可能有数以万计的内部文档、客服记录、技术文档、会议纪要。这些数据从未出现在任何公开的训练数据集中。即使你使用了最强大的 LLM,它对这些数据也一无所知。
局限三:幻觉(Hallucination)
当 LLM 被问及其不确定的问题时,它倾向于生成听起来合理但实际错误的内容——这就是"幻觉"。幻觉的根本原因是:LLM 的目标是生成语言上连贯、语义上合理的文本,而非严格与事实一致。
RAG 的核心思路:先检索,再生成
RAG(Retrieval-Augmented Generation,检索增强生成)的思路非常直接:
在向 LLM 提问之前,先从知识库中检索出与问题最相关的文档片段,然后将这些片段作为上下文一起发送给 LLM,让 LLM 基于这些证据来生成答案。
这个简单的想法解决了上述三个局限:
- 知识库可以随时更新(解决知识截止问题)
- 私有数据可以加入知识库(解决私有数据问题)
- LLM 基于真实证据作答,幻觉大幅减少(解决幻觉问题)
从信息检索的视角看,RAG 就是把传统搜索引擎(检索相关文档)和 LLM(理解和生成文本)结合起来。每一部分都做它最擅长的事。
RAG vs 微调:应该选哪个?
初学者常常困惑:为什么不直接用私有数据微调(fine-tune)LLM,而要用 RAG?
微调(Fine-tuning)的优势:
- 模型可以学会特定领域的语言风格和术语
- 推理速度更快(无需检索步骤)
- 适合让模型学会特定的输出格式或行为模式
微调的局限:
- 数据更新成本高:每次知识库更新都需要重新微调,时间和计算成本极高
- 无法处理超长文档:微调只能改变模型的"直觉",无法让模型"记住"海量文档的具体内容
- 幻觉问题仍然存在:微调不能保证模型严格基于训练数据作答
- 成本高昂:高质量微调需要大量 GPU 计算和专业标注数据
RAG 的优势:
- 知识库可以实时更新(添加新文档,删除过时内容)
- 答案可溯源(可以告诉用户"这个答案来自文档 X 的第 Y 页")
- 成本可控(只需维护向量数据库,无需重新训练模型)
- 与幻觉作斗争更有效(LLM 被明确要求"基于以下内容回答")
结论:大多数企业 AI 应用应该先尝试 RAG,只有当 RAG 的效果确实不够好,且有足够的高质量标注数据时,才考虑微调。
L2:原理层——RAG 管道的两个阶段
阶段一:数据摄入(Ingestion Pipeline)
摄入管道将原始文档转换为可检索的向量索引。这个过程分为四个步骤:
步骤 1:加载(Load)
加载不同格式的原始文档:
PDF → 文本提取(保留章节结构)
Word → 段落提取
Markdown → 直接使用
HTML → 去除标签,保留内容
CSV/Excel → 行内容转文本
数据库记录 → 格式化为文本
文档加载是看似简单实则复杂的步骤。难点在于:
- PDF 可能是扫描图像(需要 OCR)
- 文档可能包含表格(如何保留表格语义?)
- 有些文档有特殊的章节结构(如法律合同、技术手册)
步骤 2:分块(Chunk)
将长文档切分为较小的块(chunk)。这一步是 RAG 性能的关键决定因素之一。
为什么需要分块?
- Embedding 模型有 token 长度限制(通常 512-8192 tokens)
- 检索时返回的是块,块太大会引入不相关内容,块太小会丢失上下文
- 向量相似度在语义内聚的文本块上效果最好
三种主要分块策略:
固定大小分块(Fixed-size chunking):
按照固定的 token 数量切分,相邻块有重叠(overlap)
优点:实现简单,适合结构不明显的文档
缺点:可能在句子中间切断,破坏语义完整性
语义分块(Semantic chunking):
在语义边界处切分(段落、节、句子)
优点:保留了完整的语义单元
缺点:块大小不均匀,实现复杂
递归字符分块(Recursive character chunking):
按照分隔符层次递归切分:["\n\n", "\n", ".", " "]
优先在段落边界切分,其次是句子边界,最后是词边界
兼顾了固定大小和语义边界的优点
步骤 3:嵌入(Embed)
将文本块转换为密集向量(dense vector)。这一步使用 Embedding 模型,将语义相近的文本映射到向量空间中相邻的位置。
主流 Embedding 模型对比:
| 模型 | 维度 | 最大 tokens | 特点 |
|---|---|---|---|
| text-embedding-3-small | 1536 | 8191 | OpenAI,性价比高 |
| text-embedding-3-large | 3072 | 8191 | OpenAI,精度最高 |
| voyage-3 | 1024 | 32000 | Anthropic 推荐,长文档效果好 |
| nomic-embed-text | 768 | 8192 | 开源,可本地部署 |
步骤 4:存储(Store)
将向量存入向量数据库。主流向量数据库对比:
| 数据库 | 类型 | 特点 |
|---|---|---|
| pgvector | PostgreSQL 扩展 | 与现有 PG 栈无缝集成,适合中等规模 |
| Qdrant | 独立服务 | 性能优秀,丰富的过滤功能 |
| Chroma | 嵌入式 | 开发简单,适合本地原型 |
| Pinecone | 云服务 | 全托管,适合大规模生产 |
| Weaviate | 独立服务 | 内置混合搜索 |
阶段二:检索与生成(Retrieval and Generation)
步骤 1:嵌入查询
用户的问题使用同一个 Embedding 模型转换为向量。这一步的关键是:查询向量和文档向量必须使用同一个模型,才能在同一个向量空间中比较。
步骤 2:近似最近邻搜索(ANN Search)
在向量数据库中找出与查询向量最相似的 K 个文档块。相似度度量方式:
余弦相似度(Cosine Similarity): $$\text{sim}(A, B) = \frac{A \cdot B}{|A| |B|}$$ 最常用,不受向量长度影响,适合文本 Embedding。
内积(Dot Product): $$\text{sim}(A, B) = A \cdot B$$ 适合已归一化的向量(归一化后等同于余弦相似度)。
欧氏距离(Euclidean Distance): $$d(A, B) = \sqrt{\sum_i (A_i - B_i)^2}$$ 适合图像 Embedding 等场景,文本场景较少使用。
步骤 3:重排序(Rerank)
初始检索(使用向量相似度)的精度有限。重排序步骤使用交叉编码器(cross-encoder)模型对检索结果进行精排:
双编码器(向量相似度):快,但精度有限
交叉编码器(重排序):慢,但精度更高
工作流:先用双编码器快速召回 Top-50,再用交叉编码器精排为 Top-5
常用重排序服务:Cohere Rerank、Jina Reranker、BGE Reranker(开源)。
步骤 4:上下文组装与生成
将检索到的文档块组装成上下文,发送给 LLM:
[系统提示]
你是一个专业助手。请仅基于以下参考资料回答问题。
如果参考资料中没有相关信息,请明确说明。
[参考资料]
来源:文档A,第3章
内容:...
来源:文档B,第7页
内容:...
[用户问题]
请问...
L3:代码实践——构建完整 Go RAG 管道
文档加载器
// rag/loader.go
package rag
import (
"bufio"
"fmt"
"os"
"path/filepath"
"strings"
)
// Document 代表一个加载的文档
type Document struct {
ID string
Content string
Source string // 文件路径或 URL
Metadata map[string]string
}
// MarkdownLoader 加载 Markdown 文件
type MarkdownLoader struct{}
func (l *MarkdownLoader) Load(path string) ([]Document, error) {
files, err := filepath.Glob(path)
if err != nil {
return nil, err
}
var docs []Document
for _, file := range files {
content, err := os.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("read %s: %w", file, err)
}
docs = append(docs, Document{
ID: file,
Content: string(content),
Source: file,
Metadata: map[string]string{
"filename": filepath.Base(file),
"type": "markdown",
},
})
}
return docs, nil
}
// DirectoryLoader 递归加载目录下所有文档
type DirectoryLoader struct {
Extensions []string // 支持的文件扩展名
}
func (l *DirectoryLoader) Load(dirPath string) ([]Document, error) {
var docs []Document
err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
ext := strings.ToLower(filepath.Ext(path))
for _, supported := range l.Extensions {
if ext == supported {
content, err := os.ReadFile(path)
if err != nil {
return err
}
docs = append(docs, Document{
ID: path,
Content: string(content),
Source: path,
Metadata: map[string]string{
"filename": info.Name(),
"type": ext[1:],
},
})
break
}
}
return nil
})
return docs, err
}
递归文本分割器
// rag/splitter.go
package rag
import (
"strings"
"unicode/utf8"
)
// Chunk 代表一个文本块
type Chunk struct {
Content string
Source string
ChunkIdx int
Metadata map[string]string
}
// RecursiveTextSplitter 实现递归字符分割
type RecursiveTextSplitter struct {
ChunkSize int // 最大 token 数(近似为字符数/4)
ChunkOverlap int // 重叠字符数
Separators []string // 分隔符优先级列表
}
func NewRecursiveTextSplitter(chunkSize, overlap int) *RecursiveTextSplitter {
return &RecursiveTextSplitter{
ChunkSize: chunkSize,
ChunkOverlap: overlap,
Separators: []string{"\n\n", "\n", ". ", "! ", "? ", ", ", " ", ""},
}
}
// Split 将一个文档切分为多个 Chunk
func (s *RecursiveTextSplitter) Split(doc Document) []Chunk {
texts := s.splitText(doc.Content, s.Separators)
var chunks []Chunk
for i, text := range texts {
if strings.TrimSpace(text) == "" {
continue
}
chunk := Chunk{
Content: text,
Source: doc.Source,
ChunkIdx: i,
Metadata: doc.Metadata,
}
chunks = append(chunks, chunk)
}
return chunks
}
func (s *RecursiveTextSplitter) splitText(text string, separators []string) []string {
if len(separators) == 0 || utf8.RuneCountInString(text) <= s.ChunkSize {
return []string{text}
}
separator := separators[len(separators)-1] // 默认:字符级别
for _, sep := range separators {
if strings.Contains(text, sep) {
separator = sep
break
}
}
var goodSplits []string
var currentChunk strings.Builder
splits := strings.Split(text, separator)
for _, split := range splits {
if currentChunk.Len()+len(split)+len(separator) <= s.ChunkSize*4 { // 近似 token 计数
if currentChunk.Len() > 0 {
currentChunk.WriteString(separator)
}
currentChunk.WriteString(split)
} else {
if currentChunk.Len() > 0 {
goodSplits = append(goodSplits, currentChunk.String())
// 保留重叠内容
overlap := s.getOverlap(currentChunk.String())
currentChunk.Reset()
currentChunk.WriteString(overlap)
if currentChunk.Len() > 0 {
currentChunk.WriteString(separator)
}
}
currentChunk.WriteString(split)
}
}
if currentChunk.Len() > 0 {
goodSplits = append(goodSplits, currentChunk.String())
}
// 如果某个片段仍然太大,递归使用更细粒度的分隔符
var result []string
for _, gs := range goodSplits {
if utf8.RuneCountInString(gs) > s.ChunkSize*4 {
result = append(result, s.splitText(gs, separators[1:])...)
} else {
result = append(result, gs)
}
}
return result
}
func (s *RecursiveTextSplitter) getOverlap(text string) string {
runes := []rune(text)
overlapChars := s.ChunkOverlap * 4
if len(runes) <= overlapChars {
return text
}
return string(runes[len(runes)-overlapChars:])
}
Embedding API 客户端
// rag/embedder.go
package rag
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
)
// Embedder 将文本转换为向量
type Embedder interface {
Embed(ctx context.Context, texts []string) ([][]float32, error)
}
// OpenAIEmbedder 使用 OpenAI 的 Embedding API
type OpenAIEmbedder struct {
apiKey string
model string
client *http.Client
}
func NewOpenAIEmbedder(model string) *OpenAIEmbedder {
return &OpenAIEmbedder{
apiKey: os.Getenv("OPENAI_API_KEY"),
model: model,
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (e *OpenAIEmbedder) Embed(ctx context.Context, texts []string) ([][]float32, error) {
body, _ := json.Marshal(map[string]interface{}{
"model": e.model,
"input": texts,
})
req, err := http.NewRequestWithContext(ctx, "POST",
"https://api.openai.com/v1/embeddings", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+e.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := e.client.Do(req)
if err != nil {
return nil, fmt.Errorf("embedding request: %w", err)
}
defer resp.Body.Close()
var result struct {
Data []struct {
Embedding []float32 `json:"embedding"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
embeddings := make([][]float32, len(result.Data))
for i, d := range result.Data {
embeddings[i] = d.Embedding
}
return embeddings, nil
}
// VoyageEmbedder 使用 Voyage AI 的 Embedding API(Anthropic 推荐)
type VoyageEmbedder struct {
apiKey string
model string
client *http.Client
}
func NewVoyageEmbedder(model string) *VoyageEmbedder {
return &VoyageEmbedder{
apiKey: os.Getenv("VOYAGE_API_KEY"),
model: model,
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (e *VoyageEmbedder) Embed(ctx context.Context, texts []string) ([][]float32, error) {
body, _ := json.Marshal(map[string]interface{}{
"model": e.model,
"input": texts,
})
req, err := http.NewRequestWithContext(ctx, "POST",
"https://api.voyageai.com/v1/embeddings", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+e.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := e.client.Do(req)
if err != nil {
return nil, fmt.Errorf("voyage embedding request: %w", err)
}
defer resp.Body.Close()
var result struct {
Data []struct {
Embedding []float32 `json:"embedding"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
embeddings := make([][]float32, len(result.Data))
for i, d := range result.Data {
embeddings[i] = d.Embedding
}
return embeddings, nil
}
pgvector 向量存储
// rag/store.go
package rag
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/lib/pq"
_ "github.com/lib/pq"
)
// SearchResult 代表一个检索结果
type SearchResult struct {
Chunk Chunk
Similarity float64
}
// PgVectorStore 使用 pgvector 存储和检索向量
type PgVectorStore struct {
db *sql.DB
tableName string
dimension int
}
func NewPgVectorStore(dsn, tableName string, dimension int) (*PgVectorStore, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
store := &PgVectorStore{db: db, tableName: tableName, dimension: dimension}
if err := store.initialize(context.Background()); err != nil {
return nil, err
}
return store, nil
}
func (s *PgVectorStore) initialize(ctx context.Context) error {
queries := []string{
"CREATE EXTENSION IF NOT EXISTS vector",
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
source TEXT,
chunk_idx INTEGER,
metadata JSONB DEFAULT '{}',
embedding vector(%d)
)`, s.tableName, s.dimension),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_embedding_idx
ON %s USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)`, s.tableName, s.tableName),
}
for _, q := range queries {
if _, err := s.db.ExecContext(ctx, q); err != nil {
return fmt.Errorf("initialize: %w", err)
}
}
return nil
}
// Insert 批量插入文档块和对应的向量
func (s *PgVectorStore) Insert(ctx context.Context, chunks []Chunk, embeddings [][]float32) error {
if len(chunks) != len(embeddings) {
return fmt.Errorf("chunks and embeddings length mismatch")
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(
`INSERT INTO %s (content, source, chunk_idx, embedding)
VALUES ($1, $2, $3, $4)`, s.tableName))
if err != nil {
return err
}
defer stmt.Close()
for i, chunk := range chunks {
embStr := float32SliceToString(embeddings[i])
if _, err := stmt.ExecContext(ctx, chunk.Content, chunk.Source, chunk.ChunkIdx, embStr); err != nil {
return fmt.Errorf("insert chunk %d: %w", i, err)
}
}
return tx.Commit()
}
// Search 使用余弦相似度检索最相关的 K 个块
func (s *PgVectorStore) Search(ctx context.Context, queryEmbedding []float32, k int) ([]SearchResult, error) {
embStr := float32SliceToString(queryEmbedding)
rows, err := s.db.QueryContext(ctx, fmt.Sprintf(
`SELECT content, source, chunk_idx,
1 - (embedding <=> $1::vector) AS similarity
FROM %s
ORDER BY embedding <=> $1::vector
LIMIT $2`, s.tableName),
embStr, k)
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}
defer rows.Close()
var results []SearchResult
for rows.Next() {
var r SearchResult
if err := rows.Scan(&r.Chunk.Content, &r.Chunk.Source,
&r.Chunk.ChunkIdx, &r.Similarity); err != nil {
return nil, err
}
results = append(results, r)
}
return results, rows.Err()
}
// HybridSearch 结合向量搜索和全文搜索(BM25)
func (s *PgVectorStore) HybridSearch(ctx context.Context, query string, queryEmbedding []float32, k int) ([]SearchResult, error) {
embStr := float32SliceToString(queryEmbedding)
// RRF (Reciprocal Rank Fusion) 融合两种检索结果
rows, err := s.db.QueryContext(ctx, fmt.Sprintf(
`WITH vector_search AS (
SELECT id, 1 - (embedding <=> $1::vector) AS vector_score
FROM %s
ORDER BY embedding <=> $1::vector
LIMIT 50
),
text_search AS (
SELECT id, ts_rank(to_tsvector('english', content), plainto_tsquery('english', $2)) AS text_score
FROM %s
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $2)
LIMIT 50
),
rrf AS (
SELECT COALESCE(v.id, t.id) AS id,
COALESCE(1.0/(60+ROW_NUMBER() OVER (ORDER BY v.vector_score DESC)), 0) +
COALESCE(1.0/(60+ROW_NUMBER() OVER (ORDER BY t.text_score DESC)), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN text_search t ON v.id = t.id
)
SELECT d.content, d.source, d.chunk_idx, r.rrf_score
FROM rrf r JOIN %s d ON r.id = d.id
ORDER BY r.rrf_score DESC
LIMIT $3`, s.tableName, s.tableName, s.tableName),
embStr, query, k)
if err != nil {
return nil, err
}
defer rows.Close()
var results []SearchResult
for rows.Next() {
var r SearchResult
if err := rows.Scan(&r.Chunk.Content, &r.Chunk.Source,
&r.Chunk.ChunkIdx, &r.Similarity); err != nil {
return nil, err
}
results = append(results, r)
}
return results, rows.Err()
}
func float32SliceToString(v []float32) string {
parts := make([]string, len(v))
for i, f := range v {
parts[i] = fmt.Sprintf("%f", f)
}
return "[" + strings.Join(parts, ",") + "]"
}
完整 RAG 管道
// rag/pipeline.go
package rag
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"time"
)
// Pipeline 是完整的 RAG 管道
type Pipeline struct {
splitter *RecursiveTextSplitter
embedder Embedder
store *PgVectorStore
reranker Reranker
llm LLMClient
}
type Reranker interface {
Rerank(ctx context.Context, query string, docs []SearchResult, topK int) ([]SearchResult, error)
}
type LLMClient interface {
Complete(ctx context.Context, systemPrompt, userMessage string) (string, error)
}
func NewPipeline(embedder Embedder, store *PgVectorStore, reranker Reranker, llm LLMClient) *Pipeline {
return &Pipeline{
splitter: NewRecursiveTextSplitter(1000, 200),
embedder: embedder,
store: store,
reranker: reranker,
llm: llm,
}
}
// Ingest 摄入文档
func (p *Pipeline) Ingest(ctx context.Context, docs []Document) error {
var allChunks []Chunk
for _, doc := range docs {
chunks := p.splitter.Split(doc)
allChunks = append(allChunks, chunks...)
}
// 批量 Embed(每批 100 个)
batchSize := 100
for i := 0; i < len(allChunks); i += batchSize {
end := i + batchSize
if end > len(allChunks) {
end = len(allChunks)
}
batch := allChunks[i:end]
texts := make([]string, len(batch))
for j, c := range batch {
texts[j] = c.Content
}
embeddings, err := p.embedder.Embed(ctx, texts)
if err != nil {
return fmt.Errorf("embed batch %d: %w", i/batchSize, err)
}
if err := p.store.Insert(ctx, batch, embeddings); err != nil {
return fmt.Errorf("insert batch %d: %w", i/batchSize, err)
}
fmt.Printf("Ingested %d/%d chunks\n", end, len(allChunks))
}
return nil
}
// Query 处理用户查询,返回 LLM 生成的答案
func (p *Pipeline) Query(ctx context.Context, question string) (string, error) {
// 1. Embed 查询
embeddings, err := p.embedder.Embed(ctx, []string{question})
if err != nil {
return "", fmt.Errorf("embed query: %w", err)
}
queryEmbedding := embeddings[0]
// 2. 混合搜索(向量 + BM25)
results, err := p.store.HybridSearch(ctx, question, queryEmbedding, 20)
if err != nil {
return "", fmt.Errorf("search: %w", err)
}
if len(results) == 0 {
return "I couldn't find relevant information in the knowledge base to answer your question.", nil
}
// 3. 重排序(选出最相关的 5 个)
if p.reranker != nil {
results, err = p.reranker.Rerank(ctx, question, results, 5)
if err != nil {
// 降级:不重排序,使用原始结果
if len(results) > 5 {
results = results[:5]
}
}
} else if len(results) > 5 {
results = results[:5]
}
// 4. 组装上下文
context := p.assembleContext(results)
// 5. 生成答案
systemPrompt := `You are a helpful assistant. Answer the user's question based ONLY on the provided reference materials.
If the reference materials don't contain enough information to answer the question, say so clearly.
Always cite the source (filename) when referencing information.`
userMessage := fmt.Sprintf("Reference materials:\n\n%s\n\nQuestion: %s", context, question)
return p.llm.Complete(ctx, systemPrompt, userMessage)
}
// assembleContext 将检索结果组装为上下文字符串
func (p *Pipeline) assembleContext(results []SearchResult) string {
var sb strings.Builder
for i, r := range results {
sb.WriteString(fmt.Sprintf("[Source %d: %s]\n%s\n\n",
i+1, r.Chunk.Source, r.Chunk.Content))
}
return sb.String()
}
// CohereReranker 使用 Cohere Rerank API
type CohereReranker struct {
apiKey string
model string
client *http.Client
}
func NewCohereReranker() *CohereReranker {
return &CohereReranker{
apiKey: os.Getenv("COHERE_API_KEY"),
model: "rerank-english-v3.0",
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (r *CohereReranker) Rerank(ctx context.Context, query string, docs []SearchResult, topK int) ([]SearchResult, error) {
documents := make([]string, len(docs))
for i, d := range docs {
documents[i] = d.Chunk.Content
}
body, _ := json.Marshal(map[string]interface{}{
"model": r.model,
"query": query,
"documents": documents,
"top_n": topK,
})
req, err := http.NewRequestWithContext(ctx, "POST",
"https://api.cohere.ai/v1/rerank", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+r.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := r.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result struct {
Results []struct {
Index int `json:"index"`
RelevanceScore float64 `json:"relevance_score"`
} `json:"results"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
reranked := make([]SearchResult, len(result.Results))
for i, r := range result.Results {
reranked[i] = docs[r.Index]
reranked[i].Similarity = r.RelevanceScore
}
return reranked, nil
}
L4:进阶——Late Chunking、HyDE、RAGAS 评估与流式 RAG
Late Chunking(延迟分块)
传统分块在 Embed 之前进行,导致块之间的上下文联系丢失。Late Chunking 先对完整文档做 Embedding,再在向量空间中切块:
// LateChunker 实现延迟分块
type LateChunker struct {
// 使用支持长上下文的 Embedding 模型(如 jina-embeddings-v2)
embedder LongContextEmbedder
}
// EmbedWithContext 先对完整文档 Embed,再在 token 层面分块
func (c *LateChunker) EmbedWithContext(ctx context.Context, doc Document) ([]ChunkEmbedding, error) {
// 1. 获取完整文档的 token 级别向量(每个 token 一个向量)
tokenEmbeddings, err := c.embedder.EmbedTokens(ctx, doc.Content)
if err != nil {
return nil, err
}
// 2. 确定分块边界(使用语义边界)
boundaries := findSemanticBoundaries(doc.Content)
// 3. 对每个块,对其 token 向量做均值池化
var result []ChunkEmbedding
for i := 0; i < len(boundaries)-1; i++ {
start, end := boundaries[i], boundaries[i+1]
chunkTokens := tokenEmbeddings[start:end]
// 均值池化
pooled := meanPool(chunkTokens)
result = append(result, ChunkEmbedding{
Content: extractText(doc.Content, start, end),
Embedding: pooled,
})
}
return result, nil
}
func meanPool(vectors [][]float32) []float32 {
if len(vectors) == 0 {
return nil
}
dim := len(vectors[0])
result := make([]float32, dim)
for _, v := range vectors {
for i, f := range v {
result[i] += f
}
}
n := float32(len(vectors))
for i := range result {
result[i] /= n
}
return result
}
HyDE(Hypothetical Document Embeddings)
HyDE 是一个精妙的技巧:先让 LLM 根据问题生成一个"假设文档",再用这个假设文档的向量去搜索真实的知识库。
其原理是:问题的向量和答案的向量在语义空间中可能并不相近(比如"什么是 HNSW?"和"HNSW 是..."),但两个答案类型的文本则更可能相近。
// HyDERetriever 实现 HyDE 检索策略
type HyDERetriever struct {
llm LLMClient
embedder Embedder
store *PgVectorStore
}
func (r *HyDERetriever) Search(ctx context.Context, question string, k int) ([]SearchResult, error) {
// 1. 让 LLM 生成假设文档
hypotheticalDoc, err := r.llm.Complete(ctx,
"Generate a short, factual paragraph that would answer the following question. Write as if from a technical document.",
question)
if err != nil {
return nil, fmt.Errorf("generate hypothetical doc: %w", err)
}
// 2. 对假设文档做 Embedding
embeddings, err := r.embedder.Embed(ctx, []string{hypotheticalDoc})
if err != nil {
return nil, err
}
// 3. 用假设文档的向量搜索真实知识库
return r.store.Search(ctx, embeddings[0], k)
}
RAGAS 评估框架
RAGAS(RAG Assessment)提供了系统化评估 RAG 管道的指标:
// RAGASEvaluator 评估 RAG 管道质量
type RAGASEvaluator struct {
llm LLMClient
}
// EvaluateFaithfulness 评估答案是否忠实于检索内容(0-1)
// 核心问题:答案中的每个声明是否都可以从上下文中推断?
func (e *RAGASEvaluator) EvaluateFaithfulness(ctx context.Context, question, answer, context string) (float64, error) {
prompt := fmt.Sprintf(`Given the following context and answer, evaluate whether each statement in the answer is supported by the context.
Context: %s
Answer: %s
For each statement in the answer, determine if it is:
1. Fully supported by the context
2. Partially supported
3. Not supported (hallucination)
Return a JSON object with:
- "statements": list of statements found in the answer
- "supported": list of booleans indicating support
- "score": ratio of fully supported statements (0.0 to 1.0)`, context, answer)
response, err := e.llm.Complete(ctx, "You are an expert at evaluating RAG systems.", prompt)
if err != nil {
return 0, err
}
var result struct {
Score float64 `json:"score"`
}
// 从响应中提取 JSON
if err := extractJSON(response, &result); err != nil {
return 0, err
}
return result.Score, nil
}
// EvaluateAnswerRelevancy 评估答案与问题的相关性(0-1)
func (e *RAGASEvaluator) EvaluateAnswerRelevancy(ctx context.Context, question, answer string, embedder Embedder) (float64, error) {
// 让 LLM 根据答案反向生成问题,测量反向问题与原始问题的相似度
genQuestionsPrompt := fmt.Sprintf(`Generate 3 different questions that this answer could be responding to.
Answer: %s
Return just the questions, one per line.`, answer)
response, err := e.llm.Complete(ctx, "Generate questions based on the given answer.", genQuestionsPrompt)
if err != nil {
return 0, err
}
generatedQuestions := splitLines(response)
// 计算生成问题与原始问题的平均余弦相似度
texts := append([]string{question}, generatedQuestions...)
embeddings, err := embedder.Embed(ctx, texts)
if err != nil {
return 0, err
}
questionEmb := embeddings[0]
var totalSim float64
for _, genEmb := range embeddings[1:] {
totalSim += cosineSimilarity(questionEmb, genEmb)
}
return totalSim / float64(len(generatedQuestions)), nil
}
func cosineSimilarity(a, b []float32) float64 {
var dot, normA, normB float64
for i := range a {
dot += float64(a[i]) * float64(b[i])
normA += float64(a[i]) * float64(a[i])
normB += float64(b[i]) * float64(b[i])
}
if normA == 0 || normB == 0 {
return 0
}
return dot / (math.Sqrt(normA) * math.Sqrt(normB))
}
流式 RAG 响应
对于长答案,流式输出可以显著改善用户体验:
// StreamingQuery 支持流式输出的 RAG 查询
func (p *Pipeline) StreamingQuery(ctx context.Context, question string, out chan<- string) error {
defer close(out)
// 检索步骤(同步)
embeddings, err := p.embedder.Embed(ctx, []string{question})
if err != nil {
return err
}
results, err := p.store.Search(ctx, embeddings[0], 5)
if err != nil {
return err
}
context := p.assembleContext(results)
// 流式生成
return p.llm.StreamComplete(ctx,
"Answer based on the provided context.",
fmt.Sprintf("Context:\n%s\n\nQuestion: %s", context, question),
out)
}
小结
RAG 管道的性能受多个因素影响,优化顺序建议:
- 先优化分块策略:这是影响最大的单一因素。错误的分块会导致检索到不完整的上下文。
- 选择合适的 Embedding 模型:voyage-3 在中文和技术文档上效果更好。
- 添加混合搜索:BM25 对关键词搜索(人名、型号、代码片段)更好,向量搜索对语义更好。
- 加入重排序:Cohere Rerank 可以显著提升精度,成本可控。
- 最后优化 Prompt:上下文组装方式和系统提示对最终质量有明显影响。
- 用 RAGAS 评估:建立评估数据集,量化每次改进的效果。
RAG 不是一次性工程,而是一个持续迭代优化的系统。