如何从零编写一个具备事务能力的 B+ 树引擎?基于 Go 的物理布局设计

各位开发者、技术爱好者们,大家好!

今天,我们将共同踏上一段激动人心的旅程:从零开始,设计并实现一个具备事务能力的 B+ 树引擎。我们将以 Go 语言作为我们的工具,深入探讨其物理布局设计,并逐步构建起一个能够处理并发、保证数据一致性和持久性的存储核心。

为什么选择 B+ 树?因为它是现代数据库系统中索引和数据存储的基石。它的扇出特性使其非常适合磁盘I/O操作,而事务能力则是任何可靠数据库引擎不可或缺的灵魂。

我们将以讲座的形式,一步步揭示这个复杂系统的奥秘。


第一章:B+ 树引擎的基石 — 物理布局与页面管理

任何数据库引擎的起点都是对物理存储的抽象和管理。我们的 B+ 树引擎将数据存储在磁盘文件中,并将其划分为固定大小的“页面”(Page)。页面是磁盘 I/O 的最小单位。

1.1 文件系统交互与页面抽象

我们的引擎将与操作系统文件系统交互,通过 os.File 读取和写入数据。为了简化,我们暂时不深入探讨 mmap 等高级内存映射技术,而是直接使用 ReadAtWriteAt 进行随机访问。

页面(Page) 是我们存储管理的核心单元。每个页面都有一个唯一的 PageID,并且拥有固定的大小,例如 4KB 或 8KB。固定大小的页面简化了磁盘空间的管理,也符合操作系统和硬件对 I/O 的优化。

// page.go
package engine

import (
    "encoding/binary"
    "fmt"
    "os"
    "sync"
)

// PageID 是页面的唯一标识符
type PageID uint64

// PageSize 定义了每个页面的大小(例如 4KB)
const PageSize = 4096

// PageType 定义了页面的类型
type PageType byte

const (
    PageTypeEmpty    PageType = 0x00 // 空页面,可用于分配
    PageTypeBPlusInternal PageType = 0x01 // B+树内部节点
    PageTypeBPlusLeaf PageType = 0x02    // B+树叶子节点
    PageTypeMeta      PageType = 0x03    // 元数据页面,例如存储B+树的根页ID
)

// Page 是内存中的页面结构体
type Page struct {
    ID       PageID    // 页面ID
    Data     []byte    // 实际的页面数据,大小为 PageSize
    IsDirty  bool      // 页面是否被修改过
    PinCount int32     // 页面被引用的次数
    mu       sync.Mutex // 保护页面状态的互斥锁
    LSN      uint64    // Log Sequence Number,用于恢复
}

// NewPage 创建一个新的页面实例
func NewPage(id PageID) *Page {
    return &Page{
        ID:       id,
        Data:     make([]byte, PageSize),
        IsDirty:  false,
        PinCount: 0,
        LSN:      0, // 新页面初始LSN为0
    }
}

// GetPageType 从页面数据中读取页面类型
func (p *Page) GetPageType() PageType {
    return PageType(p.Data[0])
}

// SetPageType 设置页面类型
func (p *Page) SetPageType(t PageType) {
    p.Data[0] = byte(t)
    p.IsDirty = true
}

// GetFreeSpaceOffset 获取页面内自由空间起始偏移量
func (p *Page) GetFreeSpaceOffset() uint16 {
    return binary.LittleEndian.Uint16(p.Data[1:3])
}

// SetFreeSpaceOffset 设置页面内自由空间起始偏移量
func (p *Page) SetFreeSpaceOffset(offset uint16) {
    binary.LittleEndian.PutUint16(p.Data[1:3], offset)
    p.IsDirty = true
}

// GetNumEntries 获取页面内条目数量
func (p *Page) GetNumEntries() uint16 {
    return binary.LittleEndian.Uint16(p.Data[3:5])
}

// SetNumEntries 设置页面内条目数量
func (p *Page) SetNumEntries(num uint16) {
    binary.LittleEndian.PutUint16(p.Data[3:5], num)
    p.IsDirty = true
}

// GetLSN 获取页面的LSN
func (p *Page) GetLSN() uint64 {
    return binary.LittleEndian.Uint64(p.Data[5:13])
}

// SetLSN 设置页面的LSN
func (p *Page) SetLSN(lsn uint64) {
    binary.LittleEndian.PutUint64(p.Data[5:13], lsn)
    p.IsDirty = true
}

// GetParentPageID 获取父页面ID
func (p *Page) GetParentPageID() PageID {
    return PageID(binary.LittleEndian.Uint64(p.Data[13:21]))
}

// SetParentPageID 设置父页面ID
func (p *Page) SetParentPageID(parentID PageID) {
    binary.LittleEndian.PutUint64(p.Data[13:21], uint64(parentID))
    p.IsDirty = true
}

1.2 页面头部结构

每个页面都会有一个固定的头部,用于存储页面的元数据。这对于快速识别页面类型、管理页面内部空间至关重要。

偏移量 大小 (字节) 描述
0 1 页面类型 (PageType)
1 2 自由空间起始偏移量
3 2 页面内条目数量
5 8 LSN (Log Sequence Number)
13 8 父页面 ID (ParentPageID)
21 其他元数据 (例如 B+ 树特定字段)

1.3 缓冲区池 (Buffer Pool)

直接从磁盘读取或写入每个页面效率低下。缓冲区池是内存中的一个缓存区域,用于存储经常访问的页面。它通过减少磁盘 I/O 来显著提升性能。

缓冲区池的核心功能包括:

  • 获取页面 (FetchPage): 从缓冲区池中查找页面;如果不在,则从磁盘加载。
  • 创建新页面 (NewPage): 分配一个新的页面 ID,并将其添加到缓冲区池。
  • 固定/解除固定 (Pin/Unpin): 当一个页面正在被使用时,我们“固定”它,防止它被驱逐。使用完毕后,“解除固定”。
  • 刷回磁盘 (FlushPage): 将脏页面(被修改过的页面)写回磁盘。
  • 驱逐策略 (Eviction Policy): 当缓冲区池满时,需要选择一个页面驱逐,通常使用 LRU (Least Recently Used) 或 Clock 算法。为简化,我们先关注 PinCount。
// buffer_pool.go
package engine

import (
    "fmt"
    "os"
    "sync"
)

// BufferPool 管理内存中的页面缓存
type BufferPool struct {
    file       *os.File              // 数据库文件句柄
    pages      map[PageID]*Page      // 缓存的页面
    replacer   *LRUReplacer          // 页面替换器 (LRU)
    nextPageID PageID                // 下一个可用的页面ID
    poolSize   int                   // 缓冲区池大小
    mu         sync.Mutex            // 保护缓冲区池的互斥锁
}

// NewBufferPool 创建一个新的缓冲区池
func NewBufferPool(filePath string, poolSize int) (*BufferPool, error) {
    file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
    if err != nil {
        return nil, fmt.Errorf("failed to open database file: %w", err)
    }

    // 初始化nextPageID,如果文件已存在,需要读取元数据页来获取
    // 暂时简化处理,假设为新文件,或通过某种方式获取最大PageID
    fi, err := file.Stat()
    if err != nil {
        return nil, fmt.Errorf("failed to stat file: %w", err)
    }
    nextPageID := PageID(fi.Size() / PageSize)
    if nextPageID == 0 { // 文件为空,分配第一个页面作为元数据页
        nextPageID = 1 // 从1开始,0可能保留
    }

    bp := &BufferPool{
        file:       file,
        pages:      make(map[PageID]*Page, poolSize),
        replacer:   NewLRUReplacer(poolSize), // 实际LRU实现会更复杂
        nextPageID: nextPageID,
        poolSize:   poolSize,
    }

    // 尝试加载元数据页
    metaPageID := PageID(0) // 约定0号页面为元数据页面
    if nextPageID > 0 { // 文件非空,尝试读取元数据
        metaPage, err := bp.FetchPage(metaPageID, false) // 首次加载不写入LSN
        if err == nil && metaPage.GetPageType() == PageTypeMeta {
            // 从元数据页读取根页ID等信息
            // 假设元数据页存储了下一个可用的PageID
            bp.nextPageID = PageID(binary.LittleEndian.Uint64(metaPage.Data[21:29])) // 示例偏移量
        }
        bp.UnpinPage(metaPageID, false) // 解除固定
    } else {
        // 文件为空,创建元数据页
        metaPage, err := bp.NewPage()
        if err != nil {
            return nil, fmt.Errorf("failed to create meta page: %w", err)
        }
        metaPage.SetPageType(PageTypeMeta)
        // 写入初始的nextPageID到元数据页
        binary.LittleEndian.PutUint64(metaPage.Data[21:29], uint64(bp.nextPageID))
        bp.UnpinPage(metaPage.ID, true) // 标记为脏
    }

    return bp, nil
}

// FetchPage 从缓冲区池中获取一个页面。如果页面不在池中,则从磁盘读取。
// pin 参数表示是否立即固定页面。
func (bp *BufferPool) FetchPage(id PageID, pin bool) (*Page, error) {
    bp.mu.Lock()
    defer bp.mu.Unlock()

    if page, ok := bp.pages[id]; ok {
        // 页面已在池中
        if pin {
            page.mu.Lock()
            page.PinCount++
            page.mu.Unlock()
            bp.replacer.Pin(id) // 通知替换器页面被固定
        }
        return page, nil
    }

    // 页面不在池中,需要从磁盘读取
    if len(bp.pages) >= bp.poolSize {
        // 缓冲区池已满,需要驱逐一个页面
        evictID, ok := bp.replacer.Victim()
        if !ok {
            return nil, fmt.Errorf("buffer pool is full and no page can be evicted")
        }
        evictPage := bp.pages[evictID]
        if evictPage.IsDirty {
            if err := bp.flushPageInternal(evictPage); err != nil {
                return nil, fmt.Errorf("failed to flush evicted page %d: %w", evictID, err)
            }
        }
        delete(bp.pages, evictID)
    }

    page := NewPage(id)
    offset := int64(id) * PageSize
    n, err := bp.file.ReadAt(page.Data, offset)
    if err != nil && err != os.EOF { // 允许EOF,表示页面可能还未被写入
        return nil, fmt.Errorf("failed to read page %d from disk: %w", id, err)
    }
    if n > 0 && n < PageSize { // 读取到的数据不足一个页面大小
        return nil, fmt.Errorf("partial read for page %d: expected %d bytes, got %d", id, PageSize, n)
    }

    if pin {
        page.PinCount = 1
        bp.replacer.Pin(id)
    } else {
        page.PinCount = 0 // 未固定,可以立即被驱逐
    }
    bp.pages[id] = page
    return page, nil
}

// NewPage 创建一个新的页面并分配一个ID,将其添加到缓冲区池。
func (bp *BufferPool) NewPage() (*Page, error) {
    bp.mu.Lock()
    defer bp.mu.Unlock()

    if len(bp.pages) >= bp.poolSize {
        evictID, ok := bp.replacer.Victim()
        if !ok {
            return nil, fmt.Errorf("buffer pool is full and no page can be evicted")
        }
        evictPage := bp.pages[evictID]
        if evictPage.IsDirty {
            if err := bp.flushPageInternal(evictPage); err != nil {
                return nil, fmt.Errorf("failed to flush evicted page %d: %w", evictID, err)
            }
        }
        delete(bp.pages, evictID)
    }

    id := bp.nextPageID
    bp.nextPageID++

    page := NewPage(id)
    page.PinCount = 1 // 新创建的页面默认被固定
    bp.pages[id] = page
    bp.replacer.Pin(id) // 通知替换器页面被固定

    // 更新元数据页中的 nextPageID
    metaPage, err := bp.FetchPage(0, true) // 获取元数据页并固定
    if err != nil {
        return nil, fmt.Errorf("failed to fetch meta page to update nextPageID: %w", err)
    }
    binary.LittleEndian.PutUint64(metaPage.Data[21:29], uint64(bp.nextPageID))
    bp.UnpinPage(metaPage.ID, true) // 解除固定并标记为脏

    return page, nil
}

// UnpinPage 解除页面的固定。如果isDirty为true,则标记页面为脏。
func (bp *BufferPool) UnpinPage(id PageID, isDirty bool) {
    bp.mu.Lock()
    defer bp.mu.Unlock()

    if page, ok := bp.pages[id]; ok {
        page.mu.Lock()
        defer page.mu.Unlock()

        if page.PinCount > 0 {
            page.PinCount--
            if page.PinCount == 0 {
                bp.replacer.Unpin(id) // 通知替换器页面可以被驱逐了
            }
        }
        if isDirty {
            page.IsDirty = true
        }
    }
}

// FlushPage 将指定页面刷回磁盘
func (bp *BufferPool) FlushPage(id PageID) error {
    bp.mu.Lock()
    defer bp.mu.Unlock()
    if page, ok := bp.pages[id]; ok {
        return bp.flushPageInternal(page)
    }
    return nil // 页面不在池中,无需刷新
}

// flushPageInternal 是内部的刷盘函数,不加锁
func (bp *BufferPool) flushPageInternal(page *Page) error {
    page.mu.Lock()
    defer page.mu.Unlock()

    if !page.IsDirty {
        return nil
    }

    offset := int64(page.ID) * PageSize
    _, err := bp.file.WriteAt(page.Data, offset)
    if err != nil {
        return fmt.Errorf("failed to write page %d to disk: %w", page.ID, err)
    }
    page.IsDirty = false
    return nil
}

// FlushAllPages 将所有脏页面刷回磁盘
func (bp *BufferPool) FlushAllPages() {
    bp.mu.Lock()
    defer bp.mu.Unlock()
    for _, page := range bp.pages {
        bp.flushPageInternal(page)
    }
}

// Close 关闭缓冲区池,刷回所有脏页面并关闭文件
func (bp *BufferPool) Close() error {
    bp.FlushAllPages()
    return bp.file.Close()
}

// LRUReplacer 简单的LRU替换器接口和实现
// 实际生产环境会更复杂,这里仅为示例
type LRUReplacer struct {
    poolSize int
    list     *list.List // 双向链表,最近使用的在前端
    elements map[PageID]*list.Element
    mu       sync.Mutex
}

type lruEntry struct {
    pageID PageID
}

func NewLRUReplacer(poolSize int) *LRUReplacer {
    return &LRUReplacer{
        poolSize: poolSize,
        list:     list.New(),
        elements: make(map[PageID]*list.Element),
    }
}

func (r *LRUReplacer) Pin(pageID PageID) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if elem, ok := r.elements[pageID]; ok {
        r.list.Remove(elem)
        delete(r.elements, pageID)
    }
}

func (r *LRUReplacer) Unpin(pageID PageID) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if _, ok := r.elements[pageID]; !ok {
        if r.list.Len() >= r.poolSize {
            // LRU满了,需要驱逐一个,这里简化处理,实际要找可驱逐的
            // 在此实现中,Unpin只是加入LRU,驱逐逻辑在BufferPool中
        }
        elem := r.list.PushFront(&lruEntry{pageID: pageID})
        r.elements[pageID] = elem
    }
}

func (r *LRUReplacer) Victim() (PageID, bool) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if r.list.Len() == 0 {
        return 0, false
    }
    elem := r.list.Back()
    pageID := elem.Value.(*lruEntry).pageID
    r.list.Remove(elem)
    delete(r.elements, pageID)
    return pageID, true
}

// 确保引入 "container/list"
// import "container/list"

注意:LRUReplacer 的实现简化了驱逐逻辑,实际的 LRU 替换器需要更精细地管理固定页面和可驱逐页面。为了保持讲座核心,我们暂时不深入细节。

第二章:B+ 树结构与操作

B+ 树是一种多路平衡查找树,它的所有值都存储在叶子节点上,并且叶子节点通过链表连接,便于范围查询。内部节点只存储键和指向子节点的指针。

2.1 节点结构

B+ 树有两种主要类型的节点:内部节点(Internal Node)和叶子节点(Leaf Node)。它们都共享页面的基本结构,但有各自特定的元数据和数据布局。

内部节点 (Internal Node) 页面布局:
内部节点存储 (key, child_page_id) 对。

  • 页面头部
  • 键值对区域:[key_length][key_data][child_page_id]
  • 最右侧子节点指针 (通常在头部或固定位置)

叶子节点 (Leaf Node) 页面布局:
叶子节点存储 (key, value) 对,并且它们之间通过 next_leaf_page_idprev_leaf_page_id 形成双向链表。

  • 页面头部
    • NextLeafPageID (8 bytes)
    • PrevLeafPageID (8 bytes)
  • 键值对区域:[key_length][key_data][value_length][value_data]
  • 槽数组 (Slot Array):在页面尾部,指向实际键值对的偏移量和长度,便于删除和碎片整理。
// bptree.go
package engine

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "sort"
    "sync"
)

// KeyType 定义键的类型,例如 []byte
type KeyType []byte

// ValueType 定义值的类型,例如 []byte
type ValueType []byte

// BPlusTree 是B+树引擎的核心结构
type BPlusTree struct {
    bp        *BufferPool
    rootPageID PageID
    metaPageID PageID // 存储B+树元数据(例如根页ID)的页面
    mu        sync.RWMutex // 保护树结构的读写锁
}

// NewBPlusTree 创建或加载一个B+树
func NewBPlusTree(bp *BufferPool) (*BPlusTree, error) {
    tree := &BPlusTree{
        bp:        bp,
        metaPageID: 0, // 约定0号页面为元数据页面
    }

    // 尝试加载元数据页
    metaPage, err := bp.FetchPage(tree.metaPageID, true)
    if err != nil {
        return nil, fmt.Errorf("failed to fetch meta page: %w", err)
    }
    defer bp.UnpinPage(tree.metaPageID, false) // 立即解除固定

    if metaPage.GetPageType() == PageTypeMeta {
        // 从元数据页读取根页ID
        tree.rootPageID = PageID(binary.LittleEndian.Uint64(metaPage.Data[29:37])) // 示例偏移量
        if tree.rootPageID == 0 { // 根页ID为0表示树为空,需要初始化
            return tree, tree.initTree()
        }
    } else {
        // 0号页面不是元数据页,可能是新文件,需要初始化
        metaPage.SetPageType(PageTypeMeta)
        return tree, tree.initTree()
    }

    return tree, nil
}

// initTree 初始化B+树,创建第一个叶子节点作为根节点
func (t *BPlusTree) initTree() error {
    leafPage, err := t.bp.NewPage()
    if err != nil {
        return fmt.Errorf("failed to create initial leaf page: %w", err)
    }
    defer t.bp.UnpinPage(leafPage.ID, true) // 标记为脏

    leafPage.SetPageType(PageTypeBPlusLeaf)
    leafPage.SetFreeSpaceOffset(uint16(PageSize - 20)) // 初始自由空间在页面尾部,预留槽数组空间
    leafPage.SetNumEntries(0)
    // 设置next/prev leaf page ID为0 (空)
    binary.LittleEndian.PutUint64(leafPage.Data[PageHeaderSize:PageHeaderSize+8], 0) // NextLeafPageID
    binary.LittleEndian.PutUint64(leafPage.Data[PageHeaderSize+8:PageHeaderSize+16], 0) // PrevLeafPageID

    t.rootPageID = leafPage.ID
    return t.updateMetaPage()
}

// updateMetaPage 更新元数据页中的B+树根页ID
func (t *BPlusTree) updateMetaPage() error {
    metaPage, err := t.bp.FetchPage(t.metaPageID, true)
    if err != nil {
        return fmt.Errorf("failed to fetch meta page for update: %w", err)
    }
    defer t.bp.UnpinPage(t.metaPageID, true) // 标记为脏

    binary.LittleEndian.PutUint64(metaPage.Data[29:37], uint64(t.rootPageID)) // 示例偏移量
    return nil
}

// PageHeaderSize 页面头部大小
const PageHeaderSize = 21 // PageType + FreeSpaceOffset + NumEntries + LSN + ParentPageID

// LeafNodeHeaderSize 叶子节点特有头部大小
const LeafNodeHeaderSize = PageHeaderSize + 8 + 8 // Base Header + NextLeafPageID + PrevLeafPageID

// InternalNodeHeaderSize 内部节点特有头部大小
const InternalNodeHeaderSize = PageHeaderSize

// Helper functions for reading/writing leaf node specific headers
func getNextLeafPageID(page *Page) PageID {
    return PageID(binary.LittleEndian.Uint64(page.Data[PageHeaderSize : PageHeaderSize+8]))
}
func setNextLeafPageID(page *Page, id PageID) {
    binary.LittleEndian.PutUint64(page.Data[PageHeaderSize:PageHeaderSize+8], uint64(id))
    page.IsDirty = true
}
func getPrevLeafPageID(page *Page) PageID {
    return PageID(binary.LittleEndian.Uint64(page.Data[PageHeaderSize+8 : PageHeaderSize+16]))
}
func setPrevLeafPageID(page *Page, id PageID) {
    binary.LittleEndian.PutUint64(page.Data[PageHeaderSize+8:PageHeaderSize+16], uint64(id))
    page.IsDirty = true
}

// Record format in Leaf Page:
// [KeyLen (2 bytes)][KeyData][ValueLen (2 bytes)][ValueData]
// Slot Array: [RecordOffset (2 bytes)][RecordLength (2 bytes)]... (from end of page)

// GetRecordFromLeafPage 从叶子页面获取指定索引的记录
func GetRecordFromLeafPage(page *Page, idx uint16) (KeyType, ValueType, error) {
    numEntries := page.GetNumEntries()
    if idx >= numEntries {
        return nil, nil, fmt.Errorf("record index out of bounds: %d >= %d", idx, numEntries)
    }

    slotOffset := PageSize - (idx+1)*4 // 槽数组从页面尾部开始
    recordOffset := binary.LittleEndian.Uint16(page.Data[slotOffset : slotOffset+2])
    recordLength := binary.LittleEndian.Uint16(page.Data[slotOffset+2 : slotOffset+4])

    keyLen := binary.LittleEndian.Uint16(page.Data[recordOffset : recordOffset+2])
    keyData := page.Data[recordOffset+2 : recordOffset+2+keyLen]
    valueLen := binary.LittleEndian.Uint16(page.Data[recordOffset+2+keyLen : recordOffset+2+keyLen+2])
    valueData := page.Data[recordOffset+2+keyLen+2 : recordOffset+2+keyLen+2+valueLen]

    return keyData, valueData, nil
}

// FindSlotInLeafPage 在叶子页面中查找键的插入位置或存在位置
func FindSlotInLeafPage(page *Page, key KeyType) (int, bool) {
    numEntries := page.GetNumEntries()
    idx := sort.Search(int(numEntries), func(i int) bool {
        k, _, _ := GetRecordFromLeafPage(page, uint16(i))
        return bytes.Compare(k, key) >= 0
    })
    if idx < int(numEntries) {
        k, _, _ := GetRecordFromLeafPage(page, uint16(idx))
        if bytes.Equal(k, key) {
            return idx, true // 键已存在
        }
    }
    return idx, false // 键不存在,idx是插入位置
}

// GetFreeSpaceInLeafPage 计算叶子页面剩余可用空间
func GetFreeSpaceInLeafPage(page *Page) uint16 {
    headerEnd := LeafNodeHeaderSize
    freeSpaceOffset := page.GetFreeSpaceOffset()
    numEntries := page.GetNumEntries()
    return freeSpaceOffset - headerEnd - (numEntries * 4) // 数据区 + 槽数组
}

// InsertRecordIntoLeafPage 将记录插入叶子页面
func InsertRecordIntoLeafPage(page *Page, key KeyType, value ValueType, idx int) error {
    recordLen := uint16(len(key)) + uint16(len(value)) + 4 // 2 for keyLen, 2 for valueLen
    requiredSpace := recordLen + 4 // record data + slot entry

    if GetFreeSpaceInLeafPage(page) < requiredSpace {
        return fmt.Errorf("not enough space in leaf page %d to insert record", page.ID)
    }

    numEntries := page.GetNumEntries()
    freeSpaceOffset := page.GetFreeSpaceOffset()

    // 移动槽数组,为新槽位腾出空间
    for i := numEntries; i > uint16(idx); i-- {
        srcSlotOffset := PageSize - i*4
        dstSlotOffset := PageSize - (i+1)*4
        copy(page.Data[dstSlotOffset:dstSlotOffset+4], page.Data[srcSlotOffset:srcSlotOffset+4])
    }

    // 写入记录数据
    newRecordOffset := freeSpaceOffset - recordLen
    binary.LittleEndian.PutUint16(page.Data[newRecordOffset:newRecordOffset+2], uint16(len(key)))
    copy(page.Data[newRecordOffset+2:newRecordOffset+2+uint16(len(key))], key)
    binary.LittleEndian.PutUint16(page.Data[newRecordOffset+2+uint16(len(key)):newRecordOffset+2+uint16(len(key))+2], uint16(len(value)))
    copy(page.Data[newRecordOffset+2+uint19(len(key))+2:newRecordOffset+2+uint16(len(key))+2+uint16(len(value))], value)

    // 写入槽位
    slotOffset := PageSize - (uint16(idx)+1)*4
    binary.LittleEndian.PutUint16(page.Data[slotOffset:slotOffset+2], newRecordOffset)
    binary.LittleEndian.PutUint16(page.Data[slotOffset+2:slotOffset+4], recordLen)

    page.SetFreeSpaceOffset(newRecordOffset)
    page.SetNumEntries(numEntries + 1)
    page.IsDirty = true
    return nil
}

// DeleteRecordFromLeafPage 从叶子页面删除指定索引的记录
func DeleteRecordFromLeafPage(page *Page, idx int) error {
    numEntries := page.GetNumEntries()
    if idx >= int(numEntries) {
        return fmt.Errorf("record index out of bounds: %d >= %d", idx, numEntries)
    }

    // 读取要删除记录的槽位信息
    slotOffset := PageSize - (uint16(idx)+1)*4
    recordOffset := binary.LittleEndian.Uint16(page.Data[slotOffset : slotOffset+2])
    recordLength := binary.LittleEndian.Uint16(page.Data[slotOffset+2 : slotOffset+4])

    // 移动数据区 (此处不实际移动数据,而是通过调整freeSpaceOffset和重新整理槽数组来实现碎片整理)
    // 最简单的实现是只移动槽数组,等到页面分裂/合并时再进行数据整理

    // 移动槽数组,覆盖被删除的槽位
    for i := uint16(idx); i < numEntries-1; i++ {
        srcSlotOffset := PageSize - (i+2)*4 // 下一个槽位
        dstSlotOffset := PageSize - (i+1)*4 // 当前槽位
        copy(page.Data[dstSlotOffset:dstSlotOffset+4], page.Data[srcSlotOffset:srcSlotOffset+4])
    }
    // 清空最后一个槽位 (可选,但推荐)
    binary.LittleEndian.PutUint32(page.Data[PageSize-numEntries*4:PageSize-(numEntries-1)*4], 0)

    page.SetNumEntries(numEntries - 1)
    page.IsDirty = true
    // 简单的碎片整理:如果删除的记录导致了大量碎片,可能需要进行页面重组
    // 这里我们暂时不实现复杂的碎片整理,只调整freeSpaceOffset
    // 复杂的碎片整理可能需要将所有记录重新排列,并更新槽数组
    // 这里只是将freeSpaceOffset设置为已使用的最高偏移量,而不是真正将数据移动
    // 实际的数据库引擎会有一个后台任务或在分裂/合并时进行整理
    if recordOffset == page.GetFreeSpaceOffset() {
        // 如果删除的是最后一个记录,需要重新计算freeSpaceOffset
        // 这是一个复杂的问题,简单的做法是:只有当页面重组时才真正调整
        // 暂时保持freeSpaceOffset不变,只减少numEntries
    }
    return nil
}

// 示例:查找函数(简化版,不涉及事务)
func (t *BPlusTree) Search(key KeyType) (ValueType, error) {
    t.mu.RLock()
    defer t.mu.RUnlock()

    currentPageID := t.rootPageID
    for {
        page, err := t.bp.FetchPage(currentPageID, true)
        if err != nil {
            return nil, err
        }

        pageType := page.GetPageType()
        if pageType == PageTypeBPlusLeaf {
            // 在叶子节点中查找
            idx, found := FindSlotInLeafPage(page, key)
            t.bp.UnpinPage(page.ID, false)
            if found {
                _, value, _ := GetRecordFromLeafPage(page, uint16(idx))
                return value, nil
            }
            return nil, fmt.Errorf("key not found")
        } else if pageType == PageTypeBPlusInternal {
            // 在内部节点中查找下一个子节点
            // 内部节点布局: [PageHeader][key_len][key_data][child_page_id]... [rightmost_child_page_id]
            // 查找逻辑:找到第一个大于等于key的键,其左侧指针是我们要找的
            // 如果所有键都小于key,则去最右侧子节点

            // 简化:假设内部节点存储 (key, pageID) 对
            // binary.LittleEndian.Uint64(page.Data[InternalNodeHeaderSize + i*(KeyMaxLen + 8) + KeyMaxLen : ...])
            // 实际需要更灵活的存储方式

            // 内部节点查找下一页面的逻辑,此处只是一个占位符
            // 实际需要遍历内部节点中的键和指针
            nextPageID := PageID(0) // 占位符
            numEntries := page.GetNumEntries()
            found := false
            for i := uint16(0); i < numEntries; i++ {
                // 假设内部节点格式为 (key, childID)
                // 从页面数据中读取 key 和 childID
                // keyLen := binary.LittleEndian.Uint16(page.Data[InternalNodeHeaderSize + ...])
                // childID := PageID(binary.LittleEndian.Uint64(page.Data[InternalNodeHeaderSize + ...]))
                // if bytes.Compare(key, readKey) < 0 {
                //    nextPageID = prevChildID
                //    found = true
                //    break
                // }
                // prevChildID = childID
                // 复杂性在于内部节点也需要灵活的键值对存储和查找
            }
            if !found {
                // nextPageID = rightmost_child_page_id
            }

            t.bp.UnpinPage(page.ID, false)
            currentPageID = nextPageID
        } else {
            t.bp.UnpinPage(page.ID, false)
            return nil, fmt.Errorf("invalid page type %d for page %d", pageType, page.ID)
        }
    }
}

注意:B+树的 SearchInsertDelete 操作的实现非常复杂,尤其是内部节点的键值对存储和查找。为了聚焦事务能力,我们在这里提供了叶子节点操作的详细实现,而内部节点的处理则做了简化,只给出了逻辑上的指导。一个完整的 B+ 树需要一个 Node 接口和 InternalNodeLeafNode 具体实现来封装这些逻辑。

2.2 B+ 树操作的核心逻辑

  • 查找 (Search): 从根节点开始,根据键值遍历内部节点,直到到达叶子节点,然后在叶子节点中查找目标键。
  • 插入 (Insert): 找到合适的叶子节点,插入键值对。如果叶子节点空间不足,则需要进行“分裂”(Split)操作,将一半数据移到新页面,并将新页面的最小键和页面 ID 提升到父节点。如果父节点也满,则继续向上分裂,直到根节点。如果根节点分裂,则树的高度增加。
  • 删除 (Delete): 找到合适的叶子节点,删除键值对。如果叶子节点数据量过少(低于某个阈值),则可能需要与兄弟节点进行“合并”(Merge)或“再分配”(Redistribute)操作,以保持树的平衡和空间利用率。这些操作也可能向上级联。

第三章:事务管理 — ACID 的基石

数据库事务是并发控制和故障恢复的基础。它确保数据库操作的原子性、一致性、隔离性和持久性 (ACID)。

3.1 原子性 (Atomicity) 与持久性 (Durability) — WAL

原子性意味着一个事务要么全部成功,要么全部失败。持久性意味着一旦事务提交,其更改就永久保存。这两者通常通过预写日志 (Write-Ahead Logging, WAL) 来实现。

WAL 原理:

  1. 所有数据修改首先写入日志文件,而不是直接写入数据文件。
  2. 日志记录包含足够的信息来重做(Redo)或撤销(Undo)一个操作。
  3. 只有当事务的所有日志记录都写入并刷盘后,事务才能被标记为提交。
  4. 数据页面可以稍后异步地刷回磁盘。

日志记录类型:

  • Update: 记录页面 ID、偏移量、旧值、新值。
  • Insert: 记录页面 ID、插入的记录数据。
  • Delete: 记录页面 ID、删除的记录数据。
  • Page_Split/Page_Merge: 记录涉及的页面 ID 和操作详情。
  • Begin/Commit/Abort: 事务生命周期事件。
  • Checkpoint: 记录当前脏页面和活跃事务的状态,用于加速恢复。

每个日志记录都有一个唯一的 LSN (Log Sequence Number)。页面的头部也存储一个 LSN,表示该页面最后一次被修改时的日志 LSN。

// log.go
package engine

import (
    "encoding/binary"
    "fmt"
    "os"
    "sync"
)

// LogRecordType 日志记录类型
type LogRecordType byte

const (
    LogTypeInvalid LogRecordType = 0x00
    LogTypeBegin   LogRecordType = 0x01
    LogTypeCommit  LogRecordType = 0x02
    LogTypeAbort   LogRecordType = 0x03
    LogTypeInsert  LogRecordType = 0x04
    LogTypeDelete  LogRecordType = 0x05
    LogTypeUpdate  LogRecordType = 0x06
    LogTypePageSplit LogRecordType = 0x07
    LogTypePageMerge LogRecordType = 0x08
    LogTypeCheckpoint LogRecordType = 0x09
)

// BaseLogRecord 基础日志记录结构,所有日志记录都包含这些字段
type BaseLogRecord struct {
    LSN    uint64        // Log Sequence Number
    PrevLSN uint64        // 前一个日志记录的LSN (同一个事务)
    TxID   uint64        // 事务ID
    Type   LogRecordType // 日志记录类型
    Size   uint32        // 整个日志记录的字节大小
}

// LogManager 负责日志的写入和管理
type LogManager struct {
    logFile *os.File
    nextLSN uint64      // 下一个可用的LSN
    offset  int64       // 当前写入文件的偏移量
    mu      sync.Mutex  // 保护日志管理器状态的互斥锁
    buffer  bytes.Buffer // 日志缓冲区
}

// NewLogManager 创建一个新的日志管理器
func NewLogManager(logFilePath string) (*LogManager, error) {
    file, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
    if err != nil {
        return nil, fmt.Errorf("failed to open log file: %w", err)
    }

    fi, err := file.Stat()
    if err != nil {
        return nil, fmt.Errorf("failed to stat log file: %w", err)
    }

    lm := &LogManager{
        logFile: file,
        nextLSN: uint64(fi.Size()), // 初始LSN为文件大小
        offset:  fi.Size(),
        buffer:  bytes.Buffer{},
    }
    // 在启动时需要进行恢复,确定准确的nextLSN,这里简化为文件大小
    // 实际需要Scan整个日志文件来确定最后一个有效的LSN

    return lm, nil
}

// AppendLogRecord 将日志记录写入缓冲区
func (lm *LogManager) AppendLogRecord(record []byte) (uint64, error) {
    lm.mu.Lock()
    defer lm.mu.Unlock()

    currentLSN := lm.nextLSN
    lm.nextLSN += uint64(len(record))

    _, err := lm.buffer.Write(record)
    if err != nil {
        return 0, fmt.Errorf("failed to write log to buffer: %w", err)
    }

    return currentLSN, nil
}

// Flush 将缓冲区中的日志数据刷回磁盘
func (lm *LogManager) Flush() error {
    lm.mu.Lock()
    defer lm.mu.Unlock()

    if lm.buffer.Len() == 0 {
        return nil
    }

    _, err := lm.logFile.Write(lm.buffer.Bytes())
    if err != nil {
        return fmt.Errorf("failed to write log buffer to file: %w", err)
    }
    lm.buffer.Reset() // 清空缓冲区

    err = lm.logFile.Sync() // 确保数据写入物理磁盘
    if err != nil {
        return fmt.Errorf("failed to sync log file: %w", err)
    }
    return nil
}

// Close 关闭日志管理器
func (lm *LogManager) Close() error {
    if err := lm.Flush(); err != nil {
        return fmt.Errorf("failed to flush log before closing: %w", err)
    }
    return lm.logFile.Close()
}

// GetCurrentLSN 获取当前LSN
func (lm *LogManager) GetCurrentLSN() uint64 {
    lm.mu.Lock()
    defer lm.mu.Unlock()
    return lm.nextLSN
}

// ReadLogRecord 读取指定LSN的日志记录
func (lm *LogManager) ReadLogRecord(lsn uint64) ([]byte, error) {
    lm.mu.Lock()
    defer lm.mu.Unlock()

    // 实际需要从文件读取,并解析日志记录的头部来获取完整大小
    // 这里简化为直接读取一个固定大小的记录,实际日志记录是变长的
    buf := make([]byte, 1024) // 假设最大日志记录大小
    n, err := lm.logFile.ReadAt(buf, int64(lsn))
    if err != nil {
        return nil, fmt.Errorf("failed to read log record at LSN %d: %w", lsn, err)
    }
    // 需要从buf中解析出日志记录的实际大小,然后返回正确的数据
    if n < 4 { // 至少要能读到Size字段
        return nil, fmt.Errorf("too small to be a valid log record at LSN %d", lsn)
    }
    recordSize := binary.LittleEndian.Uint32(buf[16:20]) // 假设BaseLogRecord的Size字段在偏移16
    return buf[:recordSize], nil
}

// 示例:InsertLogRecord 结构和序列化
type InsertLogRecord struct {
    BaseLogRecord
    PageID       PageID
    Offset       uint16 // 插入到页面的哪个偏移量
    RecordData   []byte // 完整的记录数据 (KeyLen+KeyData+ValLen+ValData)
}

func (r *InsertLogRecord) Serialize() []byte {
    var buf bytes.Buffer
    // 写入 BaseLogRecord 字段
    binary.Write(&buf, binary.LittleEndian, r.LSN)
    binary.Write(&buf, binary.LittleEndian, r.PrevLSN)
    binary.Write(&buf, binary.LittleEndian, r.TxID)
    buf.WriteByte(byte(r.Type))
    binary.Write(&buf, binary.LittleEndian, r.Size)
    // 写入 InsertLogRecord 独有字段
    binary.Write(&buf, binary.LittleEndian, r.PageID)
    binary.Write(&buf, binary.LittleEndian, r.Offset)
    buf.Write(r.RecordData)
    return buf.Bytes()
}

func DeserializeInsertLogRecord(data []byte) (*InsertLogRecord, error) {
    // 逆序列化逻辑,与Serialize对应
    // ... 复杂,此处省略
    return nil, nil
}

// 计算BaseLogRecord的大小
const BaseLogRecordSize = 8 + 8 + 8 + 1 + 4 // LSN + PrevLSN + TxID + Type + Size

3.2 隔离性 (Isolation) — 并发控制

隔离性确保并发执行的事务互不干扰,每个事务看起来都是独立执行的。这通常通过锁 (Locking)多版本并发控制 (MVCC) 实现。这里我们聚焦于锁。

两阶段锁 (Two-Phase Locking, 2PL):

  1. 增长阶段 (Growing Phase): 事务可以获取锁,但不能释放锁。
  2. 收缩阶段 (Shrinking Phase): 事务可以释放锁,但不能获取锁。
    确保所有事务在提交或中止前,都持有其所需的所有锁,从而避免脏读、不可重复读和幻读。

锁的粒度:

  • 页面锁 (Page Lock): 锁定整个页面,简单但并发度低。
  • 记录锁 (Record Lock): 锁定单个记录,并发度高但管理复杂。

我们将从页面锁开始,因为它们与缓冲区池和页面操作天然契合。

// lock_manager.go
package engine

import (
    "fmt"
    "sync"
)

// LockMode 锁模式
type LockMode int

const (
    LockModeShared    LockMode = iota // 共享锁 (读锁)
    LockModeExclusive                 // 排他锁 (写锁)
)

// LockManager 管理页面锁
type LockManager struct {
    pageLocks map[PageID]*sync.RWMutex // 存储每个页面的读写锁
    mu        sync.Mutex             // 保护pageLocks map的互斥锁
}

// NewLockManager 创建一个新的锁管理器
func NewLockManager() *LockManager {
    return &LockManager{
        pageLocks: make(map[PageID]*sync.RWMutex),
    }
}

// GetPageLock 获取指定页面的锁对象
func (lm *LockManager) getPageLock(pageID PageID) *sync.RWMutex {
    lm.mu.Lock()
    defer lm.mu.Unlock()

    lock, ok := lm.pageLocks[pageID]
    if !ok {
        lock = &sync.RWMutex{}
        lm.pageLocks[pageID] = lock
    }
    return lock
}

// AcquireLock 获取页面锁
func (lm *LockManager) AcquireLock(pageID PageID, mode LockMode) {
    lock := lm.getPageLock(pageID)
    if mode == LockModeShared {
        lock.RLock()
    } else {
        lock.Lock()
    }
}

// ReleaseLock 释放页面锁
func (lm *LockManager) ReleaseLock(pageID PageID, mode LockMode) {
    lock := lm.getPageLock(pageID)
    if mode == LockModeShared {
        lock.RUnlock()
    } else {
        lock.Unlock()
    }
}

3.3 事务管理器 (Transaction Manager)

事务管理器协调所有事务操作,包括启动、提交、回滚,并与日志管理器、锁管理器和缓冲区池交互。

// transaction.go
package engine

import (
    "fmt"
    "sync"
)

// TxID 事务ID
type TxID uint64

// Transaction 事务结构体
type Transaction struct {
    ID          TxID
    State       TxState
    PrevLSN     uint64                // 当前事务上一个操作的LSN
    ModifiedPages map[PageID]*Page     // 事务修改过的页面,用于回滚
    HeldLocks   map[PageID]LockMode   // 事务持有的锁
    mu          sync.Mutex
}

// TxState 事务状态
type TxState int

const (
    TxStateRunning TxState = iota
    TxStateCommitting
    TxStateAborting
    TxStateCommitted
    TxStateAborted
)

// TransactionManager 管理所有事务
type TransactionManager struct {
    bp        *BufferPool
    lm        *LogManager
    lockMgr   *LockManager
    nextTxID  TxID
    activeTxs map[TxID]*Transaction // 活跃事务列表
    mu        sync.Mutex            // 保护nextTxID和activeTxs
}

// NewTransactionManager 创建一个新的事务管理器
func NewTransactionManager(bp *BufferPool, lm *LogManager, lockMgr *LockManager) *TransactionManager {
    return &TransactionManager{
        bp:        bp,
        lm:        lm,
        lockMgr:   lockMgr,
        nextTxID:  1, // 事务ID从1开始
        activeTxs: make(map[TxID]*Transaction),
    }
}

// Begin 启动一个新事务
func (tm *TransactionManager) Begin() (*Transaction, error) {
    tm.mu.Lock()
    defer tm.mu.Unlock()

    txID := tm.nextTxID
    tm.nextTxID++

    tx := &Transaction{
        ID:          txID,
        State:       TxStateRunning,
        PrevLSN:     0, // 初始PrevLSN为0
        ModifiedPages: make(map[PageID]*Page),
        HeldLocks:   make(map[PageID]LockMode),
    }
    tm.activeTxs[txID] = tx

    // 写入Begin日志
    beginRecord := &BaseLogRecord{
        LSN:    tm.lm.GetCurrentLSN(),
        PrevLSN: 0,
        TxID:   uint64(txID),
        Type:   LogTypeBegin,
        Size:   BaseLogRecordSize,
    }
    lsn, err := tm.lm.AppendLogRecord(beginRecord.Serialize())
    if err != nil {
        return nil, fmt.Errorf("failed to write Begin log for Tx %d: %w", txID, err)
    }
    tx.PrevLSN = lsn

    return tx, nil
}

// Commit 提交事务
func (tm *TransactionManager) Commit(tx *Transaction) error {
    tx.mu.Lock()
    if tx.State != TxStateRunning {
        tx.mu.Unlock()
        return fmt.Errorf("transaction %d is not in running state", tx.ID)
    }
    tx.State = TxStateCommitting
    tx.mu.Unlock()

    // 写入Commit日志
    commitRecord := &BaseLogRecord{
        LSN:    tm.lm.GetCurrentLSN(),
        PrevLSN: tx.PrevLSN,
        TxID:   uint64(tx.ID),
        Type:   LogTypeCommit,
        Size:   BaseLogRecordSize,
    }
    _, err := tm.lm.AppendLogRecord(commitRecord.Serialize())
    if err != nil {
        return fmt.Errorf("failed to write Commit log for Tx %d: %w", tx.ID, err)
    }

    // 刷盘日志,确保持久性
    if err := tm.lm.Flush(); err != nil {
        return fmt.Errorf("failed to flush logs for Tx %d commit: %w", tx.ID, err)
    }

    // 释放所有锁
    for pageID, mode := range tx.HeldLocks {
        tm.lockMgr.ReleaseLock(pageID, mode)
    }

    tx.mu.Lock()
    tx.State = TxStateCommitted
    tm.mu.Unlock() // 保护activeTxs

    tm.mu.Lock()
    delete(tm.activeTxs, tx.ID)
    tm.mu.Unlock()

    return nil
}

// Abort 中止事务
func (tm *TransactionManager) Abort(tx *Transaction) error {
    tx.mu.Lock()
    if tx.State != TxStateRunning {
        tx.mu.Unlock()
        return fmt.Errorf("transaction %d is not in running state", tx.ID)
    }
    tx.State = TxStateAborting
    tx.mu.Unlock()

    // 执行Undo操作 (根据日志记录回滚修改)
    // 这是一个复杂的过程,需要从PrevLSN开始反向读取日志并应用Undo操作
    // For simplicity, we skip the detailed Undo implementation here.
    fmt.Printf("Tx %d aborting: Performing Undo operations...n", tx.ID)
    // TODO: Implement actual undo logic by reading log records in reverse LSN order.

    // 写入Abort日志
    abortRecord := &BaseLogRecord{
        LSN:    tm.lm.GetCurrentLSN(),
        PrevLSN: tx.PrevLSN,
        TxID:   uint64(tx.ID),
        Type:   LogTypeAbort,
        Size:   BaseLogRecordSize,
    }
    _, err := tm.lm.AppendLogRecord(abortRecord.Serialize())
    if err != nil {
        return fmt.Errorf("failed to write Abort log for Tx %d: %w", tx.ID, err)
    }

    // 刷盘日志
    if err := tm.lm.Flush(); err != nil {
        return fmt.Errorf("failed to flush logs for Tx %d abort: %w", tx.ID, err)
    }

    // 释放所有锁
    for pageID, mode := range tx.HeldLocks {
        tm.lockMgr.ReleaseLock(pageID, mode)
    }

    tx.mu.Lock()
    tx.State = TxStateAborted
    tx.mu.Unlock()

    tm.mu.Lock()
    delete(tm.activeTxs, tx.ID)
    tm.mu.Unlock()

    return nil
}

// GetPageWithLock 获取页面并根据操作类型加锁
func (tm *TransactionManager) GetPageWithLock(tx *Transaction, pageID PageID, mode LockMode) (*Page, error) {
    // 获取锁
    tm.lockMgr.AcquireLock(pageID, mode)
    tx.mu.Lock()
    tx.HeldLocks[pageID] = mode
    tx.mu.Unlock()

    // 从缓冲区池获取页面
    page, err := tm.bp.FetchPage(pageID, true)
    if err != nil {
        // 获取页面失败,需要释放锁
        tm.lockMgr.ReleaseLock(pageID, mode)
        tx.mu.Lock()
        delete(tx.HeldLocks, pageID)
        tx.mu.Unlock()
        return nil, err
    }
    return page, nil
}

// UnpinPageWithLog 更新页面并解除固定,记录日志
func (tm *TransactionManager) UnpinPageWithLog(tx *Transaction, page *Page, isDirty bool, logRecord []byte) error {
    if isDirty {
        // 记录日志,并更新页面的LSN
        lsn, err := tm.lm.AppendLogRecord(logRecord)
        if err != nil {
            return fmt.Errorf("failed to append log for page %d in Tx %d: %w", page.ID, tx.ID, err)
        }
        page.SetLSN(lsn)
        tx.PrevLSN = lsn // 更新事务的PrevLSN

        // 事务修改过的页面需要标记
        tx.mu.Lock()
        tx.ModifiedPages[page.ID] = page
        tx.mu.Unlock()
    }
    tm.bp.UnpinPage(page.ID, isDirty)
    return nil
}

3.4 恢复管理器 (Recovery Manager)

当数据库崩溃后重启时,恢复管理器会使用 WAL 日志来确保所有已提交的事务都得以持久化 (Redo),所有未提交的事务都得以回滚 (Undo),从而恢复到一致的状态。

ARIES 恢复算法 (简要概述):

  1. 分析阶段 (Analysis Phase): 从最近的检查点开始扫描日志,识别活跃事务和脏页面。
  2. 重做阶段 (Redo Phase): 从日志的某个点开始,重新应用所有修改操作,使所有页面都达到日志记录的最新状态。
  3. 撤销阶段 (Undo Phase): 回滚所有在崩溃时仍活跃的事务的修改。
// recovery_manager.go
package engine

import (
    "fmt"
    "io"
    "os"
)

// RecoveryManager 负责数据库的故障恢复
type RecoveryManager struct {
    bp *BufferPool
    lm *LogManager
    // 其他组件,如事务管理器,用于协调恢复过程
}

// NewRecoveryManager 创建一个新的恢复管理器
func NewRecoveryManager(bp *BufferPool, lm *LogManager) *RecoveryManager {
    return &RecoveryManager{
        bp: bp,
        lm: lm,
    }
}

// Recover 启动恢复过程
func (rm *RecoveryManager) Recover() error {
    fmt.Println("Starting database recovery...")

    // 1. Analysis Phase: 识别活跃事务和脏页面
    // 实际需要扫描日志文件,找到最后一个Checkpoint,并从Checkpoint开始分析
    // 这里简化为从头开始扫描,并跳过细节
    fmt.Println("Analysis Phase: (Simplified) Identify active transactions and dirty pages.")

    // 2. Redo Phase: 重做所有操作
    fmt.Println("Redo Phase: Reapplying all log records.")
    if err := rm.redoAllLogs(); err != nil {
        return fmt.Errorf("redo phase failed: %w", err)
    }

    // 3. Undo Phase: 回滚未提交事务
    fmt.Println("Undo Phase: (Simplified) Rollback uncommitted transactions.")
    // 实际需要根据Analysis Phase的结果,逆序扫描日志进行Undo

    fmt.Println("Recovery complete.")
    return nil
}

// redoAllLogs 示例性地重做所有日志记录
// 实际需要从上次Checkpoint开始,并根据页面的LSN判断是否需要重做
func (rm *RecoveryManager) redoAllLogs() error {
    logFile, err := os.Open(rm.lm.logFile.Name())
    if err != nil {
        return fmt.Errorf("failed to open log file for redo: %w", err)
    }
    defer logFile.Close()

    reader := NewLogReader(logFile) // 假设有一个LogReader来逐条读取日志

    for {
        recordBytes, lsn, err := reader.Next() // 读取下一条日志记录
        if err == io.EOF {
            break
        }
        if err != nil {
            return fmt.Errorf("error reading log for redo: %w", err)
        }

        // 假设我们能反序列化所有日志记录
        baseRecord, err := DeserializeBaseLogRecord(recordBytes) // 假设有这样的函数
        if err != nil {
            return fmt.Errorf("failed to deserialize base log record at LSN %d: %w", lsn, err)
        }

        // 对于数据修改日志,执行Redo操作
        switch baseRecord.Type {
        case LogTypeInsert:
            insertRecord, err := DeserializeInsertLogRecord(recordBytes) // 假设有这样的函数
            if err != nil { /* error handling */ }
            page, err := rm.bp.FetchPage(insertRecord.PageID, true)
            if err != nil { /* error handling */ }

            // 只有当页面的LSN小于日志记录的LSN时才需要Redo
            if page.GetLSN() < insertRecord.LSN {
                // 实际的Redo操作:根据日志记录应用更改
                // 例如:在页面的指定偏移量写入数据
                // copy(page.Data[insertRecord.Offset:], insertRecord.RecordData)
                page.SetLSN(insertRecord.LSN)
                page.IsDirty = true // 标记为脏,稍后刷盘
            }
            rm.bp.UnpinPage(page.ID, true) // Redo操作后页面变为脏
        // 其他LogType,如Update, Delete, PageSplit等,也需要相应的Redo逻辑
        case LogTypeCommit:
            // 标记事务已提交
        case LogTypeAbort:
            // 标记事务已中止
        }
    }

    // 在Redo阶段结束后,需要将所有脏页面刷回磁盘
    rm.bp.FlushAllPages()
    return nil
}

// LogReader 辅助读取日志文件
type LogReader struct {
    file   *os.File
    offset int64
}

func NewLogReader(file *os.File) *LogReader {
    return &LogReader{file: file, offset: 0}
}

// Next 读取下一条日志记录
func (lr *LogReader) Next() ([]byte, uint64, error) {
    currentLSN := uint64(lr.offset)

    // 假设日志记录的格式是 BaseLogRecord + payload
    // 我们需要先读取 BaseLogRecord 的 Size 字段
    headerBuf := make([]byte, BaseLogRecordSize)
    n, err := lr.file.ReadAt(headerBuf, lr.offset)
    if err == io.EOF {
        return nil, 0, io.EOF
    }
    if err != nil {
        return nil, 0, fmt.Errorf("failed to read log record header: %w", err)
    }
    if n < BaseLogRecordSize {
        return nil, 0, fmt.Errorf("partial log record header read: expected %d, got %d", BaseLogRecordSize, n)
    }

    recordSize := binary.LittleEndian.Uint32(headerBuf[BaseLogRecordSize-4 : BaseLogRecordSize]) // BaseLogRecord的Size字段

    fullRecordBuf := make([]byte, recordSize)
    n, err = lr.file.ReadAt(fullRecordBuf, lr.offset)
    if err != nil {
        return nil, 0, fmt.Errorf("failed to read full log record: %w", err)
    }
    if uint32(n) < recordSize {
        return nil, 0, fmt.Errorf("partial log record read: expected %d, got %d", recordSize, n)
    }

    lr.offset += int64(recordSize)
    return fullRecordBuf, currentLSN, nil
}

// 辅助函数:反序列化BaseLogRecord (简化)
func DeserializeBaseLogRecord(data []byte) (*BaseLogRecord, error) {
    if len(data) < BaseLogRecordSize {
        return nil, fmt.Errorf("data too short for BaseLogRecord")
    }
    return &BaseLogRecord{
        LSN:    binary.LittleEndian.Uint64(data[0:8]),
        PrevLSN: binary.LittleEndian.Uint64(data[8:16]),
        TxID:   binary.LittleEndian.Uint64(data[16:24]),
        Type:   LogRecordType(data[24]),
        Size:   binary.LittleEndian.Uint32(data[25:29]), // 假设Type占1字节,Size在25-29
    }, nil
}

第四章:整合与挑战

现在我们有了 B+ 树的基本结构、页面管理器、缓冲区池、日志管理器、锁管理器和事务管理器。如何将它们整合起来,并让 B+ 树的操作具备事务能力呢?

B+ 树的 InsertDelete 操作不再是简单的页面修改,而是需要:

  1. 开始事务: txMgr.Begin()
  2. 获取页面(并加锁): txMgr.GetPageWithLock(tx, pageID, LockModeExclusive)
  3. 记录日志: 在修改页面之前,将旧值和新值(或操作详情)写入日志缓冲区。
  4. 修改页面: 在内存中的页面数据上执行实际的 B+ 树操作。
  5. 更新页面 LSN: 将页面头部 LSN 设置为最新日志记录的 LSN。
  6. 解除页面固定(并标记脏页): txMgr.UnpinPageWithLog(tx, page, true, logRecordBytes)
  7. 提交或中止事务: txMgr.Commit(tx)txMgr.Abort(tx)

这要求 B+ 树的每个操作方法都接受一个 *Transaction 参数。

挑战与进一步考虑:

  • 死锁检测与处理: 2PL 可能导致死锁。需要实现死锁检测(例如,等待图)和死锁恢复(例如,选择一个事务回滚)。
  • 并发性能优化:
    • 锁粒度: 页面锁可能成为瓶颈,需要考虑更细粒度的记录锁。
    • 锁升级/降级: 事务可能在运行时改变对资源的锁定需求。
    • 闩 (Latch) 与锁 (Lock) 的区别: 闩是轻量级、短期的互斥量,用于保护内存中的数据结构(如 B+ 树节点内部结构、缓冲区池的哈希表),而锁是长期持有、保护逻辑数据的。
  • 内存管理: 避免 Go 垃圾回收对性能的影响,可以考虑使用 sync.Pool 预分配页面缓冲区。
  • 磁盘空间管理: 如何高效地分配和回收页面 ID,避免文件碎片。
  • 检查点 (Checkpointing): 定期执行检查点,减少恢复时间。
  • 数据一致性检查: 定期检查 B+ 树的结构是否正确,数据是否一致。
  • 分布式事务: 如果引擎需要扩展到多节点环境,还需要考虑 2PC (Two-Phase Commit) 等分布式事务协议。
  • 错误处理: 健壮的错误处理机制对于数据库引擎至关重要。

结束语

从零开始构建一个具备事务能力的 B+ 树引擎是一项艰巨而充满挑战的任务,但它也带来了对数据库内部机制深刻的理解。我们探讨了物理布局、页面管理、B+ 树结构、预写日志、并发控制和恢复机制,这些都是现代数据库系统不可或缺的组件。这趟旅程远未结束,它仅仅是通往构建高性能、高可靠性数据存储系统道路上的第一步。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注