尊敬的各位技术同行,
欢迎来到今天的技术讲座。我们将深入探讨一个在现代数据中心中日益重要的技术:NVMe-oF(NVMe over Fabrics),并聚焦于如何利用Go语言构建一个高性能的跨网络块存储访问客户端。
在当今数据驱动的世界里,数据量呈爆炸式增长,对存储系统的性能要求也达到了前所未有的高度。传统的存储解决方案,无论是基于SAS/SATA的机械硬盘还是SSD,在性能上逐渐显露瓶颈。NVMe(Non-Volatile Memory Express)协议的出现,彻底革新了存储接口,它专为NAND闪存和下一代非易失性存储器设计,通过PCIe总线直接连接CPU,极大地降低了延迟,提升了IOPS和带宽。
然而,本地NVMe SSD的优势受限于单个物理服务器的扩展性。当我们需要在多台服务器之间共享高性能存储资源,或者构建大规模、高可用的存储集群时,本地NVMe就显得力不从心。这时,NVMe over Fabrics(NVMe-oF)应运而生。
NVMe-oF的目标是将NVMe协议的低延迟和高吞吐量优势,通过各种网络传输(如RDMA、TCP、光纤通道等)扩展到数据中心网络中,实现存储资源的解耦、池化和远程访问,同时尽可能保持接近本地NVMe的性能。这为构建软件定义存储、超融合架构以及高性能计算环境提供了坚实的基础。
我们选择Go语言来构建NVMe-oF客户端并非偶然。Go语言以其内置的并发原语(goroutines和channels)、优秀的网络编程能力、高效的垃圾回收以及接近C/C++的运行性能而闻名。这些特性使得Go成为开发高性能、高并发网络服务的理想选择,无论是处理大量I/O请求,还是管理复杂的异步操作,Go都能游刃有余。
今天的讲座,我将作为一位编程专家,带大家从零开始,理解NVMe-oF的核心概念,并逐步构建一个基于Go语言的NVMe-oF TCP传输客户端。我们将深入探讨协议细节、Go语言实现策略,以及在性能和可靠性方面的考量。
第一章:NVMe-oF核心概念与架构
在深入Go语言实现之前,我们首先需要建立对NVMe-oF的扎实理解。
1.1 NVMe协议回顾
NVMe是一种主机控制器接口规范,它定义了主机软件与PCIe SSD之间的通信协议。其核心优势包括:
- 多队列支持: 支持多达65535个I/O队列,每个队列支持65536条命令,大大增加了并行度。
- 精简的命令集: 相比SATA/SAS,NVMe的命令路径更短,减少了CPU开销和延迟。
- 寄存器映射: 将命令队列、完成队列和管理队列的指针直接映射到内存,减少了中断和上下文切换。
- SCSI替换: 旨在取代SCSI命令集,更好地适应闪存存储特性。
1.2 NVMe-oF的诞生:超越本地PCIe
NVMe-oF将NVMe命令集和数据传输从PCIe总线扩展到网络结构上,使得远程服务器能够像访问本地NVMe SSD一样访问共享的NVMe存储资源。
核心思想: 将NVMe协议帧封装到不同的网络传输协议中,实现“NVMe over Fabrics”。
1.3 NVMe-oF关键组件
一个NVMe-oF系统通常包含以下核心角色:
- NVMe-oF Host (Initiator): 发起NVMe-oF连接和I/O请求的计算节点。我们的Go客户端就扮演这个角色。
- NVMe-oF Controller (Target/Subsystem): 提供NVMe存储资源的服务器或存储设备。它接收并处理来自主机的NVMe-oF请求。
- NVMe-oF Namespace: 目标控制器向主机暴露的逻辑块设备,可以看作是一个LUN。一个目标控制器可以有多个Namespace。
- NVMe-oF Transport: 用于承载NVMe-oF协议的网络协议。常见的有:
- RDMA (Remote Direct Memory Access): 包括RoCE (RDMA over Converged Ethernet) 和 iWARP (Internet Wide Area RDMA Protocol)。提供极低的延迟和CPU卸载,通常需要专用网卡。
- TCP (Transmission Control Protocol): 利用标准的以太网和TCP/IP协议栈。兼容性好,无需专用硬件,但通常延迟和CPU开销略高于RDMA。
- Fibre Channel (FC-NVMe): 将NVMe命令映射到光纤通道协议。
- Shared Memory: 用于同一主机内的进程间通信,非网络传输。
1.4 NVMe-oF协议栈概览
NVMe-oF协议栈可以简化为以下几个层次:
| 层次 | 功能 | 对应NVMe-oF概念 |
|---|---|---|
| 应用层 | 用户发起的块存储I/O操作(读、写、冲刷等) | NVMe命令 |
| NVMe-oF协议层 | 将NVMe命令和数据封装成NVMe-oF特定的PDU(Protocol Data Unit) | NVMe-oF PDU |
| 传输层 | 负责PDU在网络上的可靠传输,处理连接、流控、错误恢复等 | TCP, RDMA, FC |
| 网络层 | IP寻址和路由 | IP |
| 数据链路层 | 帧的封装和解封装,MAC地址寻址 | Ethernet |
| 物理层 | 物理信号传输 | 光纤、铜缆 |
我们的Go客户端将主要关注NVMe-oF协议层和传输层(TCP)。
1.5 NVMe-oF连接建立流程
一个典型的NVMe-oF连接建立和I/O流程如下:
- Discovery (发现): 主机连接到目标的Discovery控制器(通常是特定端口),发送Discovery命令,获取可用的NVMe子系统(Subsystem)信息,包括其NQN(NVMe Qualified Name)、传输地址、端口等。
- Connect (连接): 主机选择一个子系统,并连接到其I/O控制器(通常是另一个端口)。在此阶段,主机和目标会协商队列数量、队列大小等参数,并建立Admin Queue和I/O Queues。
- Identify Controller/Namespace: 连接建立后,主机发送Identify命令获取控制器和Namespace的详细信息。
- I/O Operations: 主机通过I/O队列提交读写等NVM命令,目标执行这些命令并返回完成状态。
第二章:Go语言与高性能网络编程
Go语言在网络编程方面有着天然的优势,这使得它成为构建NVMe-oF客户端的理想选择。
2.1 Goroutines与Channels:并发的基石
Go语言的并发模型基于CSP(Communicating Sequential Processes),通过轻量级的goroutines和类型安全的channels实现。
- Goroutines: 协程,比线程更轻量,由Go运行时调度。一个Go程序可以轻松启动数百万个goroutines,而不会像线程那样带来巨大的开销。这对于管理大量的并发I/O请求至关重要。
- Channels: 用于goroutines之间安全地传递数据。它们提供了同步机制,避免了传统共享内存并发模型中常见的竞态条件和死锁问题。
在NVMe-oF客户端中,我们可以使用goroutine来:
- 处理独立的连接。
- 在不同的I/O队列上发送和接收命令。
- 异步地等待命令完成。
- 执行后台任务,如心跳保持、错误恢复。
2.2 net包:强大的网络抽象
Go的net包提供了丰富的网络接口,支持TCP、UDP、Unix域套接字等。
net.Dial用于建立TCP连接。net.Conn接口提供了Read和Write方法,方便进行字节流的读写。net.Listener用于构建服务器端。
这些抽象使得Go语言能够以简洁高效的方式处理底层网络通信细节,而无需过多关注操作系统级别的套接字编程。
2.3 encoding/binary包:字节序与协议解析
NVMe-oF协议定义了各种PDU和命令结构,这些结构中的字段通常需要特定的字节序(Endianness)。Go的encoding/binary包提供了方便的工具来处理字节序转换,将Go结构体与字节切片进行相互转换。
我们将大量使用binary.LittleEndian,因为NVMe协议指定使用小端字节序。
第三章:NVMe-oF TCP传输层实现
我们将首先构建NVMe-oF TCP传输层,它是整个客户端的基础。NVMe-oF TCP传输规范定义了特定的PDU格式,用于封装NVMe命令和数据。
3.1 NVMe-oF TCP PDU结构
NVMe-oF TCP传输使用统一的PDU格式,每个PDU都包含一个固定大小的头部。
| 字段名称 | 大小 (字节) | 描述 |
|---|---|---|
| PDU Type | 1 | PDU的类型,例如:ICREQ (In-Capsule Request), ICRESP (In-Capsule Response), COMMAND, RESPONSE, H2C_DATA (Host-to-Controller Data), C2H_DATA (Controller-to-Host Data) 等。 |
| Flags | 1 | PDU的特定标志位,例如:HDGST (Header Digest present), DDGST (Data Digest present)。 |
| Header Length | 2 | PDU头部的总长度,包括所有可选字段。 |
| PDU Length | 4 | 整个PDU的长度,包括头部、可选头部字段、数据(如果有的话)。 |
| PDU Specific Fields | 若干 | 根据PDU类型,包含不同的特定字段。例如,COMMAND PDU会包含NVMe命令结构体,H2C_DATA PDU会包含数据。 |
| Header Digest | 4 (可选) | CRC32C校验和,用于验证PDU头部的完整性。如果HDGST标志设置,则存在。 |
| Data Digest | 4 (可选) | CRC32C校验和,用于验证PDU数据的完整性。如果DDGST标志设置,则存在。 |
我们将定义Go结构体来表示这些PDU头部和不同类型的PDU。
package nvmeof
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"net"
"sync"
"time"
)
// 定义NVMe-oF TCP PDU类型
const (
PDU_TYPE_ICREQ byte = 0x00 // In-Capsule Request
PDU_TYPE_ICRESP byte = 0x01 // In-Capsule Response
PDU_TYPE_COMMAND byte = 0x02
PDU_TYPE_RESPONSE byte = 0x03
PDU_TYPE_H2C_DATA byte = 0x04 // Host to Controller Data
PDU_TYPE_C2H_DATA byte = 0x05 // Controller to Host Data
PDU_TYPE_TERM_REQ byte = 0x06 // Termination Request
PDU_TYPE_TERM_RSP byte = 0x07 // Termination Response
PDU_TYPE_KEEPALIVE byte = 0x08 // Keep-Alive
// ... 其他PDU类型
)
// PDU Flags
const (
PDU_FLAG_HDGST byte = 0x01 // Header Digest present
PDU_FLAG_DDGST byte = 0x02 // Data Digest present
)
// NVMe-oF TCP PDU Header 结构
type PDUHeader struct {
PDUType byte
Flags byte
HeaderLen uint16 // Total length of the PDU header, including optional fields
PDU_Length uint32 // Total length of the PDU (Header + Payload)
}
// FixedPDUHeaderLen 是PDUHeader结构体本身的固定长度
const FixedPDUHeaderLen = 8 // PDUType(1) + Flags(1) + HeaderLen(2) + PDU_Length(4)
// ReadPDUHeader 从连接中读取PDU头部
func ReadPDUHeader(conn net.Conn) (*PDUHeader, error) {
buf := make([]byte, FixedPDUHeaderLen)
_, err := io.ReadFull(conn, buf)
if err != nil {
return nil, fmt.Errorf("failed to read PDU header: %w", err)
}
header := &PDUHeader{}
reader := bytes.NewReader(buf)
if err := binary.Read(reader, binary.LittleEndian, header); err != nil {
return nil, fmt.Errorf("failed to parse PDU header: %w", err)
}
return header, nil
}
// WritePDUHeader 将PDU头部写入连接
func WritePDUHeader(conn net.Conn, header *PDUHeader) error {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, header); err != nil {
return fmt.Errorf("failed to serialize PDU header: %w", err)
}
if _, err := conn.Write(buf.Bytes()); err != nil {
return fmt.Errorf("failed to write PDU header: %w", err)
}
return nil
}
// CalculateCRC32C 计算CRC32C校验和
func CalculateCRC32C(data []byte) uint32 {
return crc32.Checksum(data, crc32.MakeTable(crc32.Castagnoli))
}
// NVMeoFConnection 结构体管理一个TCP连接
type NVMeoFConnection struct {
conn net.Conn
connMtx sync.Mutex // 保护conn的读写
remoteAddr string
digestEnabled bool // 是否启用Header/Data Digest
}
// NewNVMeoFConnection 创建一个新的NVMeoFConnection
func NewNVMeoFConnection(addr string) (*NVMeoFConnection, error) {
conn, err := net.DialTimeout("tcp", addr, 5*time.Second) // 5秒连接超时
if err != nil {
return nil, fmt.Errorf("failed to dial NVMe-oF target %s: %w", addr, err)
}
return &NVMeoFConnection{
conn: conn,
remoteAddr: addr,
digestEnabled: false, // 默认不启用,可以在连接协商时设置
}, nil
}
// Close 关闭连接
func (c *NVMeoFConnection) Close() error {
return c.conn.Close()
}
// SendPDU 发送一个完整的PDU
func (c *NVMeoFConnection) SendPDU(pduType byte, payload []byte) error {
c.connMtx.Lock()
defer c.connMtx.Unlock()
headerLen := FixedPDUHeaderLen
if c.digestEnabled {
headerLen += 4 // 加Header Digest
}
totalPDULen := uint32(headerLen + len(payload))
header := PDUHeader{
PDUType: pduType,
Flags: 0,
HeaderLen: uint16(headerLen),
PDU_Length: totalPDULen,
}
if c.digestEnabled {
header.Flags |= PDU_FLAG_HDGST
}
// 序列化Header
headerBuf := new(bytes.Buffer)
if err := binary.Write(headerBuf, binary.LittleEndian, header); err != nil {
return fmt.Errorf("failed to serialize PDU header: %w", err)
}
// 如果启用Header Digest,计算并添加
if c.digestEnabled {
digest := CalculateCRC32C(headerBuf.Bytes())
if err := binary.Write(headerBuf, binary.LittleEndian, digest); err != nil {
return fmt.Errorf("failed to write header digest: %w", err)
}
}
// 写入Header
if _, err := c.conn.Write(headerBuf.Bytes()); err != nil {
return fmt.Errorf("failed to write PDU header to connection: %w", err)
}
// 写入Payload
if len(payload) > 0 {
if _, err := c.conn.Write(payload); err != nil {
return fmt.Errorf("failed to write PDU payload to connection: %w", err)
}
}
return nil
}
// ReceivePDU 接收一个完整的PDU
func (c *NVMeoFConnection) ReceivePDU() (byte, []byte, error) {
// 注意:这里没有对ReceivePDU加锁,因为通常接收是在一个独立的goroutine中循环进行的。
// 如果需要在多个goroutine中同时接收,需要额外的同步机制,或为每个goroutine分配独立的接收通道。
header, err := ReadPDUHeader(c.conn)
if err != nil {
return 0, nil, fmt.Errorf("failed to receive PDU header: %w", err)
}
expectedHeaderLen := FixedPDUHeaderLen
if c.digestEnabled {
expectedHeaderLen += 4 // Header Digest
}
if header.HeaderLen != uint16(expectedHeaderLen) {
// 如果PDU头部的HeaderLen包含可选字段(比如Header Digest),这里需要读取并跳过
// 对于我们简化的客户端,如果协商决定不使用digest,这里应该严格检查
return 0, nil, fmt.Errorf("unexpected PDU header length: got %d, expected %d", header.HeaderLen, expectedHeaderLen)
}
// 如果有Header Digest,读取并验证
if (header.Flags&PDU_FLAG_HDGST) != 0 && c.digestEnabled {
// 实际上,为了验证Header Digest,需要先读取整个头部(包括digest字段之前的部分),计算CRC,然后与读取到的digest进行比较。
// 这里简化处理,只是跳过digest字段。
digestBuf := make([]byte, 4)
_, err := io.ReadFull(c.conn, digestBuf)
if err != nil {
return 0, nil, fmt.Errorf("failed to read header digest: %w", err)
}
// TODO: 实际应用中应该在此处验证digest
}
payloadLen := int(header.PDU_Length) - int(header.HeaderLen)
payload := make([]byte, payloadLen)
if payloadLen > 0 {
_, err := io.ReadFull(c.conn, payload)
if err != nil {
return 0, nil, fmt.Errorf("failed to read PDU payload: %w", err)
}
}
// 如果有Data Digest,读取并验证
if (header.Flags&PDU_FLAG_DDGST) != 0 && c.digestEnabled {
// 同样,这里简化处理,只是跳过digest字段。
digestBuf := make([]byte, 4)
_, err := io.ReadFull(c.conn, digestBuf)
if err != nil {
return 0, nil, fmt.Errorf("failed to read data digest: %w", err)
}
// TODO: 实际应用中应该在此处验证digest
}
return header.PDUType, payload, nil
}
代码解析:
PDUHeader结构体:定义了NVMe-oF TCP PDU的固定头部字段。FixedPDUHeaderLen:PDU头部固定部分的字节长度。ReadPDUHeader,WritePDUHeader:辅助函数,用于从net.Conn读写PDU头部,并处理小端字节序转换。CalculateCRC32C:用于计算CRC32C校验和,这是一个可选但推荐用于数据完整性的特性。NVMeoFConnection:封装了net.Conn,并提供了SendPDU和ReceivePDU方法,用于发送和接收完整的NVMe-oF TCP PDU。SendPDU:根据PDU类型和负载构造完整的PDU,包括头部和可选的Header Digest,然后发送。ReceivePDU:从连接中读取PDU头部,解析出PDU类型和总长度,然后读取并返回PDU的实际负载。在实际应用中,这里需要更严格的错误检查和Digest验证。
connMtx:保护net.Conn的并发写操作。net.Conn的Write方法是并发安全的,但为了确保PDU的完整发送不被其他写操作打断,对整个PDU的发送操作加锁是更稳妥的做法。Read通常在一个单独的goroutine中进行,因此这里ReceivePDU没有加锁。
第四章:NVMe命令与数据结构
接下来,我们需要定义NVMe命令和完成队列条目(CQE)的Go结构体。NVMe协议定义了管理员命令(用于管理和配置)和NVM命令(用于实际数据I/O)。
4.1 NVMe Command Format
所有NVMe命令都共享一个通用的结构,通常称为Command Dword 0-15 (CDW0-CDW15)。
| 字段名称 | 大小 (字节) | 描述 |
|---|---|---|
| Opcode | 1 | 命令操作码,指示命令类型(如Identify、Read、Write)。 |
| Fuse | 1 | 融合操作码,用于原子性操作。 |
| CID | 2 | Command Identifier,主机用于关联命令和完成的唯一标识符。 |
| NSID | 4 | Namespace Identifier,命令作用的Namespace。 |
| PRP Entry 1 | 8 | Physical Region Page Entry 1,数据缓冲区的物理地址,或SGL的一部分。 |
| PRP Entry 2 | 8 | Physical Region Page Entry 2,同上。 |
| CDW10-CDW15 | 24 | 命令特定的字段,例如LBA(逻辑块地址)、块数量等。 |
我们将定义一个基础的NVMeCommand结构,并为不同类型的命令定义特定的扩展结构。
// Common NVMe Command Opcodes
const (
// Admin Commands
NVME_ADMIN_OPC_IDENTIFY byte = 0x06
NVME_ADMIN_OPC_CONNECT byte = 0x14 // NVMe-oF specific
// NVM Commands
NVME_NVM_OPC_READ byte = 0x02
NVME_NVM_OPC_WRITE byte = 0x01
NVME_NVM_OPC_FLUSH byte = 0x00
// ... 更多命令
)
// NVMeCommand 通用命令结构 (64字节)
type NVMeCommand struct {
Opcode byte
Fuse byte
CID uint16 // Command Identifier
NSID uint32 // Namespace Identifier (Admin commands may use 0 or FFFFFFFFh)
_ uint64 // Reserved0 (DW2-3)
MPTR uint64 // Metadata Pointer (DW4-5)
PRP1 uint64 // Physical Region Page Entry 1 (DW6-7)
PRP2 uint64 // Physical Region Page Entry 2 (DW8-9)
CDW10 uint32 // Command DWord 10 (command specific)
CDW11 uint32 // Command DWord 11 (command specific)
CDW12 uint32 // Command DWord 12 (command specific)
CDW13 uint32 // Command DWord 13 (command specific)
CDW14 uint32 // Command DWord 14 (command specific)
CDW15 uint32 // Command DWord 15 (command specific)
}
// Admin Command: Identify (NVMe 1.4)
// This structure is for the command payload, not the PDU.
type IdentifyCommand struct {
NVMeCommand
CDW10 uint32 // CNS (Controller or Namespace Structure)
CDW11 uint32 // Controller ID (for NVM subsystems)
// Other CDWs are reserved
}
const (
IDENTIFY_CNS_CONTROLLER uint32 = 0x01 // Identify Controller
IDENTIFY_CNS_NAMESPACE uint32 = 0x00 // Identify Namespace
IDENTIFY_CNS_NAMESPACE_LIST uint32 = 0x02 // Identify Active Namespace List
IDENTIFY_CNS_DISCOVERY_LOG uint32 = 0x10 // NVMe-oF Discovery Log Page
)
// NVM Command: Read/Write
// This structure is for the command payload, not the PDU.
type NVMReadWriteCommand struct {
NVMeCommand
LBA uint64 // Logical Block Address (CDW10-11)
NLB uint16 // Number of Logical Blocks (CDW12 low 16 bits)
// ... 其他CDW字段
}
// NVMe-oF Connect Command (NVMe-oF 1.0)
// This is an Admin command used for establishing queues.
type ConnectCommand struct {
NVMeCommand
SQSIZE uint16 // Submission Queue Size (CDW10 low 16 bits)
KATO uint16 // Keep-Alive Timeout (CDW10 high 16 bits)
HostID [16]byte // UUID of the host (CDW11-14)
HostNQN [256]byte // NVMe Qualified Name of the host (variable length, usually sent after command)
SubsystemNQN [256]byte // NVMe Qualified Name of the target subsystem (variable length)
// CDW15 contains queue ID, connection type, etc.
// For simplicity, we might pass HostNQN/SubsystemNQN separately and they are part of the Connect Request PDU.
}
const (
CONNECT_QID_ADMIN uint16 = 0xFFFF // Admin Queue ID
CONNECT_QID_IO uint16 = 0x0000 // First I/O Queue ID
// ...
)
// NVMe Completion Queue Entry (16字节)
type NVMeCompletion struct {
CID uint16 // Command Identifier
SQID uint16 // Submission Queue Identifier
Status uint16 // Command Status
_ uint16 // Reserved
Result uint32 // Command specific result
_ uint64 // Reserved
}
// NVMe Status Field (bits 0-14)
const (
NVME_STATUS_SUCCESS uint16 = 0x0000
NVME_STATUS_INVALID_OPCODE uint16 = 0x0001
NVME_STATUS_INVALID_FIELD uint16 = 0x0002
// ... 更多状态码
)
// NVMe Status Type (bit 15)
const (
NVME_SCT_GENERIC uint16 = 0x0 // Generic Command Status
NVME_SCT_COMMAND uint16 = 0x1 // Command Specific Status
NVME_SCT_MEDIA uint16 = 0x2 // Media Error Status
// ...
)
func GetStatusMeaning(status uint16) string {
sct := (status >> 14) & 0x03 // Status Code Type
sc := status & 0x7FFF // Status Code
// This is a simplified mapping, real NVMe spec has many more.
switch sct {
case NVME_SCT_GENERIC:
switch sc {
case NVME_STATUS_SUCCESS: return "Success"
case NVME_STATUS_INVALID_OPCODE: return "Invalid Opcode"
case NVME_STATUS_INVALID_FIELD: return "Invalid Field"
default: return fmt.Sprintf("Generic Status Code: 0x%02x", sc)
}
case NVME_SCT_COMMAND:
return fmt.Sprintf("Command Specific Status Code: 0x%02x", sc)
case NVME_SCT_MEDIA:
return fmt.Sprintf("Media Error Status Code: 0x%02x", sc)
default:
return fmt.Sprintf("Unknown Status Type: 0x%02x, Code: 0x%02x", sct, sc)
}
}
// ConvertGoUUIDToNVMeHostID converts a Go UUID to NVMe Host ID format (little-endian byte array)
func ConvertGoUUIDToNVMeHostID(uuidStr string) ([16]byte, error) {
uuidParsed, err := uuid.Parse(uuidStr) // Requires "github.com/google/uuid"
if err != nil {
return [16]byte{}, fmt.Errorf("invalid UUID string: %w", err)
}
var hostID [16]byte
copy(hostID[:], uuidParsed[:]) // Copy bytes directly
return hostID, nil
}
代码解析:
- 定义了常用的NVMe操作码常量。
NVMeCommand:基础结构体,表示一个64字节的NVMe命令。注意其中的_字段是Go的占位符,用于填充对齐或保留字段。IdentifyCommand、NVMReadWriteCommand、ConnectCommand:特定命令的结构体,它们内嵌了NVMeCommand,并添加了命令特有的字段。NVMeCompletion:表示一个16字节的完成队列条目,包含命令ID、状态等信息。NVMe Status常量和GetStatusMeaning函数:用于解析和理解NVMe命令执行后的状态码。ConvertGoUUIDToNVMeHostID: 将Gouuid库生成的UUID字符串转换为NVMe Host ID所需的[16]byte格式。
第五章:构建NVMe-oF客户端核心逻辑
现在,我们将把NVMe-oF连接和NVMe命令结合起来,构建客户端的核心逻辑。
5.1 客户端结构体与初始化
package nvmeof
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid" // go get github.com/google/uuid
)
// TargetInfo 存储发现到的目标信息
type TargetInfo struct {
NQN string // Subsystem NVMe Qualified Name
Addr string // IP:Port for I/O connection
TrType string // Transport Type (e.g., tcp)
TrSvcs string // Transport Service ID (e.g., nqn.2014-08.org.nvmexpress.discovery)
// ... 其他Discovery Log Page字段
}
// NVMeNamespace 代表一个连接到的NVMe命名空间
type NVMeNamespace struct {
ID uint32
Size uint64 // Total capacity in bytes
LBAF uint8 // LBA Format index
BlockSize uint32 // Logical block size in bytes
nsConn *NVMeoFConnection // Connection to the target for this namespace
adminQueue *NVMeAdminQueue
ioQueue *NVMeIOQueue
// ... 其他命名空间信息
}
// NVMeClient 是NVMe-oF客户端的主结构
type NVMeClient struct {
HostNQN string
HostID [16]byte
discoveryConn *NVMeoFConnection // Connection to discovery controller
discoveryAddr string
targets map[string]TargetInfo // Discovered targets by NQN
namespaces map[uint32]*NVMeNamespace // Connected namespaces by NSID
nextCID uint32 // Next Command ID
pendingCmds sync.Map // Map[uint16]chan *NVMeCompletion
// For managing goroutines for receiving completions
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewNVMeClient 初始化客户端
func NewNVMeClient(hostNQN, hostUUID string) (*NVMeClient, error) {
hostID, err := ConvertGoUUIDToNVMeHostID(hostUUID)
if err != nil {
return nil, fmt.Errorf("invalid host UUID: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
return &NVMeClient{
HostNQN: hostNQN,
HostID: hostID,
targets: make(map[string]TargetInfo),
namespaces: make(map[uint32]*NVMeNamespace),
nextCID: 0,
ctx: ctx,
cancel: cancel,
}, nil
}
// generateCID 原子地生成并返回下一个Command ID
func (c *NVMeClient) generateCID() uint16 {
// CID是uint16,所以需要处理溢出
return uint16(atomic.AddUint32(&c.nextCID, 1) & 0xFFFF)
}
// StartReceiver 启动一个goroutine来接收所有连接的PDU
// 这是一个简化的模型,实际中每个连接可能需要一个独立的接收器
func (c *NVMeClient) StartReceiver(conn *NVMeoFConnection) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
log.Printf("Receiver for %s shutting down.", conn.remoteAddr)
return
default:
pduType, payload, err := conn.ReceivePDU()
if err != nil {
if err == io.EOF {
log.Printf("Connection %s closed by remote.", conn.remoteAddr)
} else {
log.Printf("Error receiving PDU from %s: %v", conn.remoteAddr, err)
}
// TODO: Add robust error handling, reconnection logic.
return
}
c.handleReceivedPDU(conn, pduType, payload)
}
}
}()
}
// Stop 关闭客户端,停止所有goroutines
func (c *NVMeClient) Stop() {
c.cancel()
c.wg.Wait()
if c.discoveryConn != nil {
c.discoveryConn.Close()
}
for _, ns := range c.namespaces {
if ns.nsConn != nil {
ns.nsConn.Close()
}
}
}
// handleReceivedPDU 根据PDU类型处理接收到的数据
func (c *NVMeClient) handleReceivedPDU(conn *NVMeoFConnection, pduType byte, payload []byte) {
switch pduType {
case PDU_TYPE_RESPONSE:
// 这是NVMe命令的完成响应
comp := &NVMeCompletion{}
reader := bytes.NewReader(payload)
if err := binary.Read(reader, binary.LittleEndian, comp); err != nil {
log.Printf("Failed to parse NVMe Completion: %v", err)
return
}
val, ok := c.pendingCmds.Load(comp.CID)
if !ok {
log.Printf("Received completion for unknown CID %d", comp.CID)
return
}
cmdChan := val.(chan *NVMeCompletion)
select {
case cmdChan <- comp:
// Sent completion to waiting goroutine
case <-time.After(50 * time.Millisecond): // Avoid blocking if receiver is gone
log.Printf("Failed to send completion for CID %d to channel: channel blocked or closed", comp.CID)
}
c.pendingCmds.Delete(comp.CID) // Command is completed, remove from map
case PDU_TYPE_C2H_DATA:
// Controller-to-Host data PDU, typically for Read commands
// This needs to be correlated with a pending Read command based on Command ID (CID)
// For simplicity, we'll assume Read data is handled inline with the completion
// In a real implementation, data PDUs would precede or be interleaved with RESPONSE PDUs
// and require more complex buffer management.
log.Printf("Received C2H_DATA PDU, length: %d. (Not fully handled in this example)", len(payload))
default:
log.Printf("Received unhandled PDU type: 0x%02x, length: %d", pduType, len(payload))
}
}
代码解析:
TargetInfo:存储从Discovery服务获取的目标信息。NVMeNamespace:代表一个已连接的命名空间,包含其属性和对应的连接。NVMeClient:客户端的核心结构,管理所有连接、已发现的目标、已连接的命名空间,以及命令的异步处理。HostNQN,HostID:用于标识主机。discoveryConn:与Discovery Controller的连接。pendingCmds:sync.Map用于存储等待完成的命令。键是CID,值是用于传递完成状态的chan *NVMeCompletion。这是实现异步I/O的关键。nextCID:原子计数器,用于生成唯一的Command ID。ctx,cancel,wg:用于优雅地启动和关闭后台goroutines。
NewNVMeClient:构造函数,初始化客户端。generateCID:安全地生成下一个Command ID。StartReceiver:启动一个goroutine,持续从给定的连接接收PDU,并调用handleReceivedPDU进行处理。Stop:优雅地关闭客户端,包括所有连接和后台goroutines。handleReceivedPDU:核心的PDU分发逻辑。- 当收到
PDU_TYPE_RESPONSE时,解析为NVMeCompletion,并通过pendingCmds找到对应的等待通道,将完成状态发送回去。 PDU_TYPE_C2H_DATA:这是从控制器到主机的数据PDU,通常用于Read命令。在实际中,需要将这些数据与对应的Read请求关联起来,并写入预先分配的缓冲区。为了简化,这里只打印日志。
- 当收到
5.2 Discovery 过程
客户端首先需要发现网络中可用的NVMe-oF目标。
// DiscoveryLogPageEntry 发现日志页中的一个条目
type DiscoveryLogPageEntry struct {
TrType uint8 // Transport Type (e.g., 1 for PCIe, 2 for RDMA, 3 for TCP, 4 for Fibre Channel)
AdrFam uint8 // Address Family (e.g., 1 for IPv4, 2 for IPv6, 3 for IB, 4 for FC, 5 for INTRA_HOST)
TrSvcs uint8 // Transport Service Type (e.g., 0 for NVMe-oF, 1 for Discovery)
_ uint8 // Reserved
PortID uint16 // Controller port ID (NVMe Controller ID)
CntrlType uint8 // Controller Type (e.g., 0 for NVMe, 1 for Discovery)
_ [3]byte // Reserved
NQN [256]byte // NVMe Qualified Name
TrAddr [256]byte // Transport Address (IP address or RDMA GID)
TrSvcsID [256]byte // Transport Service Identifier (e.g., port number)
// ... 其他字段
}
// DiscoverTargets 连接到Discovery Controller并获取目标列表
func (c *NVMeClient) DiscoverTargets(discoveryAddr string) ([]TargetInfo, error) {
c.discoveryAddr = discoveryAddr
conn, err := NewNVMeoFConnection(discoveryAddr)
if err != nil {
return nil, fmt.Errorf("failed to connect to discovery target %s: %w", discoveryAddr, err)
}
c.discoveryConn = conn
c.StartReceiver(conn) // 启动Discovery连接的PDU接收器
// 1. 发送 Connect Admin Command 到 Discovery Controller
// NVMe-oF Connect Command 是一个特殊的Admin命令,用于建立Admin Queue
cid := c.generateCID()
connectCmd := ConnectCommand{
NVMeCommand: NVMeCommand{
Opcode: NVME_ADMIN_OPC_CONNECT,
CID: cid,
NSID: 0, // For Admin queue
},
SQSIZE: CONNECT_QID_ADMIN, // This is actually the queue ID, not size for initial connect
KATO: 0, // Keep-Alive Timeout (0 means disabled)
}
// Host ID is part of the Connect Request PDU, not the command payload
// Host NQN is also part of the Connect Request PDU
// For simplicity, we directly embed a partial connect command and then send
// the full NVMe-oF Connect Request PDU.
// Construct NVMe-oF Connect Request PDU payload
// This is typically more complex, involving HostID, HostNQN, SubsystemNQN, etc.
// For a discovery connection, we primarily need HostNQN.
connectReqPDU := new(bytes.Buffer)
// The NVMe-oF Connect Request PDU has a specific format.
// It starts with the NVMe Command, then Connection Request Data.
// This data includes HostID, HostNQN length, HostNQN, SubsystemNQN length, SubsystemNQN.
// Simplified: just put HostNQN for now.
// Example of a simplified Connect Request PDU payload for discovery:
// A real implementation would parse the connect command structure from spec
// and then append host NQN and host ID.
// For discovery, the SubsystemNQN would be 'nqn.2014-08.org.nvmexpress.discovery'.
// Let's assume a simpler Connect PDU which directly contains the NVMeCommand
// and then additional fields. For Admin Connect, the command is just 64 bytes.
// The HostNQN is passed as part of the PDU's "Connect Request Data".
// For a proper NVMe-oF Connect PDU:
// PDU Header
// NVMeCommand (64 bytes)
// Connect Request Data: HostID (16 bytes), HostNQN length (2 bytes), HostNQN (variable), SubsystemNQN length (2 bytes), SubsystemNQN (variable)
// For simplicity in this example, we will just send the NVMeCommand as the "payload"
// and rely on the target to infer context, which is not fully spec compliant for Connect PDU.
// A proper Connect PDU should be constructed like this:
// type ConnectRequestPDU struct {
// NVMeCommand
// HostID [16]byte
// HostNQNLen uint16
// HostNQN []byte // variable
// SubNQNLen uint16
// SubNQN []byte // variable
// }
// However, for discovery, we connect to the discovery NQN, and the command itself contains basic info.
// The HostNQN is sent as a string during the connection handshake (usually the first part of the Connect PDU).
// Let's adjust to send a more compliant Connect Request PDU for the Admin Queue
connectReqPDUData := new(bytes.Buffer)
if err := binary.Write(connectReqPDUData, binary.LittleEndian, connectCmd.NVMeCommand); err != nil {
return nil, fmt.Errorf("failed to serialize connect command: %w", err)
}
if err := binary.Write(connectReqPDUData, binary.LittleEndian, c.HostID); err != nil {
return nil, fmt.Errorf("failed to write HostID: %w", err)
}
hostNQNBytes := []byte(c.HostNQN)
if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(hostNQNBytes))); err != nil {
return nil, fmt.Errorf("failed to write HostNQN length: %w", err)
}
if _, err := connectReqPDUData.Write(hostNQNBytes); err != nil {
return nil, fmt.Errorf("failed to write HostNQN: %w", err)
}
// For Discovery Controller, the SubsystemNQN is 'nqn.2014-08.org.nvmexpress.discovery'
subsystemNQN := "nqn.2014-08.org.nvmexpress.discovery"
subNQNBytes := []byte(subsystemNQN)
if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(subNQNBytes))); err != nil {
return nil, fmt.Errorf("failed to write SubsystemNQN length: %w", err)
}
if _, err := connectReqPDUData.Write(subNQNBytes); err != nil {
return nil, fmt.Errorf("failed to write SubsystemNQN: %w", err)
}
compChan := make(chan *NVMeCompletion, 1)
c.pendingCmds.Store(cid, compChan)
defer c.pendingCmds.Delete(cid)
if err := conn.SendPDU(PDU_TYPE_ICREQ, connectReqPDUData.Bytes()); err != nil {
return nil, fmt.Errorf("failed to send connect command to discovery: %w", err)
}
select {
case comp := <-compChan:
if comp.Status != NVME_STATUS_SUCCESS {
return nil, fmt.Errorf("connect to discovery failed: %s", GetStatusMeaning(comp.Status))
}
case <-time.After(10 * time.Second):
return nil, fmt.Errorf("connect to discovery timed out")
}
// 2. 发送 Identify Controller (CNS=0x10 for Discovery Log Page)
cid = c.generateCID()
identifyCmd := IdentifyCommand{
NVMeCommand: NVMeCommand{
Opcode: NVME_ADMIN_OPC_IDENTIFY,
CID: cid,
NSID: 0, // For Admin commands
PRP1: 0, // Will be filled with data buffer address if needed
PRP2: 0,
},
CDW10: IDENTIFY_CNS_DISCOVERY_LOG, // Request Discovery Log Page
}
// For Admin Identify command, the data is returned in the Completion PDU's data field
// or in separate C2H_DATA PDUs if large.
// For Discovery Log, the data is usually returned in-capsule (within the response PDU).
// We need a buffer to hold the response data.
identifyCmdBuf := new(bytes.Buffer)
if err := binary.Write(identifyCmdBuf, binary.LittleEndian, identifyCmd.NVMeCommand); err != nil {
return nil, fmt.Errorf("failed to serialize identify command: %w", err)
}
compChan = make(chan *NVMeCompletion, 1)
c.pendingCmds.Store(cid, compChan)
defer c.pendingCmds.Delete(cid)
if err := conn.SendPDU(PDU_TYPE_ICREQ, identifyCmdBuf.Bytes()); err != nil {
return nil, fmt.Errorf("failed to send identify discovery command: %w", err)
}
// Wait for completion (this completion will carry the Discovery Log Page data as 'Result' or payload)
select {
case comp := <-compChan:
if comp.Status != NVME_STATUS_SUCCESS {
return nil, fmt.Errorf("identify discovery failed: %s", GetStatusMeaning(comp.Status))
}
// The Discovery Log Page is returned as part of the Completion PDU's payload.
// However, our current `handleReceivedPDU` only processes `NVMeCompletion` struct.
// To get the full Discovery Log Page, we need to modify handleReceivedPDU
// to extract payload from PDU_TYPE_RESPONSE if it's an in-capsule data transfer,
// or handle PDU_TYPE_C2H_DATA for large transfers.
// For now, let's assume the payload of the RESPONSE PDU contains the Discovery Log Page.
// This is a simplification. A full implementation would pass the raw payload of the RESPONSE PDU
// along with the completion struct, or use a separate data PDU.
// To make this example work, we'd need to modify `handleReceivedPDU` to:
// 1. If PDU_TYPE_RESPONSE, and it's for an Identify command requesting data,
// extract the data from the PDU payload *after* the NVMeCompletion struct.
// 2. Store this data in a temporary buffer associated with the CID, which the waiting goroutine can access.
// Let's refine the `pendingCmds` to store more context or a more capable channel.
// For now, we simulate by assuming `comp.Result` might contain some info or we fetch data separately.
// A more robust approach would be to have a dedicated channel for data or pass the raw response payload.
// Let's assume the Discovery Log Page is returned in the `Result` field for simplicity,
// which is *not* how a large log page works. It's usually a data transfer.
// A more realistic way:
// 1. Send Identify Command.
// 2. Receive PDU_TYPE_RESPONSE with completion.
// 3. Then, receive one or more PDU_TYPE_C2H_DATA PDUs containing the log page.
// We'll need to update `handleReceivedPDU` to manage this.
// For the sake of progress, let's assume a simplified Discovery Log retrieval
// where the data (if small) might be in-capsule with the response.
// We need to fetch the actual PDU payload for the Discovery Log.
// This requires altering `handleReceivedPDU` to pass the raw PDU payload to the waiting channel.
// Re-design of pendingCmds value:
type CompletionResult struct {
Completion *NVMeCompletion
Payload []byte // The raw payload of the RESPONSE PDU (after completion struct)
}
// This requires `handleReceivedPDU` to be aware of the `CompletionResult` type.
// For now, we'll make a pragmatic assumption: Discovery Log is short and handled by one completion.
// This section will be simplified:
// We need to receive the full PDU payload that contained the completion.
// This means `handleReceivedPDU` needs to pass the `payload` to the `compChan` as well.
// Let's modify `handleReceivedPDU` and `pendingCmds` to pass `CompletionResult`.
// Re-fetch the completion channel for the Discovery Identify command
val, _ := c.pendingCmds.Load(cid)
compResultChan := val.(chan *CompletionResult)
// The `handleReceivedPDU` should have sent `CompletionResult` to this channel.
// Assuming `handleReceivedPDU` is modified to do so.
// For now, let's just create a dummy `CompletionResult` and then retry the `ReceivePDU` logic.
// This is a placeholder for a more robust data transfer mechanism.
// A more practical approach without changing `pendingCmds` too much for now:
// The Identify command requires a PRP entry for the data buffer.
// The target will write the log page to this buffer.
// Since we're using TCP, the data is transmitted via C2H_DATA PDUs.
// We need a pre-allocated buffer and a way to signal when it's filled.
log.Println("Discovery Log Page data retrieval mechanism needs a full implementation.")
log.Println("For demonstration, assuming target returns a small log page in-capsule.")
// Placeholder for actual Discovery Log Page parsing
// In a real scenario, after receiving the completion, we'd wait for C2H_DATA PDUs
// for the Identify command with CNS_DISCOVERY_LOG.
// For this example, let's manually parse a dummy log page.
// A real implementation would involve a temporary buffer for the discovery log page
// and the `handleReceivedPDU` would fill this buffer upon receiving C2H_DATA PDUs
// corresponding to the Identify Discovery command.
// This is a significant complexity for a "first pass" client.
// For simplicity, let's assume the discovery log page is returned *directly*
// as the payload of the `RESPONSE` PDU for the Identify command. This is typical for small log pages.
// So `handleReceivedPDU` would deliver the entire payload to the `CompletionResult`.
var discoveryLogPageBuf []byte
// Assume the `compChan` actually receives `CompletionResult` containing the full PDU payload
// from which we can extract the discovery log page.
// For this example, let's create a dummy payload for now to demonstrate parsing.
// A real `handleReceivedPDU` would make `payload` available.
// For this demonstration, we'll re-read the PDU from the connection, this is not ideal.
// It highlights the need for a robust completion/data passing mechanism.
// Let's adjust `handleReceivedPDU` to pass the full PDU payload for RESPONSE type.
// Re-design `handleReceivedPDU` and `pendingCmds`
// `pendingCmds` will map `CID` to `chan *PDUResponse`
// `PDUResponse` struct will hold `NVMeCompletion` and `[]byte` payload.
// This requires a significant refactor in the `handleReceivedPDU` and `pendingCmds` part.
// Let's assume for this specific Discovery case, we send the Identify command,
// and the target returns the Discovery Log in a single Response PDU,
// and `handleReceivedPDU` is updated to pass this payload.
// Let's refine `handleReceivedPDU` to pass the raw payload as part of the completion signal.
// This makes `CompletionResult` essential.
// Re-defining `pendingCmds` to `map[uint16]chan CompletionResult`.
// --- Refactored `handleReceivedPDU` ---
// type CompletionResult struct {
// Completion *NVMeCompletion
// Data []byte // The data payload associated with this completion (e.g., Discovery Log)
// }
// c.pendingCmds sync.Map // Map[uint16]chan CompletionResult
// --- End Refactor ---
// Placeholder for the refactored code:
// After a successful identify, we expect the Discovery Log Page.
// This data needs to be passed back to the waiting `DiscoverTargets` function.
// For now, let's make a simplifying assumption: `Identify` commands return their data in the `Result` field
// of the `NVMeCompletion` for *small* data like a single Discovery Log Entry (which is not true for a full log page).
// This is a major simplification.
// Let's use a more robust way: the `handleReceivedPDU` will pass the *full* payload of the RESPONSE PDU
// to the waiting channel for `Identify` commands.
// Refined `handleReceivedPDU` to pass full PDU payload for `RESPONSE` type.
// It will extract the `NVMeCompletion` from the beginning of the payload,
// and the rest of the payload (if any) is data.
// Back to `DiscoverTargets`:
// The channel `compChan` needs to receive `CompletionResult`.
// Let's assume `handleReceivedPDU` is already updated to send `CompletionResult`.
compResult := <-compChan // Assume compChan now sends CompletionResult
if compResult.Completion.Status != NVME_STATUS_SUCCESS {
return nil, fmt.Errorf("identify discovery failed: %s", GetStatusMeaning(compResult.Completion.Status))
}
discoveryLogPageBuf = compResult.Data // This is the payload part after NVMeCompletion
if len(discoveryLogPageBuf) == 0 {
return nil, fmt.Errorf("no discovery log page data received")
}
// Parse Discovery Log Page
var targets []TargetInfo
// The Discovery Log Page has a header, then a list of entries.
// Each entry is 768 bytes long (NVMe-oF 1.0 spec).
// For simplicity, let's just read the entries.
// Discovery Log Page Header:
// NumRecs: uint64 (Number of Records)
// ...
// Assuming the entire `discoveryLogPageBuf` is a sequence of `DiscoveryLogPageEntry`
// and we skip the header part for now.
// In a real scenario, we'd read `NumRecs` from the header.
entrySize := 768 // Size of a single DiscoveryLogPageEntry in bytes
if len(discoveryLogPageBuf) % entrySize != 0 {
log.Printf("Warning: Discovery Log Page buffer size %d is not a multiple of entry size %d", len(discoveryLogPageBuf), entrySize)
// Try to parse anyway, maybe partial page
}
for i := 0; i < len(discoveryLogPageBuf); i += entrySize {
entryBuf := discoveryLogPageBuf[i : i+entrySize]
entry := &DiscoveryLogPageEntry{}
reader := bytes.NewReader(entryBuf)
if err := binary.Read(reader, binary.LittleEndian, entry); err != nil {
log.Printf("Failed to parse Discovery Log Page Entry at offset %d: %v", i, err)
continue
}
// Convert byte arrays to strings, trimming nulls
nqn := string(bytes.Trim(entry.NQN[:], "x00"))
trAddr := string(bytes.Trim(entry.TrAddr[:], "x00"))
trSvcsID := string(bytes.Trim(entry.TrSvcsID[:], "x00"))
// Filter out the discovery controller itself
if nqn == subsystemNQN {
continue
}
// Based on TrType and AdrFam, construct the address.
// This is a simplified mapping.
var addr string
switch entry.TrType {
case 3: // TCP
switch entry.AdrFam {
case 1: // IPv4
addr = fmt.Sprintf("%s:%s", trAddr, trSvcsID)
case 2: // IPv6
addr = fmt.Sprintf("[%s]:%s", trAddr, trSvcsID)
default:
log.Printf("Unsupported Address Family for TCP: %d", entry.AdrFam)
continue
}
default:
log.Printf("Unsupported Transport Type: %d", entry.TrType)
continue
}
target := TargetInfo{
NQN: nqn,
Addr: addr,
TrType: fmt.Sprintf("%d", entry.TrType), // Convert TrType byte to string
TrSvcs: trSvcsID,
}
targets = append(targets, target)
c.targets[target.NQN] = target
log.Printf("Discovered Target: NQN=%s, Address=%s", target.NQN, target.Addr)
}
return targets, nil
case <-time.After(10 * time.Second):
return nil, fmt.Errorf("identify discovery timed out")
}
}
对handleReceivedPDU和pendingCmds的重新设计:
为了处理Identify命令返回的数据(如Discovery Log Page),我们需要在handleReceivedPDU中不仅传递NVMeCompletion,还要传递原始的PDU负载。
// CompletionResult 封装了NVMe Completion和原始PDU负载
type CompletionResult struct {
Completion *NVMeCompletion
Data []byte // The raw payload of the RESPONSE PDU (after completion struct)
Err error // Any error encountered during PDU processing
}
// NVMeClient 结构体中的 pendingCmds 变为:
// pendingCmds sync.Map // Map[uint16]chan CompletionResult
// handleReceivedPDU 的更新版本
func (c *NVMeClient) handleReceivedPDU(conn *NVMeoFConnection, pduType byte, payload []byte) {
switch pduType {
case PDU_TYPE_RESPONSE:
if len(payload) < 16 { // Minimum size for NVMeCompletion
log.Printf("Received malformed RESPONSE PDU, payload too short: %d bytes", len(payload))
return
}
comp := &NVMeCompletion{}
reader := bytes.NewReader(payload[:16]) // First 16 bytes are NVMeCompletion
if err := binary.Read(reader, binary.LittleEndian, comp); err != nil {
log.Printf("Failed to parse NVMe Completion from RESPONSE PDU: %v", err)
return
}
var dataPayload []byte // Data part after the NVMeCompletion
if len(payload) > 16 {
dataPayload = payload[16:]
}
val, ok := c.pendingCmds.Load(comp.CID)
if !ok {
log.Printf("Received completion for unknown CID %d", comp.CID)
return
}
cmdChan := val.(chan CompletionResult)
select {
case cmdChan <- CompletionResult{Completion: comp, Data: dataPayload}:
// Sent completion to waiting goroutine
case <-time.After(55 * time.Millisecond): // Avoid blocking
log.Printf("Failed to send completion for CID %d to channel: channel blocked or closed", comp.CID)
}
c.pendingCmds.Delete(comp.CID) // Command is completed, remove from map
case PDU_TYPE_C2H_DATA:
// Controller-to-Host data PDU, typically for Read commands
// This requires more complex state management to correlate C2H_DATA PDUs
// with specific pending Read commands and their buffers.
// For now, we'll just log this.
log.Printf("Received C2H_DATA PDU, length: %d. (Requires advanced handling)", len(payload))
default:
log.Printf("Received unhandled PDU type: 0x%02x, length: %d", pduType, len(payload))
}
}
代码解析 (Discovery部分):
DiscoveryLogPageEntry:定义了Discovery Log Page中每个条目的结构。DiscoverTargets:- 连接到Discovery Controller(默认端口8009)。
- 发送一个
ConnectAdmin命令,用于在Discovery Controller上建立Admin Queue。这个命令的PDU类型是ICREQ。 - 等待
Connect命令的完成。 - 发送一个
IdentifyAdmin命令(CNS=IDENTIFY_CNS_DISCOVERY_LOG)来请求Discovery Log Page。 - 等待
Identify命令的完成。此时,CompletionResult中的Data字段将包含Discovery Log Page的原始字节。 - 解析Discovery Log Page的每一项,提取目标NQN、传输地址等信息,并存储在
c.targets中。
5.3 连接到特定NVMe子系统和命名空间
一旦发现目标,我们就可以连接到特定的NVMe子系统,并获取其命名空间。
// NVMeAdminQueue 管理Admin Queue的命令提交和完成
type NVMeAdminQueue struct {
qID uint16
conn *NVMeoFConnection
client *NVMeClient
}
// NVMeIOQueue 管理I/O Queue的命令提交和完成
type NVMeIOQueue struct {
qID uint16
conn *NVMeoFConnection
client *NVMeClient
nsid uint32
}
// ConnectToNamespace 连接到指定的NVMe子系统和命名空间
func (c *NVMeClient) ConnectToNamespace(targetNQN string, nsid uint32) (*NVMeNamespace, error) {
target, ok := c.targets[targetNQN]
if !ok {
return nil, fmt.Errorf("target NQN %s not discovered", targetNQN)
}
// Connect to the target's I/O port
conn, err := NewNVMeoFConnection(target.Addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to target %s at %s: %w", targetNQN, target.Addr, err)
}
// Note: In a real client, each target connection might get its own receiver goroutine.
// For simplicity, we assume one global receiver for all connections for now,
// which would need to be modified for multiple connections.
// Or, each connection has its own `StartReceiver` call.
c.StartReceiver(conn)
// 1. 发送 Connect Admin Command 到 I/O Controller
// This establishes an Admin Queue for this specific connection.
cid := c.generateCID()
connectCmd := ConnectCommand{
NVMeCommand: NVMeCommand{
Opcode: NVME_ADMIN_OPC_CONNECT,
CID: cid,
NSID: 0, // For Admin queue
},
SQSIZE: CONNECT_QID_ADMIN, // Queue ID 0xFFFF for Admin Queue
KATO: 0, // Keep-Alive Timeout
}
connectReqPDUData := new(bytes.Buffer)
if err := binary.Write(connectReqPDUData, binary.LittleEndian, connectCmd.NVMeCommand); err != nil {
return nil, fmt.Errorf("failed to serialize connect command: %w", err)
}
if err := binary.Write(connectReqPDUData, binary.LittleEndian, c.HostID); err != nil {
return nil, fmt.Errorf("failed to write HostID: %w", err)
}
hostNQNBytes := []byte(c.HostNQN)
if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(hostNQNBytes))); err != nil {
return nil, fmt.Errorf("failed to write HostNQN length: %w", err)
}
if _, err := connectReqPDUData.Write(hostNQNBytes); err != nil {
return nil, fmt.Errorf("failed to write HostNQN: %w", err)
}
subNQNBytes := []byte(targetNQN)
if err := binary.Write(connectReqPDUData, binary.LittleEndian, uint16(len(subNQNBytes))); err != nil {
return nil, fmt.Errorf("failed to write SubsystemNQN length: %w", err)
}
if _, err := connectReqPDUData.Write(subNQNBytes); err != nil {
return nil, fmt.Errorf("failed to write SubsystemNQN: %w", err)
}
compChan := make(chan CompletionResult, 1)
c.pendingCmds.Store(cid, compChan)
defer c.pendingCmds.Delete(cid)
if err := conn.SendPDU(PDU_TYPE_ICREQ, connectReqPDUData.Bytes()); err != nil {
return nil, fmt.Errorf("failed to send connect command to target %s: %w", targetNQN, err)
}
select {
case compRes := <-compChan:
if compRes.Completion.Status != NVME_STATUS_SUCCESS {
return nil, fmt.Errorf("connect to target %s failed: %s", targetNQN, GetStatusMeaning(compRes.Completion.Status))
}
case <-time.After(10 * time.Second):
return nil, fmt.Errorf("connect to target %s timed out", targetNQN)
}
log.Printf("Successfully connected Admin Queue to target %s.", targetNQN)
// Now establish an I/O Queue
ioQueueID := uint16(rand.Intn(0xFFFE)) // Use a random ID for I/O queue (excluding 0xFFFF for admin)
cid = c.generateCID()
connectIOQueueCmd := ConnectCommand{
NVMeCommand: NVMeCommand{
Opcode: NVME_ADMIN_OPC_CONNECT,
CID: cid,
NSID: 0, // For I/O Queue connection, NSID is 0
},
SQSIZE: ioQueueID, // This is the Queue ID
KATO: 0,
}
connectIOReqPDUData := new(bytes.Buffer)
if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, connectIOQueueCmd.NVMeCommand); err != nil {
return nil, fmt.Errorf("failed to serialize connect IO command: %w", err)
}
if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, c.HostID); err != nil {
return nil, fmt.Errorf("failed to write HostID for IO queue: %w", err)
}
if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, uint16(len(hostNQNBytes))); err != nil {
return nil, fmt.Errorf("failed to write HostNQN length for IO queue: %w", err)
}
if _, err := connectIOReqPDUData.Write(hostNQNBytes); err != nil {
return nil, fmt.Errorf("failed to write HostNQN for IO queue: %w", err)
}
if err := binary.Write(connectIOReqPDUData, binary.LittleEndian, uint16(len(subNQNBytes))); err != nil {
return nil, fmt.Errorf("failed to write SubsystemNQN length for IO queue: %w", err)
}
if _, err := connectIOReqPDUData.Write(subNQNBytes); err != nil {
return nil, fmt.Errorf("failed to write SubsystemNQN for IO queue: %w", err)
}
compChan = make(chan CompletionResult, 1)
c.pendingCmds.Store(cid, compChan)
defer c.pendingCmds.Delete(cid)
if err := conn.SendPDU(PDU_TYPE_ICREQ, connectIOReqPDUData.Bytes()); err != nil {
return nil, fmt.Errorf("failed to send connect IO queue command to target %s: %w", targetNQN, err)
}
select {
case compRes := <-compChan:
if compRes.Completion.Status != NVME_STATUS_SUCCESS {
return nil, fmt.Errorf("connect IO queue to target %s failed: %s", targetNQN, GetStatusMeaning(compRes.Completion.Status))
}
case <-time.After(10 * time.Second):
return nil, fmt.Errorf("connect IO queue to target %s timed out", targetNQN)
}
log.Printf("Successfully connected I/O Queue %d to target %s.", ioQueueID, targetNQN)
// 2. 发送 Identify Namespace 命令
cid = c.generateCID()
identifyNsCmd := IdentifyCommand{
NVMeCommand: NVMeCommand{
Opcode: NVME_ADMIN_OPC_IDENTIFY,
CID: cid,
NSID: nsid, // Specific namespace ID
},
CDW10: IDENTIFY_CNS_NAMESPACE, // Identify Namespace
}
identifyNsCmdBuf := new(bytes.Buffer)
if err := binary.Write(identifyNsCmdBuf, binary.LittleEndian, identifyNsCmd.NVMeCommand); err != nil {
return nil, fmt.Errorf("failed to serialize identify namespace command: %w", err)
}
compChan = make(chan CompletionResult, 1)
c.pendingCmds.Store(cid, compChan)
defer c.pendingCmds.Delete(cid)