什么是 ‘Log-structured Streaming’:在 Go 中实现类似 Kafka 的分区日志流处理引擎

什么是 ‘Log-structured Streaming’:在 Go 中实现类似 Kafka 的分区日志流处理引擎

各位同仁,欢迎来到今天的讲座。我们今天将深入探讨一个在现代分布式系统中无处不在但又常常被低估的核心概念:日志结构化流(Log-structured Streaming)。这个概念是许多高性能、高可用分布式系统的基石,其中最著名的莫过于 Apache Kafka。我们将一起揭示其内部工作原理,并通过 Go 语言,逐步构建一个简化版的、类似 Kafka 的分区日志流处理引擎。

一、日志结构化流的核心理念

日志结构化流,顾名思义,其核心是一种数据存储和访问模式,它将所有数据操作视为对一个不可变、只追加(append-only)的日志的追加操作。想象一下一本传统的账本,每一笔交易都被按时间顺序记录下来,一旦写入,就不能修改,只能在末尾添加新的记录。这就是日志结构化流的基本思想。

具体来说,日志结构化流系统通常由以下几个关键特性组成:

  1. 只追加(Append-Only):数据总是被写入日志的末尾。没有就地更新或删除操作。
  2. 不可变性(Immutability):一旦数据被写入日志,它就永远不会被修改。任何“更新”或“删除”操作实际上都是在日志末尾追加一条新的记录,表明某个键的新状态或标记某个键为已删除。
  3. 有序性(Ordering):日志中的所有记录都按照它们被写入的顺序严格排序。这种顺序性对于许多分布式系统中的一致性、事务处理和事件溯源至关重要。
  4. 持久化(Durability):数据被持久化到磁盘上,通常以一种顺序写入的方式,这对于传统机械硬盘和现代 SSD 都非常高效。
  5. 偏移量(Offset):每条记录在日志中都有一个唯一的、递增的逻辑偏移量。消费者通过指定偏移量来读取数据。

这种设计模式带来了巨大的优势:

  • 高性能写入:只追加操作转化为对磁盘的顺序写入,这是最快的 I/O 模式。
  • 简化并发控制:写入是并发安全的,因为它们只在日志末尾操作,减少了锁竞争。
  • 高吞吐量读取:消费者可以批量读取,并且由于数据是有序的,缓存命中率高。
  • 天然的容错性与恢复能力:崩溃恢复变得简单,只需重放日志即可恢复状态。
  • 事件溯源:完整的历史记录都被保留,可以用于审计、回溯和复杂的分析。

Kafka、Pulsar 等现代消息队列和流处理系统都广泛采用了日志结构化存储作为其核心。通过将逻辑日志划分为多个分区(partitions),这些系统实现了水平扩展和高并发。

我们的目标,就是用 Go 语言,从零开始构建一个简化版的、具备核心日志结构化特性的流处理引擎,我们称之为 GoStream

二、GoStream 的核心组件设计

在构建 GoStream 之前,我们首先需要明确其核心组件及其职责。一个分区日志流处理引擎至少需要以下几个关键抽象:

  1. 记录 (Record):这是日志中最小的数据单元。
  2. 日志段 (Log Segment):日志并非无限增长的单个文件。为了管理方便和实现数据保留策略,日志会被切分成多个文件,每个文件就是一个日志段。
  3. 段索引 (Segment Index):为了高效地从日志段中查找记录,我们需要一个索引来映射逻辑偏移量到物理文件位置。
  4. 分区 (Partition):一个分区由一系列按顺序排列的日志段组成,它负责管理这些段的生命周期、写入和读取。
  5. 代理/集群 (Broker):这是 GoStream 的核心服务,它管理多个主题(Topic),每个主题又包含多个分区。它对外提供生产(Produce)和消费(Consume)数据的接口。

下面,我们逐一详细设计这些组件。

2.1 记录 (Record)

每条记录都应该包含必要的元数据和实际数据。为了保持简单,我们定义一个包含键、值、时间戳和偏移量的记录结构。

package gostream

import (
    "encoding/binary"
    "hash/crc32"
    "time"
)

const (
    // RecordHeaderSize 是记录头部固定部分的最小大小
    // CRC32 (4 bytes) + MagicByte (1 byte) + Timestamp (8 bytes) + KeyLength (4 bytes) + ValueLength (4 bytes)
    RecordHeaderSize = 4 + 1 + 8 + 4 + 4
    MagicByte        = 0x1 // 用于版本控制或验证
)

// Record 代表日志中的一条数据记录
type Record struct {
    Offset    uint64    // 逻辑偏移量,在日志中唯一标识这条记录
    Timestamp time.Time // 记录的生产时间
    Key       []byte    // 记录的键
    Value     []byte    // 记录的值
    CRC       uint32    // 用于数据完整性校验
}

// EncodeRecord 将 Record 编码为字节切片,以便写入磁盘
// 格式: CRC32 | MagicByte | Timestamp(ms) | KeyLength | Key | ValueLength | Value
func EncodeRecord(record *Record) ([]byte, error) {
    // 计算大小
    keyLen := len(record.Key)
    valLen := len(record.Value)
    totalSize := RecordHeaderSize + keyLen + valLen

    buf := make([]byte, totalSize)
    offset := 0

    // 预留CRC32位置
    offset += 4

    // MagicByte
    buf[offset] = MagicByte
    offset += 1

    // Timestamp (毫秒)
    binary.BigEndian.PutUint64(buf[offset:offset+8], uint64(record.Timestamp.UnixMilli()))
    offset += 8

    // KeyLength
    binary.BigEndian.PutUint32(buf[offset:offset+4], uint32(keyLen))
    offset += 4

    // Key
    copy(buf[offset:offset+keyLen], record.Key)
    offset += keyLen

    // ValueLength
    binary.BigEndian.PutUint32(buf[offset:offset+4], uint32(valLen))
    offset += 4

    // Value
    copy(buf[offset:offset+valLen], record.Value)

    // 计算CRC32,覆盖 MagicByte 到 Value 的所有数据
    record.CRC = crc32.ChecksumIEEE(buf[4:])
    binary.BigEndian.PutUint32(buf[0:4], record.CRC)

    return buf, nil
}

// DecodeRecord 从字节切片解码 Record
// 返回解码后的 Record 和读取的字节数
func DecodeRecord(data []byte) (*Record, int, error) {
    if len(data) < RecordHeaderSize {
        return nil, 0, io.EOF // 数据不足以形成一个记录头
    }

    offset := 0
    crc := binary.BigEndian.Uint32(data[offset : offset+4])
    offset += 4

    actualCRC := crc32.ChecksumIEEE(data[offset:]) // 验证CRC时,计算从MagicByte开始的校验和
    if crc != actualCRC {
        return nil, 0, fmt.Errorf("CRC mismatch: expected %d, got %d", crc, actualCRC)
    }

    magicByte := data[offset]
    if magicByte != MagicByte {
        return nil, 0, fmt.Errorf("unsupported magic byte: %x", magicByte)
    }
    offset += 1

    timestampMilli := binary.BigEndian.Uint64(data[offset : offset+8])
    offset += 8

    keyLen := binary.BigEndian.Uint32(data[offset : offset+4])
    offset += 4

    if len(data) < offset+int(keyLen) {
        return nil, 0, io.EOF
    }
    key := data[offset : offset+int(keyLen)]
    offset += int(keyLen)

    valLen := binary.BigEndian.Uint32(data[offset : offset+4])
    offset += 4

    if len(data) < offset+int(valLen) {
        return nil, 0, io.EOF
    }
    value := data[offset : offset+int(valLen)]
    offset += int(valLen)

    record := &Record{
        CRC:       crc,
        Timestamp: time.UnixMMilli(int64(timestampMilli)),
        Key:       key,
        Value:     value,
    }

    return record, offset, nil
}

记录格式表:

字段 类型 长度 (字节) 描述
CRC32 uint32 4 整个记录(除 CRC 本身)的校验和
MagicByte byte 1 版本或格式标识
Timestamp uint64 8 记录创建的时间戳 (Unix 毫秒)
KeyLength uint32 4 Key 字段的长度
Key []byte KeyLength 记录的键(实际数据)
ValueLength uint32 4 Value 字段的长度
Value []byte ValueLength 记录的值(实际数据)

这种长度前缀的格式允许我们读取变长的数据(键和值),同时通过 CRC32 保证了数据的完整性。

2.2 日志段 (Log Segment)

日志段是 GoStream 存储数据的基本单位。每个日志段对应磁盘上的一个物理文件,通常有一个最大大小限制。当一个日志段达到其最大大小时,系统会“滚动”到一个新的日志段。

一个日志段文件通常以其起始偏移量 (base offset) 命名。例如,00000000000000000000.log 表示这个段从偏移量 0 开始。

package gostream

import (
    "fmt"
    "io"
    "os"
    "path/filepath"
    "sync"
)

const (
    MaxLogSegmentSize = 1024 * 1024 * 128 // 128MB
    LogFileExtension  = ".log"
    IndexFileExtension = ".index"
)

// LogSegment 代表一个日志段文件,负责读写记录和管理其索引
type LogSegment struct {
    mu          sync.RWMutex
    file        *os.File     // 数据文件句柄
    index       *SegmentIndex // 对应的索引
    baseOffset  uint64       // 该段的起始逻辑偏移量
    nextOffset  uint64       // 下一个可用的逻辑偏移量(当前段的最高偏移量 + 1)
    currentSize int64        // 当前文件大小
    path        string       // 段文件路径
    maxSize     int64        // 最大允许的段文件大小
}

// NewLogSegment 创建或打开一个日志段
// dataDir: 数据存储目录
// baseOffset: 该段的起始偏移量
// maxSize: 段文件最大大小
// recover: 是否在启动时进行恢复 (从文件重建nextOffset和index)
func NewLogSegment(dataDir string, baseOffset uint64, maxSize int64, recover bool) (*LogSegment, error) {
    logFilePath := filepath.Join(dataDir, fmt.Sprintf("%020d%s", baseOffset, LogFileExtension))
    idxFilePath := filepath.Join(dataDir, fmt.Sprintf("%020d%s", baseOffset, IndexFileExtension))

    file, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_CREATE, 0644)
    if err != nil {
        return nil, fmt.Errorf("failed to open log file %s: %w", logFilePath, err)
    }

    stat, err := file.Stat()
    if err != nil {
        file.Close()
        return nil, fmt.Errorf("failed to stat log file %s: %w", logFilePath, err)
    }

    segment := &LogSegment{
        file:        file,
        baseOffset:  baseOffset,
        nextOffset:  baseOffset, // 初始时,下一个偏移量就是baseOffset
        currentSize: stat.Size(),
        path:        logFilePath,
        maxSize:     maxSize,
    }

    segment.index, err = NewSegmentIndex(idxFilePath, baseOffset, recover)
    if err != nil {
        file.Close()
        return nil, fmt.Errorf("failed to create segment index: %w", err)
    }

    if recover && segment.currentSize > 0 {
        if err := segment.recoverNextOffsetAndIndex(); err != nil {
            segment.Close()
            return nil, fmt.Errorf("failed to recover log segment %s: %w", logFilePath, err)
        }
    } else if segment.currentSize == 0 {
        // 新创建的段,nextOffset就是baseOffset,index也是空的
    } else {
        // 如果不recover,但文件有内容,则nextOffset和index需要从文件中读取
        // 实际上NewSegmentIndex的recover参数会处理index
        // nextOffset需要在Append时更新
        // 对于已有的段,nextOffset应该从index中获取最后一条记录的偏移量+1
        if lastIdxEntry, err := segment.index.LastEntry(); err == nil {
            segment.nextOffset = segment.baseOffset + lastIdxEntry.RelativeOffset + 1
        }
    }

    return segment, nil
}

// Append 将一条记录写入日志段。
// 返回这条记录在整个分区中的全局偏移量。
func (s *LogSegment) Append(record *Record) (uint64, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 设置记录的全局偏移量
    record.Offset = s.nextOffset

    // 编码记录
    encodedRecord, err := EncodeRecord(record)
    if err != nil {
        return 0, fmt.Errorf("failed to encode record: %w", err)
    }

    // 写入文件
    n, err := s.file.Write(encodedRecord)
    if err != nil {
        return 0, fmt.Errorf("failed to write record to log file: %w", err)
    }

    // 更新文件大小
    s.currentSize += int64(n)

    // 将相对偏移量和物理位置添加到索引
    // 相对偏移量 = 全局偏移量 - 段的起始偏移量
    relativeOffset := record.Offset - s.baseOffset
    if err := s.index.Append(relativeOffset, s.currentSize-int64(n)); err != nil {
        // 如果索引写入失败,日志文件可能已经写入,这会导致不一致
        // 严格来说,这里需要回滚文件写入或者标记段为损坏
        // 简单起见,这里我们只返回错误,但在实际系统中需要更复杂的处理
        return 0, fmt.Errorf("failed to append to index: %w", err)
    }

    // 更新下一个可用偏移量
    s.nextOffset++

    return record.Offset, nil
}

// Read 从日志段中读取一条记录
// offset: 要读取的记录的全局偏移量
func (s *LogSegment) Read(offset uint64) (*Record, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    if offset < s.baseOffset || offset >= s.nextOffset {
        return nil, fmt.Errorf("offset %d is out of range for segment %d-%d", offset, s.baseOffset, s.nextOffset-1)
    }

    // 计算相对偏移量
    relativeOffset := offset - s.baseOffset

    // 从索引查找物理位置
    pos, err := s.index.Lookup(relativeOffset)
    if err != nil {
        return nil, fmt.Errorf("failed to lookup offset %d in index: %w", offset, err)
    }

    // 从文件中读取数据
    // 为了简单起见,我们假设记录不会跨越文件末尾,并且我们可以一次性读取整个记录
    // 实际情况可能需要更复杂的缓冲和读取逻辑
    // 这里先读取一个足够大的缓冲区,然后解码
    // 更优化的方法是先读取头部,获取长度,再读取完整记录
    // 为了简化,我们读取从pos开始的一大块,然后用DecodeRecord处理
    // 假设最大记录大小不会超过一个合理的值
    maxRecordReadSize := int64(MaxLogSegmentSize / 4) // 假设一条记录不会太大

    // 尝试读取一个大块数据
    buf := make([]byte, maxRecordReadSize)
    n, err := s.file.ReadAt(buf, pos)
    if err != nil && err != io.EOF { // io.EOF 可能是因为读到了文件末尾,但仍然可能有一部分记录
        return nil, fmt.Errorf("failed to read from log file at pos %d: %w", pos, err)
    }
    if n == 0 {
        return nil, fmt.Errorf("no data found at position %d for offset %d", pos, offset)
    }

    // 解码记录
    record, bytesRead, err := DecodeRecord(buf[:n])
    if err != nil {
        return nil, fmt.Errorf("failed to decode record at pos %d for offset %d: %w", pos, offset, err)
    }

    // 验证解码出的记录的偏移量是否匹配
    // DecodeRecord目前没有设置Offset,需要手动设置
    record.Offset = offset

    // 再次验证CRC,虽然DecodeRecord已经做了,但这里可以进一步确保
    expectedCRC := crc32.ChecksumIEEE(buf[4:bytesRead])
    if record.CRC != expectedCRC {
        return nil, fmt.Errorf("CRC mismatch after decoding for offset %d", offset)
    }

    return record, nil
}

// recoverNextOffsetAndIndex 在启动时扫描日志文件,重建nextOffset和索引
func (s *LogSegment) recoverNextOffsetAndIndex() error {
    s.index.Clear() // 清空内存中的旧索引条目
    s.index.Flush() // 清空磁盘上的旧索引文件

    _, err := s.file.Seek(0, io.SeekStart)
    if err != nil {
        return fmt.Errorf("failed to seek to start of log file: %w", err)
    }

    reader := bufio.NewReader(s.file)
    currentPos := int64(0)
    recoveredOffset := s.baseOffset

    for {
        // 尝试读取记录头部的固定部分,以便知道键值长度
        headerBuf := make([]byte, RecordHeaderSize)
        _, err := io.ReadFull(reader, headerBuf)
        if err == io.EOF {
            break // 到达文件末尾
        }
        if err != nil {
            return fmt.Errorf("failed to read record header for recovery at pos %d: %w", currentPos, err)
        }

        // 解析长度信息
        keyLen := binary.BigEndian.Uint32(headerBuf[13:17]) // 1 (Magic) + 8 (Timestamp) + 4 (KeyLen) = 13
        valLen := binary.BigEndian.Uint32(headerBuf[17:21])

        // 计算整个记录的实际大小
        recordSize := RecordHeaderSize + int(keyLen) + int(valLen)

        // 读取剩余的键和值部分
        fullRecordBuf := make([]byte, recordSize)
        copy(fullRecordBuf, headerBuf) // 将已读取的头部复制过去
        _, err = io.ReadFull(reader, fullRecordBuf[RecordHeaderSize:])
        if err == io.EOF {
            break // 到达文件末尾,但可能是一个不完整的记录,我们停止
        }
        if err != nil {
            return fmt.Errorf("failed to read remaining record for recovery at pos %d: %w", currentPos, err)
        }

        // 尝试解码记录以验证完整性 (可选,但推荐)
        _, bytesRead, err := DecodeRecord(fullRecordBuf)
        if err != nil {
            // 如果解码失败,说明记录损坏,停止恢复
            fmt.Printf("Warning: Corrupted record found at position %d, stopping recovery: %vn", currentPos, err)
            break
        }
        if bytesRead != recordSize {
            fmt.Printf("Warning: Inconsistent record size at position %d, stopping recoveryn", currentPos)
            break
        }

        // 添加到索引
        if err := s.index.Append(recoveredOffset-s.baseOffset, currentPos); err != nil {
            return fmt.Errorf("failed to append to index during recovery: %w", err)
        }

        currentPos += int64(recordSize)
        recoveredOffset++
    }

    s.nextOffset = recoveredOffset
    s.currentSize = currentPos // 更新实际文件大小
    fmt.Printf("Segment %s recovered. nextOffset: %d, currentSize: %dn", filepath.Base(s.path), s.nextOffset, s.currentSize)
    return nil
}

// IsFull 检查日志段是否已满
func (s *LogSegment) IsFull() bool {
    s.mu.RLock()
    defer s.mu.RUnlock()
    return s.currentSize >= s.maxSize
}

// Close 关闭日志段的文件句柄和索引
func (s *LogSegment) Close() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if err := s.file.Close(); err != nil {
        return fmt.Errorf("failed to close log file: %w", err)
    }
    if err := s.index.Close(); err != nil {
        return fmt.Errorf("failed to close index file: %w", err)
    }
    return nil
}

// Delete 删除日志段文件和索引文件
func (s *LogSegment) Delete() error {
    s.Close() // 确保文件句柄已关闭
    if err := os.Remove(s.path); err != nil {
        return fmt.Errorf("failed to remove log file %s: %w", s.path, err)
    }
    if err := os.Remove(s.index.path); err != nil {
        return fmt.Errorf("failed to remove index file %s: %w", s.index.path, err)
    }
    return nil
}

// BaseOffset 返回日志段的起始偏移量
func (s *LogSegment) BaseOffset() uint64 {
    s.mu.RLock()
    defer s.mu.RUnlock()
    return s.baseOffset
}

// NextOffset 返回日志段的下一个可用偏移量
func (s *LogSegment) NextOffset() uint64 {
    s.mu.RLock()
    defer s.mu.RUnlock()
    return s.nextOffset
}

2.3 段索引 (Segment Index)

线性扫描日志文件来查找记录是不可接受的。因此,我们需要一个索引来快速将逻辑偏移量映射到文件中的物理位置。对于每个日志段,我们都会有一个对应的索引文件。

段索引存储的是 (相对偏移量, 物理文件位置) 对。相对偏移量是记录的全局偏移量减去该日志段的 baseOffset

package gostream

import (
    "encoding/binary"
    "fmt"
    "io"
    "os"
    "path/filepath"
    "sync"
)

const (
    IndexEntrySize = 12 // 相对偏移量 (4字节) + 物理位置 (8字节)
)

// IndexEntry 代表索引中的一个条目
type IndexEntry struct {
    RelativeOffset uint32 // 相对当前日志段的起始偏移量
    Position       int64  // 记录在日志文件中的物理起始位置
}

// SegmentIndex 负责管理一个日志段的索引文件
type SegmentIndex struct {
    mu          sync.RWMutex
    file        *os.File       // 索引文件句柄
    entries     []IndexEntry   // 内存中的索引条目缓存
    baseOffset  uint64         // 对应的日志段的起始偏移量
    path        string         // 索引文件路径
    currentSize int64          // 当前索引文件大小
}

// NewSegmentIndex 创建或打开一个段索引
// idxFilePath: 索引文件路径
// baseOffset: 对应的日志段的起始偏移量
// recover: 是否在启动时进行恢复 (从文件加载索引到内存)
func NewSegmentIndex(idxFilePath string, baseOffset uint64, recover bool) (*SegmentIndex, error) {
    file, err := os.OpenFile(idxFilePath, os.O_RDWR|os.O_CREATE, 0644)
    if err != nil {
        return nil, fmt.Errorf("failed to open index file %s: %w", idxFilePath, err)
    }

    stat, err := file.Stat()
    if err != nil {
        file.Close()
        return nil, fmt.Errorf("failed to stat index file %s: %w", idxFilePath, err)
    }

    idx := &SegmentIndex{
        file:        file,
        entries:     make([]IndexEntry, 0, 1024), // 预分配一些容量
        baseOffset:  baseOffset,
        path:        idxFilePath,
        currentSize: stat.Size(),
    }

    if recover && idx.currentSize > 0 {
        if err := idx.Recover(); err != nil {
            idx.Close()
            return nil, fmt.Errorf("failed to recover segment index %s: %w", idxFilePath, err)
        }
    }

    return idx, nil
}

// Append 将一个新的索引条目追加到索引中
func (s *SegmentIndex) Append(relativeOffset uint32, position int64) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    entry := IndexEntry{
        RelativeOffset: relativeOffset,
        Position:       position,
    }

    // 编码索引条目
    buf := make([]byte, IndexEntrySize)
    binary.BigEndian.PutUint32(buf[0:4], entry.RelativeOffset)
    binary.BigEndian.PutUint64(buf[4:12], uint64(entry.Position))

    // 写入文件
    _, err := s.file.Write(buf)
    if err != nil {
        return fmt.Errorf("failed to write index entry to file: %w", err)
    }

    // 更新内存缓存
    s.entries = append(s.entries, entry)
    s.currentSize += IndexEntrySize

    return nil
}

// Lookup 通过相对偏移量查找记录的物理位置
// 使用二分查找在内存中的 entries 缓存中进行查找
func (s *SegmentIndex) Lookup(relativeOffset uint64) (int64, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    // 在内存缓存中进行二分查找
    // 找到第一个相对偏移量 >= targetRelativeOffset 的条目
    idx := search(len(s.entries), func(i int) bool {
        return s.entries[i].RelativeOffset >= uint32(relativeOffset)
    })

    if idx < len(s.entries) && s.entries[idx].RelativeOffset == uint32(relativeOffset) {
        // 找到了精确匹配
        return s.entries[idx].Position, nil
    }

    // 如果没有精确匹配,但找到了一个大于目标偏移量的条目,
    // 并且 idx > 0,则说明目标偏移量可能在 entries[idx-1] 和 entries[idx] 之间
    // 考虑到我们的索引是稀疏的(Kafka索引是稀疏的,但我们这里是全量索引),
    // 这里应该能找到精确匹配,除非索引损坏或目标偏移量不存在
    if idx > 0 {
        // 如果是稀疏索引,我们应该返回前一个条目的位置
        // 但我们这里是全量索引,如果没找到精确匹配,就说明不存在
        return 0, fmt.Errorf("relative offset %d not found in index", relativeOffset)
    }

    return 0, fmt.Errorf("relative offset %d not found in index", relativeOffset)
}

// search 是一个辅助函数,用于二分查找
func search(n int, f func(i int) bool) int {
    // Define f(x) for the condition (x >= target)
    // search returns the smallest index i at which f(i) is true.
    // If no such index exists, it returns n.
    i, j := 0, n
    for i < j {
        h := int(uint(i+j) >> 1) // avoid overflow when i+j is big
        if f(h) {
            j = h
        } else {
            i = h + 1
        }
    }
    return i
}

// Recover 从磁盘文件加载索引条目到内存
func (s *SegmentIndex) Recover() error {
    s.mu.Lock()
    defer s.mu.Unlock()

    s.entries = s.entries[:0] // 清空内存缓存

    _, err := s.file.Seek(0, io.SeekStart)
    if err != nil {
        return fmt.Errorf("failed to seek to start of index file: %w", err)
    }

    reader := bufio.NewReader(s.file)
    buf := make([]byte, IndexEntrySize)

    for {
        n, err := io.ReadFull(reader, buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return fmt.Errorf("failed to read index entry during recovery: %w", err)
        }
        if n != IndexEntrySize {
            return fmt.Errorf("incomplete index entry read during recovery, expected %d bytes, got %d", IndexEntrySize, n)
        }

        entry := IndexEntry{
            RelativeOffset: binary.BigEndian.Uint32(buf[0:4]),
            Position:       int64(binary.BigEndian.Uint64(buf[4:12])),
        }
        s.entries = append(s.entries, entry)
    }
    return nil
}

// LastEntry 返回最后一个索引条目
func (s *SegmentIndex) LastEntry() (*IndexEntry, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    if len(s.entries) == 0 {
        return nil, fmt.Errorf("index is empty")
    }
    return &s.entries[len(s.entries)-1], nil
}

// Clear 清空内存中的索引条目
func (s *SegmentIndex) Clear() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.entries = s.entries[:0]
}

// Flush 将内存中的索引数据写入磁盘 (如果需要,例如在Close之前)
// 但我们的Append操作已经直接写入磁盘,所以这里仅用于确保文件刷新
func (s *SegmentIndex) Flush() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.file.Sync() // 确保数据写入磁盘
}

// Close 关闭索引文件句柄
func (s *SegmentIndex) Close() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if err := s.file.Close(); err != nil {
        return fmt.Errorf("failed to close index file: %w", err)
    }
    return nil
}

索引文件格式表:

字段 类型 长度 (字节) 描述
RelativeOffset uint32 4 记录相对于日志段起始的逻辑偏移量
Position uint64 8 记录在日志文件中的物理起始位置

请注意,这里的 Lookup 假设索引是密集的,即每个记录都有一个索引条目。在生产级的 Kafka 中,索引通常是稀疏的,每隔一定数量的记录才有一个索引条目,这可以节省大量内存和磁盘空间。查询时,稀疏索引会找到最接近且小于目标偏移量的条目,然后从该物理位置开始线性扫描。为了简化,我们暂时使用密集索引。

2.4 分区 (Partition)

分区是 GoStream 中数据组织的核心逻辑单元。一个分区包含一系列有序的日志段,负责管理这些段的生命周期(创建、滚动、删除)以及协调记录的写入和读取。

package gostream

import (
    "fmt"
    "io/fs"
    "os"
    "path/filepath"
    "sort"
    "strconv"
    "strings"
    "sync"
)

// Partition 是一个逻辑分区,由多个日志段组成
type Partition struct {
    mu            sync.RWMutex
    id            int                 // 分区ID
    topic         string              // 所属主题名称
    dataDir       string              // 分区数据存储目录
    segments      []*LogSegment       // 有序的日志段列表
    activeSegment *LogSegment         // 当前活动的写入日志段
    maxSegmentSize int64              // 每个日志段的最大大小
    retentionTime time.Duration       // 数据保留时间
    nextOffset    uint64              // 分区中下一个可用的全局偏移量
}

// NewPartition 创建或加载一个分区
func NewPartition(dataDir string, topic string, id int, maxSegmentSize int64, retentionTime time.Duration) (*Partition, error) {
    partitionPath := filepath.Join(dataDir, topic, fmt.Sprintf("partition-%d", id))
    if err := os.MkdirAll(partitionPath, 0755); err != nil {
        return nil, fmt.Errorf("failed to create partition directory %s: %w", partitionPath, err)
    }

    p := &Partition{
        id:            id,
        topic:         topic,
        dataDir:       partitionPath,
        segments:      make([]*LogSegment, 0),
        maxSegmentSize: maxSegmentSize,
        retentionTime: retentionTime,
    }

    if err := p.loadSegments(); err != nil {
        return nil, fmt.Errorf("failed to load segments for partition %s-%d: %w", topic, id, err)
    }

    if len(p.segments) == 0 {
        // 如果没有现有段,则创建一个新的起始段
        if err := p.rollSegment(0); err != nil {
            return nil, fmt.Errorf("failed to create initial log segment for partition %s-%d: %w", topic, id, err)
        }
    } else {
        // 如果有现有段,最后一个段就是活动段
        p.activeSegment = p.segments[len(p.segments)-1]
        p.nextOffset = p.activeSegment.NextOffset()
    }

    return p, nil
}

// loadSegments 从文件系统加载所有日志段
func (p *Partition) loadSegments() error {
    entries, err := os.ReadDir(p.dataDir)
    if err != nil {
        return fmt.Errorf("failed to read partition directory %s: %w", p.dataDir, err)
    }

    var segmentBaseOffsets []uint64
    for _, entry := range entries {
        if !entry.IsDir() && strings.HasSuffix(entry.Name(), LogFileExtension) {
            fileName := strings.TrimSuffix(entry.Name(), LogFileExtension)
            offset, parseErr := strconv.ParseUint(fileName, 10, 64)
            if parseErr == nil {
                segmentBaseOffsets = append(segmentBaseOffsets, offset)
            }
        }
    }

    // 按偏移量升序排序
    sort.Slice(segmentBaseOffsets, func(i, j int) bool {
        return segmentBaseOffsets[i] < segmentBaseOffsets[j]
    })

    for _, baseOffset := range segmentBaseOffsets {
        segment, err := NewLogSegment(p.dataDir, baseOffset, p.maxSegmentSize, true) // Recover=true
        if err != nil {
            return fmt.Errorf("failed to load log segment %d: %w", baseOffset, err)
        }
        p.segments = append(p.segments, segment)
    }
    return nil
}

// Append 将一条记录追加到分区
func (p *Partition) Append(record *Record) (uint64, error) {
    p.mu.Lock()
    defer p.mu.Unlock()

    // 检查当前活动段是否已满
    if p.activeSegment.IsFull() {
        if err := p.rollSegment(p.activeSegment.NextOffset()); err != nil {
            return 0, fmt.Errorf("failed to roll segment: %w", err)
        }
    }

    // 写入记录
    offset, err := p.activeSegment.Append(record)
    if err != nil {
        return 0, fmt.Errorf("failed to append record to active segment: %w", err)
    }

    p.nextOffset = offset + 1 // 更新分区下一个可用偏移量

    return offset, nil
}

// Read 从指定偏移量开始读取记录
func (p *Partition) Read(offset uint64, maxBytes int) ([]*Record, error) {
    p.mu.RLock()
    defer p.mu.RUnlock()

    if offset >= p.nextOffset {
        return nil, fmt.Errorf("read offset %d is beyond current next offset %d", offset, p.nextOffset)
    }

    var records []*Record
    bytesRead := 0

    // 找到包含目标偏移量的日志段
    segmentIdx := sort.Search(len(p.segments), func(i int) bool {
        return p.segments[i].BaseOffset() > offset
    })
    // segmentIdx 现在是第一个 BaseOffset > offset 的段的索引。
    // 所以,实际包含目标偏移量的段是 segmentIdx - 1
    if segmentIdx > 0 {
        segmentIdx--
    } else if segmentIdx == 0 && offset < p.segments[0].BaseOffset() {
        // 目标偏移量在所有段的起始偏移量之前,这不应该发生,除非是新分区
        return nil, fmt.Errorf("offset %d is before the first segment's base offset %d", offset, p.segments[0].BaseOffset())
    }

    // 从找到的段开始读取
    for i := segmentIdx; i < len(p.segments); i++ {
        seg := p.segments[i]
        currentReadOffset := offset

        // 确保从当前段的起始偏移量或指定偏移量中较大的那个开始读
        if currentReadOffset < seg.BaseOffset() {
            currentReadOffset = seg.BaseOffset()
        }

        for currentReadOffset < seg.NextOffset() {
            record, err := seg.Read(currentReadOffset)
            if err != nil {
                // 如果是文件末尾或者无法解码,可能需要跳过或停止
                if err == io.EOF || strings.Contains(err.Error(), "CRC mismatch") {
                    // 遇到损坏记录或文件末尾,停止当前段的读取
                    fmt.Printf("Warning: Failed to read record at offset %d in segment %d: %v. Moving to next segment.n", currentReadOffset, seg.BaseOffset(), err)
                    break
                }
                return nil, fmt.Errorf("failed to read record from segment %d at offset %d: %w", seg.BaseOffset(), currentReadOffset, err)
            }

            // 粗略估计记录大小,如果超过 maxBytes,则停止
            recordSize := RecordHeaderSize + len(record.Key) + len(record.Value)
            if bytesRead+recordSize > maxBytes && len(records) > 0 {
                return records, nil // 达到最大字节限制,返回已读取的记录
            }

            records = append(records, record)
            bytesRead += recordSize
            currentReadOffset++ // 移动到下一条记录
        }

        // 更新全局偏移量为当前段的最后一个读取偏移量 + 1
        offset = seg.NextOffset()
    }

    return records, nil
}

// rollSegment 创建一个新的日志段,并将其设置为活动段
func (p *Partition) rollSegment(newBaseOffset uint64) error {
    if p.activeSegment != nil {
        if err := p.activeSegment.Close(); err != nil {
            return fmt.Errorf("failed to close old active segment %d: %w", p.activeSegment.BaseOffset(), err)
        }
    }

    newSegment, err := NewLogSegment(p.dataDir, newBaseOffset, p.maxSegmentSize, false) // Recover=false for new segment
    if err != nil {
        return fmt.Errorf("failed to create new log segment %d: %w", newBaseOffset, err)
    }

    p.segments = append(p.segments, newSegment)
    p.activeSegment = newSegment
    p.nextOffset = newBaseOffset // 新段的下一个偏移量从其baseOffset开始

    fmt.Printf("Partition %s-%d rolled to new segment with base offset %dn", p.topic, p.id, newBaseOffset)
    return nil
}

// TruncateBefore 删除所有 baseOffset 小于指定偏移量的日志段
// 通常用于数据保留策略
func (p *Partition) TruncateBefore(offset uint64) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    var segmentsToKeep []*LogSegment
    for _, seg := range p.segments {
        if seg.BaseOffset() < offset {
            // 该段需要被删除
            fmt.Printf("Deleting segment %d for partition %s-%d as part of truncation before offset %dn", seg.BaseOffset(), p.topic, p.id, offset)
            if err := seg.Delete(); err != nil {
                fmt.Printf("Warning: Failed to delete segment %d: %vn", seg.BaseOffset(), err)
                // 即使删除失败,也尝试继续,但这是一个问题
            }
        } else {
            segmentsToKeep = append(segmentsToKeep, seg)
        }
    }
    p.segments = segmentsToKeep
    return nil
}

// ApplyRetentionPolicy 根据保留时间删除旧的日志段
func (p *Partition) ApplyRetentionPolicy() error {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.retentionTime <= 0 {
        return nil // 没有设置保留策略
    }

    // 计算要保留的最早时间戳
    cutoffTime := time.Now().Add(-p.retentionTime)
    var earliestOffsetToKeep uint64 = 0

    // 找到第一个应该保留的段的起始偏移量
    for _, seg := range p.segments {
        // 简单起见,我们假设段的创建时间与其baseOffset相关
        // 实际Kafka会通过索引中的时间戳来判断
        // 这里我们简化为:如果一个段的下一个偏移量对应的记录是旧的,那么整个段就可能被删除
        // 更准确的策略是:如果段中所有记录都已过期,则删除
        // 暂时简化为:如果一个段的最后一个记录的时间戳早于cutoffTime,则删除
        // 但这需要读取最后一个记录,效率不高。
        // 更实用的方法是根据段文件的最后修改时间来判断(粗略)
        // 或者在日志段中存储段的创建时间/最大时间戳
        // 这里,我们假设如果这个段的“最老”数据已经过期,那么就可以删除
        // 假设我们有一个方法可以获取段中最早的记录时间戳
        // 为了简化,我们只保留 activeSegment
        if seg == p.activeSegment {
            earliestOffsetToKeep = seg.BaseOffset()
            break
        }
        // 假设我们通过段的创建时间(文件名)来近似判断其年龄
        // 这是一个简化的假设,生产系统会更复杂
        fileInfo, err := os.Stat(seg.path)
        if err == nil && fileInfo.ModTime().Before(cutoffTime) {
            // 这个段可能已过期
            // 暂时不做任何操作,因为我们不能直接删除activeSegment之前的段,除非它的所有记录都过期
            // 真正的Kafka会找到一个可以安全删除的偏移量
            continue
        } else if err != nil {
            fmt.Printf("Warning: Failed to stat segment file %s: %vn", seg.path, err)
        }
        earliestOffsetToKeep = seg.BaseOffset() // 找到第一个不应该被删除的段的BaseOffset
        break
    }

    if earliestOffsetToKeep > 0 {
        return p.TruncateBefore(earliestOffsetToKeep)
    }
    return nil
}

// Close 关闭分区中的所有日志段
func (p *Partition) Close() error {
    p.mu.Lock()
    defer p.mu.Unlock()
    var errs []error
    for _, seg := range p.segments {
        if err := seg.Close(); err != nil {
            errs = append(errs, fmt.Errorf("failed to close segment %d: %w", seg.BaseOffset(), err))
        }
    }
    if len(errs) > 0 {
        return fmt.Errorf("errors closing partition %s-%d: %v", p.topic, p.id, errs)
    }
    return nil
}

// NextOffset 获取分区中下一个可用的全局偏移量
func (p *Partition) NextOffset() uint64 {
    p.mu.RLock()
    defer p.mu.RUnlock()
    return p.nextOffset
}

2.5 代理/集群 (Broker)

BrokerGoStream 的顶层服务,它管理着所有的主题和分区,并对外提供生产和消费数据的 API。

package gostream

import (
    "fmt"
    "os"
    "path/filepath"
    "strconv"
    "strings"
    "sync"
    "time"
)

// Broker 是 GoStream 服务的核心,管理所有主题和分区
type Broker struct {
    mu            sync.RWMutex
    dataPath      string                                   // 数据存储根目录
    topics        map[string]map[int]*Partition            // topic -> partitionID -> Partition
    maxSegmentSize int64
    retentionTime time.Duration
    stopCh        chan struct{}
    wg            sync.WaitGroup
}

// NewBroker 创建并初始化 Broker
func NewBroker(dataPath string, maxSegmentSize int64, retentionTime time.Duration) (*Broker, error) {
    if err := os.MkdirAll(dataPath, 0755); err != nil {
        return nil, fmt.Errorf("failed to create data directory %s: %w", dataPath, err)
    }

    broker := &Broker{
        dataPath:      dataPath,
        topics:        make(map[string]map[int]*Partition),
        maxSegmentSize: maxSegmentSize,
        retentionTime: retentionTime,
        stopCh:        make(chan struct{}),
    }

    if err := broker.loadTopicsAndPartitions(); err != nil {
        return nil, fmt.Errorf("failed to load existing topics and partitions: %w", err)
    }

    return broker, nil
}

// loadTopicsAndPartitions 从文件系统加载所有现有主题和分区
func (b *Broker) loadTopicsAndPartitions() error {
    b.mu.Lock()
    defer b.mu.Unlock()

    topicDirs, err := os.ReadDir(b.dataPath)
    if err != nil {
        return fmt.Errorf("failed to read data directory %s: %w", b.dataPath, err)
    }

    for _, topicEntry := range topicDirs {
        if topicEntry.IsDir() {
            topicName := topicEntry.Name()
            b.topics[topicName] = make(map[int]*Partition)

            partitionDirs, err := os.ReadDir(filepath.Join(b.dataPath, topicName))
            if err != nil {
                return fmt.Errorf("failed to read topic directory %s: %w", topicName, err)
            }

            for _, partitionEntry := range partitionDirs {
                if partitionEntry.IsDir() && strings.HasPrefix(partitionEntry.Name(), "partition-") {
                    idStr := strings.TrimPrefix(partitionEntry.Name(), "partition-")
                    id, parseErr := strconv.Atoi(idStr)
                    if parseErr != nil {
                        fmt.Printf("Warning: Invalid partition directory name %s for topic %s: %vn", partitionEntry.Name(), topicName, parseErr)
                        continue
                    }

                    partition, err := NewPartition(b.dataPath, topicName, id, b.maxSegmentSize, b.retentionTime)
                    if err != nil {
                        return fmt.Errorf("failed to load partition %s-%d: %w", topicName, id, err)
                    }
                    b.topics[topicName][id] = partition
                    fmt.Printf("Loaded partition %s-%dn", topicName, id)
                }
            }
        }
    }
    return nil
}

// CreateTopic 创建一个新的主题,并指定分区数量
func (b *Broker) CreateTopic(topicName string, numPartitions int) error {
    b.mu.Lock()
    defer b.mu.Unlock()

    if _, exists := b.topics[topicName]; exists {
        return fmt.Errorf("topic %s already exists", topicName)
    }

    b.topics[topicName] = make(map[int]*Partition)
    for i := 0; i < numPartitions; i++ {
        partition, err := NewPartition(b.dataPath, topicName, i, b.maxSegmentSize, b.retentionTime)
        if err != nil {
            // 如果创建某个分区失败,可能需要回滚已创建的分区
            return fmt.Errorf("failed to create partition %d for topic %s: %w", i, topicName, err)
        }
        b.topics[topicName][i] = partition
    }
    fmt.Printf("Created topic %s with %d partitionsn", topicName, numPartitions)
    return nil
}

// Produce 将记录生产到指定主题和分区
func (b *Broker) Produce(topic string, partitionID int, key, value []byte) (uint64, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    topicPartitions, ok := b.topics[topic]
    if !ok {
        return 0, fmt.Errorf("topic %s not found", topic)
    }

    partition, ok := topicPartitions[partitionID]
    if !ok {
        return 0, fmt.Errorf("partition %d for topic %s not found", partitionID, topic)
    }

    record := &Record{
        Timestamp: time.Now(),
        Key:       key,
        Value:     value,
    }

    offset, err := partition.Append(record)
    if err != nil {
        return 0, fmt.Errorf("failed to append record to partition %s-%d: %w", topic, partitionID, err)
    }
    return offset, nil
}

// Consume 从指定主题、分区和偏移量开始消费记录
func (b *Broker) Consume(topic string, partitionID int, offset uint64, maxBytes int) ([]*Record, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    topicPartitions, ok := b.topics[topic]
    if !ok {
        return nil, fmt.Errorf("topic %s not found", topic)
    }

    partition, ok := topicPartitions[partitionID]
    if !ok {
        return nil, fmt.Errorf("partition %d for topic %s not found", partitionID, topic)
    }

    records, err := partition.Read(offset, maxBytes)
    if err != nil {
        return nil, fmt.Errorf("failed to read records from partition %s-%d at offset %d: %w", topic, partitionID, offset, err)
    }
    return records, nil
}

// GetPartitionNextOffset 获取指定分区下一个可用的偏移量
func (b *Broker) GetPartitionNextOffset(topic string, partitionID int) (uint64, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    topicPartitions, ok := b.topics[topic]
    if !ok {
        return 0, fmt.Errorf("topic %s not found", topic)
    }

    partition, ok := topicPartitions[partitionID]
    if !ok {
        return 0, fmt.Errorf("partition %d for topic %s not found", partitionID, topic)
    }
    return partition.NextOffset(), nil
}

// Start 启动 Broker 的后台任务,例如数据保留策略
func (b *Broker) Start() {
    b.wg.Add(1)
    go b.runRetentionPolicy()
    fmt.Println("Broker started retention policy goroutine.")
}

// runRetentionPolicy 定期运行数据保留策略
func (b *Broker) runRetentionPolicy() {
    defer b.wg.Done()
    ticker := time.NewTicker(5 * time.Minute) // 每5分钟运行一次
    defer ticker.Stop()

    for {
        select {
        case <-b.stopCh:
            fmt.Println("Retention policy goroutine stopping.")
            return
        case <-ticker.C:
            fmt.Println("Running retention policy...")
            b.mu.RLock() // 只读锁,因为只读分区信息并触发其内部的Truncate
            for topicName, partitions := range b.topics {
                for partitionID, p := range partitions {
                    if err := p.ApplyRetentionPolicy(); err != nil {
                        fmt.Printf("Error applying retention policy for partition %s-%d: %vn", topicName, partitionID, err)
                    }
                }
            }
            b.mu.RUnlock()
            fmt.Println("Retention policy finished.")
        }
    }
}

// Stop 停止 Broker 服务
func (b *Broker) Stop() error {
    fmt.Println("Stopping Broker...")
    close(b.stopCh) // 通知后台任务停止
    b.wg.Wait()     // 等待所有后台任务完成

    b.mu.Lock()
    defer b.mu.Unlock()

    var errs []error
    for _, topicPartitions := range b.topics {
        for _, partition := range topicPartitions {
            if err := partition.Close(); err != nil {
                errs = append(errs, fmt.Errorf("failed to close partition %s-%d: %w", partition.topic, partition.id, err))
            }
        }
    }

    if len(errs) > 0 {
        return fmt.Errorf("errors stopping broker: %v", errs)
    }
    fmt.Println("Broker stopped successfully.")
    return nil
}

三、Go 语言特性的应用与考量

在实现 GoStream 的过程中,我们充分利用了 Go 语言的以下特性:

  • 并发原语 (sync 包)sync.Mutexsync.RWMutex 用于保护共享数据结构,如 LogSegmentPartition 中的文件句柄和内存缓存,以及 Broker 中的主题分区映射。读写锁 (RWMutex) 在读多写少的场景下提供了更好的性能。
  • 文件 I/O (os, io, bufio 包)
    • os.OpenFile 提供了灵活的文件打开模式(读写、创建、追加)。
    • io.ReadFullio.Seek 用于精确控制文件读取。
    • bufio.NewReader 提供了带缓冲的 I/O,提高了读取效率,尤其是在 recoverNextOffsetAndIndex 中逐字节读取记录时。
    • file.Sync() (fsync) 是确保数据持久化到磁盘的关键。
  • 错误处理:Go 惯用的 error 返回值模式被广泛使用,通过返回 fmt.Errorf 封装底层错误,提供了清晰的错误链。
  • 结构体和接口:结构体用于定义数据模型(Record, IndexEntry)和核心组件(LogSegment, SegmentIndex, Partition, Broker)。虽然我们没有显式定义很多接口,但这些结构体的方法签名可以很自然地构成接口,方便未来的扩展和测试。
  • encoding/binary:用于将 Go 的基本数据类型(如 uint32, uint64, int64)序列化和反序列化为字节切片,这是构建自定义二进制协议的关键。大端字节序(binary.BigEndian)的选择保证了跨平台数据的一致性。
  • path/filepath:用于跨平台地构建文件路径,确保程序在不同操作系统上的兼容性。
  • time:处理时间戳和数据保留策略。

持久化与容错性考虑:

  • WAL (Write-Ahead Log) 原则:日志结构化设计本身就是一种 WAL。数据总是先追加到日志文件,然后才被视为“持久化”。
  • fsync:在 LogSegment.AppendSegmentIndex.Append 中,我们没有直接调用 file.Sync()。在生产系统中,这是非常关键的。为了性能,Kafka 通常会批量写入后进行 fsync,或者依赖操作系统的 flush 周期。在我们的简化版中,这可以作为一个优化点。不调用 fsync 意味着在系统崩溃时,最近写入的数据可能丢失。
  • 恢复机制NewLogSegmentNewSegmentIndex 中的 recover 参数以及 LogSegment.recoverNextOffsetAndIndex 方法,展示了从磁盘文件重建内存状态的能力。这是日志结构化系统实现容错的关键。即使程序崩溃,只要日志文件存在,就可以恢复到崩溃前的状态。

潜在的改进点:

  • 稀疏索引:像 Kafka 一样实现稀疏索引,可以显著减少索引文件大小和内存占用。
  • 零拷贝(Zero-Copy):在 Read 操作中,直接将文件内容映射到内存(mmap),可以避免数据在内核空间和用户空间之间的拷贝,提高吞吐量。Go 的 syscall 包提供了 mmap 的能力。
  • 批量写入:生产者可以批量发送多条记录,减少 I/O 操作和 fsync 的频率。
  • 数据压缩:对记录进行压缩可以减少磁盘空间占用和网络传输开销。
  • 更强大的数据保留策略:除了按时间保留,还可以按大小保留,或者基于键的日志压缩(Log Compaction),只保留每个键的最新值。
  • 网络层:目前 Broker 只是一个内存中的对象。要使其成为一个真正的分布式系统,需要添加网络协议(如 gRPC 或自定义 TCP 协议)来处理来自生产者和消费者的请求。

四、实践与运行 GoStream

为了演示 GoStream 的基本功能,我们可以编写一个简单的 main 函数来模拟生产者和消费者。

package main

import (
    "fmt"
    "gostream" // 假设你的gostream包在当前模块下
    "os"
    "time"
)

func main() {
    dataPath := "./gostream_data"
    maxSegmentSize := int64(1 * 1024 * 1024) // 1MB for quick segment rolling
    retentionTime := 10 * time.Minute // 数据保留10分钟

    // 清理旧数据目录 (仅用于测试)
    os.RemoveAll(dataPath)

    // 1. 初始化 Broker
    broker, err := gostream.NewBroker(dataPath, maxSegmentSize, retentionTime)
    if err != nil {
        fmt.Printf("Failed to create broker: %vn", err)
        return
    }
    defer broker.Stop() // 确保Broker在程序退出时关闭

    broker.Start() // 启动后台任务,例如数据保留策略

    // 2. 创建主题
    topic := "my-topic"
    numPartitions := 2
    err = broker.CreateTopic(topic, numPartitions)
    if err != nil {
        fmt.Printf("Failed to create topic: %vn", err)
        return
    }

    fmt.Println("--- Producers ---")
    // 3. 模拟生产者
    for i := 0; i < 500; i++ { // 写入500条记录,触发段滚动
        partitionID := i % numPartitions
        key := []byte(fmt.Sprintf("key-%d", i))
        value := []byte(fmt.Sprintf("hello world from record %d", i))

        offset, err := broker.Produce(topic, partitionID, key, value)
        if err != nil {
            fmt.Printf("Producer failed for record %d: %vn", i, err)
            continue
        }
        fmt.Printf("Produced to topic %s, partition %d, offset %d: %sn", topic, partitionID, offset, string(value))
        time.Sleep(1 * time.Millisecond) // 模拟生产间隔
    }

    fmt.Println("n--- Consumers ---")
    // 4. 模拟消费者
    // 消费者1从分区0的0偏移量开始消费
    consumerOffset0 := uint64(0)
    fmt.Printf("Consumer 1 (Partition 0) starting from offset %dn", consumerOffset0)
    for {
        records, err := broker.Consume(topic, 0, consumerOffset0, 1024*10) // 每次最多读取10KB数据
        if err != nil {
            fmt.Printf("Consumer 1 (Partition 0) error: %vn", err)
            break
        }
        if len(records) == 0 {
            fmt.Printf("Consumer 1 (Partition 0) no new records, waiting...n")
            time.Sleep(1 * time.Second) // 没有新记录,等待一段时间
            // 在实际系统中,这里会进行长轮询或者等待通知
            continue
        }

        for _, record := range records {
            fmt.Printf("Consumer 1 (Partition 0) received: offset=%d, key=%s, value=%sn", record.Offset, string(record.Key), string(record.Value))
            consumerOffset0 = record.Offset + 1
        }
        // 为了演示,只消费一次就退出
        break
    }

    // 消费者2从分区1的当前最新偏移量开始消费
    consumerOffset1, err := broker.GetPartitionNextOffset(topic, 1)
    if err != nil {
        fmt.Printf("Consumer 2 (Partition 1) failed to get next offset: %vn", err)
        return
    }
    fmt.Printf("Consumer 2 (Partition 1) starting from offset %dn", consumerOffset1)
    for {
        records, err := broker.Consume(topic, 1, consumerOffset1, 1024*10)
        if err != nil {
            fmt.Printf("Consumer 2 (Partition 1) error: %vn", err)
            break
        }
        if len(records) == 0 {
            fmt.Printf("Consumer 2 (Partition 1) no new records, waiting...n")
            time.Sleep(1 * time.Second)
            continue
        }

        for _, record := range records {
            fmt.Printf("Consumer 2 (Partition 1) received: offset=%d, key=%s, value=%sn", record.Offset, string(record.Key), string(record.Value))
            consumerOffset1 = record.Offset + 1
        }
        // 为了演示,只消费一次就退出
        break
    }

    fmt.Println("n--- Demonstrating Broker Restart and Recovery ---")
    // 关闭并重新启动 Broker,模拟系统重启
    broker.Stop()
    fmt.Println("Broker stopped. Reinitializing...")

    // 重新创建 Broker,它应该能从磁盘加载所有数据
    broker2, err := gostream.NewBroker(dataPath, maxSegmentSize, retentionTime)
    if err != nil {
        fmt.Printf("Failed to re-create broker: %vn", err)
        return
    }
    defer broker2.Stop()
    broker2.Start()

    // 再次尝试消费,验证数据是否恢复
    fmt.Printf("Consumer after restart (Partition 0) starting from offset %dn", 0)
    recordsAfterRestart, err := broker2.Consume(topic, 0, 0, 1024*100) // 读取所有数据
    if err != nil {
        fmt.Printf("Consumer after restart (Partition 0) error: %vn", err)
        return
    }
    fmt.Printf("Consumer after restart (Partition 0) received %d records.n", len(recordsAfterRestart))
    // 可以进一步验证记录内容是否正确

    fmt.Println("n--- Demonstrating Retention Policy ---")
    fmt.Println("Waiting for retention policy to potentially delete old segments...")
    time.Sleep(15 * time.Minute) // 等待超过 retentionTime

    // 再次启动一个消费者,看旧数据是否被删除
    broker3, err := gostream.NewBroker(dataPath, maxSegmentSize, retentionTime)
    if err != nil {
        fmt.Printf("Failed to re-create broker for retention check: %vn", err)
        return
    }
    defer broker3.Stop()
    broker3.Start()
    recordsAfterRetention, err := broker3.Consume(topic, 0, 0, 1024*100)
    if err != nil {
        fmt.Printf("Consumer after retention (Partition 0) error: %vn", err)
        return
    }
    fmt.Printf("Consumer after retention (Partition 0) received %d records. (Expected fewer if retention worked)n", len(recordsAfterRetention))

    fmt.Println("GoStream demo finished.")
}

五、总结与展望

通过本讲座,我们深入探讨了日志结构化流的核心原理,并使用 Go 语言从头开始实现了一个简化版的 GoStream 引擎。我们设计并实现了记录、日志段、段索引、分区和代理等关键组件,并讨论了 Go 语言在实现此类系统中的优势和注意事项。这个 GoStream 虽然是一个简化模型,但它包含了日志结构化流处理引擎的核心要素,例如顺序写入、索引查找、段滚动、数据持久化与恢复。

构建一个生产级的 Kafka 替代品需要考虑更复杂的分布式协调、复制机制、流量控制、消费者组管理、监控和管理工具等。然而,理解并实现像 GoStream 这样的核心组件,是掌握这些复杂系统基石的关键一步。它不仅能帮助我们更好地理解如 Kafka 这类系统的内部工作,也为我们设计和实现其他高性能、高可用分布式存储和流处理系统提供了宝贵的实践经验。日志结构化设计以其简洁、高效和强大的容错能力,将继续在未来的数据基础设施中扮演核心角色。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注