好的,下面是一篇关于C++实现高性能WebSocket服务器的文章,重点在于利用Asio/Epoll实现高并发与低延迟:
C++高性能WebSockets服务器:利用Asio/Epoll实现高并发与低延迟
大家好!今天我们来深入探讨如何使用C++构建高性能的WebSocket服务器。WebSocket作为一种全双工通信协议,在实时应用中发挥着关键作用,比如在线游戏、实时聊天、金融数据推送等。高性能是这些应用的基本要求,因此,我们需要精心设计服务器架构。
1. WebSocket协议回顾
在深入代码之前,我们先简单回顾一下WebSocket协议的关键点:
- 握手 (Handshake): WebSocket连接建立之初,客户端和服务器会进行HTTP协议升级的握手过程。客户端发送包含
Upgrade: websocket和Connection: Upgrade头的HTTP请求,服务器验证后返回状态码101 Switching Protocols,握手成功。 - 帧 (Frames): 握手成功后,数据以帧的形式在客户端和服务器之间传输。每一帧包含帧头和数据负载。帧头包含操作码 (opcode),用于指示数据类型 (文本、二进制、控制帧等),以及掩码 (mask) 和负载长度等信息。
- 控制帧 (Control Frames): WebSocket协议定义了几种控制帧,例如Ping、Pong和Close。Ping/Pong用于检测连接是否存活,Close用于关闭连接。
- 掩码 (Masking): 从客户端发送到服务器的数据帧必须进行掩码处理,以提高安全性。服务器发送到客户端的数据帧则不需要掩码。
2. 技术选型:Asio与Epoll
为了实现高并发和低延迟,我们选择以下技术:
- Asio: 一个跨平台的C++库,提供异步I/O、定时器和其他底层操作系统功能的抽象。Asio可以与多种I/O模型一起使用,包括Epoll、kqueue和Windows的IOCP。
- Epoll: Linux内核提供的高效I/O事件通知机制。Epoll允许我们同时监视大量的文件描述符 (sockets),并在文件描述符准备好进行I/O操作时得到通知。
为什么选择Asio和Epoll?
| 技术 | 优点 | 缺点 |
|---|---|---|
| Asio | 跨平台,提供统一的异步I/O接口,简化了网络编程的复杂性。可以与多种I/O模型一起使用,如Epoll,select,iocp。 | 相对重量级,相比纯C代码,性能略有损失 (但可以通过优化弥补)。有一定的学习曲线。 |
| Epoll | 高效的I/O事件通知机制,支持百万级别的并发连接。基于事件驱动,减少了CPU的空闲时间。 | 仅限于Linux平台。需要手动管理文件描述符的事件监听和取消,代码相对复杂。 |
| Asio+Epoll | 结合了两者的优点,利用Asio的跨平台性和易用性,同时利用Epoll的高性能。Asio封装了Epoll的底层细节,使得我们可以更容易地编写高性能的网络应用程序。 | 代码略微复杂,需要理解Asio的异步编程模型和Epoll的工作原理。调试困难,特别是涉及到多线程和异步操作时。 |
3. 服务器架构设计
我们的WebSocket服务器将采用以下架构:
- 多线程事件循环 (Multi-threaded Event Loop): 主线程负责监听新的连接,并将连接分发给工作线程。每个工作线程运行一个Asio I/O上下文 (io_context) 和一个Epoll实例,负责处理分配给它的所有客户端连接。
- 连接管理器 (Connection Manager): 用于跟踪所有活动的WebSocket连接,并提供管理连接的方法,例如发送消息、关闭连接等。
- 消息处理器 (Message Handler): 负责解析WebSocket帧,处理不同的操作码 (文本、二进制、Ping、Pong、Close),并将消息传递给应用程序逻辑。
4. 代码实现
下面是一些关键代码片段,展示了如何使用Asio和Epoll实现高性能的WebSocket服务器。
4.1. 包含头文件
#include <iostream>
#include <asio.hpp>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include <vector>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <random> // For generating mask
#include <iomanip> // For std::setw
#include <sstream> // For creating strings
#ifdef __linux__
#include <sys/epoll.h>
#endif
4.2. WebSocket 帧结构体
struct WebSocketFrame {
bool fin;
uint8_t opcode;
bool mask;
uint64_t payload_length;
std::array<uint8_t, 4> masking_key;
std::vector<uint8_t> payload_data;
};
4.3. WebSocketConnection 类
这个类管理单个WebSocket连接。 它处理握手、接收和发送帧,以及关闭连接。
class WebSocketConnection : public std::enable_shared_from_this<WebSocketConnection> {
public:
WebSocketConnection(asio::ip::tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
do_handshake();
}
private:
void do_handshake() {
auto self = shared_from_this();
asio::async_read_until(socket_, buffer_, "rnrn",
[this, self](const asio::error_code& error, std::size_t bytes_transferred) {
if (!error) {
std::stringstream ss;
ss << std::istream(&buffer_).rdbuf();
std::string request = ss.str();
// 提取 Sec-WebSocket-Key
std::string key;
size_t pos = request.find("Sec-WebSocket-Key: ");
if (pos != std::string::npos) {
pos += std::string("Sec-WebSocket-Key: ").length();
key = request.substr(pos, request.find("rn", pos) - pos);
}
// 计算 Sec-WebSocket-Accept
std::string accept_key = calculate_accept_key(key);
// 构建握手响应
std::string response =
"HTTP/1.1 101 Switching Protocolsrn"
"Upgrade: websocketrn"
"Connection: Upgradern"
"Sec-WebSocket-Accept: " + accept_key + "rnrn";
asio::async_write(socket_, asio::buffer(response),
[this, self](const asio::error_code& error, std::size_t bytes_transferred) {
if (!error) {
std::cout << "Handshake completed successfully." << std::endl;
do_read_header(); // 握手成功后开始读取数据帧
} else {
std::cerr << "Error during handshake: " << error.message() << std::endl;
close();
}
});
} else {
std::cerr << "Error reading handshake request: " << error.message() << std::endl;
close();
}
});
}
std::string calculate_accept_key(const std::string& key) {
std::string combined_key = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
std::vector<unsigned char> hash(SHA_DIGEST_LENGTH);
SHA1(reinterpret_cast<const unsigned char*>(combined_key.c_str()), combined_key.length(), hash.data());
std::string encoded_hash = base64_encode(hash.data(), SHA_DIGEST_LENGTH);
return encoded_hash;
}
std::string base64_encode(const unsigned char* buf, size_t size) {
BIO *bio, *b64;
BUF_MEM *bufferPtr;
b64 = BIO_new(BIO_f_base64());
bio = BIO_new(BIO_s_mem());
bio = BIO_push(b64, bio);
BIO_write(bio, buf, size);
BIO_flush(bio);
BIO_get_mem_ptr(bio, &bufferPtr);
BIO_set_close(bio, BIO_NOCLOSE);
std::string encoded(bufferPtr->data, bufferPtr->length);
BIO_free_all(bio);
// Remove newlines that OpenSSL may add
encoded.erase(std::remove(encoded.begin(), encoded.end(), 'n'), encoded.end());
return encoded;
}
void do_read_header() {
auto self = shared_from_this();
asio::async_read(socket_, asio::buffer(header_buffer_), asio::transfer_exactly(2),
[this, self](const asio::error_code& error, std::size_t bytes_transferred) {
if (!error) {
bool fin = (header_buffer_[0] & 0x80);
uint8_t opcode = (header_buffer_[0] & 0x0F);
bool mask = (header_buffer_[1] & 0x80);
uint8_t payload_len = (header_buffer_[1] & 0x7F);
WebSocketFrame frame;
frame.fin = fin;
frame.opcode = opcode;
frame.mask = mask;
if (payload_len < 126) {
frame.payload_length = payload_len;
do_read_payload(frame);
} else if (payload_len == 126) {
asio::async_read(socket_, asio::buffer(extended_payload_len_buffer_), asio::transfer_exactly(2),
[this, self, frame](const asio::error_code& error, std::size_t bytes_transferred) mutable {
if (!error) {
frame.payload_length = (static_cast<uint64_t>(extended_payload_len_buffer_[0]) << 8) | extended_payload_len_buffer_[1];
do_read_payload(frame);
} else {
std::cerr << "Error reading extended payload length: " << error.message() << std::endl;
close();
}
});
} else { // payload_len == 127
asio::async_read(socket_, asio::buffer(extended_payload_len_buffer_), asio::transfer_exactly(8),
[this, self, frame](const asio::error_code& error, std::size_t bytes_transferred) mutable {
if (!error) {
frame.payload_length = (static_cast<uint64_t>(extended_payload_len_buffer_[0]) << 56) |
(static_cast<uint64_t>(extended_payload_len_buffer_[1]) << 48) |
(static_cast<uint64_t>(extended_payload_len_buffer_[2]) << 40) |
(static_cast<uint64_t>(extended_payload_len_buffer_[3]) << 32) |
(static_cast<uint64_t>(extended_payload_len_buffer_[4]) << 24) |
(static_cast<uint64_t>(extended_payload_len_buffer_[5]) << 16) |
(static_cast<uint64_t>(extended_payload_len_buffer_[6]) << 8) |
static_cast<uint64_t>(extended_payload_len_buffer_[7]);
do_read_payload(frame);
} else {
std::cerr << "Error reading extended payload length: " << error.message() << std::endl;
close();
}
});
}
} else {
std::cerr << "Error reading header: " << error.message() << std::endl;
close();
}
});
}
void do_read_payload(WebSocketFrame frame) {
auto self = shared_from_this();
if (frame.mask) {
asio::async_read(socket_, asio::buffer(masking_key_buffer_), asio::transfer_exactly(4),
[this, self, frame](const asio::error_code& error, std::size_t bytes_transferred) mutable {
if (!error) {
frame.masking_key = masking_key_buffer_;
frame.payload_data.resize(frame.payload_length);
asio::async_read(socket_, asio::buffer(frame.payload_data), asio::transfer_exactly(frame.payload_length),
[this, self, frame](const asio::error_code& error, std::size_t bytes_transferred) {
if (!error) {
// 解码 payload
for (size_t i = 0; i < frame.payload_length; ++i) {
frame.payload_data[i] ^= frame.masking_key[i % 4];
}
process_frame(frame); // 处理完整的帧
do_read_header(); // 继续读取下一个帧
} else {
std::cerr << "Error reading masked payload: " << error.message() << std::endl;
close();
}
});
} else {
std::cerr << "Error reading masking key: " << error.message() << std::endl;
close();
}
});
} else {
frame.payload_data.resize(frame.payload_length);
asio::async_read(socket_, asio::buffer(frame.payload_data), asio::transfer_exactly(frame.payload_length),
[this, self, frame](const asio::error_code& error, std::size_t bytes_transferred) {
if (!error) {
process_frame(frame); // 处理完整的帧
do_read_header(); // 继续读取下一个帧
} else {
std::cerr << "Error reading unmasked payload: " << error.message() << std::endl;
close();
}
});
}
}
void process_frame(const WebSocketFrame& frame) {
switch (frame.opcode) {
case 0x1: // Text frame
{
std::string message(frame.payload_data.begin(), frame.payload_data.end());
std::cout << "Received text message: " << message << std::endl;
send_message(message); // Echo back the message
break;
}
case 0x2: // Binary frame
{
std::cout << "Received binary message of size: " << frame.payload_length << std::endl;
// Handle binary data
break;
}
case 0x8: // Connection close
{
std::cout << "Received close frame." << std::endl;
close();
break;
}
case 0x9: // Ping
{
std::cout << "Received ping frame." << std::endl;
send_pong();
break;
}
case 0xA: // Pong
{
std::cout << "Received pong frame." << std::endl;
// Handle pong (optional)
break;
}
default:
{
std::cerr << "Received unknown opcode: " << static_cast<int>(frame.opcode) << std::endl;
close();
break;
}
}
}
void send_message(const std::string& message) {
// 创建 WebSocket 帧 (文本帧)
WebSocketFrame frame;
frame.fin = true;
frame.opcode = 0x1; // Text frame
frame.mask = false; // Server doesn't mask
frame.payload_length = message.length();
frame.payload_data = std::vector<uint8_t>(message.begin(), message.end());
// 将帧数据写入缓冲区
std::vector<uint8_t> buffer;
buffer.push_back(0x80 | frame.opcode); // FIN + Opcode
if (frame.payload_length <= 125) {
buffer.push_back(frame.payload_length);
} else if (frame.payload_length <= 65535) {
buffer.push_back(126);
buffer.push_back((frame.payload_length >> 8) & 0xFF);
buffer.push_back(frame.payload_length & 0xFF);
} else { // frame.payload_length > 65535
buffer.push_back(127);
buffer.push_back((frame.payload_length >> 56) & 0xFF);
buffer.push_back((frame.payload_length >> 48) & 0xFF);
buffer.push_back((frame.payload_length >> 40) & 0xFF);
buffer.push_back((frame.payload_length >> 32) & 0xFF);
buffer.push_back((frame.payload_length >> 24) & 0xFF);
buffer.push_back((frame.payload_length >> 16) & 0xFF);
buffer.push_back((frame.payload_length >> 8) & 0xFF);
buffer.push_back(frame.payload_length & 0xFF);
}
buffer.insert(buffer.end(), frame.payload_data.begin(), frame.payload_data.end());
auto self = shared_from_this();
asio::async_write(socket_, asio::buffer(buffer),
[this, self](const asio::error_code& error, std::size_t bytes_transferred) {
if (error) {
std::cerr << "Error sending message: " << error.message() << std::endl;
close();
}
});
}
void send_pong() {
// 创建 WebSocket 帧 (Pong 帧)
WebSocketFrame frame;
frame.fin = true;
frame.opcode = 0xA; // Pong frame
frame.mask = false; // Server doesn't mask
frame.payload_length = 0;
// 将帧数据写入缓冲区
std::vector<uint8_t> buffer;
buffer.push_back(0x80 | frame.opcode); // FIN + Opcode
buffer.push_back(frame.payload_length);
auto self = shared_from_this();
asio::async_write(socket_, asio::buffer(buffer),
[this, self](const asio::error_code& error, std::size_t bytes_transferred) {
if (error) {
std::cerr << "Error sending pong: " << error.message() << std::endl;
close();
}
});
}
void close() {
asio::error_code ec;
socket_.shutdown(asio::ip::tcp::socket::shutdown_send, ec);
socket_.close(ec);
}
private:
asio::ip::tcp::socket socket_;
asio::streambuf buffer_;
std::array<uint8_t, 2> header_buffer_;
std::array<uint8_t, 8> extended_payload_len_buffer_;
std::array<uint8_t, 4> masking_key_buffer_;
};
4.4. WebSocketServer 类
class WebSocketServer {
public:
WebSocketServer(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)),
io_context_(io_context) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
[this](asio::error_code ec, asio::ip::tcp::socket socket) {
if (!ec) {
std::make_shared<WebSocketConnection>(std::move(socket))->start();
} else {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
do_accept(); // 循环接受新的连接
});
}
asio::ip::tcp::acceptor acceptor_;
asio::io_context& io_context_;
};
4.5. Main 函数
int main() {
try {
asio::io_context io_context;
WebSocketServer server(io_context, 8080);
std::cout << "WebSocket server listening on port 8080" << std::endl;
io_context.run(); // 运行事件循环
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
5. 高并发优化策略
- 线程池 (Thread Pool): 使用线程池来管理工作线程,避免频繁创建和销毁线程的开销。
- I/O多路复用 (I/O Multiplexing): 使用Epoll或其他I/O多路复用技术,在一个线程中同时处理多个连接的I/O事件。
- 零拷贝 (Zero-Copy): 尽可能减少数据在内核空间和用户空间之间的复制,例如使用
sendfile()系统调用。 - 缓冲区管理 (Buffer Management): 使用对象池或预分配的缓冲区来减少内存分配和释放的开销。
- 协议优化 (Protocol Optimization): 压缩WebSocket消息,减少网络传输的数据量。
6. 错误处理与日志
- 错误码 (Error Codes): 使用Asio的
asio::error_code来处理I/O操作的错误。 - 异常处理 (Exception Handling): 使用
try-catch块来捕获和处理异常。 - 日志记录 (Logging): 使用日志库 (例如spdlog) 记录服务器的运行状态、错误信息和调试信息。
7. 安全性考虑
- 输入验证 (Input Validation): 验证客户端发送的数据,防止恶意代码注入。
- 流量控制 (Traffic Control): 限制客户端的发送速率,防止DDoS攻击。
- 身份验证 (Authentication): 对客户端进行身份验证,只允许授权用户连接。
- 加密 (Encryption): 使用TLS/SSL加密WebSocket连接,保护数据的机密性和完整性 (WSS协议)。
代码编译运行
-
安装依赖: 确保你的系统安装了Boost.Asio和OpenSSL。 在Debian/Ubuntu系统上,你可以使用以下命令安装:
sudo apt-get update sudo apt-get install libboost-all-dev libssl-dev -
编译代码: 使用支持C++11或更高版本的编译器编译代码。 例如,使用g++:
g++ -std=c++11 -o websocket_server main.cpp -lssl -lcrypto -lpthread -
运行服务器: 运行编译后的可执行文件:
./websocket_server
一些提示和说明:
- 代码的完整性: 以上代码片段只是为了演示关键概念,并非完整的可运行程序。 你需要将这些片段组合起来,并添加必要的错误处理、日志记录和配置管理代码,才能构建一个完整的WebSocket服务器。
- 错误处理: 在实际应用中,必须对所有可能出错的地方进行适当的错误处理,例如I/O操作失败、内存分配失败等。
- 性能测试: 在部署服务器之前,应该进行充分的性能测试,例如使用
wrk或ab等工具模拟大量并发连接,并监控服务器的CPU、内存和网络使用情况。 - 代码风格: 保持代码风格的一致性,例如使用统一的缩进、命名规范和注释风格,可以提高代码的可读性和可维护性。
总结:构建高性能WebSocket服务器的关键要点
选择合适的技术栈 (Asio/Epoll),精心设计服务器架构 (多线程事件循环),并采取有效的优化策略 (线程池、I/O多路复用、零拷贝、缓冲区管理、协议优化),以及关注安全性、错误处理和日志记录,才能构建出能够满足高并发和低延迟需求的WebSocket服务器。
更多IT精英技术系列讲座,到智猿学院