第 41 章

实现一个简易数据库

实现一个简易数据库

有一种说法在系统程序员中广泛流传:真正理解一个系统的唯一方式,是自己动手实现它。数据库是这个规律最典型的体现。你可以读完整本《数据库系统概念》,你可以背出 B+树的每一个插入规则,但直到你亲手写出一个能运行 SELECT * FROM users WHERE age > 18 的引擎,你才会真正理解为什么数据库的设计是现在这个样子。

这一章我们用 Go 构建一个完整的小型数据库引擎。不是玩具级别的内存哈希表,而是一个真正支持磁盘存储、崩溃恢复、事务隔离的系统。我们会经历:键值存储 → B+树索引 → WAL 日志 → MVCC 事务 → SQL 解析器。每一步都会揭示现代数据库引擎的一块核心机制。

Level 1 · 数据库引擎的本质

存储引擎 vs 查询引擎

当你运行 SELECT name FROM employees WHERE salary > 10000 ORDER BY name 时,数据库内部发生了两件完全不同的事情。

查询引擎(Query Engine)负责理解你的意图:解析 SQL 文本,生成执行计划,决定先过滤还是先排序,决定用哪个索引。查询引擎是个翻译器——把人类的语言翻译成机器的操作序列。

存储引擎(Storage Engine)负责数据的物理存取:数据在磁盘上怎么组织,怎么从磁盘读出来,怎么写回去,崩溃后怎么恢复。存储引擎是个物理系统——和硬件、操作系统直接打交道。

MySQL 的架构清晰地体现了这种分层:上层是优化器和执行器(查询引擎),下层是 InnoDB 或 MyISAM(存储引擎)。你甚至可以给同一张表换一个存储引擎,上层代码完全不需要修改。PostgreSQL 把存储引擎和查询引擎更紧密地耦合在一起,但逻辑层次依然清晰。

这种分层不是偶然的工程决策,而是数据库问题本质的体现:如何存数据如何查数据 是两个根本不同的问题,复杂度来源不同,优化方向不同,变化速度也不同。

为什么这是最好的系统编程练习

构建数据库引擎会强迫你面对计算机系统中最核心的几个矛盾:

速度 vs 持久性。 内存快,磁盘慢。但内存是易失的,断电就没了。所有的崩溃恢复机制,本质上都是在解决这个矛盾。

并发 vs 一致性。 多个用户同时读写数据库。要让他们互不干扰,同时又要保证数据的一致性,这是并发控制的核心问题。MVCC 是现代数据库给出的答案之一。

空间 vs 时间。 B+树的每个节点应该多大?太小了节点太多,树太深,查找慢;太大了每次 I/O 读写的数据太多,浪费。缓冲池的大小怎么决定?这些都是空间和时间的权衡。

灵活性 vs 性能。 支持 SQL 意味着你需要解析和优化查询,这有额外的 CPU 开销。直接使用键值 API 更快,但表达能力受限。

每一个这样的矛盾,在数据库系统中都有对应的工程解法,而这些解法在其他系统(操作系统、分布式系统、文件系统)中也无处不在。学会数据库,实际上是学会了一套思考系统问题的框架。

Go 的适配性

Go 在这个项目中的优势是多方面的:

Level 2 · 核心机制原理

B+树:数据库索引的基石

B+树是绝大多数关系型数据库的默认索引结构。理解为什么选 B+树而不是其他结构,需要先理解数据库面对的 I/O 现实。

一次磁盘随机 I/O 的延迟是 5-10ms,而一次内存访问是 100ns,相差 5 万倍。对于一个有 100 万条记录的表,二叉搜索树(BST)的高度约为 20,每次查找需要 20 次 I/O,也就是 100-200ms。这完全不可接受。

B+树的解决方案是:让每个节点尽可能大,减少树的高度。典型的 B+树节点大小等于磁盘 I/O 的最小单位——页(Page),通常是 4KB 或 16KB(InnoDB 默认 16KB)。一个 16KB 的节点可以存储约 1000 个 64 位的键,所以一棵 3 层的 B+树可以索引 10 亿条记录,而只需要 3 次 I/O。

B+树的节点结构

B+树分为内部节点(Internal Node)和叶节点(Leaf Node):

内部节点:
+--------+----+--------+----+--------+
| ptr[0] | k1 | ptr[1] | k2 | ptr[2] |
+--------+----+--------+----+--------+

叶节点:
+-----+-----+-----+------+
| k1  | k2  | k3  | next | → 指向下一个叶节点(支持范围扫描)
+-----+-----+-----+------+
| v1  | v2  | v3  |

内部节点只存键和子节点指针,不存值。所有的实际数据(或主键,在 InnoDB 的二级索引中)都在叶节点。叶节点之间用指针串成双向链表,这使得范围扫描变得极其高效——找到起点后,顺着链表走就行了,完全不需要回溯树。

插入与分裂

当一个叶节点满了,B+树会把它分裂成两个节点,把中间的键提升到父节点。如果父节点也满了,继续向上分裂,最坏情况下一路分裂到根节点,这时树增高一层。

分裂前(节点满,capacity=3):
[10, 20, 30]

插入 25 后分裂:
父节点获得 25
[10, 20] [25, 30]

删除与合并

删除后,如果节点的键数少于最低阈值(通常是 capacity/2),有两个选项:

  1. 借用:从相邻兄弟节点借一个键(通过父节点中转)。
  2. 合并:和兄弟节点合并,父节点中的分隔键下移。

合并会触发父节点的键减少,可能引发递归合并,最坏情况下树降低一层。

WAL:顺序写实现崩溃恢复

数据库最基本的承诺之一是持久性(Durability):事务一旦提交,数据不会因为崩溃而丢失。

最直接的实现方式是:提交时把脏页(被修改但尚未写回磁盘的页)全部刷新到磁盘。但这有两个问题:随机 I/O 极慢(在磁盘上,随机写性能约为顺序写的 1/100),而且如果修改跨越多个页,崩溃在中间会导致数据不一致。

WAL(Write-Ahead Log,预写式日志)的解决方案极其优雅:

核心规则:在修改磁盘上的任何数据页之前,必须先把对应的日志记录写入 WAL 文件。

WAL 的写入是纯顺序追加(append-only),顺序写性能极高。数据页可以延迟写,只要 WAL 已经落盘,崩溃后就能从 WAL 重放所有操作,恢复到一致状态。

WAL 日志记录格式:
+----------+----------+---------+---------+----------+
| LSN (8B) | TxID (8B)| PageID  | Offset  | Data     |
+----------+----------+---------+---------+----------+

LSN(Log Sequence Number)是单调递增的序列号,用来确定日志记录的顺序,也用来标记数据页的"最新程度"。每个数据页的头部都存储着最后一次修改它的 WAL 记录的 LSN(称为 pageLSN)。

恢复流程(ARIES 算法简化版):

  1. 分析阶段:从最近的检查点(Checkpoint)开始扫描 WAL,确定哪些事务提交了,哪些没有。
  2. 重做阶段:重放所有 WAL 记录,把数据库恢复到崩溃前的状态(包括未提交的事务的修改)。
  3. 撤销阶段:回滚所有未提交的事务(通过 WAL 中的补偿日志记录)。

MVCC:快照隔离的实现

读写不互相阻塞——这是 MVCC(Multi-Version Concurrency Control,多版本并发控制)最重要的特性。它让读操作看到一个一致的历史快照,而不阻塞写操作。

版本链

每一行数据都有一条版本链,链上的每个版本记录了:

行 id=1 的版本链:
v3: name="Charlie", begin_ts=200, end_ts=NULL    ← 最新版本
v2: name="Bob",     begin_ts=100, end_ts=200
v1: name="Alice",   begin_ts=1,   end_ts=100

快照读

每个事务开始时,记录当前的事务 ID 作为快照时间戳(snap_ts)。读操作只能看到满足以下条件的版本:

这样,事务 A 在整个生命周期内看到的是一个一致的快照——就算其他事务在 A 运行期间修改了数据,A 也看不到这些修改。

写操作

写操作不修改现有版本,而是创建新版本(INSERT)或标记旧版本失效(DELETE/UPDATE = DELETE + INSERT)。这就是"多版本"的含义。

缓冲池:页面缓存管理

数据库不直接读写磁盘,而是通过缓冲池(Buffer Pool)。缓冲池是内存中的一块区域,缓存了最近访问的磁盘页。

缓冲池结构:
+---------+---------+---------+---------+
| Frame 0 | Frame 1 | Frame 2 | Frame 3 |  ← 固定大小的内存块
+---------+---------+---------+---------+
     ↓
  页表(Page Table): PageID → Frame 号
  替换策略(LRU/Clock): 决定驱逐哪个 Frame

当需要读一个页时:

  1. 查页表,如果命中(缓冲池中有),直接返回。
  2. 未命中,选一个牺牲帧(Victim Frame),如果被选中的帧是脏页,先把它刷到磁盘。
  3. 从磁盘读取需要的页,放入牺牲帧,更新页表。

替换策略决定选哪个帧作为牺牲者。最简单的是 LRU(最近最少使用),实际的数据库(如 InnoDB)使用改进的 LRU——分为新/旧两个子列表,防止全表扫描把热数据驱逐出去。

Level 3 · 从零构建

第一步:页式存储和缓冲池

我们的数据库以 4KB 为一个页(Page)进行磁盘管理。每个页有唯一的 PageID。

package storage

import (
    "encoding/binary"
    "fmt"
    "os"
    "sync"
)

const PageSize = 4096

type PageID uint32
type Page [PageSize]byte

// DiskManager 管理磁盘文件,以页为单位读写
type DiskManager struct {
    file       *os.File
    numPages   PageID
    mu         sync.Mutex
}

func NewDiskManager(path string) (*DiskManager, error) {
    f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
    if err != nil {
        return nil, fmt.Errorf("open file: %w", err)
    }
    info, _ := f.Stat()
    numPages := PageID(info.Size() / PageSize)
    return &DiskManager{file: f, numPages: numPages}, nil
}

func (dm *DiskManager) ReadPage(id PageID, p *Page) error {
    _, err := dm.file.ReadAt(p[:], int64(id)*PageSize)
    return err
}

func (dm *DiskManager) WritePage(id PageID, p *Page) error {
    _, err := dm.file.WriteAt(p[:], int64(id)*PageSize)
    return err
}

func (dm *DiskManager) AllocatePage() PageID {
    dm.mu.Lock()
    defer dm.mu.Unlock()
    id := dm.numPages
    dm.numPages++
    // 在文件末尾写一个空页,确保文件扩展
    var empty Page
    dm.file.WriteAt(empty[:], int64(id)*PageSize)
    return id
}

// BufferPool 缓冲池:缓存磁盘页,支持 LRU 替换
type frame struct {
    page    Page
    pageID  PageID
    dirty   bool
    pinCount int
}

type BufferPool struct {
    dm      *DiskManager
    frames  []frame
    pageTable map[PageID]int  // pageID → frame index
    freeList  []int           // 空闲 frame 列表
    lruList   []int           // LRU 顺序(头部最近使用)
    mu        sync.Mutex
}

func NewBufferPool(dm *DiskManager, poolSize int) *BufferPool {
    bp := &BufferPool{
        dm:        dm,
        frames:    make([]frame, poolSize),
        pageTable: make(map[PageID]int),
        freeList:  make([]int, poolSize),
    }
    for i := range bp.freeList {
        bp.freeList[i] = i
    }
    return bp
}

// FetchPage 获取一个页面(pin 住,防止被驱逐)
func (bp *BufferPool) FetchPage(id PageID) (*Page, error) {
    bp.mu.Lock()
    defer bp.mu.Unlock()

    // 命中缓冲池
    if fi, ok := bp.pageTable[id]; ok {
        bp.frames[fi].pinCount++
        bp.moveFront(&bp.lruList, fi)
        return &bp.frames[fi].page, nil
    }

    // 找空闲 frame 或驱逐
    fi, err := bp.evict()
    if err != nil {
        return nil, err
    }

    // 从磁盘读页
    f := &bp.frames[fi]
    if err := bp.dm.ReadPage(id, &f.page); err != nil {
        return nil, fmt.Errorf("read page %d: %w", id, err)
    }
    f.pageID = id
    f.dirty = false
    f.pinCount = 1
    bp.pageTable[id] = fi
    bp.lruList = append([]int{fi}, bp.lruList...)
    return &f.page, nil
}

func (bp *BufferPool) UnpinPage(id PageID, dirty bool) {
    bp.mu.Lock()
    defer bp.mu.Unlock()
    if fi, ok := bp.pageTable[id]; ok {
        bp.frames[fi].pinCount--
        if dirty {
            bp.frames[fi].dirty = true
        }
    }
}

func (bp *BufferPool) FlushPage(id PageID) error {
    bp.mu.Lock()
    defer bp.mu.Unlock()
    fi, ok := bp.pageTable[id]
    if !ok {
        return nil
    }
    f := &bp.frames[fi]
    if f.dirty {
        if err := bp.dm.WritePage(id, &f.page); err != nil {
            return err
        }
        f.dirty = false
    }
    return nil
}

func (bp *BufferPool) evict() (int, error) {
    if len(bp.freeList) > 0 {
        fi := bp.freeList[0]
        bp.freeList = bp.freeList[1:]
        return fi, nil
    }
    // LRU 驱逐:从 lruList 尾部找 pinCount=0 的 frame
    for i := len(bp.lruList) - 1; i >= 0; i-- {
        fi := bp.lruList[i]
        f := &bp.frames[fi]
        if f.pinCount == 0 {
            if f.dirty {
                if err := bp.dm.WritePage(f.pageID, &f.page); err != nil {
                    return 0, err
                }
            }
            delete(bp.pageTable, f.pageID)
            bp.lruList = append(bp.lruList[:i], bp.lruList[i+1:]...)
            return fi, nil
        }
    }
    return 0, fmt.Errorf("buffer pool full: all pages are pinned")
}

func (bp *BufferPool) moveFront(list *[]int, val int) {
    for i, v := range *list {
        if v == val {
            *list = append([]int{val}, append((*list)[:i], (*list)[i+1:]...)...)
            return
        }
    }
}

// 将 uint64 写入 Page 的指定偏移
func PutUint64(p *Page, offset int, v uint64) {
    binary.LittleEndian.PutUint64(p[offset:], v)
}

func GetUint64(p *Page, offset int) uint64 {
    return binary.LittleEndian.Uint64(p[offset:])
}

第二步:B+树键值存储

package btree

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "github.com/yourname/minidb/storage"
)

// 叶节点格式(简化):
// [numKeys:2][key0:8][val0:8]...[keyN:8][valN:8][nextLeaf:4]
// 内部节点格式:
// [numKeys:2][child0:4][key0:8][child1:4][key1:8]...[childN:4]

const (
    PageSize   = storage.PageSize
    MaxKeys    = 200   // 每个节点最多 key 数,可以根据页大小调整
    LeafFlag   = 0x01  // 页头部标志位,区分叶/内部节点
)

type Key uint64
type Value uint64
type PageID = storage.PageID

type BTree struct {
    bp     *storage.BufferPool
    dm     *storage.DiskManager
    rootID PageID
}

func NewBTree(bp *storage.BufferPool, dm *storage.DiskManager) *BTree {
    // 第 0 页是元数据页(存 rootID)
    rootID := dm.AllocatePage() // 分配根节点页
    bt := &BTree{bp: bp, dm: dm, rootID: rootID}
    bt.initLeafPage(rootID)
    return bt
}

func (bt *BTree) initLeafPage(id PageID) {
    p, _ := bt.bp.FetchPage(id)
    p[0] = LeafFlag // 标记为叶节点
    binary.LittleEndian.PutUint16(p[1:], 0) // numKeys = 0
    bt.bp.UnpinPage(id, true)
}

func (bt *BTree) Get(key Key) (Value, bool) {
    leafID := bt.findLeaf(bt.rootID, key)
    p, err := bt.bp.FetchPage(leafID)
    if err != nil {
        return 0, false
    }
    defer bt.bp.UnpinPage(leafID, false)

    numKeys := int(binary.LittleEndian.Uint16(p[1:]))
    // 从偏移 3 开始: [key:8][val:8] 每对 16 字节
    for i := 0; i < numKeys; i++ {
        offset := 3 + i*16
        k := Key(binary.LittleEndian.Uint64(p[offset:]))
        if k == key {
            v := Value(binary.LittleEndian.Uint64(p[offset+8:]))
            return v, true
        }
    }
    return 0, false
}

func (bt *BTree) Put(key Key, val Value) error {
    leafID := bt.findLeaf(bt.rootID, key)
    newKey, newPageID, split := bt.insertLeaf(leafID, key, val)
    if split {
        // 向上传递分裂
        bt.insertIntoParent(bt.rootID, leafID, newKey, newPageID)
    }
    return nil
}

// insertLeaf 向叶节点插入,如果满了则分裂
// 返回:分裂时提升的键,新右节点的 PageID,是否分裂
func (bt *BTree) insertLeaf(id PageID, key Key, val Value) (Key, PageID, bool) {
    p, _ := bt.bp.FetchPage(id)
    numKeys := int(binary.LittleEndian.Uint16(p[1:]))

    if numKeys < MaxKeys {
        // 找到插入位置并移位
        pos := 0
        for pos < numKeys {
            k := Key(binary.LittleEndian.Uint64(p[3+pos*16:]))
            if k >= key {
                break
            }
            pos++
        }
        // 向右移
        copy(p[3+(pos+1)*16:], p[3+pos*16:3+numKeys*16])
        binary.LittleEndian.PutUint64(p[3+pos*16:], uint64(key))
        binary.LittleEndian.PutUint64(p[3+pos*16+8:], uint64(val))
        binary.LittleEndian.PutUint16(p[1:], uint16(numKeys+1))
        bt.bp.UnpinPage(id, true)
        return 0, 0, false
    }

    // 叶节点已满,需要分裂
    // 分裂点:mid = MaxKeys / 2
    mid := MaxKeys / 2
    newID := bt.dm.AllocatePage()
    newPage, _ := bt.bp.FetchPage(newID)
    newPage[0] = LeafFlag

    // 右半边复制到新页
    rightCount := numKeys - mid
    copy(newPage[3:], p[3+mid*16:3+numKeys*16])
    binary.LittleEndian.PutUint16(newPage[1:], uint16(rightCount))

    // 原页只保留左半边
    binary.LittleEndian.PutUint16(p[1:], uint16(mid))

    // 更新叶链表:新页 next = 原页 next,原页 next = 新页
    origNext := PageID(binary.LittleEndian.Uint32(p[PageSize-4:]))
    binary.LittleEndian.PutUint32(newPage[PageSize-4:], uint32(origNext))
    binary.LittleEndian.PutUint32(p[PageSize-4:], uint32(newID))

    // 提升的键是新右页的第一个键
    promoteKey := Key(binary.LittleEndian.Uint64(newPage[3:]))

    bt.bp.UnpinPage(id, true)
    bt.bp.UnpinPage(newID, true)

    // 插入到正确的半边
    if key < promoteKey {
        bt.insertLeaf(id, key, val)
    } else {
        bt.insertLeaf(newID, key, val)
    }
    return promoteKey, newID, true
}

func (bt *BTree) findLeaf(nodeID PageID, key Key) PageID {
    p, _ := bt.bp.FetchPage(nodeID)
    if p[0] == LeafFlag {
        bt.bp.UnpinPage(nodeID, false)
        return nodeID
    }
    // 内部节点:二分查找子节点
    numKeys := int(binary.LittleEndian.Uint16(p[1:]))
    child := PageID(binary.LittleEndian.Uint32(p[3:])) // child[0]
    for i := 0; i < numKeys; i++ {
        // 格式: child[i]:4, key[i]:8
        k := Key(binary.LittleEndian.Uint64(p[3+4+i*12:]))
        if key >= k {
            child = PageID(binary.LittleEndian.Uint32(p[3+4+i*12+8:]))
        }
    }
    bt.bp.UnpinPage(nodeID, false)
    return bt.findLeaf(child, key)
}

// RangeScan 范围扫描,返回 [start, end] 内的所有键值对
func (bt *BTree) RangeScan(start, end Key, fn func(Key, Value)) {
    leafID := bt.findLeaf(bt.rootID, start)
    for leafID != 0 {
        p, _ := bt.bp.FetchPage(leafID)
        numKeys := int(binary.LittleEndian.Uint16(p[1:]))
        finished := false
        for i := 0; i < numKeys; i++ {
            k := Key(binary.LittleEndian.Uint64(p[3+i*16:]))
            v := Value(binary.LittleEndian.Uint64(p[3+i*16+8:]))
            if k > end {
                finished = true
                break
            }
            if k >= start {
                fn(k, v)
            }
        }
        next := PageID(binary.LittleEndian.Uint32(p[PageSize-4:]))
        bt.bp.UnpinPage(leafID, false)
        if finished || next == 0 {
            break
        }
        leafID = next
    }
}

// insertIntoParent 是简化版,完整实现需要维护父节点指针
// 这里仅演示根分裂场景
func (bt *BTree) insertIntoParent(rootID, leftID PageID, key Key, rightID PageID) {
    // 当 leftID 是 root 时:创建新根
    if leftID == bt.rootID {
        newRootID := bt.dm.AllocatePage()
        p, _ := bt.bp.FetchPage(newRootID)
        p[0] = 0 // 内部节点标志
        binary.LittleEndian.PutUint16(p[1:], 1) // 1 个 key
        binary.LittleEndian.PutUint32(p[3:], uint32(leftID))
        binary.LittleEndian.PutUint64(p[7:], uint64(key))
        binary.LittleEndian.PutUint32(p[15:], uint32(rightID))
        bt.bp.UnpinPage(newRootID, true)
        bt.rootID = newRootID
    }
    // 完整实现需要向上传递,这里从略
}

第三步:WAL 崩溃恢复

package wal

import (
    "encoding/binary"
    "fmt"
    "io"
    "os"
    "sync"
    "sync/atomic"
)

// LogRecord WAL 日志记录
type LogRecord struct {
    LSN    uint64
    TxID   uint64
    Type   uint8  // 0=Update, 1=Commit, 2=Abort, 3=Begin
    PageID uint32
    Offset uint32
    Data   []byte
}

const (
    RecordUpdate = 0
    RecordCommit = 1
    RecordAbort  = 2
    RecordBegin  = 3
)

type WAL struct {
    file    *os.File
    nextLSN uint64
    mu      sync.Mutex
}

func NewWAL(path string) (*WAL, error) {
    f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }
    w := &WAL{file: f, nextLSN: 1}
    return w, nil
}

// Append 追加写 WAL 记录,返回 LSN
func (w *WAL) Append(rec *LogRecord) (uint64, error) {
    w.mu.Lock()
    defer w.mu.Unlock()

    lsn := atomic.AddUint64(&w.nextLSN, 1) - 1
    rec.LSN = lsn

    // 序列化: [total_len:4][LSN:8][TxID:8][Type:1][PageID:4][Offset:4][DataLen:4][Data...]
    dataLen := len(rec.Data)
    totalLen := 4 + 8 + 8 + 1 + 4 + 4 + 4 + dataLen

    buf := make([]byte, totalLen+4) // 前 4 字节是 total_len
    binary.LittleEndian.PutUint32(buf[0:], uint32(totalLen))
    binary.LittleEndian.PutUint64(buf[4:], lsn)
    binary.LittleEndian.PutUint64(buf[12:], rec.TxID)
    buf[20] = rec.Type
    binary.LittleEndian.PutUint32(buf[21:], rec.PageID)
    binary.LittleEndian.PutUint32(buf[25:], rec.Offset)
    binary.LittleEndian.PutUint32(buf[29:], uint32(dataLen))
    copy(buf[33:], rec.Data)

    if _, err := w.file.Write(buf); err != nil {
        return 0, fmt.Errorf("wal append: %w", err)
    }
    // fsync 确保落盘
    if err := w.file.Sync(); err != nil {
        return 0, fmt.Errorf("wal sync: %w", err)
    }
    return lsn, nil
}

// Recover 重放 WAL,返回已提交事务的操作列表
func (w *WAL) Recover() ([]LogRecord, error) {
    w.file.Seek(0, io.SeekStart)
    var records []LogRecord
    committedTx := make(map[uint64]bool)

    var allRecords []LogRecord
    for {
        var totalLen uint32
        if err := binary.Read(w.file, binary.LittleEndian, &totalLen); err != nil {
            if err == io.EOF {
                break
            }
            return nil, err
        }
        buf := make([]byte, totalLen)
        if _, err := io.ReadFull(w.file, buf); err != nil {
            break // 不完整记录,崩溃点
        }
        rec := LogRecord{
            LSN:    binary.LittleEndian.Uint64(buf[0:]),
            TxID:   binary.LittleEndian.Uint64(buf[8:]),
            Type:   buf[16],
            PageID: binary.LittleEndian.Uint32(buf[17:]),
            Offset: binary.LittleEndian.Uint32(buf[21:]),
        }
        dataLen := binary.LittleEndian.Uint32(buf[25:])
        rec.Data = make([]byte, dataLen)
        copy(rec.Data, buf[29:])
        allRecords = append(allRecords, rec)
        if rec.Type == RecordCommit {
            committedTx[rec.TxID] = true
        }
    }

    // 只重放已提交事务的操作
    for _, rec := range allRecords {
        if rec.Type == RecordUpdate && committedTx[rec.TxID] {
            records = append(records, rec)
        }
    }
    return records, nil
}

第四步:MVCC 事务

package mvcc

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// Version 代表一行的一个版本
type Version struct {
    BeginTS uint64
    EndTS   uint64 // 0 表示"当前有效"
    Value   []byte
    Next    *Version // 旧版本链
}

// Row 代表一行数据
type Row struct {
    Key     uint64
    Head    *Version // 最新版本
    mu      sync.RWMutex
}

// MVCC 事务管理器
type MVCC struct {
    nextTS  uint64      // 单调递增的事务时间戳
    rows    sync.Map    // key(uint64) → *Row
}

func NewMVCC() *MVCC {
    return &MVCC{}
}

func (m *MVCC) Begin() uint64 {
    return atomic.AddUint64(&m.nextTS, 1)
}

// Read 快照读:读取时间戳 ts 之前提交的最新版本
func (m *MVCC) Read(key, snapTS uint64) ([]byte, bool) {
    val, ok := m.rows.Load(key)
    if !ok {
        return nil, false
    }
    row := val.(*Row)
    row.mu.RLock()
    defer row.mu.RUnlock()

    for v := row.Head; v != nil; v = v.Next {
        if v.BeginTS <= snapTS && (v.EndTS == 0 || v.EndTS > snapTS) {
            result := make([]byte, len(v.Value))
            copy(result, v.Value)
            return result, true
        }
    }
    return nil, false
}

// Write 写入一个新版本(commitTS 是提交时间戳)
func (m *MVCC) Write(key, commitTS uint64, value []byte) error {
    actual, _ := m.rows.LoadOrStore(key, &Row{Key: key})
    row := actual.(*Row)
    row.mu.Lock()
    defer row.mu.Unlock()

    newVer := &Version{
        BeginTS: commitTS,
        EndTS:   0,
        Value:   append([]byte{}, value...),
        Next:    row.Head,
    }
    // 标记旧的当前版本为结束
    if row.Head != nil && row.Head.EndTS == 0 {
        row.Head.EndTS = commitTS
    }
    row.Head = newVer
    return nil
}

// Delete 标记删除:把最新版本的 EndTS 设置为 commitTS
func (m *MVCC) Delete(key, commitTS uint64) error {
    val, ok := m.rows.Load(key)
    if !ok {
        return fmt.Errorf("key %d not found", key)
    }
    row := val.(*Row)
    row.mu.Lock()
    defer row.mu.Unlock()
    if row.Head != nil && row.Head.EndTS == 0 {
        row.Head.EndTS = commitTS
    }
    return nil
}

// Tx 代表一个完整的 MVCC 事务
type Tx struct {
    mvcc    *MVCC
    snapTS  uint64
    writes  []writeIntent
    committed bool
}

type writeIntent struct {
    key   uint64
    value []byte // nil 表示删除
}

func (m *MVCC) BeginTx() *Tx {
    return &Tx{mvcc: m, snapTS: m.Begin()}
}

func (tx *Tx) Get(key uint64) ([]byte, bool) {
    return tx.mvcc.Read(key, tx.snapTS)
}

func (tx *Tx) Set(key uint64, value []byte) {
    tx.writes = append(tx.writes, writeIntent{key: key, value: value})
}

func (tx *Tx) Del(key uint64) {
    tx.writes = append(tx.writes, writeIntent{key: key, value: nil})
}

func (tx *Tx) Commit() error {
    commitTS := atomic.AddUint64(&tx.mvcc.nextTS, 1)
    for _, w := range tx.writes {
        if w.value == nil {
            tx.mvcc.Delete(w.key, commitTS)
        } else {
            tx.mvcc.Write(w.key, commitTS, w.value)
        }
    }
    tx.committed = true
    return nil
}

func (tx *Tx) Rollback() {
    tx.writes = nil
}

第五步:简易 SQL 解析器

package sql

import (
    "fmt"
    "strings"
)

// 支持 SELECT/INSERT/UPDATE/DELETE 的极简 SQL 解析器

type StatementType int

const (
    StmtSelect StatementType = iota
    StmtInsert
    StmtUpdate
    StmtDelete
)

type Statement struct {
    Type      StatementType
    Table     string
    Columns   []string
    Values    []string
    Where     *WhereClause
    SetClauses []SetClause
}

type WhereClause struct {
    Column string
    Op     string // "=", ">", "<", ">=", "<="
    Value  string
}

type SetClause struct {
    Column string
    Value  string
}

// Parse 解析一条 SQL 语句(简化版,不处理所有边界情况)
func Parse(query string) (*Statement, error) {
    tokens := tokenize(query)
    if len(tokens) == 0 {
        return nil, fmt.Errorf("empty query")
    }

    switch strings.ToUpper(tokens[0]) {
    case "SELECT":
        return parseSelect(tokens)
    case "INSERT":
        return parseInsert(tokens)
    case "UPDATE":
        return parseUpdate(tokens)
    case "DELETE":
        return parseDelete(tokens)
    default:
        return nil, fmt.Errorf("unknown statement type: %s", tokens[0])
    }
}

func parseSelect(tokens []string) (*Statement, error) {
    // SELECT col1, col2 FROM table WHERE col = val
    stmt := &Statement{Type: StmtSelect}
    i := 1
    for i < len(tokens) && strings.ToUpper(tokens[i]) != "FROM" {
        col := strings.Trim(tokens[i], ", ")
        if col != "" && col != "," {
            stmt.Columns = append(stmt.Columns, col)
        }
        i++
    }
    if i >= len(tokens) {
        return nil, fmt.Errorf("missing FROM clause")
    }
    i++ // skip FROM
    if i >= len(tokens) {
        return nil, fmt.Errorf("missing table name")
    }
    stmt.Table = tokens[i]
    i++
    if i < len(tokens) && strings.ToUpper(tokens[i]) == "WHERE" {
        w, err := parseWhere(tokens[i+1:])
        if err != nil {
            return nil, err
        }
        stmt.Where = w
    }
    return stmt, nil
}

func parseInsert(tokens []string) (*Statement, error) {
    // INSERT INTO table (col1, col2) VALUES (val1, val2)
    if len(tokens) < 5 || strings.ToUpper(tokens[1]) != "INTO" {
        return nil, fmt.Errorf("invalid INSERT syntax")
    }
    stmt := &Statement{Type: StmtInsert, Table: tokens[2]}
    // 简化:不解析列名,直接解析 VALUES 后的值
    valIdx := -1
    for i, t := range tokens {
        if strings.ToUpper(t) == "VALUES" {
            valIdx = i
            break
        }
    }
    if valIdx == -1 {
        return nil, fmt.Errorf("missing VALUES keyword")
    }
    for i := valIdx + 1; i < len(tokens); i++ {
        v := strings.Trim(tokens[i], "(),\"' ")
        if v != "" {
            stmt.Values = append(stmt.Values, v)
        }
    }
    return stmt, nil
}

func parseUpdate(tokens []string) (*Statement, error) {
    // UPDATE table SET col1=val1 WHERE col2=val2
    if len(tokens) < 4 {
        return nil, fmt.Errorf("invalid UPDATE syntax")
    }
    stmt := &Statement{Type: StmtUpdate, Table: tokens[1]}
    i := 3 // skip SET
    for i < len(tokens) && strings.ToUpper(tokens[i]) != "WHERE" {
        part := strings.Trim(tokens[i], ", ")
        if eq := strings.Index(part, "="); eq != -1 {
            stmt.SetClauses = append(stmt.SetClauses, SetClause{
                Column: strings.TrimSpace(part[:eq]),
                Value:  strings.Trim(part[eq+1:], "'\""),
            })
        }
        i++
    }
    if i < len(tokens) && strings.ToUpper(tokens[i]) == "WHERE" {
        w, err := parseWhere(tokens[i+1:])
        if err != nil {
            return nil, err
        }
        stmt.Where = w
    }
    return stmt, nil
}

func parseDelete(tokens []string) (*Statement, error) {
    // DELETE FROM table WHERE col=val
    if len(tokens) < 3 {
        return nil, fmt.Errorf("invalid DELETE syntax")
    }
    stmt := &Statement{Type: StmtDelete, Table: tokens[2]}
    for i, t := range tokens {
        if strings.ToUpper(t) == "WHERE" && i+1 < len(tokens) {
            w, err := parseWhere(tokens[i+1:])
            if err != nil {
                return nil, err
            }
            stmt.Where = w
            break
        }
    }
    return stmt, nil
}

func parseWhere(tokens []string) (*WhereClause, error) {
    // col op val
    if len(tokens) < 3 {
        return nil, fmt.Errorf("incomplete WHERE clause")
    }
    ops := []string{">=", "<=", "=", ">", "<"}
    combined := strings.Join(tokens, " ")
    for _, op := range ops {
        if idx := strings.Index(combined, op); idx != -1 {
            col := strings.TrimSpace(combined[:idx])
            val := strings.TrimSpace(combined[idx+len(op):])
            return &WhereClause{Column: col, Op: op, Value: strings.Trim(val, "'\"")}, nil
        }
    }
    return nil, fmt.Errorf("no operator found in WHERE clause")
}

func tokenize(query string) []string {
    // 简单空格分词(不处理字符串字面量中的空格)
    return strings.Fields(strings.ReplaceAll(query, ",", " , "))
}

// 使用示例
func Example() {
    queries := []string{
        "SELECT name, age FROM users WHERE age > 18",
        "INSERT INTO users VALUES (1, 'Alice', 30)",
        "UPDATE users SET name='Bob' WHERE id=1",
        "DELETE FROM users WHERE id=1",
    }
    for _, q := range queries {
        stmt, err := Parse(q)
        if err != nil {
            fmt.Printf("Error: %v\n", err)
            continue
        }
        fmt.Printf("Table: %s, Type: %d\n", stmt.Table, stmt.Type)
    }
}

Level 4 · 进阶与边界

无锁读与 MVCC 的深度权衡

MVCC 的无锁读是有代价的。版本链越长,读操作需要遍历的版本就越多。在高写入场景下,版本链可能变得很长,导致读性能退化。

垃圾回收(Vacuum)机制定期清理过期版本。PostgreSQL 的 VACUUM 是著名的设计难题——它需要找到"最老的活跃事务"(oldest active transaction),早于这个时间戳的所有非当前版本都可以安全删除。

// Compaction:删除 olderThan 之前的历史版本
func (m *MVCC) Compaction(olderThan uint64) {
    m.rows.Range(func(key, val interface{}) bool {
        row := val.(*Row)
        row.mu.Lock()
        defer row.mu.Unlock()
        // 找到第一个 BeginTS <= olderThan 的版本,截断之后的链
        cur := row.Head
        for cur != nil && cur.BeginTS > olderThan {
            cur = cur.Next
        }
        if cur != nil {
            cur.Next = nil // 截断旧版本链
        }
        return true
    })
}

LSM 树:写优化的 B+树替代方案

B+树对写操作不够友好:每次写都需要随机 I/O 更新一个或多个页。LSM 树(Log-Structured Merge Tree)把所有写都变成顺序写,代价是读操作需要合并多个文件。

LSM 树结构:
MemTable(内存,红黑树) ──写满后刷盘──► L0 SSTable(不可变,有序)
L0 SSTables ──Compaction──► L1 SSTables
L1 SSTables ──Compaction──► L2 SSTables
...

RocksDB、BadgerDB、LevelDB 都使用 LSM 树。写密集型场景(日志、时序数据)LSM 表现更好;读多写少场景(OLTP)B+树更合适。这就是 InnoDB 和 MyISAM 选择 B+树,而 Cassandra 和 HBase 选择 LSM 树的根本原因。

与 BoltDB 和 BadgerDB 的对比

BoltDB:纯 B+树实现,写事务通过 Copy-on-Write(COW)实现,不使用 WAL——修改时复制整条路径上的所有节点,原始页保持不变。提交时更新根页指针(原子操作)。这种设计使读事务完全无锁,但写吞吐量受限于 B+树的随机 I/O。

BadgerDB:LSM 树实现,Key 和 Value 分开存储(Key 在 LSM 树中,Value 在 Value Log 中)。这一设计(称为 WiscKey)大幅减少了 LSM 树的写放大(Write Amplification),对大 Value 场景性能提升显著。

性能对比(粗略参考,取决于工作负载)

场景 B+树(BoltDB) LSM(BadgerDB)
随机读 慢(需合并多层)
顺序读
随机写 慢(随机 I/O) 快(顺序追加)
磁盘空间 低碎片 有空间放大

嵌入式库 vs 独立服务器

我们实现的引擎适合作为嵌入式库(像 SQLite、BoltDB 那样),直接链接到应用进程中。这消除了网络 I/O 延迟,适合单机场景。

要变成独立服务器,需要:

  1. 网络层:监听 TCP 连接,实现协议(MySQL 协议或自定义二进制协议)。
  2. 连接管理:每个连接一个 goroutine,连接池管理。
  3. 事务协调:跨连接的事务必须串行化提交(全局事务 ID 生成器)。
  4. 认证与权限:用户管理、表级/行级权限。

SQLite 选择了嵌入式,MySQL/PostgreSQL 选择了服务器模式,各有其设计哲学。

这个项目的完整代码(包括测试用例)可以在 GitHub 上找到(模拟地址:github.com/yourname/minidb)。构建完成后,你会对任何一个成熟的数据库的设计决策有更深刻的直觉。

本章评分
4.9  / 5  (3 评分)

💬 留言讨论