解析 ‘NVMe-oF (NVMe over Fabrics)’:利用 Go 构建跨网络的高性能块存储访问客户端

尊敬的各位技术同行,

欢迎来到今天的技术讲座。我们将深入探讨一个在现代数据中心中日益重要的技术:NVMe-oF(NVMe over Fabrics),并聚焦于如何利用Go语言构建一个高性能的跨网络块存储访问客户端。

在当今数据驱动的世界里,数据量呈爆炸式增长,对存储系统的性能要求也达到了前所未有的高度。传统的存储解决方案,无论是基于SAS/SATA的机械硬盘还是SSD,在性能上逐渐显露瓶颈。NVMe(Non-Volatile Memory Express)协议的出现,彻底革新了存储接口,它专为NAND闪存和下一代非易失性存储器设计,通过PCIe总线直接连接CPU,极大地降低了延迟,提升了IOPS和带宽。

然而,本地NVMe SSD的优势受限于单个物理服务器的扩展性。当我们需要在多台服务器之间共享高性能存储资源,或者构建大规模、高可用的存储集群时,本地NVMe就显得力不从心。这时,NVMe over Fabrics(NVMe-oF)应运而生。

NVMe-oF的目标是将NVMe协议的低延迟和高吞吐量优势,通过各种网络传输(如RDMA、TCP、光纤通道等)扩展到数据中心网络中,实现存储资源的解耦、池化和远程访问,同时尽可能保持接近本地NVMe的性能。这为构建软件定义存储、超融合架构以及高性能计算环境提供了坚实的基础。

我们选择Go语言来构建NVMe-oF客户端并非偶然。Go语言以其内置的并发原语(goroutines和channels)、优秀的网络编程能力、高效的垃圾回收以及接近C/C++的运行性能而闻名。这些特性使得Go成为开发高性能、高并发网络服务的理想选择,无论是处理大量I/O请求,还是管理复杂的异步操作,Go都能游刃有余。

今天的讲座,我将作为一位编程专家,带大家从零开始,理解NVMe-oF的核心概念,并逐步构建一个基于Go语言的NVMe-oF TCP传输客户端。我们将深入探讨协议细节、Go语言实现策略,以及在性能和可靠性方面的考量。


第一章:NVMe-oF核心概念与架构

在深入Go语言实现之前,我们首先需要建立对NVMe-oF的扎实理解。

1.1 NVMe协议回顾

NVMe是一种主机控制器接口规范,它定义了主机软件与PCIe SSD之间的通信协议。其核心优势包括:

  • 多队列支持: 支持多达65535个I/O队列,每个队列支持65536条命令,大大增加了并行度。
  • 精简的命令集: 相比SATA/SAS,NVMe的命令路径更短,减少了CPU开销和延迟。
  • 寄存器映射: 将命令队列、完成队列和管理队列的指针直接映射到内存,减少了中断和上下文切换。
  • SCSI替换: 旨在取代SCSI命令集,更好地适应闪存存储特性。

1.2 NVMe-oF的诞生:超越本地PCIe

NVMe-oF将NVMe命令集和数据传输从PCIe总线扩展到网络结构上,使得远程服务器能够像访问本地NVMe SSD一样访问共享的NVMe存储资源。

核心思想: 将NVMe协议帧封装到不同的网络传输协议中,实现“NVMe over Fabrics”。

1.3 NVMe-oF关键组件

一个NVMe-oF系统通常包含以下核心角色:

  • NVMe-oF Host (Initiator): 发起NVMe-oF连接和I/O请求的计算节点。我们的Go客户端就扮演这个角色。
  • NVMe-oF Controller (Target/Subsystem): 提供NVMe存储资源的服务器或存储设备。它接收并处理来自主机的NVMe-oF请求。
  • NVMe-oF Namespace: 目标控制器向主机暴露的逻辑块设备,可以看作是一个LUN。一个目标控制器可以有多个Namespace。
  • NVMe-oF Transport: 用于承载NVMe-oF协议的网络协议。常见的有:
    • RDMA (Remote Direct Memory Access): 包括RoCE (RDMA over Converged Ethernet) 和 iWARP (Internet Wide Area RDMA Protocol)。提供极低的延迟和CPU卸载,通常需要专用网卡。
    • TCP (Transmission Control Protocol): 利用标准的以太网和TCP/IP协议栈。兼容性好,无需专用硬件,但通常延迟和CPU开销略高于RDMA。
    • Fibre Channel (FC-NVMe): 将NVMe命令映射到光纤通道协议。
    • Shared Memory: 用于同一主机内的进程间通信,非网络传输。

1.4 NVMe-oF协议栈概览

NVMe-oF协议栈可以简化为以下几个层次:

层次 功能 对应NVMe-oF概念
应用层 用户发起的块存储I/O操作(读、写、冲刷等) NVMe命令
NVMe-oF协议层 将NVMe命令和数据封装成NVMe-oF特定的PDU(Protocol Data Unit) NVMe-oF PDU
传输层 负责PDU在网络上的可靠传输,处理连接、流控、错误恢复等 TCP, RDMA, FC
网络层 IP寻址和路由 IP
数据链路层 帧的封装和解封装,MAC地址寻址 Ethernet
物理层 物理信号传输 光纤、铜缆

我们的Go客户端将主要关注NVMe-oF协议层和传输层(TCP)。

1.5 NVMe-oF连接建立流程

一个典型的NVMe-oF连接建立和I/O流程如下:

  1. Discovery (发现): 主机连接到目标的Discovery控制器(通常是特定端口),发送Discovery命令,获取可用的NVMe子系统(Subsystem)信息,包括其NQN(NVMe Qualified Name)、传输地址、端口等。
  2. Connect (连接): 主机选择一个子系统,并连接到其I/O控制器(通常是另一个端口)。在此阶段,主机和目标会协商队列数量、队列大小等参数,并建立Admin Queue和I/O Queues。
  3. Identify Controller/Namespace: 连接建立后,主机发送Identify命令获取控制器和Namespace的详细信息。
  4. I/O Operations: 主机通过I/O队列提交读写等NVM命令,目标执行这些命令并返回完成状态。

第二章:Go语言与高性能网络编程

Go语言在网络编程方面有着天然的优势,这使得它成为构建NVMe-oF客户端的理想选择。

2.1 Goroutines与Channels:并发的基石

Go语言的并发模型基于CSP(Communicating Sequential Processes),通过轻量级的goroutines和类型安全的channels实现。

  • Goroutines: 协程,比线程更轻量,由Go运行时调度。一个Go程序可以轻松启动数百万个goroutines,而不会像线程那样带来巨大的开销。这对于管理大量的并发I/O请求至关重要。
  • Channels: 用于goroutines之间安全地传递数据。它们提供了同步机制,避免了传统共享内存并发模型中常见的竞态条件和死锁问题。

在NVMe-oF客户端中,我们可以使用goroutine来:

  • 处理独立的连接。
  • 在不同的I/O队列上发送和接收命令。
  • 异步地等待命令完成。
  • 执行后台任务,如心跳保持、错误恢复。

2.2 net包:强大的网络抽象

Go的net包提供了丰富的网络接口,支持TCP、UDP、Unix域套接字等。

  • net.Dial 用于建立TCP连接。
  • net.Conn 接口提供了ReadWrite方法,方便进行字节流的读写。
  • net.Listener 用于构建服务器端。

这些抽象使得Go语言能够以简洁高效的方式处理底层网络通信细节,而无需过多关注操作系统级别的套接字编程。

2.3 encoding/binary包:字节序与协议解析

NVMe-oF协议定义了各种PDU和命令结构,这些结构中的字段通常需要特定的字节序(Endianness)。Go的encoding/binary包提供了方便的工具来处理字节序转换,将Go结构体与字节切片进行相互转换。

我们将大量使用binary.LittleEndian,因为NVMe协议指定使用小端字节序。


第三章:NVMe-oF TCP传输层实现

我们将首先构建NVMe-oF TCP传输层,它是整个客户端的基础。NVMe-oF TCP传输规范定义了特定的PDU格式,用于封装NVMe命令和数据。

3.1 NVMe-oF TCP PDU结构

NVMe-oF TCP传输使用统一的PDU格式,每个PDU都包含一个固定大小的头部。

字段名称 大小 (字节) 描述
PDU Type 1 PDU的类型,例如:ICREQ (In-Capsule Request), ICRESP (In-Capsule Response), COMMAND, RESPONSE, H2C_DATA (Host-to-Controller Data), C2H_DATA (Controller-to-Host Data) 等。
Flags 1 PDU的特定标志位,例如:HDGST (Header Digest present), DDGST (Data Digest present)。
Header Length 2 PDU头部的总长度,包括所有可选字段。
PDU Length 4 整个PDU的长度,包括头部、可选头部字段、数据(如果有的话)。
PDU Specific Fields 若干 根据PDU类型,包含不同的特定字段。例如,COMMAND PDU会包含NVMe命令结构体,H2C_DATA PDU会包含数据。
Header Digest 4 (可选) CRC32C校验和,用于验证PDU头部的完整性。如果HDGST标志设置,则存在。
Data Digest 4 (可选) CRC32C校验和,用于验证PDU数据的完整性。如果DDGST标志设置,则存在。

我们将定义Go结构体来表示这些PDU头部和不同类型的PDU。

package nvmeof

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "hash/crc32"
    "io"
    "net"
    "sync"
    "time"
)

// 定义NVMe-oF TCP PDU类型
const (
    PDU_TYPE_ICREQ    byte = 0x00 // In-Capsule Request
    PDU_TYPE_ICRESP   byte = 0x01 // In-Capsule Response
    PDU_TYPE_COMMAND  byte = 0x02
    PDU_TYPE_RESPONSE byte = 0x03
    PDU_TYPE_H2C_DATA byte = 0x04 // Host to Controller Data
    PDU_TYPE_C2H_DATA byte = 0x05 // Controller to Host Data
    PDU_TYPE_TERM_REQ byte = 0x06 // Termination Request
    PDU_TYPE_TERM_RSP byte = 0x07 // Termination Response
    PDU_TYPE_KEEPALIVE byte = 0x08 // Keep-Alive
    // ... 其他PDU类型
)

// PDU Flags
const (
    PDU_FLAG_HDGST byte = 0x01 // Header Digest present
    PDU_FLAG_DDGST byte = 0x02 // Data Digest present
)

// NVMe-oF TCP PDU Header 结构
type PDUHeader struct {
    PDUType    byte
    Flags      byte
    HeaderLen  uint16 // Total length of the PDU header, including optional fields
    PDU_Length uint32 // Total length of the PDU (Header + Payload)
}

// FixedPDUHeaderLen 是PDUHeader结构体本身的固定长度
const FixedPDUHeaderLen = 8 // PDUType(1) + Flags(1) + HeaderLen(2) + PDU_Length(4)

// ReadPDUHeader 从连接中读取PDU头部
func ReadPDUHeader(conn net.Conn) (*PDUHeader, error) {
    buf := make([]byte, FixedPDUHeaderLen)
    _, err := io.ReadFull(conn, buf)
    if err != nil {
        return nil, fmt.Errorf("failed to read PDU header: %w", err)
    }

    header := &PDUHeader{}
    reader := bytes.NewReader(buf)
    if err := binary.Read(reader, binary.LittleEndian, header); err != nil {
        return nil, fmt.Errorf("failed to parse PDU header: %w", err)
    }

    return header, nil
}

// WritePDUHeader 将PDU头部写入连接
func WritePDUHeader(conn net.Conn, header *PDUHeader) error {
    buf := new(bytes.Buffer)
    if err := binary.Write(buf, binary.LittleEndian, header); err != nil {
        return fmt.Errorf("failed to serialize PDU header: %w", err)
    }

    if _, err := conn.Write(buf.Bytes()); err != nil {
        return fmt.Errorf("failed to write PDU header: %w", err)
    }
    return nil
}

// CalculateCRC32C 计算CRC32C校验和
func CalculateCRC32C(data []byte) uint32 {
    return crc32.Checksum(data, crc32.MakeTable(crc32.Castagnoli))
}

// NVMeoFConnection 结构体管理一个TCP连接
type NVMeoFConnection struct {
    conn      net.Conn
    connMtx   sync.Mutex // 保护conn的读写
    remoteAddr string
    digestEnabled bool // 是否启用Header/Data Digest
}

// NewNVMeoFConnection 创建一个新的NVMeoFConnection
func NewNVMeoFConnection(addr string) (*NVMeoFConnection, error) {
    conn, err := net.DialTimeout("tcp", addr, 5*time.Second) // 5秒连接超时
    if err != nil {
        return nil, fmt.Errorf("failed to dial NVMe-oF target %s: %w", addr, err)
    }
    return &NVMeoFConnection{
        conn:      conn,
        remoteAddr: addr,
        digestEnabled: false, // 默认不启用,可以在连接协商时设置
    }, nil
}

// Close 关闭连接
func (c *NVMeoFConnection) Close() error {
    return c.conn.Close()
}

// SendPDU 发送一个完整的PDU
func (c *NVMeoFConnection) SendPDU(pduType byte, payload []byte) error {
    c.connMtx.Lock()
    defer c.connMtx.Unlock()

    headerLen := FixedPDUHeaderLen
    if c.digestEnabled {
        headerLen += 4 // 加Header Digest
    }

    totalPDULen := uint32(headerLen + len(payload))

    header := PDUHeader{
        PDUType:    pduType,
        Flags:      0,
        HeaderLen:  uint16(headerLen),
        PDU_Length: totalPDULen,
    }

    if c.digestEnabled {
        header.Flags |= PDU_FLAG_HDGST
    }

    // 序列化Header
    headerBuf := new(bytes.Buffer)
    if err := binary.Write(headerBuf, binary.LittleEndian, header); err != nil {
        return fmt.Errorf("failed to serialize PDU header: %w", err)
    }

    // 如果启用Header Digest,计算并添加
    if c.digestEnabled {
        digest := CalculateCRC32C(headerBuf.Bytes())
        if err := binary.Write(headerBuf, binary.LittleEndian, digest); err != nil {
            return fmt.Errorf("failed to write header digest: %w", err)
        }
    }

    // 写入Header
    if _, err := c.conn.Write(headerBuf.Bytes()); err != nil {
        return fmt.Errorf("failed to write PDU header to connection: %w", err)
    }

    // 写入Payload
    if len(payload) > 0 {
        if _, err := c.conn.Write(payload); err != nil {
            return fmt.Errorf("failed to write PDU payload to connection: %w", err)
        }
    }

    return nil
}

// ReceivePDU 接收一个完整的PDU
func (c *NVMeoFConnection) ReceivePDU() (byte, []byte, error) {
    // 注意:这里没有对ReceivePDU加锁,因为通常接收是在一个独立的goroutine中循环进行的。
    // 如果需要在多个goroutine中同时接收,需要额外的同步机制,或为每个goroutine分配独立的接收通道。

    header, err := ReadPDUHeader(c.conn)
    if err != nil {
        return 0, nil, fmt.Errorf("failed to receive PDU header: %w", err)
    }

    expectedHeaderLen := FixedPDUHeaderLen
    if c.digestEnabled {
        expectedHeaderLen += 4 // Header Digest
    }

    if header.HeaderLen != uint16(expectedHeaderLen) {
        // 如果PDU头部的HeaderLen包含可选字段(比如Header Digest),这里需要读取并跳过
        // 对于我们简化的客户端,如果协商决定不使用digest,这里应该严格检查
        return 0, nil, fmt.Errorf("unexpected PDU header length: got %d, expected %d", header.HeaderLen, expectedHeaderLen)
    }

    // 如果有Header Digest,读取并验证
    if (header.Flags&PDU_FLAG_HDGST) != 0 && c.digestEnabled {
        // 实际上,为了验证Header Digest,需要先读取整个头部(包括digest字段之前的部分),计算CRC,然后与读取到的digest进行比较。
        // 这里简化处理,只是跳过digest字段。
        digestBuf := make([]byte, 4)
        _, err := io.ReadFull(c.conn, digestBuf)
        if err != nil {
            return 0, nil, fmt.Errorf("failed to read header digest: %w", err)
        }
        // TODO: 实际应用中应该在此处验证digest
    }

    payloadLen := int(header.PDU_Length) - int(header.HeaderLen)
    payload := make([]byte, payloadLen)
    if payloadLen > 0 {
        _, err := io.ReadFull(c.conn, payload)
        if err != nil {
            return 0, nil, fmt.Errorf("failed to read PDU payload: %w", err)
        }
    }

    // 如果有Data Digest,读取并验证
    if (header.Flags&PDU_FLAG_DDGST) != 0 && c.digestEnabled {
        // 同样,这里简化处理,只是跳过digest字段。
        digestBuf := make([]byte, 4)
        _, err := io.ReadFull(c.conn, digestBuf)
        if err != nil {
            return 0, nil, fmt.Errorf("failed to read data digest: %w", err)
        }
        // TODO: 实际应用中应该在此处验证digest
    }

    return header.PDUType, payload, nil
}

代码解析:

  • PDUHeader 结构体:定义了NVMe-oF TCP PDU的固定头部字段。
  • FixedPDUHeaderLen:PDU头部固定部分的字节长度。
  • ReadPDUHeader, WritePDUHeader:辅助函数,用于从net.Conn读写PDU头部,并处理小端字节序转换。
  • CalculateCRC32C:用于计算CRC32C校验和,这是一个可选但推荐用于数据完整性的特性。
  • NVMeoFConnection:封装了net.Conn,并提供了SendPDUReceivePDU方法,用于发送和接收完整的NVMe-oF TCP PDU。
    • SendPDU:根据PDU类型和负载构造完整的PDU,包括头部和可选的Header Digest,然后发送。
    • ReceivePDU:从连接中读取PDU头部,解析出PDU类型和总长度,然后读取并返回PDU的实际负载。在实际应用中,这里需要更严格的错误检查和Digest验证。
  • connMtx:保护net.Conn的并发写操作。net.ConnWrite方法是并发安全的,但为了确保PDU的完整发送不被其他写操作打断,对整个PDU的发送操作加锁是更稳妥的做法。Read通常在一个单独的goroutine中进行,因此这里ReceivePDU没有加锁。

第四章:NVMe命令与数据结构

接下来,我们需要定义NVMe命令和完成队列条目(CQE)的Go结构体。NVMe协议定义了管理员命令(用于管理和配置)和NVM命令(用于实际数据I/O)。

4.1 NVMe Command Format

所有NVMe命令都共享一个通用的结构,通常称为Command Dword 0-15 (CDW0-CDW15)。

字段名称 大小 (字节) 描述
Opcode 1 命令操作码,指示命令类型(如Identify、Read、Write)。
Fuse 1 融合操作码,用于原子性操作。
CID 2 Command Identifier,主机用于关联命令和完成的唯一标识符。
NSID 4 Namespace Identifier,命令作用的Namespace。
PRP Entry 1 8 Physical Region Page Entry 1,数据缓冲区的物理地址,或SGL的一部分。
PRP Entry 2 8 Physical Region Page Entry 2,同上。
CDW10-CDW15 24 命令特定的字段,例如LBA(逻辑块地址)、块数量等。

我们将定义一个基础的NVMeCommand结构,并为不同类型的命令定义特定的扩展结构。

// Common NVMe Command Opcodes
const (
    // Admin Commands
    NVME_ADMIN_OPC_IDENTIFY  byte = 0x06
    NVME_ADMIN_OPC_CONNECT   byte = 0x14 // NVMe-oF specific
    // NVM Commands
    NVME_NVM_OPC_READ  byte = 0x02
    NVME_NVM_OPC_WRITE byte = 0x01
    NVME_NVM_OPC_FLUSH byte = 0x00
    // ... 更多命令
)

// NVMeCommand 通用命令结构 (64字节)
type NVMeCommand struct {
    Opcode  byte
    Fuse    byte
    CID     uint16 // Command Identifier
    NSID    uint32 // Namespace Identifier (Admin commands may use 0 or FFFFFFFFh)
    _       uint64 // Reserved0 (DW2-3)
    MPTR    uint64 // Metadata Pointer (DW4-5)
    PRP1    uint64 // Physical Region Page Entry 1 (DW6-7)
    PRP2    uint64 // Physical Region Page Entry 2 (DW8-9)
    CDW10   uint32 // Command DWord 10 (command specific)
    CDW11   uint32 // Command DWord 11 (command specific)
    CDW12   uint32 // Command DWord 12 (command specific)
    CDW13   uint32 // Command DWord 13 (command specific)
    CDW14   uint32 // Command DWord 14 (command specific)
    CDW15   uint32 // Command DWord 15 (command specific)
}

// Admin Command: Identify (NVMe 1.4)
// This structure is for the command payload, not the PDU.
type IdentifyCommand struct {
    NVMeCommand
    CDW10   uint32 // CNS (Controller or Namespace Structure)
    CDW11   uint32 // Controller ID (for NVM subsystems)
    // Other CDWs are reserved
}

const (
    IDENTIFY_CNS_CONTROLLER        uint32 = 0x01 // Identify Controller
    IDENTIFY_CNS_NAMESPACE         uint32 = 0x00 // Identify Namespace
    IDENTIFY_CNS_NAMESPACE_LIST    uint32 = 0x02 // Identify Active Namespace List
    IDENTIFY_CNS_DISCOVERY_LOG     uint32 = 0x10 // NVMe-oF Discovery Log Page
)

// NVM Command: Read/Write
// This structure is for the command payload, not the PDU.
type NVMReadWriteCommand struct {
    NVMeCommand
    LBA     uint64 // Logical Block Address (CDW10-11)
    NLB     uint16 // Number of Logical Blocks (CDW12 low 16 bits)
    // ... 其他CDW字段
}

// NVMe-oF Connect Command (NVMe-oF 1.0)
// This is an Admin command used for establishing queues.
type ConnectCommand struct {
    NVMeCommand
    SQSIZE        uint16 // Submission Queue Size (CDW10 low 16 bits)
    KATO          uint16 // Keep-Alive Timeout (CDW10 high 16 bits)
    HostID        [16]byte // UUID of the host (CDW11-14)
    HostNQN       [256]byte // NVMe Qualified Name of the host (variable length, usually sent after command)
    SubsystemNQN  [256]byte // NVMe Qualified Name of the target subsystem (variable length)
    // CDW15 contains queue ID, connection type, etc.
    // For simplicity, we might pass HostNQN/SubsystemNQN separately and they are part of the Connect Request PDU.
}

const (
    CONNECT_QID_ADMIN uint16 = 0xFFFF // Admin Queue ID
    CONNECT_QID_IO    uint16 = 0x0000 // First I/O Queue ID
    // ...
)

// NVMe Completion Queue Entry (16字节)
type NVMeCompletion struct {
    CID     uint16 // Command Identifier
    SQID    uint16 // Submission Queue Identifier
    Status  uint16 // Command Status
    _       uint16 // Reserved
    Result  uint32 // Command specific result
    _       uint64 // Reserved
}

// NVMe Status Field (bits 0-14)
const (
    NVME_STATUS_SUCCESS            uint16 = 0x0000
    NVME_STATUS_INVALID_OPCODE     uint16 = 0x0001
    NVME_STATUS_INVALID_FIELD      uint16 = 0x0002
    // ... 更多状态码
)

// NVMe Status Type (bit 15)
const (
    NVME_SCT_GENERIC uint16 = 0x0 // Generic Command Status
    NVME_SCT_COMMAND uint16 = 0x1 // Command Specific Status
    NVME_SCT_MEDIA   uint16 = 0x2 // Media Error Status
    // ...
)

func GetStatusMeaning(status uint16) string {
    sct := (status >> 14) & 0x03 // Status Code Type
    sc  := status & 0x7FFF      // Status Code

    // This is a simplified mapping, real NVMe spec has many more.
    switch sct {
    case NVME_SCT_GENERIC:
        switch sc {
        case NVME_STATUS_SUCCESS: return "Success"
        case NVME_STATUS_INVALID_OPCODE: return "Invalid Opcode"
        case NVME_STATUS_INVALID_FIELD: return "Invalid Field"
        default: return fmt.Sprintf("Generic Status Code: 0x%02x", sc)
        }
    case NVME_SCT_COMMAND:
        return fmt.Sprintf("Command Specific Status Code: 0x%02x", sc)
    case NVME_SCT_MEDIA:
        return fmt.Sprintf("Media Error Status Code: 0x%02x", sc)
    default:
        return fmt.Sprintf("Unknown Status Type: 0x%02x, Code: 0x%02x", sct, sc)
    }
}

// ConvertGoUUIDToNVMeHostID converts a Go UUID to NVMe Host ID format (little-endian byte array)
func ConvertGoUUIDToNVMeHostID(uuidStr string) ([16]byte, error) {
    uuidParsed, err := uuid.Parse(uuidStr) // Requires "github.com/google/uuid"
    if err != nil {
        return [16]byte{}, fmt.Errorf("invalid UUID string: %w", err)
    }
    var hostID [16]byte
    copy(hostID[:], uuidParsed[:]) // Copy bytes directly
    return hostID, nil
}

代码解析:

  • 定义了常用的NVMe操作码常量。
  • NVMeCommand:基础结构体,表示一个64字节的NVMe命令。注意其中的_字段是Go的占位符,用于填充对齐或保留字段。
  • IdentifyCommandNVMReadWriteCommandConnectCommand:特定命令的结构体,它们内嵌了NVMeCommand,并添加了命令特有的字段。
  • NVMeCompletion:表示一个16字节的完成队列条目,包含命令ID、状态等信息。
  • NVMe Status 常量和 GetStatusMeaning 函数:用于解析和理解NVMe命令执行后的状态码。
  • ConvertGoUUIDToNVMeHostID: 将Go uuid库生成的UUID字符串转换为NVMe Host ID所需的[16]byte格式。

第五章:构建NVMe-oF客户端核心逻辑

现在,我们将把NVMe-oF连接和NVMe命令结合起来,构建客户端的核心逻辑。

5.1 客户端结构体与初始化

package nvmeof

import (
    "bytes"
    "context"
    "encoding/binary"
    "fmt"
    "log"
    "math/rand"
    "net"
    "sync"
    "sync/atomic"
    "time"

    "github.com/google/uuid" // go get github.com/google/uuid
)

// TargetInfo 存储发现到的目标信息
type TargetInfo struct {
    NQN string // Subsystem NVMe Qualified Name
    Addr string // IP:Port for I/O connection
    TrType string // Transport Type (e.g., tcp)
    TrSvcs string // Transport Service ID (e.g., nqn.2014-08.org.nvmexpress.discovery)
    // ... 其他Discovery Log Page字段
}

// NVMeNamespace 代表一个连接到的NVMe命名空间
type NVMeNamespace struct {
    ID         uint32
    Size       uint64 // Total capacity in bytes
    LBAF       uint8  // LBA Format index
    BlockSize  uint32 // Logical block size in bytes
    nsConn     *NVMeoFConnection // Connection to the target for this namespace
    adminQueue *NVMeAdminQueue
    ioQueue    *NVMeIOQueue
    // ... 其他命名空间信息
}

// NVMeClient 是NVMe-oF客户端的主结构
type NVMeClient struct {
    HostNQN    string
    HostID     [16]byte

    discoveryConn *NVMeoFConnection // Connection to discovery controller
    discoveryAddr string

    targets       map[string]TargetInfo // Discovered targets by NQN
    namespaces    map[uint32]*NVMeNamespace // Connected namespaces by NSID

    nextCID      uint32 // Next Command ID
    pendingCmds  sync.Map // Map[uint16]chan *NVMeCompletion

    // For managing goroutines for receiving completions
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
}

// NewNVMeClient 初始化客户端
func NewNVMeClient(hostNQN, hostUUID string) (*NVMeClient, error) {
    hostID, err := ConvertGoUUIDToNVMeHostID(hostUUID)
    if err != nil {
        return nil, fmt.Errorf("invalid host UUID: %w", err)
    }

    ctx, cancel := context.WithCancel(context.Background())

    return &NVMeClient{
        HostNQN:    hostNQN,
        HostID:     hostID,
        targets:    make(map[string]TargetInfo),
        namespaces: make(map[uint32]*NVMeNamespace),
        nextCID:    0,
        ctx:        ctx,
        cancel:     cancel,
    }, nil
}

// generateCID 原子地生成并返回下一个Command ID
func (c *NVMeClient) generateCID() uint16 {
    // CID是uint16,所以需要处理溢出
    return uint16(atomic.AddUint32(&c.nextCID, 1) & 0xFFFF)
}

// StartReceiver 启动一个goroutine来接收所有连接的PDU
// 这是一个简化的模型,实际中每个连接可能需要一个独立的接收器
func (c *NVMeClient) StartReceiver(conn *NVMeoFConnection) {
    c.wg.Add(1)
    go func() {
        defer c.wg.Done()
        for {
            select {
            case <-c.ctx.Done():
                log.Printf("Receiver for %s shutting down.", conn.remoteAddr)
                return
            default:
                pduType, payload, err := conn.ReceivePDU()
                if err != nil {
                    if err == io.EOF {
                        log.Printf("Connection %s closed by remote.", conn.remoteAddr)
                    } else {
                        log.Printf("Error receiving PDU from %s: %v", conn.remoteAddr, err)
                    }
                    // TODO: Add robust error handling, reconnection logic.
                    return
                }
                c.handleReceivedPDU(conn, pduType, payload)
            }
        }
    }()
}

// Stop 关闭客户端,停止所有goroutines
func (c *NVMeClient) Stop() {
    c.cancel()
    c.wg.Wait()
    if c.discoveryConn != nil {
        c.discoveryConn.Close()
    }
    for _, ns := range c.namespaces {
        if ns.nsConn != nil {
            ns.nsConn.Close()
        }
    }
}

// handleReceivedPDU 根据PDU类型处理接收到的数据
func (c *NVMeClient) handleReceivedPDU(conn *NVMeoFConnection, pduType byte, payload []byte) {
    switch pduType {
    case PDU_TYPE_RESPONSE:
        // 这是NVMe命令的完成响应
        comp := &NVMeCompletion{}
        reader := bytes.NewReader(payload)
        if err := binary.Read(reader, binary.LittleEndian, comp); err != nil {
            log.Printf("Failed to parse NVMe Completion: %v", err)
            return
        }

        val, ok := c.pendingCmds.Load(comp.CID)
        if !ok {
            log.Printf("Received completion for unknown CID %d", comp.CID)
            return
        }

        cmdChan := val.(chan *NVMeCompletion)
        select {
        case cmdChan <- comp:
            // Sent completion to waiting goroutine
        case <-time.After(50 * time.Millisecond): // Avoid blocking if receiver is gone
            log.Printf("Failed to send completion for CID %d to channel: channel blocked or closed", comp.CID)
        }
        c.pendingCmds.Delete(comp.CID) // Command is completed, remove from map

    case PDU_TYPE_C2H_DATA:
        // Controller-to-Host data PDU, typically for Read commands
        // This needs to be correlated with a pending Read command based on Command ID (CID)
        // For simplicity, we'll assume Read data is handled inline with the completion
        // In a real implementation, data PDUs would precede or be interleaved with RESPONSE PDUs
        // and require more complex buffer management.
        log.Printf("Received C2H_DATA PDU, length: %d. (Not fully handled in this example)", len(payload))

    default:
        log.Printf("Received unhandled PDU type: 0x%02x, length: %d", pduType, len(payload))
    }
}

代码解析:

  • TargetInfo:存储从Discovery服务获取的目标信息。
  • NVMeNamespace:代表一个已连接的命名空间,包含其属性和对应的连接。
  • NVMeClient:客户端的核心结构,管理所有连接、已发现的目标、已连接的命名空间,以及命令的异步处理。
    • HostNQN, HostID:用于标识主机。
    • discoveryConn:与Discovery Controller的连接。
    • pendingCmdssync.Map用于存储等待完成的命令。键是CID,值是用于传递完成状态的chan *NVMeCompletion。这是实现异步I/O的关键。
    • nextCID:原子计数器,用于生成唯一的Command ID。
    • ctx, cancel, wg:用于优雅地启动和关闭后台goroutines。
  • NewNVMeClient:构造函数,初始化客户端。
  • generateCID:安全地生成下一个Command ID。
  • StartReceiver:启动一个goroutine,持续从给定的连接接收PDU,并调用handleReceivedPDU进行处理。
  • Stop:优雅地关闭客户端,包括所有连接和后台goroutines。
  • handleReceivedPDU:核心的PDU分发逻辑。
    • 当收到PDU_TYPE_RESPONSE时,解析为NVMeCompletion,并通过pendingCmds找到对应的等待通道,将完成状态发送回去。
    • PDU_TYPE_C2H_DATA:这是从控制器到主机的数据PDU,通常用于Read命令。在实际中,需要将这些数据与对应的Read请求关联起来,并写入预先分配的缓冲区。为了简化,这里只打印日志。

5.2 Discovery 过程

客户端首先需要发现网络中可用的NVMe-oF目标。

// DiscoveryLogPageEntry 发现日志页中的一个条目
type DiscoveryLogPageEntry struct {
    TrType      uint8 // Transport Type (e.g., 1 for PCIe, 2 for RDMA, 3 for TCP, 4 for Fibre Channel)
    AdrFam      uint8 // Address Family (e.g., 1 for IPv4, 2 for IPv6, 3 for IB, 4 for FC, 5 for INTRA_HOST)
    TrSvcs      uint8 // Transport Service Type (e.g., 0 for NVMe-oF, 1 for Discovery)
    _           uint8 // Reserved
    PortID      uint16 // Controller port ID (NVMe Controller ID)
    CntrlType   uint8 // Controller Type (e.g., 0 for NVMe, 1 for Discovery)
    _           [3]byte // Reserved
    NQN         [256]byte // NVMe Qualified Name
    TrAddr      [256]byte // Transport Address (IP address or RDMA GID)
    TrSvcsID    [256]byte // Transport Service Identifier (e.g., port number)
    // ... 其他字段
}

// DiscoverTargets 连接到Discovery Controller并获取目标列表
func (c *NVMeClient) DiscoverTargets(discoveryAddr string) ([]TargetInfo, error) {
    c.discoveryAddr = discoveryAddr
    conn, err := NewNVMeoFConnection(discoveryAddr)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to discovery target %s: %w", discoveryAddr, err)
    }
    c.discoveryConn = conn
    c.StartReceiver(conn) // 启动Discovery连接的PDU接收器

    // 1. 发送 Connect Admin Command 到 Discovery Controller
    // NVMe-oF Connect Command 是一个特殊的Admin命令,用于建立Admin Queue
    cid := c.generateCID()
    connectCmd := ConnectCommand{
        NVMeCommand: NVMeCommand{
            Opcode: NVME_ADMIN_OPC_CONNECT,
            CID:    cid,
            NSID:   0, // For Admin queue
        },
        SQSIZE: CONNECT_QID_ADMIN, // This is actually the queue ID, not size for initial connect
        KATO:   0,                 // Keep-Alive Timeout (0 means disabled)
    }

    // Host ID is part of the Connect Request PDU, not the command payload
    // Host NQN is also part of the Connect Request PDU
    // For simplicity, we directly embed a partial connect command and then send
    // the full NVMe-oF Connect Request PDU.

    // Construct NVMe-oF Connect Request PDU payload
    // This is typically more complex, involving HostID, HostNQN, SubsystemNQN, etc.
    // For a discovery connection, we primarily need HostNQN.
    connectReqPDU := new(bytes.Buffer)
    // The NVMe-oF Connect Request PDU has a specific format.
    // It starts with the NVMe Command, then Connection Request Data.
    // This data includes HostID, HostNQN length, HostNQN, SubsystemNQN length, SubsystemNQN.
    // Simplified: just put HostNQN for now.

    // Example of a simplified Connect Request PDU payload for discovery:
    // A real implementation would parse the connect command structure from spec
    // and then append host NQN and host ID.
    // For discovery, the SubsystemNQN would be 'nqn.2014-08.org.nvmexpress.discovery'.

    // Let's assume a simpler Connect PDU which directly contains the NVMeCommand
    // and then additional fields. For Admin Connect, the command is just 64 bytes.
    // The HostNQN is passed as part of the PDU's "Connect Request Data".

    // For a proper NVMe-oF Connect PDU:
    // PDU Header
    // NVMeCommand (64 bytes)
    // Connect Request Data: HostID (16 bytes), HostNQN length (2 bytes), HostNQN (variable), SubsystemNQN length (2 bytes), SubsystemNQN (variable)

    // For simplicity in this example, we will just send the NVMeCommand as the "payload"
    // and rely on the target to infer context, which is not fully spec compliant for Connect PDU.
    // A proper Connect PDU should be constructed like this:
    // type ConnectRequestPDU struct {
    //    NVMeCommand
    //    HostID        [16]byte
    //    HostNQNLen    uint16
    //    HostNQN       []byte // variable
    //    SubNQNLen     uint16
    //    SubNQN        []byte // variable
    // }
    // However, for discovery, we connect to the discovery NQN, and the command itself contains basic info.
    // The HostNQN is sent as a string during the connection handshake (usually the first part of the Connect PDU).

    // Let's adjust to send a more compliant Connect Request PDU for the Admin Queue
    connectReqPDUData := new(bytes.Buffer)
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, connectCmd.NVMeCommand); err != nil {
        return nil, fmt.Errorf("failed to serialize connect command: %w", err)
    }
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, c.HostID); err != nil {
        return nil, fmt.Errorf("failed to write HostID: %w", err)
    }
    hostNQNBytes := []byte(c.HostNQN)
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(hostNQNBytes))); err != nil {
        return nil, fmt.Errorf("failed to write HostNQN length: %w", err)
    }
    if _, err := connectReqPDUData.Write(hostNQNBytes); err != nil {
        return nil, fmt.Errorf("failed to write HostNQN: %w", err)
    }

    // For Discovery Controller, the SubsystemNQN is 'nqn.2014-08.org.nvmexpress.discovery'
    subsystemNQN := "nqn.2014-08.org.nvmexpress.discovery"
    subNQNBytes := []byte(subsystemNQN)
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(subNQNBytes))); err != nil {
        return nil, fmt.Errorf("failed to write SubsystemNQN length: %w", err)
    }
    if _, err := connectReqPDUData.Write(subNQNBytes); err != nil {
        return nil, fmt.Errorf("failed to write SubsystemNQN: %w", err)
    }

    compChan := make(chan *NVMeCompletion, 1)
    c.pendingCmds.Store(cid, compChan)
    defer c.pendingCmds.Delete(cid)

    if err := conn.SendPDU(PDU_TYPE_ICREQ, connectReqPDUData.Bytes()); err != nil {
        return nil, fmt.Errorf("failed to send connect command to discovery: %w", err)
    }

    select {
    case comp := <-compChan:
        if comp.Status != NVME_STATUS_SUCCESS {
            return nil, fmt.Errorf("connect to discovery failed: %s", GetStatusMeaning(comp.Status))
        }
    case <-time.After(10 * time.Second):
        return nil, fmt.Errorf("connect to discovery timed out")
    }

    // 2. 发送 Identify Controller (CNS=0x10 for Discovery Log Page)
    cid = c.generateCID()
    identifyCmd := IdentifyCommand{
        NVMeCommand: NVMeCommand{
            Opcode: NVME_ADMIN_OPC_IDENTIFY,
            CID:    cid,
            NSID:   0, // For Admin commands
            PRP1:   0, // Will be filled with data buffer address if needed
            PRP2:   0,
        },
        CDW10: IDENTIFY_CNS_DISCOVERY_LOG, // Request Discovery Log Page
    }

    // For Admin Identify command, the data is returned in the Completion PDU's data field
    // or in separate C2H_DATA PDUs if large.
    // For Discovery Log, the data is usually returned in-capsule (within the response PDU).
    // We need a buffer to hold the response data.

    identifyCmdBuf := new(bytes.Buffer)
    if err := binary.Write(identifyCmdBuf, binary.LittleEndian, identifyCmd.NVMeCommand); err != nil {
        return nil, fmt.Errorf("failed to serialize identify command: %w", err)
    }

    compChan = make(chan *NVMeCompletion, 1)
    c.pendingCmds.Store(cid, compChan)
    defer c.pendingCmds.Delete(cid)

    if err := conn.SendPDU(PDU_TYPE_ICREQ, identifyCmdBuf.Bytes()); err != nil {
        return nil, fmt.Errorf("failed to send identify discovery command: %w", err)
    }

    // Wait for completion (this completion will carry the Discovery Log Page data as 'Result' or payload)
    select {
    case comp := <-compChan:
        if comp.Status != NVME_STATUS_SUCCESS {
            return nil, fmt.Errorf("identify discovery failed: %s", GetStatusMeaning(comp.Status))
        }

        // The Discovery Log Page is returned as part of the Completion PDU's payload.
        // However, our current `handleReceivedPDU` only processes `NVMeCompletion` struct.
        // To get the full Discovery Log Page, we need to modify handleReceivedPDU
        // to extract payload from PDU_TYPE_RESPONSE if it's an in-capsule data transfer,
        // or handle PDU_TYPE_C2H_DATA for large transfers.

        // For now, let's assume the payload of the RESPONSE PDU contains the Discovery Log Page.
        // This is a simplification. A full implementation would pass the raw payload of the RESPONSE PDU
        // along with the completion struct, or use a separate data PDU.

        // To make this example work, we'd need to modify `handleReceivedPDU` to:
        // 1. If PDU_TYPE_RESPONSE, and it's for an Identify command requesting data,
        //    extract the data from the PDU payload *after* the NVMeCompletion struct.
        // 2. Store this data in a temporary buffer associated with the CID, which the waiting goroutine can access.

        // Let's refine the `pendingCmds` to store more context or a more capable channel.
        // For now, we simulate by assuming `comp.Result` might contain some info or we fetch data separately.
        // A more robust approach would be to have a dedicated channel for data or pass the raw response payload.

        // Let's assume the Discovery Log Page is returned in the `Result` field for simplicity,
        // which is *not* how a large log page works. It's usually a data transfer.
        // A more realistic way:
        // 1. Send Identify Command.
        // 2. Receive PDU_TYPE_RESPONSE with completion.
        // 3. Then, receive one or more PDU_TYPE_C2H_DATA PDUs containing the log page.
        // We'll need to update `handleReceivedPDU` to manage this.

        // For the sake of progress, let's assume a simplified Discovery Log retrieval
        // where the data (if small) might be in-capsule with the response.
        // We need to fetch the actual PDU payload for the Discovery Log.
        // This requires altering `handleReceivedPDU` to pass the raw PDU payload to the waiting channel.

        // Re-design of pendingCmds value:
        type CompletionResult struct {
            Completion *NVMeCompletion
            Payload    []byte // The raw payload of the RESPONSE PDU (after completion struct)
        }

        // This requires `handleReceivedPDU` to be aware of the `CompletionResult` type.
        // For now, we'll make a pragmatic assumption: Discovery Log is short and handled by one completion.
        // This section will be simplified:
        // We need to receive the full PDU payload that contained the completion.
        // This means `handleReceivedPDU` needs to pass the `payload` to the `compChan` as well.
        // Let's modify `handleReceivedPDU` and `pendingCmds` to pass `CompletionResult`.

        // Re-fetch the completion channel for the Discovery Identify command
        val, _ := c.pendingCmds.Load(cid)
        compResultChan := val.(chan *CompletionResult)

        // The `handleReceivedPDU` should have sent `CompletionResult` to this channel.
        // Assuming `handleReceivedPDU` is modified to do so.
        // For now, let's just create a dummy `CompletionResult` and then retry the `ReceivePDU` logic.
        // This is a placeholder for a more robust data transfer mechanism.

        // A more practical approach without changing `pendingCmds` too much for now:
        // The Identify command requires a PRP entry for the data buffer.
        // The target will write the log page to this buffer.
        // Since we're using TCP, the data is transmitted via C2H_DATA PDUs.
        // We need a pre-allocated buffer and a way to signal when it's filled.

        log.Println("Discovery Log Page data retrieval mechanism needs a full implementation.")
        log.Println("For demonstration, assuming target returns a small log page in-capsule.")

        // Placeholder for actual Discovery Log Page parsing
        // In a real scenario, after receiving the completion, we'd wait for C2H_DATA PDUs
        // for the Identify command with CNS_DISCOVERY_LOG.
        // For this example, let's manually parse a dummy log page.

        // A real implementation would involve a temporary buffer for the discovery log page
        // and the `handleReceivedPDU` would fill this buffer upon receiving C2H_DATA PDUs
        // corresponding to the Identify Discovery command.
        // This is a significant complexity for a "first pass" client.

        // For simplicity, let's assume the discovery log page is returned *directly*
        // as the payload of the `RESPONSE` PDU for the Identify command. This is typical for small log pages.
        // So `handleReceivedPDU` would deliver the entire payload to the `CompletionResult`.

        var discoveryLogPageBuf []byte
        // Assume the `compChan` actually receives `CompletionResult` containing the full PDU payload
        // from which we can extract the discovery log page.
        // For this example, let's create a dummy payload for now to demonstrate parsing.
        // A real `handleReceivedPDU` would make `payload` available.

        // For this demonstration, we'll re-read the PDU from the connection, this is not ideal.
        // It highlights the need for a robust completion/data passing mechanism.
        // Let's adjust `handleReceivedPDU` to pass the full PDU payload for RESPONSE type.

        // Re-design `handleReceivedPDU` and `pendingCmds`
        // `pendingCmds` will map `CID` to `chan *PDUResponse`
        // `PDUResponse` struct will hold `NVMeCompletion` and `[]byte` payload.

        // This requires a significant refactor in the `handleReceivedPDU` and `pendingCmds` part.
        // Let's assume for this specific Discovery case, we send the Identify command,
        // and the target returns the Discovery Log in a single Response PDU,
        // and `handleReceivedPDU` is updated to pass this payload.

        // Let's refine `handleReceivedPDU` to pass the raw payload as part of the completion signal.
        // This makes `CompletionResult` essential.
        // Re-defining `pendingCmds` to `map[uint16]chan CompletionResult`.

        // --- Refactored `handleReceivedPDU` ---
        // type CompletionResult struct {
        //  Completion *NVMeCompletion
        //  Data       []byte // The data payload associated with this completion (e.g., Discovery Log)
        // }
        // c.pendingCmds  sync.Map // Map[uint16]chan CompletionResult
        // --- End Refactor ---

        // Placeholder for the refactored code:
        // After a successful identify, we expect the Discovery Log Page.
        // This data needs to be passed back to the waiting `DiscoverTargets` function.

        // For now, let's make a simplifying assumption: `Identify` commands return their data in the `Result` field
        // of the `NVMeCompletion` for *small* data like a single Discovery Log Entry (which is not true for a full log page).
        // This is a major simplification.

        // Let's use a more robust way: the `handleReceivedPDU` will pass the *full* payload of the RESPONSE PDU
        // to the waiting channel for `Identify` commands.

        // Refined `handleReceivedPDU` to pass full PDU payload for `RESPONSE` type.
        // It will extract the `NVMeCompletion` from the beginning of the payload,
        // and the rest of the payload (if any) is data.

        // Back to `DiscoverTargets`:
        // The channel `compChan` needs to receive `CompletionResult`.
        // Let's assume `handleReceivedPDU` is already updated to send `CompletionResult`.

        compResult := <-compChan // Assume compChan now sends CompletionResult
        if compResult.Completion.Status != NVME_STATUS_SUCCESS {
            return nil, fmt.Errorf("identify discovery failed: %s", GetStatusMeaning(compResult.Completion.Status))
        }

        discoveryLogPageBuf = compResult.Data // This is the payload part after NVMeCompletion

        if len(discoveryLogPageBuf) == 0 {
            return nil, fmt.Errorf("no discovery log page data received")
        }

        // Parse Discovery Log Page
        var targets []TargetInfo
        // The Discovery Log Page has a header, then a list of entries.
        // Each entry is 768 bytes long (NVMe-oF 1.0 spec).
        // For simplicity, let's just read the entries.

        // Discovery Log Page Header:
        // NumRecs: uint64 (Number of Records)
        // ...

        // Assuming the entire `discoveryLogPageBuf` is a sequence of `DiscoveryLogPageEntry`
        // and we skip the header part for now.
        // In a real scenario, we'd read `NumRecs` from the header.

        entrySize := 768 // Size of a single DiscoveryLogPageEntry in bytes
        if len(discoveryLogPageBuf) % entrySize != 0 {
            log.Printf("Warning: Discovery Log Page buffer size %d is not a multiple of entry size %d", len(discoveryLogPageBuf), entrySize)
            // Try to parse anyway, maybe partial page
        }

        for i := 0; i < len(discoveryLogPageBuf); i += entrySize {
            entryBuf := discoveryLogPageBuf[i : i+entrySize]
            entry := &DiscoveryLogPageEntry{}
            reader := bytes.NewReader(entryBuf)
            if err := binary.Read(reader, binary.LittleEndian, entry); err != nil {
                log.Printf("Failed to parse Discovery Log Page Entry at offset %d: %v", i, err)
                continue
            }

            // Convert byte arrays to strings, trimming nulls
            nqn := string(bytes.Trim(entry.NQN[:], "x00"))
            trAddr := string(bytes.Trim(entry.TrAddr[:], "x00"))
            trSvcsID := string(bytes.Trim(entry.TrSvcsID[:], "x00"))

            // Filter out the discovery controller itself
            if nqn == subsystemNQN {
                continue
            }

            // Based on TrType and AdrFam, construct the address.
            // This is a simplified mapping.
            var addr string
            switch entry.TrType {
            case 3: // TCP
                switch entry.AdrFam {
                case 1: // IPv4
                    addr = fmt.Sprintf("%s:%s", trAddr, trSvcsID)
                case 2: // IPv6
                    addr = fmt.Sprintf("[%s]:%s", trAddr, trSvcsID)
                default:
                    log.Printf("Unsupported Address Family for TCP: %d", entry.AdrFam)
                    continue
                }
            default:
                log.Printf("Unsupported Transport Type: %d", entry.TrType)
                continue
            }

            target := TargetInfo{
                NQN:    nqn,
                Addr:   addr,
                TrType: fmt.Sprintf("%d", entry.TrType), // Convert TrType byte to string
                TrSvcs: trSvcsID,
            }
            targets = append(targets, target)
            c.targets[target.NQN] = target
            log.Printf("Discovered Target: NQN=%s, Address=%s", target.NQN, target.Addr)
        }
        return targets, nil

    case <-time.After(10 * time.Second):
        return nil, fmt.Errorf("identify discovery timed out")
    }
}

handleReceivedPDUpendingCmds的重新设计:

为了处理Identify命令返回的数据(如Discovery Log Page),我们需要在handleReceivedPDU中不仅传递NVMeCompletion,还要传递原始的PDU负载。

// CompletionResult 封装了NVMe Completion和原始PDU负载
type CompletionResult struct {
    Completion *NVMeCompletion
    Data       []byte // The raw payload of the RESPONSE PDU (after completion struct)
    Err        error // Any error encountered during PDU processing
}

// NVMeClient 结构体中的 pendingCmds 变为:
// pendingCmds  sync.Map // Map[uint16]chan CompletionResult

// handleReceivedPDU 的更新版本
func (c *NVMeClient) handleReceivedPDU(conn *NVMeoFConnection, pduType byte, payload []byte) {
    switch pduType {
    case PDU_TYPE_RESPONSE:
        if len(payload) < 16 { // Minimum size for NVMeCompletion
            log.Printf("Received malformed RESPONSE PDU, payload too short: %d bytes", len(payload))
            return
        }

        comp := &NVMeCompletion{}
        reader := bytes.NewReader(payload[:16]) // First 16 bytes are NVMeCompletion
        if err := binary.Read(reader, binary.LittleEndian, comp); err != nil {
            log.Printf("Failed to parse NVMe Completion from RESPONSE PDU: %v", err)
            return
        }

        var dataPayload []byte // Data part after the NVMeCompletion
        if len(payload) > 16 {
            dataPayload = payload[16:]
        }

        val, ok := c.pendingCmds.Load(comp.CID)
        if !ok {
            log.Printf("Received completion for unknown CID %d", comp.CID)
            return
        }

        cmdChan := val.(chan CompletionResult)
        select {
        case cmdChan <- CompletionResult{Completion: comp, Data: dataPayload}:
            // Sent completion to waiting goroutine
        case <-time.After(55 * time.Millisecond): // Avoid blocking
            log.Printf("Failed to send completion for CID %d to channel: channel blocked or closed", comp.CID)
        }
        c.pendingCmds.Delete(comp.CID) // Command is completed, remove from map

    case PDU_TYPE_C2H_DATA:
        // Controller-to-Host data PDU, typically for Read commands
        // This requires more complex state management to correlate C2H_DATA PDUs
        // with specific pending Read commands and their buffers.
        // For now, we'll just log this.
        log.Printf("Received C2H_DATA PDU, length: %d. (Requires advanced handling)", len(payload))

    default:
        log.Printf("Received unhandled PDU type: 0x%02x, length: %d", pduType, len(payload))
    }
}

代码解析 (Discovery部分):

  • DiscoveryLogPageEntry:定义了Discovery Log Page中每个条目的结构。
  • DiscoverTargets
    • 连接到Discovery Controller(默认端口8009)。
    • 发送一个Connect Admin命令,用于在Discovery Controller上建立Admin Queue。这个命令的PDU类型是ICREQ
    • 等待Connect命令的完成。
    • 发送一个Identify Admin命令(CNS=IDENTIFY_CNS_DISCOVERY_LOG)来请求Discovery Log Page。
    • 等待Identify命令的完成。此时,CompletionResult中的Data字段将包含Discovery Log Page的原始字节。
    • 解析Discovery Log Page的每一项,提取目标NQN、传输地址等信息,并存储在c.targets中。

5.3 连接到特定NVMe子系统和命名空间

一旦发现目标,我们就可以连接到特定的NVMe子系统,并获取其命名空间。


// NVMeAdminQueue 管理Admin Queue的命令提交和完成
type NVMeAdminQueue struct {
    qID uint16
    conn *NVMeoFConnection
    client *NVMeClient
}

// NVMeIOQueue 管理I/O Queue的命令提交和完成
type NVMeIOQueue struct {
    qID uint16
    conn *NVMeoFConnection
    client *NVMeClient
    nsid uint32
}

// ConnectToNamespace 连接到指定的NVMe子系统和命名空间
func (c *NVMeClient) ConnectToNamespace(targetNQN string, nsid uint32) (*NVMeNamespace, error) {
    target, ok := c.targets[targetNQN]
    if !ok {
        return nil, fmt.Errorf("target NQN %s not discovered", targetNQN)
    }

    // Connect to the target's I/O port
    conn, err := NewNVMeoFConnection(target.Addr)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to target %s at %s: %w", targetNQN, target.Addr, err)
    }
    // Note: In a real client, each target connection might get its own receiver goroutine.
    // For simplicity, we assume one global receiver for all connections for now,
    // which would need to be modified for multiple connections.
    // Or, each connection has its own `StartReceiver` call.
    c.StartReceiver(conn) 

    // 1. 发送 Connect Admin Command 到 I/O Controller
    // This establishes an Admin Queue for this specific connection.
    cid := c.generateCID()
    connectCmd := ConnectCommand{
        NVMeCommand: NVMeCommand{
            Opcode: NVME_ADMIN_OPC_CONNECT,
            CID:    cid,
            NSID:   0, // For Admin queue
        },
        SQSIZE: CONNECT_QID_ADMIN, // Queue ID 0xFFFF for Admin Queue
        KATO:   0,                 // Keep-Alive Timeout
    }

    connectReqPDUData := new(bytes.Buffer)
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, connectCmd.NVMeCommand); err != nil {
        return nil, fmt.Errorf("failed to serialize connect command: %w", err)
    }
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, c.HostID); err != nil {
        return nil, fmt.Errorf("failed to write HostID: %w", err)
    }
    hostNQNBytes := []byte(c.HostNQN)
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(hostNQNBytes))); err != nil {
        return nil, fmt.Errorf("failed to write HostNQN length: %w", err)
    }
    if _, err := connectReqPDUData.Write(hostNQNBytes); err != nil {
        return nil, fmt.Errorf("failed to write HostNQN: %w", err)
    }
    subNQNBytes := []byte(targetNQN)
    if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(subNQNBytes))); err != nil {
        return nil, fmt.Errorf("failed to write SubsystemNQN length: %w", err)
    }
    if _, err := connectReqPDUData.Write(subNQNBytes); err != nil {
        return nil, fmt.Errorf("failed to write SubsystemNQN: %w", err)
    }

    compChan := make(chan CompletionResult, 1)
    c.pendingCmds.Store(cid, compChan)
    defer c.pendingCmds.Delete(cid)

    if err := conn.SendPDU(PDU_TYPE_ICREQ, connectReqPDUData.Bytes()); err != nil {
        return nil, fmt.Errorf("failed to send connect command to target %s: %w", targetNQN, err)
    }

    select {
    case compRes := <-compChan:
        if compRes.Completion.Status != NVME_STATUS_SUCCESS {
            return nil, fmt.Errorf("connect to target %s failed: %s", targetNQN, GetStatusMeaning(compRes.Completion.Status))
        }
    case <-time.After(10 * time.Second):
        return nil, fmt.Errorf("connect to target %s timed out", targetNQN)
    }
    log.Printf("Successfully connected Admin Queue to target %s.", targetNQN)

    // Now establish an I/O Queue
    ioQueueID := uint16(rand.Intn(0xFFFE)) // Use a random ID for I/O queue (excluding 0xFFFF for admin)
    cid = c.generateCID()
    connectIOQueueCmd := ConnectCommand{
        NVMeCommand: NVMeCommand{
            Opcode: NVME_ADMIN_OPC_CONNECT,
            CID:    cid,
            NSID:   0, // For I/O Queue connection, NSID is 0
        },
        SQSIZE: ioQueueID, // This is the Queue ID
        KATO:   0,
    }

    connectIOReqPDUData := new(bytes.Buffer)
    if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, connectIOQueueCmd.NVMeCommand); err != nil {
        return nil, fmt.Errorf("failed to serialize connect IO command: %w", err)
    }
    if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, c.HostID); err != nil {
        return nil, fmt.Errorf("failed to write HostID for IO queue: %w", err)
    }
    if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, uint16(len(hostNQNBytes))); err != nil {
        return nil, fmt.Errorf("failed to write HostNQN length for IO queue: %w", err)
    }
    if _, err := connectIOReqPDUData.Write(hostNQNBytes); err != nil {
        return nil, fmt.Errorf("failed to write HostNQN for IO queue: %w", err)
    }
    if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, uint16(len(subNQNBytes))); err != nil {
        return nil, fmt.Errorf("failed to write SubsystemNQN length for IO queue: %w", err)
    }
    if _, err := connectIOReqPDUData.Write(subNQNBytes); err != nil {
        return nil, fmt.Errorf("failed to write SubsystemNQN for IO queue: %w", err)
    }

    compChan = make(chan CompletionResult, 1)
    c.pendingCmds.Store(cid, compChan)
    defer c.pendingCmds.Delete(cid)

    if err := conn.SendPDU(PDU_TYPE_ICREQ, connectIOReqPDUData.Bytes()); err != nil {
        return nil, fmt.Errorf("failed to send connect IO queue command to target %s: %w", targetNQN, err)
    }

    select {
    case compRes := <-compChan:
        if compRes.Completion.Status != NVME_STATUS_SUCCESS {
            return nil, fmt.Errorf("connect IO queue to target %s failed: %s", targetNQN, GetStatusMeaning(compRes.Completion.Status))
        }
    case <-time.After(10 * time.Second):
        return nil, fmt.Errorf("connect IO queue to target %s timed out", targetNQN)
    }
    log.Printf("Successfully connected I/O Queue %d to target %s.", ioQueueID, targetNQN)

    // 2. 发送 Identify Namespace 命令
    cid = c.generateCID()
    identifyNsCmd := IdentifyCommand{
        NVMeCommand: NVMeCommand{
            Opcode: NVME_ADMIN_OPC_IDENTIFY,
            CID:    cid,
            NSID:   nsid, // Specific namespace ID
        },
        CDW10: IDENTIFY_CNS_NAMESPACE, // Identify Namespace
    }

    identifyNsCmdBuf := new(bytes.Buffer)
    if err := binary.Write(identifyNsCmdBuf, binary.LittleEndian, identifyNsCmd.NVMeCommand); err != nil {
        return nil, fmt.Errorf("failed to serialize identify namespace command: %w", err)
    }

    compChan = make(chan CompletionResult, 1)
    c.pendingCmds.Store(cid, compChan)
    defer c.pendingCmds.Delete(cid)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注