实现一个简易数据库
实现一个简易数据库
有一种说法在系统程序员中广泛流传:真正理解一个系统的唯一方式,是自己动手实现它。数据库是这个规律最典型的体现。你可以读完整本《数据库系统概念》,你可以背出 B+树的每一个插入规则,但直到你亲手写出一个能运行 SELECT * FROM users WHERE age > 18 的引擎,你才会真正理解为什么数据库的设计是现在这个样子。
这一章我们用 Go 构建一个完整的小型数据库引擎。不是玩具级别的内存哈希表,而是一个真正支持磁盘存储、崩溃恢复、事务隔离的系统。我们会经历:键值存储 → B+树索引 → WAL 日志 → MVCC 事务 → SQL 解析器。每一步都会揭示现代数据库引擎的一块核心机制。
Level 1 · 数据库引擎的本质
存储引擎 vs 查询引擎
当你运行 SELECT name FROM employees WHERE salary > 10000 ORDER BY name 时,数据库内部发生了两件完全不同的事情。
查询引擎(Query Engine)负责理解你的意图:解析 SQL 文本,生成执行计划,决定先过滤还是先排序,决定用哪个索引。查询引擎是个翻译器——把人类的语言翻译成机器的操作序列。
存储引擎(Storage Engine)负责数据的物理存取:数据在磁盘上怎么组织,怎么从磁盘读出来,怎么写回去,崩溃后怎么恢复。存储引擎是个物理系统——和硬件、操作系统直接打交道。
MySQL 的架构清晰地体现了这种分层:上层是优化器和执行器(查询引擎),下层是 InnoDB 或 MyISAM(存储引擎)。你甚至可以给同一张表换一个存储引擎,上层代码完全不需要修改。PostgreSQL 把存储引擎和查询引擎更紧密地耦合在一起,但逻辑层次依然清晰。
这种分层不是偶然的工程决策,而是数据库问题本质的体现:如何存数据 和 如何查数据 是两个根本不同的问题,复杂度来源不同,优化方向不同,变化速度也不同。
为什么这是最好的系统编程练习
构建数据库引擎会强迫你面对计算机系统中最核心的几个矛盾:
速度 vs 持久性。 内存快,磁盘慢。但内存是易失的,断电就没了。所有的崩溃恢复机制,本质上都是在解决这个矛盾。
并发 vs 一致性。 多个用户同时读写数据库。要让他们互不干扰,同时又要保证数据的一致性,这是并发控制的核心问题。MVCC 是现代数据库给出的答案之一。
空间 vs 时间。 B+树的每个节点应该多大?太小了节点太多,树太深,查找慢;太大了每次 I/O 读写的数据太多,浪费。缓冲池的大小怎么决定?这些都是空间和时间的权衡。
灵活性 vs 性能。 支持 SQL 意味着你需要解析和优化查询,这有额外的 CPU 开销。直接使用键值 API 更快,但表达能力受限。
每一个这样的矛盾,在数据库系统中都有对应的工程解法,而这些解法在其他系统(操作系统、分布式系统、文件系统)中也无处不在。学会数据库,实际上是学会了一套思考系统问题的框架。
Go 的适配性
Go 在这个项目中的优势是多方面的:
- 直接控制字节布局:
encoding/binary包让你精确控制磁盘上的字节格式,就像 C 的结构体内存布局一样清晰,但不会有 C 的内存安全问题。 - 接口抽象层次自然:存储引擎接口、编解码器接口、事务接口,用 Go interface 表达非常简洁。
- 并发原语完备:MVCC 需要版本链的并发读写,
sync.RWMutex、sync/atomic、channel 都会用到。 - 文件操作库完整:
os.File的ReadAt/WriteAt支持随机 I/O,是实现缓冲池的基础。
Level 2 · 核心机制原理
B+树:数据库索引的基石
B+树是绝大多数关系型数据库的默认索引结构。理解为什么选 B+树而不是其他结构,需要先理解数据库面对的 I/O 现实。
一次磁盘随机 I/O 的延迟是 5-10ms,而一次内存访问是 100ns,相差 5 万倍。对于一个有 100 万条记录的表,二叉搜索树(BST)的高度约为 20,每次查找需要 20 次 I/O,也就是 100-200ms。这完全不可接受。
B+树的解决方案是:让每个节点尽可能大,减少树的高度。典型的 B+树节点大小等于磁盘 I/O 的最小单位——页(Page),通常是 4KB 或 16KB(InnoDB 默认 16KB)。一个 16KB 的节点可以存储约 1000 个 64 位的键,所以一棵 3 层的 B+树可以索引 10 亿条记录,而只需要 3 次 I/O。
B+树的节点结构
B+树分为内部节点(Internal Node)和叶节点(Leaf Node):
内部节点:
+--------+----+--------+----+--------+
| ptr[0] | k1 | ptr[1] | k2 | ptr[2] |
+--------+----+--------+----+--------+
叶节点:
+-----+-----+-----+------+
| k1 | k2 | k3 | next | → 指向下一个叶节点(支持范围扫描)
+-----+-----+-----+------+
| v1 | v2 | v3 |
内部节点只存键和子节点指针,不存值。所有的实际数据(或主键,在 InnoDB 的二级索引中)都在叶节点。叶节点之间用指针串成双向链表,这使得范围扫描变得极其高效——找到起点后,顺着链表走就行了,完全不需要回溯树。
插入与分裂
当一个叶节点满了,B+树会把它分裂成两个节点,把中间的键提升到父节点。如果父节点也满了,继续向上分裂,最坏情况下一路分裂到根节点,这时树增高一层。
分裂前(节点满,capacity=3):
[10, 20, 30]
插入 25 后分裂:
父节点获得 25
[10, 20] [25, 30]
删除与合并
删除后,如果节点的键数少于最低阈值(通常是 capacity/2),有两个选项:
- 借用:从相邻兄弟节点借一个键(通过父节点中转)。
- 合并:和兄弟节点合并,父节点中的分隔键下移。
合并会触发父节点的键减少,可能引发递归合并,最坏情况下树降低一层。
WAL:顺序写实现崩溃恢复
数据库最基本的承诺之一是持久性(Durability):事务一旦提交,数据不会因为崩溃而丢失。
最直接的实现方式是:提交时把脏页(被修改但尚未写回磁盘的页)全部刷新到磁盘。但这有两个问题:随机 I/O 极慢(在磁盘上,随机写性能约为顺序写的 1/100),而且如果修改跨越多个页,崩溃在中间会导致数据不一致。
WAL(Write-Ahead Log,预写式日志)的解决方案极其优雅:
核心规则:在修改磁盘上的任何数据页之前,必须先把对应的日志记录写入 WAL 文件。
WAL 的写入是纯顺序追加(append-only),顺序写性能极高。数据页可以延迟写,只要 WAL 已经落盘,崩溃后就能从 WAL 重放所有操作,恢复到一致状态。
WAL 日志记录格式:
+----------+----------+---------+---------+----------+
| LSN (8B) | TxID (8B)| PageID | Offset | Data |
+----------+----------+---------+---------+----------+
LSN(Log Sequence Number)是单调递增的序列号,用来确定日志记录的顺序,也用来标记数据页的"最新程度"。每个数据页的头部都存储着最后一次修改它的 WAL 记录的 LSN(称为 pageLSN)。
恢复流程(ARIES 算法简化版):
- 分析阶段:从最近的检查点(Checkpoint)开始扫描 WAL,确定哪些事务提交了,哪些没有。
- 重做阶段:重放所有 WAL 记录,把数据库恢复到崩溃前的状态(包括未提交的事务的修改)。
- 撤销阶段:回滚所有未提交的事务(通过 WAL 中的补偿日志记录)。
MVCC:快照隔离的实现
读写不互相阻塞——这是 MVCC(Multi-Version Concurrency Control,多版本并发控制)最重要的特性。它让读操作看到一个一致的历史快照,而不阻塞写操作。
版本链
每一行数据都有一条版本链,链上的每个版本记录了:
- 数据内容
- 创建这个版本的事务 ID(
begin_ts) - 删除这个版本的事务 ID(
end_ts,NULL 表示当前版本)
行 id=1 的版本链:
v3: name="Charlie", begin_ts=200, end_ts=NULL ← 最新版本
v2: name="Bob", begin_ts=100, end_ts=200
v1: name="Alice", begin_ts=1, end_ts=100
快照读
每个事务开始时,记录当前的事务 ID 作为快照时间戳(snap_ts)。读操作只能看到满足以下条件的版本:
begin_ts <= snap_ts(版本在事务开始之前已存在)end_ts > snap_ts或end_ts == NULL(版本在事务开始之前未被删除)
这样,事务 A 在整个生命周期内看到的是一个一致的快照——就算其他事务在 A 运行期间修改了数据,A 也看不到这些修改。
写操作
写操作不修改现有版本,而是创建新版本(INSERT)或标记旧版本失效(DELETE/UPDATE = DELETE + INSERT)。这就是"多版本"的含义。
缓冲池:页面缓存管理
数据库不直接读写磁盘,而是通过缓冲池(Buffer Pool)。缓冲池是内存中的一块区域,缓存了最近访问的磁盘页。
缓冲池结构:
+---------+---------+---------+---------+
| Frame 0 | Frame 1 | Frame 2 | Frame 3 | ← 固定大小的内存块
+---------+---------+---------+---------+
↓
页表(Page Table): PageID → Frame 号
替换策略(LRU/Clock): 决定驱逐哪个 Frame
当需要读一个页时:
- 查页表,如果命中(缓冲池中有),直接返回。
- 未命中,选一个牺牲帧(Victim Frame),如果被选中的帧是脏页,先把它刷到磁盘。
- 从磁盘读取需要的页,放入牺牲帧,更新页表。
替换策略决定选哪个帧作为牺牲者。最简单的是 LRU(最近最少使用),实际的数据库(如 InnoDB)使用改进的 LRU——分为新/旧两个子列表,防止全表扫描把热数据驱逐出去。
Level 3 · 从零构建
第一步:页式存储和缓冲池
我们的数据库以 4KB 为一个页(Page)进行磁盘管理。每个页有唯一的 PageID。
package storage
import (
"encoding/binary"
"fmt"
"os"
"sync"
)
const PageSize = 4096
type PageID uint32
type Page [PageSize]byte
// DiskManager 管理磁盘文件,以页为单位读写
type DiskManager struct {
file *os.File
numPages PageID
mu sync.Mutex
}
func NewDiskManager(path string) (*DiskManager, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
info, _ := f.Stat()
numPages := PageID(info.Size() / PageSize)
return &DiskManager{file: f, numPages: numPages}, nil
}
func (dm *DiskManager) ReadPage(id PageID, p *Page) error {
_, err := dm.file.ReadAt(p[:], int64(id)*PageSize)
return err
}
func (dm *DiskManager) WritePage(id PageID, p *Page) error {
_, err := dm.file.WriteAt(p[:], int64(id)*PageSize)
return err
}
func (dm *DiskManager) AllocatePage() PageID {
dm.mu.Lock()
defer dm.mu.Unlock()
id := dm.numPages
dm.numPages++
// 在文件末尾写一个空页,确保文件扩展
var empty Page
dm.file.WriteAt(empty[:], int64(id)*PageSize)
return id
}
// BufferPool 缓冲池:缓存磁盘页,支持 LRU 替换
type frame struct {
page Page
pageID PageID
dirty bool
pinCount int
}
type BufferPool struct {
dm *DiskManager
frames []frame
pageTable map[PageID]int // pageID → frame index
freeList []int // 空闲 frame 列表
lruList []int // LRU 顺序(头部最近使用)
mu sync.Mutex
}
func NewBufferPool(dm *DiskManager, poolSize int) *BufferPool {
bp := &BufferPool{
dm: dm,
frames: make([]frame, poolSize),
pageTable: make(map[PageID]int),
freeList: make([]int, poolSize),
}
for i := range bp.freeList {
bp.freeList[i] = i
}
return bp
}
// FetchPage 获取一个页面(pin 住,防止被驱逐)
func (bp *BufferPool) FetchPage(id PageID) (*Page, error) {
bp.mu.Lock()
defer bp.mu.Unlock()
// 命中缓冲池
if fi, ok := bp.pageTable[id]; ok {
bp.frames[fi].pinCount++
bp.moveFront(&bp.lruList, fi)
return &bp.frames[fi].page, nil
}
// 找空闲 frame 或驱逐
fi, err := bp.evict()
if err != nil {
return nil, err
}
// 从磁盘读页
f := &bp.frames[fi]
if err := bp.dm.ReadPage(id, &f.page); err != nil {
return nil, fmt.Errorf("read page %d: %w", id, err)
}
f.pageID = id
f.dirty = false
f.pinCount = 1
bp.pageTable[id] = fi
bp.lruList = append([]int{fi}, bp.lruList...)
return &f.page, nil
}
func (bp *BufferPool) UnpinPage(id PageID, dirty bool) {
bp.mu.Lock()
defer bp.mu.Unlock()
if fi, ok := bp.pageTable[id]; ok {
bp.frames[fi].pinCount--
if dirty {
bp.frames[fi].dirty = true
}
}
}
func (bp *BufferPool) FlushPage(id PageID) error {
bp.mu.Lock()
defer bp.mu.Unlock()
fi, ok := bp.pageTable[id]
if !ok {
return nil
}
f := &bp.frames[fi]
if f.dirty {
if err := bp.dm.WritePage(id, &f.page); err != nil {
return err
}
f.dirty = false
}
return nil
}
func (bp *BufferPool) evict() (int, error) {
if len(bp.freeList) > 0 {
fi := bp.freeList[0]
bp.freeList = bp.freeList[1:]
return fi, nil
}
// LRU 驱逐:从 lruList 尾部找 pinCount=0 的 frame
for i := len(bp.lruList) - 1; i >= 0; i-- {
fi := bp.lruList[i]
f := &bp.frames[fi]
if f.pinCount == 0 {
if f.dirty {
if err := bp.dm.WritePage(f.pageID, &f.page); err != nil {
return 0, err
}
}
delete(bp.pageTable, f.pageID)
bp.lruList = append(bp.lruList[:i], bp.lruList[i+1:]...)
return fi, nil
}
}
return 0, fmt.Errorf("buffer pool full: all pages are pinned")
}
func (bp *BufferPool) moveFront(list *[]int, val int) {
for i, v := range *list {
if v == val {
*list = append([]int{val}, append((*list)[:i], (*list)[i+1:]...)...)
return
}
}
}
// 将 uint64 写入 Page 的指定偏移
func PutUint64(p *Page, offset int, v uint64) {
binary.LittleEndian.PutUint64(p[offset:], v)
}
func GetUint64(p *Page, offset int) uint64 {
return binary.LittleEndian.Uint64(p[offset:])
}
第二步:B+树键值存储
package btree
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/yourname/minidb/storage"
)
// 叶节点格式(简化):
// [numKeys:2][key0:8][val0:8]...[keyN:8][valN:8][nextLeaf:4]
// 内部节点格式:
// [numKeys:2][child0:4][key0:8][child1:4][key1:8]...[childN:4]
const (
PageSize = storage.PageSize
MaxKeys = 200 // 每个节点最多 key 数,可以根据页大小调整
LeafFlag = 0x01 // 页头部标志位,区分叶/内部节点
)
type Key uint64
type Value uint64
type PageID = storage.PageID
type BTree struct {
bp *storage.BufferPool
dm *storage.DiskManager
rootID PageID
}
func NewBTree(bp *storage.BufferPool, dm *storage.DiskManager) *BTree {
// 第 0 页是元数据页(存 rootID)
rootID := dm.AllocatePage() // 分配根节点页
bt := &BTree{bp: bp, dm: dm, rootID: rootID}
bt.initLeafPage(rootID)
return bt
}
func (bt *BTree) initLeafPage(id PageID) {
p, _ := bt.bp.FetchPage(id)
p[0] = LeafFlag // 标记为叶节点
binary.LittleEndian.PutUint16(p[1:], 0) // numKeys = 0
bt.bp.UnpinPage(id, true)
}
func (bt *BTree) Get(key Key) (Value, bool) {
leafID := bt.findLeaf(bt.rootID, key)
p, err := bt.bp.FetchPage(leafID)
if err != nil {
return 0, false
}
defer bt.bp.UnpinPage(leafID, false)
numKeys := int(binary.LittleEndian.Uint16(p[1:]))
// 从偏移 3 开始: [key:8][val:8] 每对 16 字节
for i := 0; i < numKeys; i++ {
offset := 3 + i*16
k := Key(binary.LittleEndian.Uint64(p[offset:]))
if k == key {
v := Value(binary.LittleEndian.Uint64(p[offset+8:]))
return v, true
}
}
return 0, false
}
func (bt *BTree) Put(key Key, val Value) error {
leafID := bt.findLeaf(bt.rootID, key)
newKey, newPageID, split := bt.insertLeaf(leafID, key, val)
if split {
// 向上传递分裂
bt.insertIntoParent(bt.rootID, leafID, newKey, newPageID)
}
return nil
}
// insertLeaf 向叶节点插入,如果满了则分裂
// 返回:分裂时提升的键,新右节点的 PageID,是否分裂
func (bt *BTree) insertLeaf(id PageID, key Key, val Value) (Key, PageID, bool) {
p, _ := bt.bp.FetchPage(id)
numKeys := int(binary.LittleEndian.Uint16(p[1:]))
if numKeys < MaxKeys {
// 找到插入位置并移位
pos := 0
for pos < numKeys {
k := Key(binary.LittleEndian.Uint64(p[3+pos*16:]))
if k >= key {
break
}
pos++
}
// 向右移
copy(p[3+(pos+1)*16:], p[3+pos*16:3+numKeys*16])
binary.LittleEndian.PutUint64(p[3+pos*16:], uint64(key))
binary.LittleEndian.PutUint64(p[3+pos*16+8:], uint64(val))
binary.LittleEndian.PutUint16(p[1:], uint16(numKeys+1))
bt.bp.UnpinPage(id, true)
return 0, 0, false
}
// 叶节点已满,需要分裂
// 分裂点:mid = MaxKeys / 2
mid := MaxKeys / 2
newID := bt.dm.AllocatePage()
newPage, _ := bt.bp.FetchPage(newID)
newPage[0] = LeafFlag
// 右半边复制到新页
rightCount := numKeys - mid
copy(newPage[3:], p[3+mid*16:3+numKeys*16])
binary.LittleEndian.PutUint16(newPage[1:], uint16(rightCount))
// 原页只保留左半边
binary.LittleEndian.PutUint16(p[1:], uint16(mid))
// 更新叶链表:新页 next = 原页 next,原页 next = 新页
origNext := PageID(binary.LittleEndian.Uint32(p[PageSize-4:]))
binary.LittleEndian.PutUint32(newPage[PageSize-4:], uint32(origNext))
binary.LittleEndian.PutUint32(p[PageSize-4:], uint32(newID))
// 提升的键是新右页的第一个键
promoteKey := Key(binary.LittleEndian.Uint64(newPage[3:]))
bt.bp.UnpinPage(id, true)
bt.bp.UnpinPage(newID, true)
// 插入到正确的半边
if key < promoteKey {
bt.insertLeaf(id, key, val)
} else {
bt.insertLeaf(newID, key, val)
}
return promoteKey, newID, true
}
func (bt *BTree) findLeaf(nodeID PageID, key Key) PageID {
p, _ := bt.bp.FetchPage(nodeID)
if p[0] == LeafFlag {
bt.bp.UnpinPage(nodeID, false)
return nodeID
}
// 内部节点:二分查找子节点
numKeys := int(binary.LittleEndian.Uint16(p[1:]))
child := PageID(binary.LittleEndian.Uint32(p[3:])) // child[0]
for i := 0; i < numKeys; i++ {
// 格式: child[i]:4, key[i]:8
k := Key(binary.LittleEndian.Uint64(p[3+4+i*12:]))
if key >= k {
child = PageID(binary.LittleEndian.Uint32(p[3+4+i*12+8:]))
}
}
bt.bp.UnpinPage(nodeID, false)
return bt.findLeaf(child, key)
}
// RangeScan 范围扫描,返回 [start, end] 内的所有键值对
func (bt *BTree) RangeScan(start, end Key, fn func(Key, Value)) {
leafID := bt.findLeaf(bt.rootID, start)
for leafID != 0 {
p, _ := bt.bp.FetchPage(leafID)
numKeys := int(binary.LittleEndian.Uint16(p[1:]))
finished := false
for i := 0; i < numKeys; i++ {
k := Key(binary.LittleEndian.Uint64(p[3+i*16:]))
v := Value(binary.LittleEndian.Uint64(p[3+i*16+8:]))
if k > end {
finished = true
break
}
if k >= start {
fn(k, v)
}
}
next := PageID(binary.LittleEndian.Uint32(p[PageSize-4:]))
bt.bp.UnpinPage(leafID, false)
if finished || next == 0 {
break
}
leafID = next
}
}
// insertIntoParent 是简化版,完整实现需要维护父节点指针
// 这里仅演示根分裂场景
func (bt *BTree) insertIntoParent(rootID, leftID PageID, key Key, rightID PageID) {
// 当 leftID 是 root 时:创建新根
if leftID == bt.rootID {
newRootID := bt.dm.AllocatePage()
p, _ := bt.bp.FetchPage(newRootID)
p[0] = 0 // 内部节点标志
binary.LittleEndian.PutUint16(p[1:], 1) // 1 个 key
binary.LittleEndian.PutUint32(p[3:], uint32(leftID))
binary.LittleEndian.PutUint64(p[7:], uint64(key))
binary.LittleEndian.PutUint32(p[15:], uint32(rightID))
bt.bp.UnpinPage(newRootID, true)
bt.rootID = newRootID
}
// 完整实现需要向上传递,这里从略
}
第三步:WAL 崩溃恢复
package wal
import (
"encoding/binary"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
)
// LogRecord WAL 日志记录
type LogRecord struct {
LSN uint64
TxID uint64
Type uint8 // 0=Update, 1=Commit, 2=Abort, 3=Begin
PageID uint32
Offset uint32
Data []byte
}
const (
RecordUpdate = 0
RecordCommit = 1
RecordAbort = 2
RecordBegin = 3
)
type WAL struct {
file *os.File
nextLSN uint64
mu sync.Mutex
}
func NewWAL(path string) (*WAL, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
w := &WAL{file: f, nextLSN: 1}
return w, nil
}
// Append 追加写 WAL 记录,返回 LSN
func (w *WAL) Append(rec *LogRecord) (uint64, error) {
w.mu.Lock()
defer w.mu.Unlock()
lsn := atomic.AddUint64(&w.nextLSN, 1) - 1
rec.LSN = lsn
// 序列化: [total_len:4][LSN:8][TxID:8][Type:1][PageID:4][Offset:4][DataLen:4][Data...]
dataLen := len(rec.Data)
totalLen := 4 + 8 + 8 + 1 + 4 + 4 + 4 + dataLen
buf := make([]byte, totalLen+4) // 前 4 字节是 total_len
binary.LittleEndian.PutUint32(buf[0:], uint32(totalLen))
binary.LittleEndian.PutUint64(buf[4:], lsn)
binary.LittleEndian.PutUint64(buf[12:], rec.TxID)
buf[20] = rec.Type
binary.LittleEndian.PutUint32(buf[21:], rec.PageID)
binary.LittleEndian.PutUint32(buf[25:], rec.Offset)
binary.LittleEndian.PutUint32(buf[29:], uint32(dataLen))
copy(buf[33:], rec.Data)
if _, err := w.file.Write(buf); err != nil {
return 0, fmt.Errorf("wal append: %w", err)
}
// fsync 确保落盘
if err := w.file.Sync(); err != nil {
return 0, fmt.Errorf("wal sync: %w", err)
}
return lsn, nil
}
// Recover 重放 WAL,返回已提交事务的操作列表
func (w *WAL) Recover() ([]LogRecord, error) {
w.file.Seek(0, io.SeekStart)
var records []LogRecord
committedTx := make(map[uint64]bool)
var allRecords []LogRecord
for {
var totalLen uint32
if err := binary.Read(w.file, binary.LittleEndian, &totalLen); err != nil {
if err == io.EOF {
break
}
return nil, err
}
buf := make([]byte, totalLen)
if _, err := io.ReadFull(w.file, buf); err != nil {
break // 不完整记录,崩溃点
}
rec := LogRecord{
LSN: binary.LittleEndian.Uint64(buf[0:]),
TxID: binary.LittleEndian.Uint64(buf[8:]),
Type: buf[16],
PageID: binary.LittleEndian.Uint32(buf[17:]),
Offset: binary.LittleEndian.Uint32(buf[21:]),
}
dataLen := binary.LittleEndian.Uint32(buf[25:])
rec.Data = make([]byte, dataLen)
copy(rec.Data, buf[29:])
allRecords = append(allRecords, rec)
if rec.Type == RecordCommit {
committedTx[rec.TxID] = true
}
}
// 只重放已提交事务的操作
for _, rec := range allRecords {
if rec.Type == RecordUpdate && committedTx[rec.TxID] {
records = append(records, rec)
}
}
return records, nil
}
第四步:MVCC 事务
package mvcc
import (
"fmt"
"sync"
"sync/atomic"
)
// Version 代表一行的一个版本
type Version struct {
BeginTS uint64
EndTS uint64 // 0 表示"当前有效"
Value []byte
Next *Version // 旧版本链
}
// Row 代表一行数据
type Row struct {
Key uint64
Head *Version // 最新版本
mu sync.RWMutex
}
// MVCC 事务管理器
type MVCC struct {
nextTS uint64 // 单调递增的事务时间戳
rows sync.Map // key(uint64) → *Row
}
func NewMVCC() *MVCC {
return &MVCC{}
}
func (m *MVCC) Begin() uint64 {
return atomic.AddUint64(&m.nextTS, 1)
}
// Read 快照读:读取时间戳 ts 之前提交的最新版本
func (m *MVCC) Read(key, snapTS uint64) ([]byte, bool) {
val, ok := m.rows.Load(key)
if !ok {
return nil, false
}
row := val.(*Row)
row.mu.RLock()
defer row.mu.RUnlock()
for v := row.Head; v != nil; v = v.Next {
if v.BeginTS <= snapTS && (v.EndTS == 0 || v.EndTS > snapTS) {
result := make([]byte, len(v.Value))
copy(result, v.Value)
return result, true
}
}
return nil, false
}
// Write 写入一个新版本(commitTS 是提交时间戳)
func (m *MVCC) Write(key, commitTS uint64, value []byte) error {
actual, _ := m.rows.LoadOrStore(key, &Row{Key: key})
row := actual.(*Row)
row.mu.Lock()
defer row.mu.Unlock()
newVer := &Version{
BeginTS: commitTS,
EndTS: 0,
Value: append([]byte{}, value...),
Next: row.Head,
}
// 标记旧的当前版本为结束
if row.Head != nil && row.Head.EndTS == 0 {
row.Head.EndTS = commitTS
}
row.Head = newVer
return nil
}
// Delete 标记删除:把最新版本的 EndTS 设置为 commitTS
func (m *MVCC) Delete(key, commitTS uint64) error {
val, ok := m.rows.Load(key)
if !ok {
return fmt.Errorf("key %d not found", key)
}
row := val.(*Row)
row.mu.Lock()
defer row.mu.Unlock()
if row.Head != nil && row.Head.EndTS == 0 {
row.Head.EndTS = commitTS
}
return nil
}
// Tx 代表一个完整的 MVCC 事务
type Tx struct {
mvcc *MVCC
snapTS uint64
writes []writeIntent
committed bool
}
type writeIntent struct {
key uint64
value []byte // nil 表示删除
}
func (m *MVCC) BeginTx() *Tx {
return &Tx{mvcc: m, snapTS: m.Begin()}
}
func (tx *Tx) Get(key uint64) ([]byte, bool) {
return tx.mvcc.Read(key, tx.snapTS)
}
func (tx *Tx) Set(key uint64, value []byte) {
tx.writes = append(tx.writes, writeIntent{key: key, value: value})
}
func (tx *Tx) Del(key uint64) {
tx.writes = append(tx.writes, writeIntent{key: key, value: nil})
}
func (tx *Tx) Commit() error {
commitTS := atomic.AddUint64(&tx.mvcc.nextTS, 1)
for _, w := range tx.writes {
if w.value == nil {
tx.mvcc.Delete(w.key, commitTS)
} else {
tx.mvcc.Write(w.key, commitTS, w.value)
}
}
tx.committed = true
return nil
}
func (tx *Tx) Rollback() {
tx.writes = nil
}
第五步:简易 SQL 解析器
package sql
import (
"fmt"
"strings"
)
// 支持 SELECT/INSERT/UPDATE/DELETE 的极简 SQL 解析器
type StatementType int
const (
StmtSelect StatementType = iota
StmtInsert
StmtUpdate
StmtDelete
)
type Statement struct {
Type StatementType
Table string
Columns []string
Values []string
Where *WhereClause
SetClauses []SetClause
}
type WhereClause struct {
Column string
Op string // "=", ">", "<", ">=", "<="
Value string
}
type SetClause struct {
Column string
Value string
}
// Parse 解析一条 SQL 语句(简化版,不处理所有边界情况)
func Parse(query string) (*Statement, error) {
tokens := tokenize(query)
if len(tokens) == 0 {
return nil, fmt.Errorf("empty query")
}
switch strings.ToUpper(tokens[0]) {
case "SELECT":
return parseSelect(tokens)
case "INSERT":
return parseInsert(tokens)
case "UPDATE":
return parseUpdate(tokens)
case "DELETE":
return parseDelete(tokens)
default:
return nil, fmt.Errorf("unknown statement type: %s", tokens[0])
}
}
func parseSelect(tokens []string) (*Statement, error) {
// SELECT col1, col2 FROM table WHERE col = val
stmt := &Statement{Type: StmtSelect}
i := 1
for i < len(tokens) && strings.ToUpper(tokens[i]) != "FROM" {
col := strings.Trim(tokens[i], ", ")
if col != "" && col != "," {
stmt.Columns = append(stmt.Columns, col)
}
i++
}
if i >= len(tokens) {
return nil, fmt.Errorf("missing FROM clause")
}
i++ // skip FROM
if i >= len(tokens) {
return nil, fmt.Errorf("missing table name")
}
stmt.Table = tokens[i]
i++
if i < len(tokens) && strings.ToUpper(tokens[i]) == "WHERE" {
w, err := parseWhere(tokens[i+1:])
if err != nil {
return nil, err
}
stmt.Where = w
}
return stmt, nil
}
func parseInsert(tokens []string) (*Statement, error) {
// INSERT INTO table (col1, col2) VALUES (val1, val2)
if len(tokens) < 5 || strings.ToUpper(tokens[1]) != "INTO" {
return nil, fmt.Errorf("invalid INSERT syntax")
}
stmt := &Statement{Type: StmtInsert, Table: tokens[2]}
// 简化:不解析列名,直接解析 VALUES 后的值
valIdx := -1
for i, t := range tokens {
if strings.ToUpper(t) == "VALUES" {
valIdx = i
break
}
}
if valIdx == -1 {
return nil, fmt.Errorf("missing VALUES keyword")
}
for i := valIdx + 1; i < len(tokens); i++ {
v := strings.Trim(tokens[i], "(),\"' ")
if v != "" {
stmt.Values = append(stmt.Values, v)
}
}
return stmt, nil
}
func parseUpdate(tokens []string) (*Statement, error) {
// UPDATE table SET col1=val1 WHERE col2=val2
if len(tokens) < 4 {
return nil, fmt.Errorf("invalid UPDATE syntax")
}
stmt := &Statement{Type: StmtUpdate, Table: tokens[1]}
i := 3 // skip SET
for i < len(tokens) && strings.ToUpper(tokens[i]) != "WHERE" {
part := strings.Trim(tokens[i], ", ")
if eq := strings.Index(part, "="); eq != -1 {
stmt.SetClauses = append(stmt.SetClauses, SetClause{
Column: strings.TrimSpace(part[:eq]),
Value: strings.Trim(part[eq+1:], "'\""),
})
}
i++
}
if i < len(tokens) && strings.ToUpper(tokens[i]) == "WHERE" {
w, err := parseWhere(tokens[i+1:])
if err != nil {
return nil, err
}
stmt.Where = w
}
return stmt, nil
}
func parseDelete(tokens []string) (*Statement, error) {
// DELETE FROM table WHERE col=val
if len(tokens) < 3 {
return nil, fmt.Errorf("invalid DELETE syntax")
}
stmt := &Statement{Type: StmtDelete, Table: tokens[2]}
for i, t := range tokens {
if strings.ToUpper(t) == "WHERE" && i+1 < len(tokens) {
w, err := parseWhere(tokens[i+1:])
if err != nil {
return nil, err
}
stmt.Where = w
break
}
}
return stmt, nil
}
func parseWhere(tokens []string) (*WhereClause, error) {
// col op val
if len(tokens) < 3 {
return nil, fmt.Errorf("incomplete WHERE clause")
}
ops := []string{">=", "<=", "=", ">", "<"}
combined := strings.Join(tokens, " ")
for _, op := range ops {
if idx := strings.Index(combined, op); idx != -1 {
col := strings.TrimSpace(combined[:idx])
val := strings.TrimSpace(combined[idx+len(op):])
return &WhereClause{Column: col, Op: op, Value: strings.Trim(val, "'\"")}, nil
}
}
return nil, fmt.Errorf("no operator found in WHERE clause")
}
func tokenize(query string) []string {
// 简单空格分词(不处理字符串字面量中的空格)
return strings.Fields(strings.ReplaceAll(query, ",", " , "))
}
// 使用示例
func Example() {
queries := []string{
"SELECT name, age FROM users WHERE age > 18",
"INSERT INTO users VALUES (1, 'Alice', 30)",
"UPDATE users SET name='Bob' WHERE id=1",
"DELETE FROM users WHERE id=1",
}
for _, q := range queries {
stmt, err := Parse(q)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Table: %s, Type: %d\n", stmt.Table, stmt.Type)
}
}
Level 4 · 进阶与边界
无锁读与 MVCC 的深度权衡
MVCC 的无锁读是有代价的。版本链越长,读操作需要遍历的版本就越多。在高写入场景下,版本链可能变得很长,导致读性能退化。
垃圾回收(Vacuum)机制定期清理过期版本。PostgreSQL 的 VACUUM 是著名的设计难题——它需要找到"最老的活跃事务"(oldest active transaction),早于这个时间戳的所有非当前版本都可以安全删除。
// Compaction:删除 olderThan 之前的历史版本
func (m *MVCC) Compaction(olderThan uint64) {
m.rows.Range(func(key, val interface{}) bool {
row := val.(*Row)
row.mu.Lock()
defer row.mu.Unlock()
// 找到第一个 BeginTS <= olderThan 的版本,截断之后的链
cur := row.Head
for cur != nil && cur.BeginTS > olderThan {
cur = cur.Next
}
if cur != nil {
cur.Next = nil // 截断旧版本链
}
return true
})
}
LSM 树:写优化的 B+树替代方案
B+树对写操作不够友好:每次写都需要随机 I/O 更新一个或多个页。LSM 树(Log-Structured Merge Tree)把所有写都变成顺序写,代价是读操作需要合并多个文件。
LSM 树结构:
MemTable(内存,红黑树) ──写满后刷盘──► L0 SSTable(不可变,有序)
L0 SSTables ──Compaction──► L1 SSTables
L1 SSTables ──Compaction──► L2 SSTables
...
RocksDB、BadgerDB、LevelDB 都使用 LSM 树。写密集型场景(日志、时序数据)LSM 表现更好;读多写少场景(OLTP)B+树更合适。这就是 InnoDB 和 MyISAM 选择 B+树,而 Cassandra 和 HBase 选择 LSM 树的根本原因。
与 BoltDB 和 BadgerDB 的对比
BoltDB:纯 B+树实现,写事务通过 Copy-on-Write(COW)实现,不使用 WAL——修改时复制整条路径上的所有节点,原始页保持不变。提交时更新根页指针(原子操作)。这种设计使读事务完全无锁,但写吞吐量受限于 B+树的随机 I/O。
BadgerDB:LSM 树实现,Key 和 Value 分开存储(Key 在 LSM 树中,Value 在 Value Log 中)。这一设计(称为 WiscKey)大幅减少了 LSM 树的写放大(Write Amplification),对大 Value 场景性能提升显著。
性能对比(粗略参考,取决于工作负载):
| 场景 | B+树(BoltDB) | LSM(BadgerDB) |
|---|---|---|
| 随机读 | 快 | 慢(需合并多层) |
| 顺序读 | 快 | 快 |
| 随机写 | 慢(随机 I/O) | 快(顺序追加) |
| 磁盘空间 | 低碎片 | 有空间放大 |
嵌入式库 vs 独立服务器
我们实现的引擎适合作为嵌入式库(像 SQLite、BoltDB 那样),直接链接到应用进程中。这消除了网络 I/O 延迟,适合单机场景。
要变成独立服务器,需要:
- 网络层:监听 TCP 连接,实现协议(MySQL 协议或自定义二进制协议)。
- 连接管理:每个连接一个 goroutine,连接池管理。
- 事务协调:跨连接的事务必须串行化提交(全局事务 ID 生成器)。
- 认证与权限:用户管理、表级/行级权限。
SQLite 选择了嵌入式,MySQL/PostgreSQL 选择了服务器模式,各有其设计哲学。
这个项目的完整代码(包括测试用例)可以在 GitHub 上找到(模拟地址:github.com/yourname/minidb)。构建完成后,你会对任何一个成熟的数据库的设计决策有更深刻的直觉。