各位开发者、技术爱好者们,大家好!
今天,我们将共同踏上一段激动人心的旅程:从零开始,设计并实现一个具备事务能力的 B+ 树引擎。我们将以 Go 语言作为我们的工具,深入探讨其物理布局设计,并逐步构建起一个能够处理并发、保证数据一致性和持久性的存储核心。
为什么选择 B+ 树?因为它是现代数据库系统中索引和数据存储的基石。它的扇出特性使其非常适合磁盘I/O操作,而事务能力则是任何可靠数据库引擎不可或缺的灵魂。
我们将以讲座的形式,一步步揭示这个复杂系统的奥秘。
第一章:B+ 树引擎的基石 — 物理布局与页面管理
任何数据库引擎的起点都是对物理存储的抽象和管理。我们的 B+ 树引擎将数据存储在磁盘文件中,并将其划分为固定大小的“页面”(Page)。页面是磁盘 I/O 的最小单位。
1.1 文件系统交互与页面抽象
我们的引擎将与操作系统文件系统交互,通过 os.File 读取和写入数据。为了简化,我们暂时不深入探讨 mmap 等高级内存映射技术,而是直接使用 ReadAt 和 WriteAt 进行随机访问。
页面(Page) 是我们存储管理的核心单元。每个页面都有一个唯一的 PageID,并且拥有固定的大小,例如 4KB 或 8KB。固定大小的页面简化了磁盘空间的管理,也符合操作系统和硬件对 I/O 的优化。
// page.go
package engine
import (
"encoding/binary"
"fmt"
"os"
"sync"
)
// PageID 是页面的唯一标识符
type PageID uint64
// PageSize 定义了每个页面的大小(例如 4KB)
const PageSize = 4096
// PageType 定义了页面的类型
type PageType byte
const (
PageTypeEmpty PageType = 0x00 // 空页面,可用于分配
PageTypeBPlusInternal PageType = 0x01 // B+树内部节点
PageTypeBPlusLeaf PageType = 0x02 // B+树叶子节点
PageTypeMeta PageType = 0x03 // 元数据页面,例如存储B+树的根页ID
)
// Page 是内存中的页面结构体
type Page struct {
ID PageID // 页面ID
Data []byte // 实际的页面数据,大小为 PageSize
IsDirty bool // 页面是否被修改过
PinCount int32 // 页面被引用的次数
mu sync.Mutex // 保护页面状态的互斥锁
LSN uint64 // Log Sequence Number,用于恢复
}
// NewPage 创建一个新的页面实例
func NewPage(id PageID) *Page {
return &Page{
ID: id,
Data: make([]byte, PageSize),
IsDirty: false,
PinCount: 0,
LSN: 0, // 新页面初始LSN为0
}
}
// GetPageType 从页面数据中读取页面类型
func (p *Page) GetPageType() PageType {
return PageType(p.Data[0])
}
// SetPageType 设置页面类型
func (p *Page) SetPageType(t PageType) {
p.Data[0] = byte(t)
p.IsDirty = true
}
// GetFreeSpaceOffset 获取页面内自由空间起始偏移量
func (p *Page) GetFreeSpaceOffset() uint16 {
return binary.LittleEndian.Uint16(p.Data[1:3])
}
// SetFreeSpaceOffset 设置页面内自由空间起始偏移量
func (p *Page) SetFreeSpaceOffset(offset uint16) {
binary.LittleEndian.PutUint16(p.Data[1:3], offset)
p.IsDirty = true
}
// GetNumEntries 获取页面内条目数量
func (p *Page) GetNumEntries() uint16 {
return binary.LittleEndian.Uint16(p.Data[3:5])
}
// SetNumEntries 设置页面内条目数量
func (p *Page) SetNumEntries(num uint16) {
binary.LittleEndian.PutUint16(p.Data[3:5], num)
p.IsDirty = true
}
// GetLSN 获取页面的LSN
func (p *Page) GetLSN() uint64 {
return binary.LittleEndian.Uint64(p.Data[5:13])
}
// SetLSN 设置页面的LSN
func (p *Page) SetLSN(lsn uint64) {
binary.LittleEndian.PutUint64(p.Data[5:13], lsn)
p.IsDirty = true
}
// GetParentPageID 获取父页面ID
func (p *Page) GetParentPageID() PageID {
return PageID(binary.LittleEndian.Uint64(p.Data[13:21]))
}
// SetParentPageID 设置父页面ID
func (p *Page) SetParentPageID(parentID PageID) {
binary.LittleEndian.PutUint64(p.Data[13:21], uint64(parentID))
p.IsDirty = true
}
1.2 页面头部结构
每个页面都会有一个固定的头部,用于存储页面的元数据。这对于快速识别页面类型、管理页面内部空间至关重要。
| 偏移量 | 大小 (字节) | 描述 |
|---|---|---|
| 0 | 1 | 页面类型 (PageType) |
| 1 | 2 | 自由空间起始偏移量 |
| 3 | 2 | 页面内条目数量 |
| 5 | 8 | LSN (Log Sequence Number) |
| 13 | 8 | 父页面 ID (ParentPageID) |
| 21 | … | 其他元数据 (例如 B+ 树特定字段) |
1.3 缓冲区池 (Buffer Pool)
直接从磁盘读取或写入每个页面效率低下。缓冲区池是内存中的一个缓存区域,用于存储经常访问的页面。它通过减少磁盘 I/O 来显著提升性能。
缓冲区池的核心功能包括:
- 获取页面 (FetchPage): 从缓冲区池中查找页面;如果不在,则从磁盘加载。
- 创建新页面 (NewPage): 分配一个新的页面 ID,并将其添加到缓冲区池。
- 固定/解除固定 (Pin/Unpin): 当一个页面正在被使用时,我们“固定”它,防止它被驱逐。使用完毕后,“解除固定”。
- 刷回磁盘 (FlushPage): 将脏页面(被修改过的页面)写回磁盘。
- 驱逐策略 (Eviction Policy): 当缓冲区池满时,需要选择一个页面驱逐,通常使用 LRU (Least Recently Used) 或 Clock 算法。为简化,我们先关注 PinCount。
// buffer_pool.go
package engine
import (
"fmt"
"os"
"sync"
)
// BufferPool 管理内存中的页面缓存
type BufferPool struct {
file *os.File // 数据库文件句柄
pages map[PageID]*Page // 缓存的页面
replacer *LRUReplacer // 页面替换器 (LRU)
nextPageID PageID // 下一个可用的页面ID
poolSize int // 缓冲区池大小
mu sync.Mutex // 保护缓冲区池的互斥锁
}
// NewBufferPool 创建一个新的缓冲区池
func NewBufferPool(filePath string, poolSize int) (*BufferPool, error) {
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, fmt.Errorf("failed to open database file: %w", err)
}
// 初始化nextPageID,如果文件已存在,需要读取元数据页来获取
// 暂时简化处理,假设为新文件,或通过某种方式获取最大PageID
fi, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to stat file: %w", err)
}
nextPageID := PageID(fi.Size() / PageSize)
if nextPageID == 0 { // 文件为空,分配第一个页面作为元数据页
nextPageID = 1 // 从1开始,0可能保留
}
bp := &BufferPool{
file: file,
pages: make(map[PageID]*Page, poolSize),
replacer: NewLRUReplacer(poolSize), // 实际LRU实现会更复杂
nextPageID: nextPageID,
poolSize: poolSize,
}
// 尝试加载元数据页
metaPageID := PageID(0) // 约定0号页面为元数据页面
if nextPageID > 0 { // 文件非空,尝试读取元数据
metaPage, err := bp.FetchPage(metaPageID, false) // 首次加载不写入LSN
if err == nil && metaPage.GetPageType() == PageTypeMeta {
// 从元数据页读取根页ID等信息
// 假设元数据页存储了下一个可用的PageID
bp.nextPageID = PageID(binary.LittleEndian.Uint64(metaPage.Data[21:29])) // 示例偏移量
}
bp.UnpinPage(metaPageID, false) // 解除固定
} else {
// 文件为空,创建元数据页
metaPage, err := bp.NewPage()
if err != nil {
return nil, fmt.Errorf("failed to create meta page: %w", err)
}
metaPage.SetPageType(PageTypeMeta)
// 写入初始的nextPageID到元数据页
binary.LittleEndian.PutUint64(metaPage.Data[21:29], uint64(bp.nextPageID))
bp.UnpinPage(metaPage.ID, true) // 标记为脏
}
return bp, nil
}
// FetchPage 从缓冲区池中获取一个页面。如果页面不在池中,则从磁盘读取。
// pin 参数表示是否立即固定页面。
func (bp *BufferPool) FetchPage(id PageID, pin bool) (*Page, error) {
bp.mu.Lock()
defer bp.mu.Unlock()
if page, ok := bp.pages[id]; ok {
// 页面已在池中
if pin {
page.mu.Lock()
page.PinCount++
page.mu.Unlock()
bp.replacer.Pin(id) // 通知替换器页面被固定
}
return page, nil
}
// 页面不在池中,需要从磁盘读取
if len(bp.pages) >= bp.poolSize {
// 缓冲区池已满,需要驱逐一个页面
evictID, ok := bp.replacer.Victim()
if !ok {
return nil, fmt.Errorf("buffer pool is full and no page can be evicted")
}
evictPage := bp.pages[evictID]
if evictPage.IsDirty {
if err := bp.flushPageInternal(evictPage); err != nil {
return nil, fmt.Errorf("failed to flush evicted page %d: %w", evictID, err)
}
}
delete(bp.pages, evictID)
}
page := NewPage(id)
offset := int64(id) * PageSize
n, err := bp.file.ReadAt(page.Data, offset)
if err != nil && err != os.EOF { // 允许EOF,表示页面可能还未被写入
return nil, fmt.Errorf("failed to read page %d from disk: %w", id, err)
}
if n > 0 && n < PageSize { // 读取到的数据不足一个页面大小
return nil, fmt.Errorf("partial read for page %d: expected %d bytes, got %d", id, PageSize, n)
}
if pin {
page.PinCount = 1
bp.replacer.Pin(id)
} else {
page.PinCount = 0 // 未固定,可以立即被驱逐
}
bp.pages[id] = page
return page, nil
}
// NewPage 创建一个新的页面并分配一个ID,将其添加到缓冲区池。
func (bp *BufferPool) NewPage() (*Page, error) {
bp.mu.Lock()
defer bp.mu.Unlock()
if len(bp.pages) >= bp.poolSize {
evictID, ok := bp.replacer.Victim()
if !ok {
return nil, fmt.Errorf("buffer pool is full and no page can be evicted")
}
evictPage := bp.pages[evictID]
if evictPage.IsDirty {
if err := bp.flushPageInternal(evictPage); err != nil {
return nil, fmt.Errorf("failed to flush evicted page %d: %w", evictID, err)
}
}
delete(bp.pages, evictID)
}
id := bp.nextPageID
bp.nextPageID++
page := NewPage(id)
page.PinCount = 1 // 新创建的页面默认被固定
bp.pages[id] = page
bp.replacer.Pin(id) // 通知替换器页面被固定
// 更新元数据页中的 nextPageID
metaPage, err := bp.FetchPage(0, true) // 获取元数据页并固定
if err != nil {
return nil, fmt.Errorf("failed to fetch meta page to update nextPageID: %w", err)
}
binary.LittleEndian.PutUint64(metaPage.Data[21:29], uint64(bp.nextPageID))
bp.UnpinPage(metaPage.ID, true) // 解除固定并标记为脏
return page, nil
}
// UnpinPage 解除页面的固定。如果isDirty为true,则标记页面为脏。
func (bp *BufferPool) UnpinPage(id PageID, isDirty bool) {
bp.mu.Lock()
defer bp.mu.Unlock()
if page, ok := bp.pages[id]; ok {
page.mu.Lock()
defer page.mu.Unlock()
if page.PinCount > 0 {
page.PinCount--
if page.PinCount == 0 {
bp.replacer.Unpin(id) // 通知替换器页面可以被驱逐了
}
}
if isDirty {
page.IsDirty = true
}
}
}
// FlushPage 将指定页面刷回磁盘
func (bp *BufferPool) FlushPage(id PageID) error {
bp.mu.Lock()
defer bp.mu.Unlock()
if page, ok := bp.pages[id]; ok {
return bp.flushPageInternal(page)
}
return nil // 页面不在池中,无需刷新
}
// flushPageInternal 是内部的刷盘函数,不加锁
func (bp *BufferPool) flushPageInternal(page *Page) error {
page.mu.Lock()
defer page.mu.Unlock()
if !page.IsDirty {
return nil
}
offset := int64(page.ID) * PageSize
_, err := bp.file.WriteAt(page.Data, offset)
if err != nil {
return fmt.Errorf("failed to write page %d to disk: %w", page.ID, err)
}
page.IsDirty = false
return nil
}
// FlushAllPages 将所有脏页面刷回磁盘
func (bp *BufferPool) FlushAllPages() {
bp.mu.Lock()
defer bp.mu.Unlock()
for _, page := range bp.pages {
bp.flushPageInternal(page)
}
}
// Close 关闭缓冲区池,刷回所有脏页面并关闭文件
func (bp *BufferPool) Close() error {
bp.FlushAllPages()
return bp.file.Close()
}
// LRUReplacer 简单的LRU替换器接口和实现
// 实际生产环境会更复杂,这里仅为示例
type LRUReplacer struct {
poolSize int
list *list.List // 双向链表,最近使用的在前端
elements map[PageID]*list.Element
mu sync.Mutex
}
type lruEntry struct {
pageID PageID
}
func NewLRUReplacer(poolSize int) *LRUReplacer {
return &LRUReplacer{
poolSize: poolSize,
list: list.New(),
elements: make(map[PageID]*list.Element),
}
}
func (r *LRUReplacer) Pin(pageID PageID) {
r.mu.Lock()
defer r.mu.Unlock()
if elem, ok := r.elements[pageID]; ok {
r.list.Remove(elem)
delete(r.elements, pageID)
}
}
func (r *LRUReplacer) Unpin(pageID PageID) {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.elements[pageID]; !ok {
if r.list.Len() >= r.poolSize {
// LRU满了,需要驱逐一个,这里简化处理,实际要找可驱逐的
// 在此实现中,Unpin只是加入LRU,驱逐逻辑在BufferPool中
}
elem := r.list.PushFront(&lruEntry{pageID: pageID})
r.elements[pageID] = elem
}
}
func (r *LRUReplacer) Victim() (PageID, bool) {
r.mu.Lock()
defer r.mu.Unlock()
if r.list.Len() == 0 {
return 0, false
}
elem := r.list.Back()
pageID := elem.Value.(*lruEntry).pageID
r.list.Remove(elem)
delete(r.elements, pageID)
return pageID, true
}
// 确保引入 "container/list"
// import "container/list"
注意:LRUReplacer 的实现简化了驱逐逻辑,实际的 LRU 替换器需要更精细地管理固定页面和可驱逐页面。为了保持讲座核心,我们暂时不深入细节。
第二章:B+ 树结构与操作
B+ 树是一种多路平衡查找树,它的所有值都存储在叶子节点上,并且叶子节点通过链表连接,便于范围查询。内部节点只存储键和指向子节点的指针。
2.1 节点结构
B+ 树有两种主要类型的节点:内部节点(Internal Node)和叶子节点(Leaf Node)。它们都共享页面的基本结构,但有各自特定的元数据和数据布局。
内部节点 (Internal Node) 页面布局:
内部节点存储 (key, child_page_id) 对。
- 页面头部
- 键值对区域:
[key_length][key_data][child_page_id]… - 最右侧子节点指针 (通常在头部或固定位置)
叶子节点 (Leaf Node) 页面布局:
叶子节点存储 (key, value) 对,并且它们之间通过 next_leaf_page_id 和 prev_leaf_page_id 形成双向链表。
- 页面头部
NextLeafPageID(8 bytes)PrevLeafPageID(8 bytes)
- 键值对区域:
[key_length][key_data][value_length][value_data]… - 槽数组 (Slot Array):在页面尾部,指向实际键值对的偏移量和长度,便于删除和碎片整理。
// bptree.go
package engine
import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"sync"
)
// KeyType 定义键的类型,例如 []byte
type KeyType []byte
// ValueType 定义值的类型,例如 []byte
type ValueType []byte
// BPlusTree 是B+树引擎的核心结构
type BPlusTree struct {
bp *BufferPool
rootPageID PageID
metaPageID PageID // 存储B+树元数据(例如根页ID)的页面
mu sync.RWMutex // 保护树结构的读写锁
}
// NewBPlusTree 创建或加载一个B+树
func NewBPlusTree(bp *BufferPool) (*BPlusTree, error) {
tree := &BPlusTree{
bp: bp,
metaPageID: 0, // 约定0号页面为元数据页面
}
// 尝试加载元数据页
metaPage, err := bp.FetchPage(tree.metaPageID, true)
if err != nil {
return nil, fmt.Errorf("failed to fetch meta page: %w", err)
}
defer bp.UnpinPage(tree.metaPageID, false) // 立即解除固定
if metaPage.GetPageType() == PageTypeMeta {
// 从元数据页读取根页ID
tree.rootPageID = PageID(binary.LittleEndian.Uint64(metaPage.Data[29:37])) // 示例偏移量
if tree.rootPageID == 0 { // 根页ID为0表示树为空,需要初始化
return tree, tree.initTree()
}
} else {
// 0号页面不是元数据页,可能是新文件,需要初始化
metaPage.SetPageType(PageTypeMeta)
return tree, tree.initTree()
}
return tree, nil
}
// initTree 初始化B+树,创建第一个叶子节点作为根节点
func (t *BPlusTree) initTree() error {
leafPage, err := t.bp.NewPage()
if err != nil {
return fmt.Errorf("failed to create initial leaf page: %w", err)
}
defer t.bp.UnpinPage(leafPage.ID, true) // 标记为脏
leafPage.SetPageType(PageTypeBPlusLeaf)
leafPage.SetFreeSpaceOffset(uint16(PageSize - 20)) // 初始自由空间在页面尾部,预留槽数组空间
leafPage.SetNumEntries(0)
// 设置next/prev leaf page ID为0 (空)
binary.LittleEndian.PutUint64(leafPage.Data[PageHeaderSize:PageHeaderSize+8], 0) // NextLeafPageID
binary.LittleEndian.PutUint64(leafPage.Data[PageHeaderSize+8:PageHeaderSize+16], 0) // PrevLeafPageID
t.rootPageID = leafPage.ID
return t.updateMetaPage()
}
// updateMetaPage 更新元数据页中的B+树根页ID
func (t *BPlusTree) updateMetaPage() error {
metaPage, err := t.bp.FetchPage(t.metaPageID, true)
if err != nil {
return fmt.Errorf("failed to fetch meta page for update: %w", err)
}
defer t.bp.UnpinPage(t.metaPageID, true) // 标记为脏
binary.LittleEndian.PutUint64(metaPage.Data[29:37], uint64(t.rootPageID)) // 示例偏移量
return nil
}
// PageHeaderSize 页面头部大小
const PageHeaderSize = 21 // PageType + FreeSpaceOffset + NumEntries + LSN + ParentPageID
// LeafNodeHeaderSize 叶子节点特有头部大小
const LeafNodeHeaderSize = PageHeaderSize + 8 + 8 // Base Header + NextLeafPageID + PrevLeafPageID
// InternalNodeHeaderSize 内部节点特有头部大小
const InternalNodeHeaderSize = PageHeaderSize
// Helper functions for reading/writing leaf node specific headers
func getNextLeafPageID(page *Page) PageID {
return PageID(binary.LittleEndian.Uint64(page.Data[PageHeaderSize : PageHeaderSize+8]))
}
func setNextLeafPageID(page *Page, id PageID) {
binary.LittleEndian.PutUint64(page.Data[PageHeaderSize:PageHeaderSize+8], uint64(id))
page.IsDirty = true
}
func getPrevLeafPageID(page *Page) PageID {
return PageID(binary.LittleEndian.Uint64(page.Data[PageHeaderSize+8 : PageHeaderSize+16]))
}
func setPrevLeafPageID(page *Page, id PageID) {
binary.LittleEndian.PutUint64(page.Data[PageHeaderSize+8:PageHeaderSize+16], uint64(id))
page.IsDirty = true
}
// Record format in Leaf Page:
// [KeyLen (2 bytes)][KeyData][ValueLen (2 bytes)][ValueData]
// Slot Array: [RecordOffset (2 bytes)][RecordLength (2 bytes)]... (from end of page)
// GetRecordFromLeafPage 从叶子页面获取指定索引的记录
func GetRecordFromLeafPage(page *Page, idx uint16) (KeyType, ValueType, error) {
numEntries := page.GetNumEntries()
if idx >= numEntries {
return nil, nil, fmt.Errorf("record index out of bounds: %d >= %d", idx, numEntries)
}
slotOffset := PageSize - (idx+1)*4 // 槽数组从页面尾部开始
recordOffset := binary.LittleEndian.Uint16(page.Data[slotOffset : slotOffset+2])
recordLength := binary.LittleEndian.Uint16(page.Data[slotOffset+2 : slotOffset+4])
keyLen := binary.LittleEndian.Uint16(page.Data[recordOffset : recordOffset+2])
keyData := page.Data[recordOffset+2 : recordOffset+2+keyLen]
valueLen := binary.LittleEndian.Uint16(page.Data[recordOffset+2+keyLen : recordOffset+2+keyLen+2])
valueData := page.Data[recordOffset+2+keyLen+2 : recordOffset+2+keyLen+2+valueLen]
return keyData, valueData, nil
}
// FindSlotInLeafPage 在叶子页面中查找键的插入位置或存在位置
func FindSlotInLeafPage(page *Page, key KeyType) (int, bool) {
numEntries := page.GetNumEntries()
idx := sort.Search(int(numEntries), func(i int) bool {
k, _, _ := GetRecordFromLeafPage(page, uint16(i))
return bytes.Compare(k, key) >= 0
})
if idx < int(numEntries) {
k, _, _ := GetRecordFromLeafPage(page, uint16(idx))
if bytes.Equal(k, key) {
return idx, true // 键已存在
}
}
return idx, false // 键不存在,idx是插入位置
}
// GetFreeSpaceInLeafPage 计算叶子页面剩余可用空间
func GetFreeSpaceInLeafPage(page *Page) uint16 {
headerEnd := LeafNodeHeaderSize
freeSpaceOffset := page.GetFreeSpaceOffset()
numEntries := page.GetNumEntries()
return freeSpaceOffset - headerEnd - (numEntries * 4) // 数据区 + 槽数组
}
// InsertRecordIntoLeafPage 将记录插入叶子页面
func InsertRecordIntoLeafPage(page *Page, key KeyType, value ValueType, idx int) error {
recordLen := uint16(len(key)) + uint16(len(value)) + 4 // 2 for keyLen, 2 for valueLen
requiredSpace := recordLen + 4 // record data + slot entry
if GetFreeSpaceInLeafPage(page) < requiredSpace {
return fmt.Errorf("not enough space in leaf page %d to insert record", page.ID)
}
numEntries := page.GetNumEntries()
freeSpaceOffset := page.GetFreeSpaceOffset()
// 移动槽数组,为新槽位腾出空间
for i := numEntries; i > uint16(idx); i-- {
srcSlotOffset := PageSize - i*4
dstSlotOffset := PageSize - (i+1)*4
copy(page.Data[dstSlotOffset:dstSlotOffset+4], page.Data[srcSlotOffset:srcSlotOffset+4])
}
// 写入记录数据
newRecordOffset := freeSpaceOffset - recordLen
binary.LittleEndian.PutUint16(page.Data[newRecordOffset:newRecordOffset+2], uint16(len(key)))
copy(page.Data[newRecordOffset+2:newRecordOffset+2+uint16(len(key))], key)
binary.LittleEndian.PutUint16(page.Data[newRecordOffset+2+uint16(len(key)):newRecordOffset+2+uint16(len(key))+2], uint16(len(value)))
copy(page.Data[newRecordOffset+2+uint19(len(key))+2:newRecordOffset+2+uint16(len(key))+2+uint16(len(value))], value)
// 写入槽位
slotOffset := PageSize - (uint16(idx)+1)*4
binary.LittleEndian.PutUint16(page.Data[slotOffset:slotOffset+2], newRecordOffset)
binary.LittleEndian.PutUint16(page.Data[slotOffset+2:slotOffset+4], recordLen)
page.SetFreeSpaceOffset(newRecordOffset)
page.SetNumEntries(numEntries + 1)
page.IsDirty = true
return nil
}
// DeleteRecordFromLeafPage 从叶子页面删除指定索引的记录
func DeleteRecordFromLeafPage(page *Page, idx int) error {
numEntries := page.GetNumEntries()
if idx >= int(numEntries) {
return fmt.Errorf("record index out of bounds: %d >= %d", idx, numEntries)
}
// 读取要删除记录的槽位信息
slotOffset := PageSize - (uint16(idx)+1)*4
recordOffset := binary.LittleEndian.Uint16(page.Data[slotOffset : slotOffset+2])
recordLength := binary.LittleEndian.Uint16(page.Data[slotOffset+2 : slotOffset+4])
// 移动数据区 (此处不实际移动数据,而是通过调整freeSpaceOffset和重新整理槽数组来实现碎片整理)
// 最简单的实现是只移动槽数组,等到页面分裂/合并时再进行数据整理
// 移动槽数组,覆盖被删除的槽位
for i := uint16(idx); i < numEntries-1; i++ {
srcSlotOffset := PageSize - (i+2)*4 // 下一个槽位
dstSlotOffset := PageSize - (i+1)*4 // 当前槽位
copy(page.Data[dstSlotOffset:dstSlotOffset+4], page.Data[srcSlotOffset:srcSlotOffset+4])
}
// 清空最后一个槽位 (可选,但推荐)
binary.LittleEndian.PutUint32(page.Data[PageSize-numEntries*4:PageSize-(numEntries-1)*4], 0)
page.SetNumEntries(numEntries - 1)
page.IsDirty = true
// 简单的碎片整理:如果删除的记录导致了大量碎片,可能需要进行页面重组
// 这里我们暂时不实现复杂的碎片整理,只调整freeSpaceOffset
// 复杂的碎片整理可能需要将所有记录重新排列,并更新槽数组
// 这里只是将freeSpaceOffset设置为已使用的最高偏移量,而不是真正将数据移动
// 实际的数据库引擎会有一个后台任务或在分裂/合并时进行整理
if recordOffset == page.GetFreeSpaceOffset() {
// 如果删除的是最后一个记录,需要重新计算freeSpaceOffset
// 这是一个复杂的问题,简单的做法是:只有当页面重组时才真正调整
// 暂时保持freeSpaceOffset不变,只减少numEntries
}
return nil
}
// 示例:查找函数(简化版,不涉及事务)
func (t *BPlusTree) Search(key KeyType) (ValueType, error) {
t.mu.RLock()
defer t.mu.RUnlock()
currentPageID := t.rootPageID
for {
page, err := t.bp.FetchPage(currentPageID, true)
if err != nil {
return nil, err
}
pageType := page.GetPageType()
if pageType == PageTypeBPlusLeaf {
// 在叶子节点中查找
idx, found := FindSlotInLeafPage(page, key)
t.bp.UnpinPage(page.ID, false)
if found {
_, value, _ := GetRecordFromLeafPage(page, uint16(idx))
return value, nil
}
return nil, fmt.Errorf("key not found")
} else if pageType == PageTypeBPlusInternal {
// 在内部节点中查找下一个子节点
// 内部节点布局: [PageHeader][key_len][key_data][child_page_id]... [rightmost_child_page_id]
// 查找逻辑:找到第一个大于等于key的键,其左侧指针是我们要找的
// 如果所有键都小于key,则去最右侧子节点
// 简化:假设内部节点存储 (key, pageID) 对
// binary.LittleEndian.Uint64(page.Data[InternalNodeHeaderSize + i*(KeyMaxLen + 8) + KeyMaxLen : ...])
// 实际需要更灵活的存储方式
// 内部节点查找下一页面的逻辑,此处只是一个占位符
// 实际需要遍历内部节点中的键和指针
nextPageID := PageID(0) // 占位符
numEntries := page.GetNumEntries()
found := false
for i := uint16(0); i < numEntries; i++ {
// 假设内部节点格式为 (key, childID)
// 从页面数据中读取 key 和 childID
// keyLen := binary.LittleEndian.Uint16(page.Data[InternalNodeHeaderSize + ...])
// childID := PageID(binary.LittleEndian.Uint64(page.Data[InternalNodeHeaderSize + ...]))
// if bytes.Compare(key, readKey) < 0 {
// nextPageID = prevChildID
// found = true
// break
// }
// prevChildID = childID
// 复杂性在于内部节点也需要灵活的键值对存储和查找
}
if !found {
// nextPageID = rightmost_child_page_id
}
t.bp.UnpinPage(page.ID, false)
currentPageID = nextPageID
} else {
t.bp.UnpinPage(page.ID, false)
return nil, fmt.Errorf("invalid page type %d for page %d", pageType, page.ID)
}
}
}
注意:B+树的 Search、Insert、Delete 操作的实现非常复杂,尤其是内部节点的键值对存储和查找。为了聚焦事务能力,我们在这里提供了叶子节点操作的详细实现,而内部节点的处理则做了简化,只给出了逻辑上的指导。一个完整的 B+ 树需要一个 Node 接口和 InternalNode、LeafNode 具体实现来封装这些逻辑。
2.2 B+ 树操作的核心逻辑
- 查找 (Search): 从根节点开始,根据键值遍历内部节点,直到到达叶子节点,然后在叶子节点中查找目标键。
- 插入 (Insert): 找到合适的叶子节点,插入键值对。如果叶子节点空间不足,则需要进行“分裂”(Split)操作,将一半数据移到新页面,并将新页面的最小键和页面 ID 提升到父节点。如果父节点也满,则继续向上分裂,直到根节点。如果根节点分裂,则树的高度增加。
- 删除 (Delete): 找到合适的叶子节点,删除键值对。如果叶子节点数据量过少(低于某个阈值),则可能需要与兄弟节点进行“合并”(Merge)或“再分配”(Redistribute)操作,以保持树的平衡和空间利用率。这些操作也可能向上级联。
第三章:事务管理 — ACID 的基石
数据库事务是并发控制和故障恢复的基础。它确保数据库操作的原子性、一致性、隔离性和持久性 (ACID)。
3.1 原子性 (Atomicity) 与持久性 (Durability) — WAL
原子性意味着一个事务要么全部成功,要么全部失败。持久性意味着一旦事务提交,其更改就永久保存。这两者通常通过预写日志 (Write-Ahead Logging, WAL) 来实现。
WAL 原理:
- 所有数据修改首先写入日志文件,而不是直接写入数据文件。
- 日志记录包含足够的信息来重做(Redo)或撤销(Undo)一个操作。
- 只有当事务的所有日志记录都写入并刷盘后,事务才能被标记为提交。
- 数据页面可以稍后异步地刷回磁盘。
日志记录类型:
- Update: 记录页面 ID、偏移量、旧值、新值。
- Insert: 记录页面 ID、插入的记录数据。
- Delete: 记录页面 ID、删除的记录数据。
- Page_Split/Page_Merge: 记录涉及的页面 ID 和操作详情。
- Begin/Commit/Abort: 事务生命周期事件。
- Checkpoint: 记录当前脏页面和活跃事务的状态,用于加速恢复。
每个日志记录都有一个唯一的 LSN (Log Sequence Number)。页面的头部也存储一个 LSN,表示该页面最后一次被修改时的日志 LSN。
// log.go
package engine
import (
"encoding/binary"
"fmt"
"os"
"sync"
)
// LogRecordType 日志记录类型
type LogRecordType byte
const (
LogTypeInvalid LogRecordType = 0x00
LogTypeBegin LogRecordType = 0x01
LogTypeCommit LogRecordType = 0x02
LogTypeAbort LogRecordType = 0x03
LogTypeInsert LogRecordType = 0x04
LogTypeDelete LogRecordType = 0x05
LogTypeUpdate LogRecordType = 0x06
LogTypePageSplit LogRecordType = 0x07
LogTypePageMerge LogRecordType = 0x08
LogTypeCheckpoint LogRecordType = 0x09
)
// BaseLogRecord 基础日志记录结构,所有日志记录都包含这些字段
type BaseLogRecord struct {
LSN uint64 // Log Sequence Number
PrevLSN uint64 // 前一个日志记录的LSN (同一个事务)
TxID uint64 // 事务ID
Type LogRecordType // 日志记录类型
Size uint32 // 整个日志记录的字节大小
}
// LogManager 负责日志的写入和管理
type LogManager struct {
logFile *os.File
nextLSN uint64 // 下一个可用的LSN
offset int64 // 当前写入文件的偏移量
mu sync.Mutex // 保护日志管理器状态的互斥锁
buffer bytes.Buffer // 日志缓冲区
}
// NewLogManager 创建一个新的日志管理器
func NewLogManager(logFilePath string) (*LogManager, error) {
file, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, fmt.Errorf("failed to open log file: %w", err)
}
fi, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to stat log file: %w", err)
}
lm := &LogManager{
logFile: file,
nextLSN: uint64(fi.Size()), // 初始LSN为文件大小
offset: fi.Size(),
buffer: bytes.Buffer{},
}
// 在启动时需要进行恢复,确定准确的nextLSN,这里简化为文件大小
// 实际需要Scan整个日志文件来确定最后一个有效的LSN
return lm, nil
}
// AppendLogRecord 将日志记录写入缓冲区
func (lm *LogManager) AppendLogRecord(record []byte) (uint64, error) {
lm.mu.Lock()
defer lm.mu.Unlock()
currentLSN := lm.nextLSN
lm.nextLSN += uint64(len(record))
_, err := lm.buffer.Write(record)
if err != nil {
return 0, fmt.Errorf("failed to write log to buffer: %w", err)
}
return currentLSN, nil
}
// Flush 将缓冲区中的日志数据刷回磁盘
func (lm *LogManager) Flush() error {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.buffer.Len() == 0 {
return nil
}
_, err := lm.logFile.Write(lm.buffer.Bytes())
if err != nil {
return fmt.Errorf("failed to write log buffer to file: %w", err)
}
lm.buffer.Reset() // 清空缓冲区
err = lm.logFile.Sync() // 确保数据写入物理磁盘
if err != nil {
return fmt.Errorf("failed to sync log file: %w", err)
}
return nil
}
// Close 关闭日志管理器
func (lm *LogManager) Close() error {
if err := lm.Flush(); err != nil {
return fmt.Errorf("failed to flush log before closing: %w", err)
}
return lm.logFile.Close()
}
// GetCurrentLSN 获取当前LSN
func (lm *LogManager) GetCurrentLSN() uint64 {
lm.mu.Lock()
defer lm.mu.Unlock()
return lm.nextLSN
}
// ReadLogRecord 读取指定LSN的日志记录
func (lm *LogManager) ReadLogRecord(lsn uint64) ([]byte, error) {
lm.mu.Lock()
defer lm.mu.Unlock()
// 实际需要从文件读取,并解析日志记录的头部来获取完整大小
// 这里简化为直接读取一个固定大小的记录,实际日志记录是变长的
buf := make([]byte, 1024) // 假设最大日志记录大小
n, err := lm.logFile.ReadAt(buf, int64(lsn))
if err != nil {
return nil, fmt.Errorf("failed to read log record at LSN %d: %w", lsn, err)
}
// 需要从buf中解析出日志记录的实际大小,然后返回正确的数据
if n < 4 { // 至少要能读到Size字段
return nil, fmt.Errorf("too small to be a valid log record at LSN %d", lsn)
}
recordSize := binary.LittleEndian.Uint32(buf[16:20]) // 假设BaseLogRecord的Size字段在偏移16
return buf[:recordSize], nil
}
// 示例:InsertLogRecord 结构和序列化
type InsertLogRecord struct {
BaseLogRecord
PageID PageID
Offset uint16 // 插入到页面的哪个偏移量
RecordData []byte // 完整的记录数据 (KeyLen+KeyData+ValLen+ValData)
}
func (r *InsertLogRecord) Serialize() []byte {
var buf bytes.Buffer
// 写入 BaseLogRecord 字段
binary.Write(&buf, binary.LittleEndian, r.LSN)
binary.Write(&buf, binary.LittleEndian, r.PrevLSN)
binary.Write(&buf, binary.LittleEndian, r.TxID)
buf.WriteByte(byte(r.Type))
binary.Write(&buf, binary.LittleEndian, r.Size)
// 写入 InsertLogRecord 独有字段
binary.Write(&buf, binary.LittleEndian, r.PageID)
binary.Write(&buf, binary.LittleEndian, r.Offset)
buf.Write(r.RecordData)
return buf.Bytes()
}
func DeserializeInsertLogRecord(data []byte) (*InsertLogRecord, error) {
// 逆序列化逻辑,与Serialize对应
// ... 复杂,此处省略
return nil, nil
}
// 计算BaseLogRecord的大小
const BaseLogRecordSize = 8 + 8 + 8 + 1 + 4 // LSN + PrevLSN + TxID + Type + Size
3.2 隔离性 (Isolation) — 并发控制
隔离性确保并发执行的事务互不干扰,每个事务看起来都是独立执行的。这通常通过锁 (Locking) 或 多版本并发控制 (MVCC) 实现。这里我们聚焦于锁。
两阶段锁 (Two-Phase Locking, 2PL):
- 增长阶段 (Growing Phase): 事务可以获取锁,但不能释放锁。
- 收缩阶段 (Shrinking Phase): 事务可以释放锁,但不能获取锁。
确保所有事务在提交或中止前,都持有其所需的所有锁,从而避免脏读、不可重复读和幻读。
锁的粒度:
- 页面锁 (Page Lock): 锁定整个页面,简单但并发度低。
- 记录锁 (Record Lock): 锁定单个记录,并发度高但管理复杂。
我们将从页面锁开始,因为它们与缓冲区池和页面操作天然契合。
// lock_manager.go
package engine
import (
"fmt"
"sync"
)
// LockMode 锁模式
type LockMode int
const (
LockModeShared LockMode = iota // 共享锁 (读锁)
LockModeExclusive // 排他锁 (写锁)
)
// LockManager 管理页面锁
type LockManager struct {
pageLocks map[PageID]*sync.RWMutex // 存储每个页面的读写锁
mu sync.Mutex // 保护pageLocks map的互斥锁
}
// NewLockManager 创建一个新的锁管理器
func NewLockManager() *LockManager {
return &LockManager{
pageLocks: make(map[PageID]*sync.RWMutex),
}
}
// GetPageLock 获取指定页面的锁对象
func (lm *LockManager) getPageLock(pageID PageID) *sync.RWMutex {
lm.mu.Lock()
defer lm.mu.Unlock()
lock, ok := lm.pageLocks[pageID]
if !ok {
lock = &sync.RWMutex{}
lm.pageLocks[pageID] = lock
}
return lock
}
// AcquireLock 获取页面锁
func (lm *LockManager) AcquireLock(pageID PageID, mode LockMode) {
lock := lm.getPageLock(pageID)
if mode == LockModeShared {
lock.RLock()
} else {
lock.Lock()
}
}
// ReleaseLock 释放页面锁
func (lm *LockManager) ReleaseLock(pageID PageID, mode LockMode) {
lock := lm.getPageLock(pageID)
if mode == LockModeShared {
lock.RUnlock()
} else {
lock.Unlock()
}
}
3.3 事务管理器 (Transaction Manager)
事务管理器协调所有事务操作,包括启动、提交、回滚,并与日志管理器、锁管理器和缓冲区池交互。
// transaction.go
package engine
import (
"fmt"
"sync"
)
// TxID 事务ID
type TxID uint64
// Transaction 事务结构体
type Transaction struct {
ID TxID
State TxState
PrevLSN uint64 // 当前事务上一个操作的LSN
ModifiedPages map[PageID]*Page // 事务修改过的页面,用于回滚
HeldLocks map[PageID]LockMode // 事务持有的锁
mu sync.Mutex
}
// TxState 事务状态
type TxState int
const (
TxStateRunning TxState = iota
TxStateCommitting
TxStateAborting
TxStateCommitted
TxStateAborted
)
// TransactionManager 管理所有事务
type TransactionManager struct {
bp *BufferPool
lm *LogManager
lockMgr *LockManager
nextTxID TxID
activeTxs map[TxID]*Transaction // 活跃事务列表
mu sync.Mutex // 保护nextTxID和activeTxs
}
// NewTransactionManager 创建一个新的事务管理器
func NewTransactionManager(bp *BufferPool, lm *LogManager, lockMgr *LockManager) *TransactionManager {
return &TransactionManager{
bp: bp,
lm: lm,
lockMgr: lockMgr,
nextTxID: 1, // 事务ID从1开始
activeTxs: make(map[TxID]*Transaction),
}
}
// Begin 启动一个新事务
func (tm *TransactionManager) Begin() (*Transaction, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
txID := tm.nextTxID
tm.nextTxID++
tx := &Transaction{
ID: txID,
State: TxStateRunning,
PrevLSN: 0, // 初始PrevLSN为0
ModifiedPages: make(map[PageID]*Page),
HeldLocks: make(map[PageID]LockMode),
}
tm.activeTxs[txID] = tx
// 写入Begin日志
beginRecord := &BaseLogRecord{
LSN: tm.lm.GetCurrentLSN(),
PrevLSN: 0,
TxID: uint64(txID),
Type: LogTypeBegin,
Size: BaseLogRecordSize,
}
lsn, err := tm.lm.AppendLogRecord(beginRecord.Serialize())
if err != nil {
return nil, fmt.Errorf("failed to write Begin log for Tx %d: %w", txID, err)
}
tx.PrevLSN = lsn
return tx, nil
}
// Commit 提交事务
func (tm *TransactionManager) Commit(tx *Transaction) error {
tx.mu.Lock()
if tx.State != TxStateRunning {
tx.mu.Unlock()
return fmt.Errorf("transaction %d is not in running state", tx.ID)
}
tx.State = TxStateCommitting
tx.mu.Unlock()
// 写入Commit日志
commitRecord := &BaseLogRecord{
LSN: tm.lm.GetCurrentLSN(),
PrevLSN: tx.PrevLSN,
TxID: uint64(tx.ID),
Type: LogTypeCommit,
Size: BaseLogRecordSize,
}
_, err := tm.lm.AppendLogRecord(commitRecord.Serialize())
if err != nil {
return fmt.Errorf("failed to write Commit log for Tx %d: %w", tx.ID, err)
}
// 刷盘日志,确保持久性
if err := tm.lm.Flush(); err != nil {
return fmt.Errorf("failed to flush logs for Tx %d commit: %w", tx.ID, err)
}
// 释放所有锁
for pageID, mode := range tx.HeldLocks {
tm.lockMgr.ReleaseLock(pageID, mode)
}
tx.mu.Lock()
tx.State = TxStateCommitted
tm.mu.Unlock() // 保护activeTxs
tm.mu.Lock()
delete(tm.activeTxs, tx.ID)
tm.mu.Unlock()
return nil
}
// Abort 中止事务
func (tm *TransactionManager) Abort(tx *Transaction) error {
tx.mu.Lock()
if tx.State != TxStateRunning {
tx.mu.Unlock()
return fmt.Errorf("transaction %d is not in running state", tx.ID)
}
tx.State = TxStateAborting
tx.mu.Unlock()
// 执行Undo操作 (根据日志记录回滚修改)
// 这是一个复杂的过程,需要从PrevLSN开始反向读取日志并应用Undo操作
// For simplicity, we skip the detailed Undo implementation here.
fmt.Printf("Tx %d aborting: Performing Undo operations...n", tx.ID)
// TODO: Implement actual undo logic by reading log records in reverse LSN order.
// 写入Abort日志
abortRecord := &BaseLogRecord{
LSN: tm.lm.GetCurrentLSN(),
PrevLSN: tx.PrevLSN,
TxID: uint64(tx.ID),
Type: LogTypeAbort,
Size: BaseLogRecordSize,
}
_, err := tm.lm.AppendLogRecord(abortRecord.Serialize())
if err != nil {
return fmt.Errorf("failed to write Abort log for Tx %d: %w", tx.ID, err)
}
// 刷盘日志
if err := tm.lm.Flush(); err != nil {
return fmt.Errorf("failed to flush logs for Tx %d abort: %w", tx.ID, err)
}
// 释放所有锁
for pageID, mode := range tx.HeldLocks {
tm.lockMgr.ReleaseLock(pageID, mode)
}
tx.mu.Lock()
tx.State = TxStateAborted
tx.mu.Unlock()
tm.mu.Lock()
delete(tm.activeTxs, tx.ID)
tm.mu.Unlock()
return nil
}
// GetPageWithLock 获取页面并根据操作类型加锁
func (tm *TransactionManager) GetPageWithLock(tx *Transaction, pageID PageID, mode LockMode) (*Page, error) {
// 获取锁
tm.lockMgr.AcquireLock(pageID, mode)
tx.mu.Lock()
tx.HeldLocks[pageID] = mode
tx.mu.Unlock()
// 从缓冲区池获取页面
page, err := tm.bp.FetchPage(pageID, true)
if err != nil {
// 获取页面失败,需要释放锁
tm.lockMgr.ReleaseLock(pageID, mode)
tx.mu.Lock()
delete(tx.HeldLocks, pageID)
tx.mu.Unlock()
return nil, err
}
return page, nil
}
// UnpinPageWithLog 更新页面并解除固定,记录日志
func (tm *TransactionManager) UnpinPageWithLog(tx *Transaction, page *Page, isDirty bool, logRecord []byte) error {
if isDirty {
// 记录日志,并更新页面的LSN
lsn, err := tm.lm.AppendLogRecord(logRecord)
if err != nil {
return fmt.Errorf("failed to append log for page %d in Tx %d: %w", page.ID, tx.ID, err)
}
page.SetLSN(lsn)
tx.PrevLSN = lsn // 更新事务的PrevLSN
// 事务修改过的页面需要标记
tx.mu.Lock()
tx.ModifiedPages[page.ID] = page
tx.mu.Unlock()
}
tm.bp.UnpinPage(page.ID, isDirty)
return nil
}
3.4 恢复管理器 (Recovery Manager)
当数据库崩溃后重启时,恢复管理器会使用 WAL 日志来确保所有已提交的事务都得以持久化 (Redo),所有未提交的事务都得以回滚 (Undo),从而恢复到一致的状态。
ARIES 恢复算法 (简要概述):
- 分析阶段 (Analysis Phase): 从最近的检查点开始扫描日志,识别活跃事务和脏页面。
- 重做阶段 (Redo Phase): 从日志的某个点开始,重新应用所有修改操作,使所有页面都达到日志记录的最新状态。
- 撤销阶段 (Undo Phase): 回滚所有在崩溃时仍活跃的事务的修改。
// recovery_manager.go
package engine
import (
"fmt"
"io"
"os"
)
// RecoveryManager 负责数据库的故障恢复
type RecoveryManager struct {
bp *BufferPool
lm *LogManager
// 其他组件,如事务管理器,用于协调恢复过程
}
// NewRecoveryManager 创建一个新的恢复管理器
func NewRecoveryManager(bp *BufferPool, lm *LogManager) *RecoveryManager {
return &RecoveryManager{
bp: bp,
lm: lm,
}
}
// Recover 启动恢复过程
func (rm *RecoveryManager) Recover() error {
fmt.Println("Starting database recovery...")
// 1. Analysis Phase: 识别活跃事务和脏页面
// 实际需要扫描日志文件,找到最后一个Checkpoint,并从Checkpoint开始分析
// 这里简化为从头开始扫描,并跳过细节
fmt.Println("Analysis Phase: (Simplified) Identify active transactions and dirty pages.")
// 2. Redo Phase: 重做所有操作
fmt.Println("Redo Phase: Reapplying all log records.")
if err := rm.redoAllLogs(); err != nil {
return fmt.Errorf("redo phase failed: %w", err)
}
// 3. Undo Phase: 回滚未提交事务
fmt.Println("Undo Phase: (Simplified) Rollback uncommitted transactions.")
// 实际需要根据Analysis Phase的结果,逆序扫描日志进行Undo
fmt.Println("Recovery complete.")
return nil
}
// redoAllLogs 示例性地重做所有日志记录
// 实际需要从上次Checkpoint开始,并根据页面的LSN判断是否需要重做
func (rm *RecoveryManager) redoAllLogs() error {
logFile, err := os.Open(rm.lm.logFile.Name())
if err != nil {
return fmt.Errorf("failed to open log file for redo: %w", err)
}
defer logFile.Close()
reader := NewLogReader(logFile) // 假设有一个LogReader来逐条读取日志
for {
recordBytes, lsn, err := reader.Next() // 读取下一条日志记录
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error reading log for redo: %w", err)
}
// 假设我们能反序列化所有日志记录
baseRecord, err := DeserializeBaseLogRecord(recordBytes) // 假设有这样的函数
if err != nil {
return fmt.Errorf("failed to deserialize base log record at LSN %d: %w", lsn, err)
}
// 对于数据修改日志,执行Redo操作
switch baseRecord.Type {
case LogTypeInsert:
insertRecord, err := DeserializeInsertLogRecord(recordBytes) // 假设有这样的函数
if err != nil { /* error handling */ }
page, err := rm.bp.FetchPage(insertRecord.PageID, true)
if err != nil { /* error handling */ }
// 只有当页面的LSN小于日志记录的LSN时才需要Redo
if page.GetLSN() < insertRecord.LSN {
// 实际的Redo操作:根据日志记录应用更改
// 例如:在页面的指定偏移量写入数据
// copy(page.Data[insertRecord.Offset:], insertRecord.RecordData)
page.SetLSN(insertRecord.LSN)
page.IsDirty = true // 标记为脏,稍后刷盘
}
rm.bp.UnpinPage(page.ID, true) // Redo操作后页面变为脏
// 其他LogType,如Update, Delete, PageSplit等,也需要相应的Redo逻辑
case LogTypeCommit:
// 标记事务已提交
case LogTypeAbort:
// 标记事务已中止
}
}
// 在Redo阶段结束后,需要将所有脏页面刷回磁盘
rm.bp.FlushAllPages()
return nil
}
// LogReader 辅助读取日志文件
type LogReader struct {
file *os.File
offset int64
}
func NewLogReader(file *os.File) *LogReader {
return &LogReader{file: file, offset: 0}
}
// Next 读取下一条日志记录
func (lr *LogReader) Next() ([]byte, uint64, error) {
currentLSN := uint64(lr.offset)
// 假设日志记录的格式是 BaseLogRecord + payload
// 我们需要先读取 BaseLogRecord 的 Size 字段
headerBuf := make([]byte, BaseLogRecordSize)
n, err := lr.file.ReadAt(headerBuf, lr.offset)
if err == io.EOF {
return nil, 0, io.EOF
}
if err != nil {
return nil, 0, fmt.Errorf("failed to read log record header: %w", err)
}
if n < BaseLogRecordSize {
return nil, 0, fmt.Errorf("partial log record header read: expected %d, got %d", BaseLogRecordSize, n)
}
recordSize := binary.LittleEndian.Uint32(headerBuf[BaseLogRecordSize-4 : BaseLogRecordSize]) // BaseLogRecord的Size字段
fullRecordBuf := make([]byte, recordSize)
n, err = lr.file.ReadAt(fullRecordBuf, lr.offset)
if err != nil {
return nil, 0, fmt.Errorf("failed to read full log record: %w", err)
}
if uint32(n) < recordSize {
return nil, 0, fmt.Errorf("partial log record read: expected %d, got %d", recordSize, n)
}
lr.offset += int64(recordSize)
return fullRecordBuf, currentLSN, nil
}
// 辅助函数:反序列化BaseLogRecord (简化)
func DeserializeBaseLogRecord(data []byte) (*BaseLogRecord, error) {
if len(data) < BaseLogRecordSize {
return nil, fmt.Errorf("data too short for BaseLogRecord")
}
return &BaseLogRecord{
LSN: binary.LittleEndian.Uint64(data[0:8]),
PrevLSN: binary.LittleEndian.Uint64(data[8:16]),
TxID: binary.LittleEndian.Uint64(data[16:24]),
Type: LogRecordType(data[24]),
Size: binary.LittleEndian.Uint32(data[25:29]), // 假设Type占1字节,Size在25-29
}, nil
}
第四章:整合与挑战
现在我们有了 B+ 树的基本结构、页面管理器、缓冲区池、日志管理器、锁管理器和事务管理器。如何将它们整合起来,并让 B+ 树的操作具备事务能力呢?
B+ 树的 Insert、Delete 操作不再是简单的页面修改,而是需要:
- 开始事务:
txMgr.Begin() - 获取页面(并加锁):
txMgr.GetPageWithLock(tx, pageID, LockModeExclusive) - 记录日志: 在修改页面之前,将旧值和新值(或操作详情)写入日志缓冲区。
- 修改页面: 在内存中的页面数据上执行实际的 B+ 树操作。
- 更新页面 LSN: 将页面头部 LSN 设置为最新日志记录的 LSN。
- 解除页面固定(并标记脏页):
txMgr.UnpinPageWithLog(tx, page, true, logRecordBytes) - 提交或中止事务:
txMgr.Commit(tx)或txMgr.Abort(tx)
这要求 B+ 树的每个操作方法都接受一个 *Transaction 参数。
挑战与进一步考虑:
- 死锁检测与处理: 2PL 可能导致死锁。需要实现死锁检测(例如,等待图)和死锁恢复(例如,选择一个事务回滚)。
- 并发性能优化:
- 锁粒度: 页面锁可能成为瓶颈,需要考虑更细粒度的记录锁。
- 锁升级/降级: 事务可能在运行时改变对资源的锁定需求。
- 闩 (Latch) 与锁 (Lock) 的区别: 闩是轻量级、短期的互斥量,用于保护内存中的数据结构(如 B+ 树节点内部结构、缓冲区池的哈希表),而锁是长期持有、保护逻辑数据的。
- 内存管理: 避免 Go 垃圾回收对性能的影响,可以考虑使用
sync.Pool预分配页面缓冲区。 - 磁盘空间管理: 如何高效地分配和回收页面 ID,避免文件碎片。
- 检查点 (Checkpointing): 定期执行检查点,减少恢复时间。
- 数据一致性检查: 定期检查 B+ 树的结构是否正确,数据是否一致。
- 分布式事务: 如果引擎需要扩展到多节点环境,还需要考虑 2PC (Two-Phase Commit) 等分布式事务协议。
- 错误处理: 健壮的错误处理机制对于数据库引擎至关重要。
结束语
从零开始构建一个具备事务能力的 B+ 树引擎是一项艰巨而充满挑战的任务,但它也带来了对数据库内部机制深刻的理解。我们探讨了物理布局、页面管理、B+ 树结构、预写日志、并发控制和恢复机制,这些都是现代数据库系统不可或缺的组件。这趟旅程远未结束,它仅仅是通往构建高性能、高可靠性数据存储系统道路上的第一步。