好的,下面是关于C++中Non-blocking I/O与Completion Ports的技术文章,以讲座形式呈现:
C++中的Non-blocking I/O与Completion Ports:实现异步操作的极致性能
大家好,今天我们来探讨C++中实现异步操作的两种关键技术:Non-blocking I/O 和 Completion Ports。我们将深入了解它们的原理、应用场景以及如何结合使用以达到极致性能。
一、同步 I/O 的瓶颈
在传统的同步 I/O 模型中,当一个线程发起 I/O 操作(例如,从网络读取数据或写入磁盘)时,它会一直阻塞,直到操作完成。这意味着线程在等待 I/O 完成期间无法执行其他任务。在高并发的场景下,大量的线程阻塞在 I/O 操作上会导致系统资源严重浪费,降低整体吞吐量。
举个简单的例子:
#include <iostream>
#include <fstream>
#include <chrono>
#include <thread>
void read_file(const std::string& filename) {
std::ifstream file(filename);
if (file.is_open()) {
std::string line;
while (std::getline(file, line)) {
std::cout << "Read line: " << line << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟耗时操作
}
file.close();
} else {
std::cerr << "Unable to open file: " << filename << std::endl;
}
}
int main() {
read_file("large_file.txt"); //假设large_file.txt很大
std::cout << "File reading complete." << std::endl;
return 0;
}
在这个例子中,read_file 函数以同步方式读取文件。如果 large_file.txt 文件很大,程序会阻塞在读取操作上,直到整个文件读取完成。在多线程环境下,多个线程同时执行 read_file,会造成线程阻塞,降低程序的响应速度。
二、Non-blocking I/O 的原理与应用
Non-blocking I/O 允许线程发起 I/O 操作后立即返回,而无需等待操作完成。线程可以继续执行其他任务,并在稍后检查 I/O 操作是否完成。这大大提高了线程的利用率,从而提升了系统的并发性能。
实现 Non-blocking I/O 的关键在于将文件描述符(在 Unix 系统中)或 socket 设置为非阻塞模式。
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif
#include <iostream>
#include <stdexcept>
// 设置 socket 为 non-blocking 模式
bool set_non_blocking(int socket) {
#ifdef _WIN32
unsigned long mode = 1;
return (ioctlsocket(socket, FIONBIO, &mode) == 0);
#else
int flags = fcntl(socket, F_GETFL, 0);
if (flags == -1) return false;
return (fcntl(socket, F_SETFL, flags | O_NONBLOCK) != -1);
#endif
}
// 简单的 non-blocking socket 示例
int main() {
#ifdef _WIN32
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
std::cerr << "WSAStartup failed" << std::endl;
return 1;
}
#endif
int server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
std::cerr << "Socket creation failed" << std::endl;
return 1;
}
sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(8080);
if (bind(server_socket, (sockaddr*)&server_address, sizeof(server_address)) == -1) {
std::cerr << "Bind failed" << std::endl;
#ifdef _WIN32
closesocket(server_socket);
WSACleanup();
#else
close(server_socket);
#endif
return 1;
}
if (listen(server_socket, 5) == -1) {
std::cerr << "Listen failed" << std::endl;
#ifdef _WIN32
closesocket(server_socket);
WSACleanup();
#else
close(server_socket);
#endif
return 1;
}
if (!set_non_blocking(server_socket)) {
std::cerr << "Failed to set non-blocking mode" << std::endl;
#ifdef _WIN32
closesocket(server_socket);
WSACleanup();
#else
close(server_socket);
#endif
return 1;
}
std::cout << "Server listening on port 8080 (Non-blocking)" << std::endl;
while (true) {
sockaddr_in client_address;
socklen_t client_address_len = sizeof(client_address);
int client_socket = accept(server_socket, (sockaddr*)&client_address, &client_address_len);
if (client_socket == -1) {
#ifdef _WIN32
if (WSAGetLastError() != WSAEWOULDBLOCK) {
std::cerr << "Accept failed: " << WSAGetLastError() << std::endl;
break;
}
#else
if (errno != EWOULDBLOCK && errno != EAGAIN) {
std::cerr << "Accept failed: " << errno << std::endl;
break;
}
#endif
// 没有连接请求,继续监听
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
std::cout << "New connection accepted." << std::endl;
// 处理客户端连接... (此处省略,可以使用线程池等方式处理)
#ifdef _WIN32
closesocket(client_socket);
#else
close(client_socket);
#endif
}
#ifdef _WIN32
closesocket(server_socket);
WSACleanup();
#else
close(server_socket);
#endif
return 0;
}
在这个例子中,set_non_blocking 函数用于将 socket 设置为非阻塞模式。当调用 accept 函数时,如果当前没有连接请求,它会立即返回,并设置 errno 为 EWOULDBLOCK 或 EAGAIN (在Windows下是WSAEWOULDBLOCK)。程序可以检查 errno 的值,并根据需要继续监听或执行其他任务。
Non-blocking I/O 的优点:
- 提高线程利用率:线程不会阻塞在 I/O 操作上,可以执行其他任务。
- 提升并发性能:可以处理更多的并发连接。
- 增强响应性:程序可以更快地响应用户请求。
Non-blocking I/O 的缺点:
- 轮询开销:需要定期检查 I/O 操作是否完成,这会带来一定的 CPU 开销。
- 代码复杂度:需要处理 I/O 操作未完成的情况,增加了代码的复杂性。
三、Completion Ports 的原理与优势
Completion Ports 是一种高效的异步 I/O 机制,主要用于 Windows 操作系统。它允许应用程序将多个 I/O 操作提交到操作系统,并在 I/O 操作完成后通过完成队列接收通知。线程可以从完成队列中获取已完成的 I/O 操作,并进行后续处理。
Completion Ports 的核心概念包括:
- I/O Completion Port (IOCP): 一个内核对象,用于管理异步 I/O 操作的完成状态。
- File Handle 或 Socket: 需要进行异步 I/O 操作的文件句柄或 socket。这些句柄需要与 IOCP 关联。
- Overlapped Structure: 一个用于跟踪异步 I/O 操作的结构体。它包含一个事件句柄,当 I/O 操作完成时,操作系统会设置该事件。
- Worker Threads: 负责从完成队列中获取已完成的 I/O 操作,并进行处理的线程。
Completion Ports 的工作流程:
- 创建 I/O Completion Port。
- 将文件句柄或 socket 与 IOCP 关联。
- 发起异步 I/O 操作,并提供 Overlapped 结构体。
- Worker 线程调用
GetQueuedCompletionStatus函数,等待 I/O 操作完成。 - 当 I/O 操作完成时,操作系统将一个完成数据包放入完成队列,并唤醒一个等待的 Worker 线程。
- Worker 线程从完成队列中获取完成数据包,并处理 I/O 操作的结果。
以下是一个简单的 Completion Ports 示例:
#include <iostream>
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdexcept>
// 定义一个结构体用于传递数据
struct OverlappedData {
OVERLAPPED overlapped;
SOCKET socket;
char buffer[1024];
WSABUF wsaBuf;
};
DWORD WINAPI WorkerThread(LPVOID completionPortID) {
HANDLE completionPort = (HANDLE)completionPortID;
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED lpOverlapped;
while (true) {
BOOL result = GetQueuedCompletionStatus(
completionPort,
&bytesTransferred,
&completionKey,
&lpOverlapped,
INFINITE // 等待I/O完成
);
if (result == FALSE) {
if (lpOverlapped == NULL) {
// completionPort关闭或者线程退出
std::cerr << "GetQueuedCompletionStatus failed: " << GetLastError() << std::endl;
break;
}
// I/O操作失败
OverlappedData* data = (OverlappedData*)lpOverlapped;
std::cerr << "I/O operation failed on socket: " << data->socket << ", Error: " << GetLastError() << std::endl;
closesocket(data->socket);
delete data;
continue;
}
OverlappedData* data = (OverlappedData*)lpOverlapped;
if (bytesTransferred == 0) {
// 连接关闭
std::cout << "Connection closed by client." << std::endl;
closesocket(data->socket);
delete data;
continue;
}
// 处理接收到的数据
std::cout << "Received data: " << data->buffer << std::endl;
// 再次发起异步接收
ZeroMemory(&data->overlapped, sizeof(OVERLAPPED));
data->wsaBuf.len = sizeof(data->buffer);
data->wsaBuf.buf = data->buffer;
DWORD flags = 0;
int result2 = WSARecv(data->socket, &data->wsaBuf, 1, NULL, &flags, &data->overlapped, NULL);
if (result2 == SOCKET_ERROR) {
if (WSAGetLastError() != WSA_IO_PENDING) {
std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
closesocket(data->socket);
delete data;
continue;
}
}
}
return 0;
}
int main() {
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
std::cerr << "WSAStartup failed" << std::endl;
return 1;
}
HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (completionPort == NULL) {
std::cerr << "CreateIoCompletionPort failed: " << GetLastError() << std::endl;
WSACleanup();
return 1;
}
// 创建工作线程
const int numThreads = 4; // 根据CPU核心数调整线程数
for (int i = 0; i < numThreads; ++i) {
HANDLE thread = CreateThread(NULL, 0, WorkerThread, completionPort, 0, NULL);
if (thread == NULL) {
std::cerr << "CreateThread failed: " << GetLastError() << std::endl;
CloseHandle(completionPort);
WSACleanup();
return 1;
}
CloseHandle(thread); // 关闭线程句柄,因为主线程不需要直接管理它们
}
SOCKET serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (serverSocket == INVALID_SOCKET) {
std::cerr << "socket failed: " << WSAGetLastError() << std::endl;
CloseHandle(completionPort);
WSACleanup();
return 1;
}
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(8080);
if (bind(serverSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
std::cerr << "bind failed: " << WSAGetLastError() << std::endl;
closesocket(serverSocket);
CloseHandle(completionPort);
WSACleanup();
return 1;
}
if (listen(serverSocket, SOMAXCONN) == SOCKET_ERROR) {
std::cerr << "listen failed: " << WSAGetLastError() << std::endl;
closesocket(serverSocket);
CloseHandle(completionPort);
WSACleanup();
return 1;
}
std::cout << "Server listening on port 8080..." << std::endl;
while (true) {
sockaddr_in clientAddr;
int clientAddrLen = sizeof(clientAddr);
SOCKET clientSocket = accept(serverSocket, (SOCKADDR*)&clientAddr, &clientAddrLen);
if (clientSocket == INVALID_SOCKET) {
std::cerr << "accept failed: " << WSAGetLastError() << std::endl;
continue; // 继续监听
}
// 将socket与completion port关联
if (CreateIoCompletionPort((HANDLE)clientSocket, completionPort, (ULONG_PTR)clientSocket, 0) == NULL) {
std::cerr << "CreateIoCompletionPort (associate socket) failed: " << GetLastError() << std::endl;
closesocket(clientSocket);
continue; // 继续监听
}
// 为新的连接分配 OverlappedData 结构体
OverlappedData* data = new OverlappedData;
ZeroMemory(&data->overlapped, sizeof(OVERLAPPED));
data->socket = clientSocket;
data->wsaBuf.len = sizeof(data->buffer);
data->wsaBuf.buf = data->buffer;
// 投递第一个异步接收请求
DWORD flags = 0;
int result = WSARecv(clientSocket, &data->wsaBuf, 1, NULL, &flags, &data->overlapped, NULL);
if (result == SOCKET_ERROR) {
if (WSAGetLastError() != WSA_IO_PENDING) {
std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
closesocket(clientSocket);
delete data;
continue; // 继续监听
}
}
std::cout << "Accepted new connection. Waiting for data..." << std::endl;
}
// 关闭监听socket和completion port
closesocket(serverSocket);
CloseHandle(completionPort);
WSACleanup();
return 0;
}
在这个例子中,我们创建了一个 Completion Port,并创建了多个 Worker 线程来处理 I/O 操作的完成事件。当有新的连接到达时,我们将 socket 与 Completion Port 关联,并投递一个异步接收请求。当数据到达时,一个 Worker 线程会被唤醒,处理接收到的数据,并再次投递异步接收请求,从而实现持续的异步 I/O 操作。
Completion Ports 的优点:
- 高效的异步 I/O: 充分利用操作系统内核的异步 I/O 能力,减少线程阻塞,提高并发性能。
- 线程池管理: 操作系统负责管理 Worker 线程,根据负载动态调整线程数量,避免线程创建和销毁的开销。
- 易于扩展: 可以轻松地处理大量的并发连接,适用于高并发的网络应用。
Completion Ports 的缺点:
- 平台依赖性: Completion Ports 是 Windows 操作系统特有的技术,不具有跨平台性。
- 代码复杂性: 需要处理 Overlapped 结构体、完成队列等复杂概念,增加了代码的复杂性。
四、Non-blocking I/O 与 Completion Ports 的选择
选择 Non-blocking I/O 还是 Completion Ports 取决于具体的应用场景和平台。
- Non-blocking I/O: 适用于需要跨平台支持,且并发量不是特别高的应用。例如,一些简单的网络服务器或客户端应用。
- Completion Ports: 适用于 Windows 平台下,需要处理大量并发连接,并对性能有极致要求的应用。例如,高并发的网络游戏服务器、大型网站服务器等。
| 特性 | Non-blocking I/O | Completion Ports |
|---|---|---|
| 平台依赖性 | 跨平台 | Windows 平台特有 |
| 并发性能 | 相对较低 | 极高 |
| 线程管理 | 需要手动管理线程 | 操作系统自动管理线程 |
| 代码复杂度 | 较低 | 较高 |
| 适用场景 | 跨平台、并发量不高的应用 | Windows 平台、高并发、性能敏感的应用 |
五、结合使用 Non-blocking I/O 和 Completion Ports
在某些情况下,我们可以将 Non-blocking I/O 和 Completion Ports 结合使用,以达到更好的效果。例如,可以使用 Non-blocking I/O 来处理连接的建立和关闭,使用 Completion Ports 来处理数据的收发。
六、实际应用中的注意事项
- 错误处理: 在使用 Non-blocking I/O 和 Completion Ports 时,需要特别注意错误处理。例如,需要检查
errno的值,处理 I/O 操作失败的情况。 - 资源管理: 需要合理地管理资源,例如,及时关闭 socket,释放内存,避免资源泄露。
- 性能优化: 可以通过调整线程数量、缓冲区大小等参数来优化性能。
- 内存对齐: 在使用
OVERLAPPED结构体时,务必注意内存对齐问题,避免出现性能问题。
七、使用第三方库简化开发
为了简化 Non-blocking I/O 和 Completion Ports 的开发,可以使用一些第三方库,例如:
- Boost.Asio: 一个跨平台的 C++ 库,提供了异步 I/O、定时器、socket 等功能。
- libuv: 一个高性能的跨平台异步 I/O 库,Node.js 的底层库。
这些库封装了底层的 I/O 操作,提供了更高级的 API,使开发更加简单和高效。
八、一些建议
-
理解底层原理:深入理解Non-blocking I/O和Completion Ports的工作机制,有助于更好地解决实际问题。
-
善用工具:使用性能分析工具来定位性能瓶颈,并进行针对性的优化。
-
持续学习:异步I/O技术不断发展,保持学习的热情,掌握最新的技术动态。
使用异步 I/O 技术,可以显著提高 C++ 程序的并发性能和响应速度,特别是在高并发的网络应用中。 理解和掌握这些技术对于开发高性能的 C++ 应用至关重要。
更多IT精英技术系列讲座,到智猿学院