C++中的Non-blocking I/O与Complection Ports:实现异步操作的极致性能

好的,下面是关于C++中Non-blocking I/O与Completion Ports的技术文章,以讲座形式呈现:

C++中的Non-blocking I/O与Completion Ports:实现异步操作的极致性能

大家好,今天我们来探讨C++中实现异步操作的两种关键技术:Non-blocking I/O 和 Completion Ports。我们将深入了解它们的原理、应用场景以及如何结合使用以达到极致性能。

一、同步 I/O 的瓶颈

在传统的同步 I/O 模型中,当一个线程发起 I/O 操作(例如,从网络读取数据或写入磁盘)时,它会一直阻塞,直到操作完成。这意味着线程在等待 I/O 完成期间无法执行其他任务。在高并发的场景下,大量的线程阻塞在 I/O 操作上会导致系统资源严重浪费,降低整体吞吐量。

举个简单的例子:

#include <iostream>
#include <fstream>
#include <chrono>
#include <thread>

void read_file(const std::string& filename) {
    std::ifstream file(filename);
    if (file.is_open()) {
        std::string line;
        while (std::getline(file, line)) {
            std::cout << "Read line: " << line << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟耗时操作
        }
        file.close();
    } else {
        std::cerr << "Unable to open file: " << filename << std::endl;
    }
}

int main() {
    read_file("large_file.txt"); //假设large_file.txt很大
    std::cout << "File reading complete." << std::endl;
    return 0;
}

在这个例子中,read_file 函数以同步方式读取文件。如果 large_file.txt 文件很大,程序会阻塞在读取操作上,直到整个文件读取完成。在多线程环境下,多个线程同时执行 read_file,会造成线程阻塞,降低程序的响应速度。

二、Non-blocking I/O 的原理与应用

Non-blocking I/O 允许线程发起 I/O 操作后立即返回,而无需等待操作完成。线程可以继续执行其他任务,并在稍后检查 I/O 操作是否完成。这大大提高了线程的利用率,从而提升了系统的并发性能。

实现 Non-blocking I/O 的关键在于将文件描述符(在 Unix 系统中)或 socket 设置为非阻塞模式。

#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif

#include <iostream>
#include <stdexcept>

// 设置 socket 为 non-blocking 模式
bool set_non_blocking(int socket) {
#ifdef _WIN32
    unsigned long mode = 1;
    return (ioctlsocket(socket, FIONBIO, &mode) == 0);
#else
    int flags = fcntl(socket, F_GETFL, 0);
    if (flags == -1) return false;
    return (fcntl(socket, F_SETFL, flags | O_NONBLOCK) != -1);
#endif
}

// 简单的 non-blocking socket 示例
int main() {
#ifdef _WIN32
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        std::cerr << "WSAStartup failed" << std::endl;
        return 1;
    }
#endif

    int server_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (server_socket == -1) {
        std::cerr << "Socket creation failed" << std::endl;
        return 1;
    }

    sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = INADDR_ANY;
    server_address.sin_port = htons(8080);

    if (bind(server_socket, (sockaddr*)&server_address, sizeof(server_address)) == -1) {
        std::cerr << "Bind failed" << std::endl;
        #ifdef _WIN32
        closesocket(server_socket);
        WSACleanup();
        #else
        close(server_socket);
        #endif
        return 1;
    }

    if (listen(server_socket, 5) == -1) {
        std::cerr << "Listen failed" << std::endl;
        #ifdef _WIN32
        closesocket(server_socket);
        WSACleanup();
        #else
        close(server_socket);
        #endif
        return 1;
    }

    if (!set_non_blocking(server_socket)) {
        std::cerr << "Failed to set non-blocking mode" << std::endl;
        #ifdef _WIN32
        closesocket(server_socket);
        WSACleanup();
        #else
        close(server_socket);
        #endif
        return 1;
    }

    std::cout << "Server listening on port 8080 (Non-blocking)" << std::endl;

    while (true) {
        sockaddr_in client_address;
        socklen_t client_address_len = sizeof(client_address);

        int client_socket = accept(server_socket, (sockaddr*)&client_address, &client_address_len);

        if (client_socket == -1) {
            #ifdef _WIN32
            if (WSAGetLastError() != WSAEWOULDBLOCK) {
                std::cerr << "Accept failed: " << WSAGetLastError() << std::endl;
                break;
            }
            #else
            if (errno != EWOULDBLOCK && errno != EAGAIN) {
                std::cerr << "Accept failed: " << errno << std::endl;
                break;
            }
            #endif

            // 没有连接请求,继续监听
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            continue;
        }

        std::cout << "New connection accepted." << std::endl;

        // 处理客户端连接... (此处省略,可以使用线程池等方式处理)
        #ifdef _WIN32
        closesocket(client_socket);
        #else
        close(client_socket);
        #endif

    }

    #ifdef _WIN32
    closesocket(server_socket);
    WSACleanup();
    #else
    close(server_socket);
    #endif

    return 0;
}

在这个例子中,set_non_blocking 函数用于将 socket 设置为非阻塞模式。当调用 accept 函数时,如果当前没有连接请求,它会立即返回,并设置 errnoEWOULDBLOCKEAGAIN (在Windows下是WSAEWOULDBLOCK)。程序可以检查 errno 的值,并根据需要继续监听或执行其他任务。

Non-blocking I/O 的优点:

  • 提高线程利用率:线程不会阻塞在 I/O 操作上,可以执行其他任务。
  • 提升并发性能:可以处理更多的并发连接。
  • 增强响应性:程序可以更快地响应用户请求。

Non-blocking I/O 的缺点:

  • 轮询开销:需要定期检查 I/O 操作是否完成,这会带来一定的 CPU 开销。
  • 代码复杂度:需要处理 I/O 操作未完成的情况,增加了代码的复杂性。

三、Completion Ports 的原理与优势

Completion Ports 是一种高效的异步 I/O 机制,主要用于 Windows 操作系统。它允许应用程序将多个 I/O 操作提交到操作系统,并在 I/O 操作完成后通过完成队列接收通知。线程可以从完成队列中获取已完成的 I/O 操作,并进行后续处理。

Completion Ports 的核心概念包括:

  • I/O Completion Port (IOCP): 一个内核对象,用于管理异步 I/O 操作的完成状态。
  • File Handle 或 Socket: 需要进行异步 I/O 操作的文件句柄或 socket。这些句柄需要与 IOCP 关联。
  • Overlapped Structure: 一个用于跟踪异步 I/O 操作的结构体。它包含一个事件句柄,当 I/O 操作完成时,操作系统会设置该事件。
  • Worker Threads: 负责从完成队列中获取已完成的 I/O 操作,并进行处理的线程。

Completion Ports 的工作流程:

  1. 创建 I/O Completion Port。
  2. 将文件句柄或 socket 与 IOCP 关联。
  3. 发起异步 I/O 操作,并提供 Overlapped 结构体。
  4. Worker 线程调用 GetQueuedCompletionStatus 函数,等待 I/O 操作完成。
  5. 当 I/O 操作完成时,操作系统将一个完成数据包放入完成队列,并唤醒一个等待的 Worker 线程。
  6. Worker 线程从完成队列中获取完成数据包,并处理 I/O 操作的结果。

以下是一个简单的 Completion Ports 示例:

#include <iostream>
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdexcept>

// 定义一个结构体用于传递数据
struct OverlappedData {
    OVERLAPPED overlapped;
    SOCKET socket;
    char buffer[1024];
    WSABUF wsaBuf;
};

DWORD WINAPI WorkerThread(LPVOID completionPortID) {
    HANDLE completionPort = (HANDLE)completionPortID;
    DWORD bytesTransferred;
    ULONG_PTR completionKey;
    LPOVERLAPPED lpOverlapped;

    while (true) {
        BOOL result = GetQueuedCompletionStatus(
            completionPort,
            &bytesTransferred,
            &completionKey,
            &lpOverlapped,
            INFINITE  // 等待I/O完成
        );

        if (result == FALSE) {
            if (lpOverlapped == NULL) {
                // completionPort关闭或者线程退出
                std::cerr << "GetQueuedCompletionStatus failed: " << GetLastError() << std::endl;
                break;
            }

            // I/O操作失败
            OverlappedData* data = (OverlappedData*)lpOverlapped;
            std::cerr << "I/O operation failed on socket: " << data->socket << ", Error: " << GetLastError() << std::endl;
            closesocket(data->socket);
            delete data;
            continue;
        }

        OverlappedData* data = (OverlappedData*)lpOverlapped;

        if (bytesTransferred == 0) {
            // 连接关闭
            std::cout << "Connection closed by client." << std::endl;
            closesocket(data->socket);
            delete data;
            continue;
        }

        // 处理接收到的数据
        std::cout << "Received data: " << data->buffer << std::endl;

        // 再次发起异步接收
        ZeroMemory(&data->overlapped, sizeof(OVERLAPPED));
        data->wsaBuf.len = sizeof(data->buffer);
        data->wsaBuf.buf = data->buffer;

        DWORD flags = 0;
        int result2 = WSARecv(data->socket, &data->wsaBuf, 1, NULL, &flags, &data->overlapped, NULL);
        if (result2 == SOCKET_ERROR) {
            if (WSAGetLastError() != WSA_IO_PENDING) {
                std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
                closesocket(data->socket);
                delete data;
                continue;
            }
        }
    }

    return 0;
}

int main() {
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        std::cerr << "WSAStartup failed" << std::endl;
        return 1;
    }

    HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (completionPort == NULL) {
        std::cerr << "CreateIoCompletionPort failed: " << GetLastError() << std::endl;
        WSACleanup();
        return 1;
    }

    // 创建工作线程
    const int numThreads = 4;  // 根据CPU核心数调整线程数
    for (int i = 0; i < numThreads; ++i) {
        HANDLE thread = CreateThread(NULL, 0, WorkerThread, completionPort, 0, NULL);
        if (thread == NULL) {
            std::cerr << "CreateThread failed: " << GetLastError() << std::endl;
            CloseHandle(completionPort);
            WSACleanup();
            return 1;
        }
        CloseHandle(thread); // 关闭线程句柄,因为主线程不需要直接管理它们
    }

    SOCKET serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (serverSocket == INVALID_SOCKET) {
        std::cerr << "socket failed: " << WSAGetLastError() << std::endl;
        CloseHandle(completionPort);
        WSACleanup();
        return 1;
    }

    sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_addr.s_addr = INADDR_ANY;
    serverAddr.sin_port = htons(8080);

    if (bind(serverSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
        std::cerr << "bind failed: " << WSAGetLastError() << std::endl;
        closesocket(serverSocket);
        CloseHandle(completionPort);
        WSACleanup();
        return 1;
    }

    if (listen(serverSocket, SOMAXCONN) == SOCKET_ERROR) {
        std::cerr << "listen failed: " << WSAGetLastError() << std::endl;
        closesocket(serverSocket);
        CloseHandle(completionPort);
        WSACleanup();
        return 1;
    }

    std::cout << "Server listening on port 8080..." << std::endl;

    while (true) {
        sockaddr_in clientAddr;
        int clientAddrLen = sizeof(clientAddr);
        SOCKET clientSocket = accept(serverSocket, (SOCKADDR*)&clientAddr, &clientAddrLen);
        if (clientSocket == INVALID_SOCKET) {
            std::cerr << "accept failed: " << WSAGetLastError() << std::endl;
            continue; // 继续监听
        }

        // 将socket与completion port关联
        if (CreateIoCompletionPort((HANDLE)clientSocket, completionPort, (ULONG_PTR)clientSocket, 0) == NULL) {
            std::cerr << "CreateIoCompletionPort (associate socket) failed: " << GetLastError() << std::endl;
            closesocket(clientSocket);
            continue; // 继续监听
        }

        // 为新的连接分配 OverlappedData 结构体
        OverlappedData* data = new OverlappedData;
        ZeroMemory(&data->overlapped, sizeof(OVERLAPPED));
        data->socket = clientSocket;
        data->wsaBuf.len = sizeof(data->buffer);
        data->wsaBuf.buf = data->buffer;

        // 投递第一个异步接收请求
        DWORD flags = 0;
        int result = WSARecv(clientSocket, &data->wsaBuf, 1, NULL, &flags, &data->overlapped, NULL);
        if (result == SOCKET_ERROR) {
            if (WSAGetLastError() != WSA_IO_PENDING) {
                std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
                closesocket(clientSocket);
                delete data;
                continue; // 继续监听
            }
        }

        std::cout << "Accepted new connection.  Waiting for data..." << std::endl;
    }

    // 关闭监听socket和completion port
    closesocket(serverSocket);
    CloseHandle(completionPort);
    WSACleanup();

    return 0;
}

在这个例子中,我们创建了一个 Completion Port,并创建了多个 Worker 线程来处理 I/O 操作的完成事件。当有新的连接到达时,我们将 socket 与 Completion Port 关联,并投递一个异步接收请求。当数据到达时,一个 Worker 线程会被唤醒,处理接收到的数据,并再次投递异步接收请求,从而实现持续的异步 I/O 操作。

Completion Ports 的优点:

  • 高效的异步 I/O: 充分利用操作系统内核的异步 I/O 能力,减少线程阻塞,提高并发性能。
  • 线程池管理: 操作系统负责管理 Worker 线程,根据负载动态调整线程数量,避免线程创建和销毁的开销。
  • 易于扩展: 可以轻松地处理大量的并发连接,适用于高并发的网络应用。

Completion Ports 的缺点:

  • 平台依赖性: Completion Ports 是 Windows 操作系统特有的技术,不具有跨平台性。
  • 代码复杂性: 需要处理 Overlapped 结构体、完成队列等复杂概念,增加了代码的复杂性。

四、Non-blocking I/O 与 Completion Ports 的选择

选择 Non-blocking I/O 还是 Completion Ports 取决于具体的应用场景和平台。

  • Non-blocking I/O: 适用于需要跨平台支持,且并发量不是特别高的应用。例如,一些简单的网络服务器或客户端应用。
  • Completion Ports: 适用于 Windows 平台下,需要处理大量并发连接,并对性能有极致要求的应用。例如,高并发的网络游戏服务器、大型网站服务器等。
特性 Non-blocking I/O Completion Ports
平台依赖性 跨平台 Windows 平台特有
并发性能 相对较低 极高
线程管理 需要手动管理线程 操作系统自动管理线程
代码复杂度 较低 较高
适用场景 跨平台、并发量不高的应用 Windows 平台、高并发、性能敏感的应用

五、结合使用 Non-blocking I/O 和 Completion Ports

在某些情况下,我们可以将 Non-blocking I/O 和 Completion Ports 结合使用,以达到更好的效果。例如,可以使用 Non-blocking I/O 来处理连接的建立和关闭,使用 Completion Ports 来处理数据的收发。

六、实际应用中的注意事项

  • 错误处理: 在使用 Non-blocking I/O 和 Completion Ports 时,需要特别注意错误处理。例如,需要检查 errno 的值,处理 I/O 操作失败的情况。
  • 资源管理: 需要合理地管理资源,例如,及时关闭 socket,释放内存,避免资源泄露。
  • 性能优化: 可以通过调整线程数量、缓冲区大小等参数来优化性能。
  • 内存对齐: 在使用OVERLAPPED结构体时,务必注意内存对齐问题,避免出现性能问题。

七、使用第三方库简化开发

为了简化 Non-blocking I/O 和 Completion Ports 的开发,可以使用一些第三方库,例如:

  • Boost.Asio: 一个跨平台的 C++ 库,提供了异步 I/O、定时器、socket 等功能。
  • libuv: 一个高性能的跨平台异步 I/O 库,Node.js 的底层库。

这些库封装了底层的 I/O 操作,提供了更高级的 API,使开发更加简单和高效。

八、一些建议

  1. 理解底层原理:深入理解Non-blocking I/O和Completion Ports的工作机制,有助于更好地解决实际问题。

  2. 善用工具:使用性能分析工具来定位性能瓶颈,并进行针对性的优化。

  3. 持续学习:异步I/O技术不断发展,保持学习的热情,掌握最新的技术动态。

使用异步 I/O 技术,可以显著提高 C++ 程序的并发性能和响应速度,特别是在高并发的网络应用中。 理解和掌握这些技术对于开发高性能的 C++ 应用至关重要。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注