各位编程爱好者、系统架构师,以及所有致力于构建高性能网络服务的同仁们,大家好!
今天,我们齐聚一堂,共同探讨一个在高性能网络框架中至关重要的议题:如何高效地管理海量的定时任务。在现代网络服务中,无论是连接超时检测、心跳机制、延迟重试、还是各种业务逻辑的定时触发,定时器都扮演着核心角色。尤其是在处理万级甚至更高并发量的场景下,传统定时器实现的性能瓶颈会迅速显现,成为系统扩展的阿喀琉斯之踵。
我们将深入剖析一种业界广泛采用,并被证明极为高效的算法——时间轮(Time Wheel),来解决这一挑战。本次讲座将以C++语言为实现载体,从原理、设计到具体代码实践,一步步揭示时间轮如何在O(1)级别的复杂度下,支撑起百万级并发定时任务的稳定运行。
一、高性能网络框架中定时器的核心需求与挑战
在高性能网络框架(如基于Reactor/Proactor模式的事件驱动框架)中,定时器并不仅仅是“一段时间后执行某个函数”这么简单。它面临着一系列严苛的需求和挑战:
- 大规模并发支持: 框架可能同时管理数万、数十万甚至数百万个活跃连接或逻辑实体,每个都可能需要一个或多个定时器。这意味着定时器实现必须能够高效地存储和管理海量的任务。
- 高精度与低延迟: 许多场景要求定时器能够以毫秒级的精度触发,并且从任务到期到实际执行的延迟要尽可能小。
- 高性能操作:
- 添加(Add): 快速注册新的定时任务。
- 取消(Cancel): 快速撤销尚未到期的定时任务(例如,连接关闭时需要取消其所有相关定时器)。
- 触发(Tick): 核心操作,周期性地检查并执行到期任务,这个操作必须极其高效,以避免阻塞主事件循环。
- 内存效率: 大规模定时任务意味着如果每个任务的元数据占用过多内存,将导致巨大的内存开销。
- 线程安全与事件循环: 定时器通常与事件循环(Event Loop)紧密集成,在单线程环境下运行以避免复杂的锁竞争,但也要考虑如何安全地从其他线程添加或取消任务。
- 易用性: 提供简洁明了的API,方便框架用户快速集成。
传统的定时器实现方式,如基于排序链表或小顶堆(std::priority_queue),在面对大规模并发时,会暴露出明显的性能短板。
1.1 传统定时器实现的局限性
我们先回顾一下常见的两种传统定时器实现及其性能特点:
1.1.1 排序链表 (Sorted Linked List)
这种方法将所有定时任务按照其到期时间顺序存储在一个链表中。
- 添加任务: 需要遍历链表找到合适的插入位置,平均时间复杂度为O(N),最坏情况下也是O(N)。
- 取消任务: 同样需要遍历链表找到并移除任务,平均时间复杂度为O(N)。
- 触发任务: 只需要检查链表头部的任务是否到期,O(1);如果到期则移除并执行,然后检查下一个。
优点: 实现简单,内存开销相对较小(每个节点只存储任务信息和指针)。
缺点: 随着任务数量N的增加,添加和取消操作的性能急剧下降,无法应对高并发场景。
1.1.2 小顶堆 (Min-Heap / std::priority_queue)
使用小顶堆存储定时任务,堆顶元素是到期时间最早的任务。
- 添加任务: 将新任务插入堆中,并进行堆调整,时间复杂度为O(log N)。
- 取消任务:
std::priority_queue不直接支持按ID取消任意任务。如果需要取消,通常需要找到该任务在堆中的位置,然后进行删除和堆调整,这通常是O(N)的遍历,或者通过标记任务为“已取消”并在弹出时跳过,但这样会导致堆中存在大量无效任务,浪费内存和CPU。如果自定义堆,并维护一个映射关系,可以将查找优化到O(1),删除和堆调整仍是O(log N)。 - 触发任务: 检查堆顶任务是否到期,如果到期则弹出堆顶并执行,然后重新调整堆,时间复杂度为O(log N)。
优点: 添加和触发操作的性能相对稳定,对数时间复杂度优于排序链表的线性复杂度。
缺点:
- 取消任意任务比较复杂或效率低下。
- 堆的内存局部性可能不如链表。
- 频繁的堆调整在大规模任务下仍然会带来显著的CPU开销。
为了更直观地对比,我们用一个表格来总结:
| 定时器实现方式 | 添加任务复杂度 | 取消任务复杂度 | 触发任务复杂度 (单个到期任务) | 适用场景 |
|---|---|---|---|---|
| 排序链表 | O(N) | O(N) | O(1) | 任务量小,不常取消 |
| 小顶堆 | O(log N) | O(N) 或 O(log N) (需额外实现) | O(log N) | 任务量适中,取消不频繁 |
| 时间轮 | O(1) | O(1) | O(K) (K为当前槽位任务数) | 任务量大,高并发,频繁取消 |
从上表可以看出,排序链表和小顶堆在“万级并发”的场景下,性能都难以令人满意。时间轮算法的O(1)特性,使其在面对高并发时展现出无可比拟的优势。
二、时间轮算法的核心原理
时间轮算法,顾名思义,就像一个时钟。它将时间划分为一系列离散的“槽位”(slots),每个槽位代表一个固定的时间间隔。所有在某个特定时间间隔内到期的定时任务,都会被放置到对应的槽位中。
2.1 单层时间轮 (Single-Level Time Wheel)
想象一个由N个槽位组成的圆环,每个槽位可以看作一个链表或一个桶。有一个指针(或索引) current_slot_index 周期性地向前移动,每移动一次,就代表“时间流逝”了一个单位(称为“tick”)。
基本工作流程:
- 初始化: 创建一个固定大小的数组(例如
std::vector<std::list<TimerEntry*>>),数组的每个元素就是一个槽位,槽位中存储着指向TimerEntry的指针列表。同时初始化current_slot_index为0。 - 添加定时任务:
- 假设时间轮的每个槽位代表1毫秒,总共有256个槽位。
- 如果要添加一个在
delay_ms后到期的任务,其到期时间expire_time_ms = current_time_ms + delay_ms。 - 计算任务应该放置的槽位:
slot_index = (expire_time_ms / tick_interval_ms) % slot_count。 - 将任务插入到对应槽位链表的末尾。
- 同时,为了支持快速取消,我们还需要一个哈希表(
std::unordered_map<TimerID, TimerEntry*>)来存储TimerID到TimerEntry对象的映射。
- 时间轮转动(Tick):
- 每经过一个
tick_interval_ms,时间轮的current_slot_index就会向前移动一位。 - 当
current_slot_index移动到下一个槽位时,它会检查并处理当前槽位(即current_slot_index指向的槽位)中的所有定时任务。 - 对于槽位中的每个任务:
- 如果任务的
expire_time_ms已经达到或超过current_time_ms,则执行任务回调,并从槽位中移除该任务。 - 如果任务的
expire_time_ms尚未达到,说明这个任务需要多转几轮时间轮才能到期(即“圈数”未到),则继续保留在当前槽位。
- 如果任务的
current_slot_index到达数组末尾后,会回到0,完成一轮循环。
- 每经过一个
单层时间轮的局限:
单层时间轮对于短时间的、密集的定时任务非常高效。但是,如果需要支持非常长的定时任务(例如,一个小时),而时间轮的槽位又很小(例如,每个槽位1毫秒,总共256个槽位),那么一个小时的任务需要时间轮转动很多很多圈才能到期。这意味着在 tick() 操作时,你需要频繁地检查任务的圈数,这增加了每次 tick 的计算量。为了解决这个问题,我们引入了多层时间轮。
2.2 多层时间轮 (Multi-Level Time Wheel)
多层时间轮类似于钟表的时、分、秒针。它由多个单层时间轮组成,每个轮子代表不同的时间粒度。例如:
- 最底层轮子 (Wheel 0): 精度最高,槽位间隔最小(如1毫秒),负责处理短期的定时任务。
- 第二层轮子 (Wheel 1): 槽位间隔是底层轮子总容量,负责处理中期的定时任务。
- 第三层轮子 (Wheel 2): 槽位间隔是第二层轮子总容量,负责处理更长期的定时任务。
工作流程:
- 初始化: 创建N个时间轮,每个轮子有自己的槽位数量和槽位间隔。
- 添加定时任务:
- 根据任务的
delay_ms,计算它应该被放置在哪一层时间轮的哪个槽位。 - 例如,一个任务在65秒后到期。如果最底层轮子只能处理256毫秒的任务,那么它显然不能放在最底层。它会被放置在更高层的时间轮上。
- 核心思想是,任务总是被放置在能够容纳其到期时间,且精度足够高的最顶层时间轮。
- 根据任务的
- 时间轮转动与级联 (Cascading):
tick()操作总是从最底层的时间轮开始。- 当最底层时间轮的
current_slot_index完成一轮循环(即从最后一个槽位回到0)时,它会触发其上一层时间轮的current_slot_index前进一格。 - 当一个高层时间轮的指针前进时,它会将其当前槽位中的所有任务“级联”(cascade)到下一层时间轮。这些任务在被级联到下一层时,需要重新计算它们在下一层中的槽位位置。
- 这个过程就像钟表的秒针走完一圈,分钟针就走一格;分钟针走完一圈,时针就走一格。
举例说明级联:
假设我们有两层时间轮:
- W0 (秒轮): 槽位大小1毫秒,256个槽位,总容量256毫秒。
- W1 (分钟轮): 槽位大小256毫秒,64个槽位,总容量 256 * 64 = 16384毫秒 (~16.3秒)。
一个任务要在 10000ms 后到期。
10000ms远超W0的容量256ms。10000ms在W1的容量16384ms之内。- 所以,任务会被添加到
W1。计算其在W1中的槽位:slot_idx = (current_time_ms + 10000ms) / 256ms % 64。
当 W0 的指针转动 256 圈,即 256ms 后,W1 的指针会前进一格。假设 W1 的指针当前指向 slot X。W1 会将 slot X 中的所有任务级联到 W0。这些任务现在相对于 W0 来说,它们的到期时间更近了,可以在 W0 中找到合适的槽位。
这样,只有当任务即将到期时,它才会被移动到精度更高的底层时间轮,大大减少了高层时间轮的tick操作的开销。
三、C++ 实现:数据结构设计
在C++中实现时间轮,我们需要考虑几个关键的数据结构。
3.1 TimerEntry:定时任务的抽象
每个定时任务都需要一个结构体来保存其信息。
#pragma once
#include <functional>
#include <chrono>
#include <memory> // For std::shared_ptr or custom memory management
// 使用 long long 作为 TimerID,保证唯一性且足够大
using TimerID = long long;
using TimerCallback = std::function<void()>;
struct TimerEntry {
TimerID id; // 定时器唯一ID
long long expire_ms; // 任务到期时间戳 (毫秒)
TimerCallback callback; // 任务回调函数
bool cancelled; // 任务是否已被取消的标记
int rounds; // 对于单层轮,表示还需要转几圈才到期
// 对于多层轮,通常不需要这个字段,因为任务会级联
// 为了支持O(1)删除,需要存储其在 std::list 中的迭代器
// 注意:这里的迭代器是一个占位符,实际使用时需要特定类型
// std::list<std::shared_ptr<TimerEntry>>::iterator list_iterator;
// 或者当使用 TimerEntry* 存储时:
void* list_iterator_ptr; // 存储 std::list<TimerEntry*>::iterator 的原始指针,需小心转换
TimerEntry() : id(0), expire_ms(0), cancelled(false), rounds(0), list_iterator_ptr(nullptr) {}
// 构造函数
TimerEntry(TimerID _id, long long _expire_ms, TimerCallback _cb)
: id(_id), expire_ms(_expire_ms), callback(std::move(_cb)), cancelled(false), rounds(0), list_iterator_ptr(nullptr) {}
// 禁用拷贝构造和赋值,因为迭代器等内部状态需要特殊处理
// 或者如果 TimerEntry 不直接拥有迭代器,而是由外部管理,则可以允许
TimerEntry(const TimerEntry&) = delete;
TimerEntry& operator=(const TimerEntry&) = delete;
// 允许移动构造和赋值
TimerEntry(TimerEntry&&) = default;
TimerEntry& operator=(TimerEntry&&) = default;
};
TimerEntry 字段说明:
id: 唯一标识一个定时器。用于取消操作。expire_ms: 任务的绝对到期时间,使用毫秒时间戳。建议使用std::chrono::steady_clock来获取单调递增的时间,避免系统时间调整带来的问题。callback: 任务到期后需要执行的回调函数,std::function<void()>提供了灵活的函数封装能力。cancelled: 一个布尔标记,用于实现“惰性取消”。当一个任务被取消时,我们不立即从槽位链表中删除它,而是标记为cancelled=true。等到tick操作遍历到它时,发现已取消,便跳过执行并删除。这避免了在cancel操作中遍历链表的高昂开销。rounds: 主要用于单层时间轮,表示任务还需要时间轮转动多少圈才能到期。在多层时间轮中,这个概念被级联机制取代。list_iterator_ptr: 关键优化。 为了实现O(1)的取消操作,当TimerEntry被添加到某个槽位的std::list中时,我们需要将该TimerEntry在std::list中的迭代器存储起来。之后,如果需要取消这个任务,可以直接通过这个迭代器O(1)地从std::list中移除。由于std::list::iterator是一个模板类型,直接存储有点麻烦,这里我们用void*占位,实际实现中需要更严谨的方式(例如,在TimerEntry外部维护映射,或使用特殊设计的intrusive_list)。
3.2 TimerWheel:单个时间轮的实现
一个 TimerWheel 类代表一个单独的时间轮层。
#pragma once
#include <vector>
#include <list>
#include <unordered_map>
#include <atomic>
#include <memory> // For std::shared_ptr or custom memory management
#include "TimerEntry.h" // 包含 TimerEntry 定义
// TimerWheel 的前向声明,用于多层时间轮中的级联
class MultiLevelTimeWheel;
class TimerWheel {
public:
TimerWheel(int slots_num, long long tick_interval_ms, MultiLevelTimeWheel* parent_wheel = nullptr);
~TimerWheel();
// 添加一个定时任务到当前时间轮
// 返回 TimerEntry*,用于存储到外部的映射中
std::shared_ptr<TimerEntry> add_timer(long long expire_ms, TimerCallback cb);
// 从当前时间轮中移除一个定时任务
// 实际是惰性取消:标记为cancelled,并在tick时处理
bool cancel_timer(TimerID id);
// 驱动时间轮转动一个tick
// 返回到期并执行的任务数量
int tick(long long current_ms);
// 级联当前槽位中的任务到下一层时间轮
void cascade_timers();
// 获取当前时间轮的容量 (毫秒)
long long get_capacity_ms() const { return static_cast<long long>(slots_num_) * tick_interval_ms_; }
// 获取当前时间轮的槽位数
int get_slots_num() const { return slots_num_; }
// 获取当前时间轮的tick间隔
long long get_tick_interval_ms() const { return tick_interval_ms_; }
// 查找 TimerEntry*
std::shared_ptr<TimerEntry> find_timer(TimerID id);
private:
int slots_num_; // 槽位总数
long long tick_interval_ms_; // 每个槽位代表的时间间隔 (毫秒)
std::vector<std::list<std::shared_ptr<TimerEntry>>> buckets_; // 存储定时任务的槽位
int current_slot_index_; // 当前时间轮指针指向的槽位
MultiLevelTimeWheel* parent_wheel_; // 指向顶层时间轮的指针,用于级联
// 或者可以设计为直接级联到下一个 TimerWheel
// 映射 TimerID 到 TimerEntry,用于O(1)查找和取消
// 注意:这里存储的是 shared_ptr,确保 TimerEntry 生命周期被管理
std::unordered_map<TimerID, std::shared_ptr<TimerEntry>> timer_entries_map_;
static std::atomic<TimerID> next_timer_id_; // 用于生成唯一 TimerID
TimerID generate_timer_id() { return next_timer_id_.fetch_add(1, std::memory_order_relaxed); }
// 辅助函数,计算任务应放入哪个槽位
int get_slot_index(long long expire_ms) const;
};
TimerWheel 字段说明:
slots_num_: 时间轮的槽位数量,例如256。tick_interval_ms_: 每个槽位代表的时间长度,例如1毫秒。buckets_: 核心数据结构,std::vector存储着std::list<std::shared_ptr<TimerEntry>>。每个std::list就是一个槽位,存储着到期时间落在这个槽位范围内的定时任务。使用std::shared_ptr<TimerEntry>简化内存管理,但也可以使用自定义对象池和裸指针来进一步优化性能和内存。current_slot_index_: 当前时间轮的指针,指向正在处理的槽位。parent_wheel_: 在多层时间轮中,用于向上级联或向下传递。这里暂时设计为指向MultiLevelTimeWheel,实际可以更灵活。timer_entries_map_:std::unordered_map<TimerID, std::shared_ptr<TimerEntry>>。这个映射至关重要,它允许我们通过TimerIDO(1)地查找TimerEntry对象,从而实现O(1)的取消操作(通过标记cancelled字段)。next_timer_id_: 静态原子变量,用于生成全局唯一的定时器ID。
3.3 MultiLevelTimeWheel:多层时间轮的协调器
MultiLevelTimeWheel 类将多个 TimerWheel 实例组织起来,实现级联机制。
#pragma once
#include <vector>
#include <chrono>
#include <memory>
#include "TimerWheel.h"
#include "TimerEntry.h"
// 静态成员初始化
std::atomic<TimerID> TimerWheel::next_timer_id_ = 1;
class MultiLevelTimeWheel {
public:
// 构造函数:初始化多层时间轮
// wheel_configs: vector of {slots_num, tick_interval_ms} for each wheel
MultiLevelTimeWheel(const std::vector<std::pair<int, long long>>& wheel_configs);
~MultiLevelTimeWheel();
// 添加定时任务
TimerID add_timer(long long delay_ms, TimerCallback cb);
// 取消定时任务
bool cancel_timer(TimerID id);
// 驱动所有时间轮转动,并处理到期任务
// 建议每隔 tick_interval_ms_lowest (最底层时间轮的tick间隔) 调用一次
int tick();
// 获取下一个即将到期的任务的剩余时间 (ms)
// 这是一个优化,可以用于epoll_wait/select的timeout参数
long long get_next_timeout_ms() const;
private:
std::vector<std::unique_ptr<TimerWheel>> wheels_; // 存储各层时间轮
long long last_tick_ms_; // 上次tick操作的时间戳 (毫秒)
long long lowest_level_tick_interval_ms_; // 最底层时间轮的tick间隔
// 内部方法,根据 expire_ms 决定将任务放置在哪一层时间轮
void add_timer_internal(std::shared_ptr<TimerEntry> entry);
// 用于 TimerWheel 内部调用的级联接口
friend class TimerWheel;
void cascade_to_next_wheel(std::shared_ptr<TimerEntry> entry, int current_wheel_idx);
};
MultiLevelTimeWheel 字段说明:
wheels_:std::vector<std::unique_ptr<TimerWheel>>存储所有时间轮层。unique_ptr确保了TimerWheel对象的生命周期由MultiLevelTimeWheel管理。last_tick_ms_: 记录上一次tick()操作时的时间戳,用于计算本次tick应该前进多少个最底层轮子的tick。lowest_level_tick_interval_ms_: 最底层时间轮的tick_interval_ms,这决定了MultiLevelTimeWheel::tick()应该被调用的最小频率。
四、C++ 实现:核心操作逻辑
接下来,我们详细实现这些类的关键方法。
4.1 获取当前时间
在所有时间相关的操作中,获取一个稳定、单调递增的时间戳至关重要。std::chrono::steady_clock 是理想的选择。
long long get_current_steady_ms() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()
).count();
}
4.2 TimerWheel 构造与辅助函数
// TimerWheel.cpp
#include "TimerWheel.h"
#include <iostream> // For debugging
// 静态成员初始化 (在 TimerWheel.cpp 中定义)
std::atomic<TimerID> TimerWheel::next_timer_id_ = 1;
TimerWheel::TimerWheel(int slots_num, long long tick_interval_ms, MultiLevelTimeWheel* parent_wheel)
: slots_num_(slots_num),
tick_interval_ms_(tick_interval_ms),
buckets_(slots_num), // 初始化槽位 vector
current_slot_index_(0),
parent_wheel_(parent_wheel) {
// 确保槽位数量和tick间隔是正数
if (slots_num_ <= 0 || tick_interval_ms_ <= 0) {
throw std::invalid_argument("slots_num and tick_interval_ms must be positive.");
}
}
TimerWheel::~TimerWheel() {
// 清理所有桶中的 TimerEntry
for (auto& bucket : buckets_) {
bucket.clear(); // shared_ptr 会自动释放 TimerEntry
}
timer_entries_map_.clear();
}
int TimerWheel::get_slot_index(long long expire_ms) const {
// 根据到期时间戳计算槽位索引
// 注意这里除以 tick_interval_ms_ 是为了将绝对时间戳转换为相对槽位时间
// 然后取模得到在当前轮中的索引
return (expire_ms / tick_interval_ms_) % slots_num_;
}
std::shared_ptr<TimerEntry> TimerWheel::find_timer(TimerID id) {
auto it = timer_entries_map_.find(id);
if (it != timer_entries_map_.end()) {
return it->second;
}
return nullptr;
}
4.3 TimerWheel::add_timer (内部使用,由 MultiLevelTimeWheel 调用)
这个 add_timer 方法是供 MultiLevelTimeWheel 内部调用,负责将已经创建好的 TimerEntry 放入当前时间轮的正确槽位。
// TimerWheel.cpp
std::shared_ptr<TimerEntry> TimerWheel::add_timer(long long expire_ms, TimerCallback cb) {
TimerID id = generate_timer_id();
auto entry = std::make_shared<TimerEntry>(id, expire_ms, std::move(cb));
// 计算任务应该放置的槽位
int slot_idx = get_slot_index(expire_ms);
// 将 TimerEntry 插入到对应槽位的链表
buckets_[slot_idx].push_back(entry);
// 存储 TimerEntry 在链表中的迭代器(这里我们暂时不直接存储迭代器)
// 如果需要O(1)的实际删除,TimerEntry 需要包含 std::list<std::shared_ptr<TimerEntry>>::iterator 成员
// 并在这里设置它,但为了简化代码和避免交叉依赖,我们依赖惰性取消。
// 将 TimerID 和 TimerEntry 映射存储起来,用于取消操作
timer_entries_map_[id] = entry;
return entry; // 返回 TimerEntry 方便上层进行管理
}
// 实际的多层时间轮的add_timer会是这样的:
// MultiLevelTimeWheel::add_timer(long long delay_ms, TimerCallback cb) {
// long long current_ms = get_current_steady_ms();
// long long expire_ms = current_ms + delay_ms;
// TimerID id = generate_timer_id(); // 注意这里 TimerID 应该由 MultiLevelTimeWheel 统一生成
// auto entry = std::make_shared<TimerEntry>(id, expire_ms, std::move(cb));
// add_timer_internal(entry); // 调用内部方法将任务放置到合适的轮子
// return id;
// }
4.4 TimerWheel::cancel_timer (惰性取消)
// TimerWheel.cpp
bool TimerWheel::cancel_timer(TimerID id) {
auto it = timer_entries_map_.find(id);
if (it != timer_entries_map_.end()) {
it->second->cancelled = true; // 标记为已取消
// 实际从 map 中移除,但 TimerEntry 还在 list 中
timer_entries_map_.erase(it); // 从 map 中移除,表示不再管理
return true;
}
return false;
}
4.5 TimerWheel::tick (时间轮转动与任务处理)
这是时间轮的核心逻辑。它负责推进 current_slot_index_,并处理当前槽位中的任务。
// TimerWheel.cpp
int TimerWheel::tick(long long current_ms) {
// 先处理当前槽位
std::list<std::shared_ptr<TimerEntry>>& current_bucket = buckets_[current_slot_index_];
auto it = current_bucket.begin();
int executed_count = 0;
while (it != current_bucket.end()) {
std::shared_ptr<TimerEntry> entry = *it;
if (entry->cancelled) {
// 任务已取消,直接从链表中移除
it = current_bucket.erase(it);
// 注意:因为我们已经从 timer_entries_map_ 中移除了,这里不需要再操作 map
// 并且 shared_ptr 会自动管理 TimerEntry 的生命周期
continue;
}
// 检查任务是否到期
// 对于单层时间轮,需要检查圈数
// 对于多层时间轮,如果到达这一层,说明到期时间在当前轮的容量内
// 并且如果到期时间 <= current_ms,则执行
if (entry->expire_ms <= current_ms) {
// 任务到期,执行回调
if (entry->callback) {
entry->callback();
executed_count++;
}
// 从链表中移除已执行的任务
it = current_bucket.erase(it);
// 从 map 中移除 (如果之前没有取消,这里才移除)
timer_entries_map_.erase(entry->id);
} else {
// 任务未到期,或者需要级联到下一层
// 对于多层时间轮,这里是处理级联逻辑的地方
// 如果是最高层轮子,且任务未到期,则保留
// 如果不是最高层,且任务未到期,则级联
// ----------------------------------------------------
// 在 MultiLevelTimeWheel 的 tick 逻辑中,会调用 cascade_timers
// 这里假设这个 tick 方法只处理本轮到期的任务,不处理级联
// 级联由 MultiLevelTimeWheel 协调
// ----------------------------------------------------
++it; // 移动到下一个任务
}
}
// 移动到下一个槽位
current_slot_index_ = (current_slot_index_ + 1) % slots_num_;
return executed_count;
}
4.6 MultiLevelTimeWheel 构造函数
// MultiLevelTimeWheel.cpp
#include "MultiLevelTimeWheel.h"
#include <algorithm> // For std::min
MultiLevelTimeWheel::MultiLevelTimeWheel(const std::vector<std::pair<int, long long>>& wheel_configs)
: last_tick_ms_(get_current_steady_ms()),
lowest_level_tick_interval_ms_(0) { // Initialized later
if (wheel_configs.empty()) {
throw std::invalid_argument("Wheel configurations cannot be empty.");
}
long long prev_wheel_capacity = 1; // 虚拟的上一层轮子容量,用于计算第一层轮子的tick_interval
for (size_t i = 0; i < wheel_configs.size(); ++i) {
int slots_num = wheel_configs[i].first;
long long tick_interval_ms = wheel_configs[i].second;
// 验证配置的合理性
if (slots_num <= 0 || tick_interval_ms <= 0) {
throw std::invalid_argument("Invalid wheel config: slots_num and tick_interval_ms must be positive.");
}
// 通常,高层轮子的tick_interval_ms应该是低层轮子的总容量
// 但是为了更灵活的配置,这里允许直接指定
// 如果我们遵循严格的级联模式:
// 第0层轮子: tick_interval_ms_0
// 第1层轮子: tick_interval_ms_1 = slots_num_0 * tick_interval_ms_0
// ...
// 这里的 tick_interval_ms 是指当前轮子每个槽位的实际时间粒度
// 如果是第一层轮子 (i=0),则其 tick_interval_ms 由配置指定
// 如果是更高层轮子 (i>0),其 tick_interval_ms 应该是前一层轮子的总容量
// 为了简化,我们直接使用配置传入的 tick_interval_ms,但用户需要确保其逻辑正确性
wheels_.push_back(std::make_unique<TimerWheel>(slots_num, tick_interval_ms, this));
prev_wheel_capacity = wheels_.back()->get_capacity_ms(); // 更新上一层轮子的容量
if (i == 0) {
lowest_level_tick_interval_ms_ = tick_interval_ms;
}
}
if (lowest_level_tick_interval_ms_ == 0) { // 防止配置为空
throw std::runtime_error("Lowest level tick interval not set.");
}
}
MultiLevelTimeWheel::~MultiLevelTimeWheel() {
wheels_.clear(); // unique_ptr 会自动清理
}
示例 wheel_configs 配置:
// 假设我们需要支持最长约48天的定时任务
// Wheel 0 (最底层): 256 slots, 1ms/slot = 256ms 容量
// Wheel 1: 64 slots, 256ms/slot = 16384ms (~16秒) 容量
// Wheel 2: 64 slots, 16384ms/slot = 1048576ms (~17分钟) 容量
// Wheel 3: 64 slots, 1048576ms/slot = 67108864ms (~18小时) 容量
// Wheel 4 (最高层): 64 slots, 67108864ms/slot = 4294967296ms (~48天) 容量
std::vector<std::pair<int, long long>> configs = {
{256, 1}, // Wheel 0: 1ms per slot, 256 slots => 256ms capacity
{64, 256}, // Wheel 1: 256ms per slot, 64 slots => 16384ms capacity (256 * 64)
{64, 16384}, // Wheel 2: 16384ms per slot, 64 slots => 1048576ms capacity (16384 * 64)
{64, 1048576}, // Wheel 3: 1048576ms per slot, 64 slots => 67108864ms capacity (1048576 * 64)
{64, 67108864} // Wheel 4: 67108864ms per slot, 64 slots => 4294967296ms capacity (67108864 * 64)
};
MultiLevelTimeWheel my_timer_wheel(configs);
注意: 这里的 tick_interval_ms 配置需要用户自己计算好。例如,Wheel 1 的 tick_interval_ms 应该等于 Wheel 0 的总容量。这个设计是灵活的,但也要求使用者理解其含义。
4.7 MultiLevelTimeWheel::add_timer
// MultiLevelTimeWheel.cpp
TimerID MultiLevelTimeWheel::add_timer(long long delay_ms, TimerCallback cb) {
long long current_ms = get_current_steady_ms();
long long expire_ms = current_ms + delay_ms;
TimerID id = TimerWheel::next_timer_id_.fetch_add(1, std::memory_order_relaxed); // 由 TimerWheel 统一生成 ID
auto entry = std::make_shared<TimerEntry>(id, expire_ms, std::move(cb));
add_timer_internal(entry);
return id;
}
void MultiLevelTimeWheel::add_timer_internal(std::shared_ptr<TimerEntry> entry) {
long long delay_ms_remaining = entry->expire_ms - get_current_steady_ms(); // 相对延迟
// 找到合适的轮子来放置任务
for (int i = 0; i < wheels_.size(); ++i) {
long long wheel_capacity = wheels_[i]->get_capacity_ms();
long long wheel_tick_interval = wheels_[i]->get_tick_interval_ms();
// 如果任务的剩余延迟小于等于当前轮子的总容量
// 并且任务的到期时间 (相对于当前轮子的 tick 粒度) 能够被当前轮子处理
// (即,如果放在当前轮子,它会在一轮内到期,或者它距离最近的 tick 足够远,不会被立即处理)
if (delay_ms_remaining < wheel_capacity || i == wheels_.size() - 1) { // 最后一层强制放入
// 计算任务在当前轮子中需要转的圈数 (rounds)
// 这里我们不需要显式存储 rounds,级联机制会处理
// 只需要确保 expire_ms 落在当前轮子的一个槽位内
// 实际插入到对应轮子
int slot_idx = (entry->expire_ms / wheel_tick_interval) % wheels_[i]->get_slots_num();
wheels_[i]->buckets_[slot_idx].push_back(entry);
wheels_[i]->timer_entries_map_[entry->id] = entry; // 维护映射
return;
}
}
// 如果走到这里,说明任务延迟超出了所有轮子的最大容量,通常是逻辑错误
// 或者可以考虑抛出异常或记录日志
// std::cerr << "Error: Timer delay_ms " << delay_ms_remaining << " exceeds max wheel capacity." << std::endl;
}
4.8 MultiLevelTimeWheel::cancel_timer
// MultiLevelTimeWheel.cpp
bool MultiLevelTimeWheel::cancel_timer(TimerID id) {
// 遍历所有轮子查找并取消
for (const auto& wheel : wheels_) {
// 先尝试在当前轮子的 map 中找到 TimerEntry
auto entry_ptr = wheel->find_timer(id);
if (entry_ptr) {
entry_ptr->cancelled = true; // 标记为取消
wheel->timer_entries_map_.erase(id); // 从 map 中移除
return true;
}
}
return false; // 未找到该ID的定时器
}
4.9 MultiLevelTimeWheel::tick 与级联
这是最复杂的部分,需要协调所有时间轮的转动和任务的级联。
// MultiLevelTimeWheel.cpp
int MultiLevelTimeWheel::tick() {
long long current_ms = get_current_steady_ms();
int executed_tasks_count = 0;
// 计算自上次 tick 以来,最底层时间轮应该转动多少个槽位
long long elapsed_ms = current_ms - last_tick_ms_;
if (elapsed_ms < lowest_level_tick_interval_ms_) {
// 时间还没到下一个 tick 周期,不做任何处理
return 0;
}
// 计算应该转动多少个最底层轮子的 tick
long long ticks_to_advance = elapsed_ms / lowest_level_tick_interval_ms_;
last_tick_ms_ = current_ms; // 更新上次 tick 时间
for (long long i = 0; i < ticks_to_advance; ++i) {
// 从最底层轮子开始驱动
// 依次调用每个轮子的 tick 方法
// 注意:这里的 current_ms 应该是每次 tick 时的准确时间,但为了简化,用当前的 current_ms
// 实际应用中,如果 tick_to_advance 很大,需要更精细的时间步进
executed_tasks_count += wheels_[0]->tick(current_ms);
// 检查最底层轮子是否完成一轮,如果完成,则触发上一层轮子的级联
// 这里需要判断 current_slot_index_ 是否刚刚从 slots_num - 1 变为 0
// 但为了简化,我们让每个轮子在 tick 内部处理其 current_slot_index_ 的更新
// 并在 MultiLevelTimeWheel 中根据轮子容量判断是否需要级联
// 级联逻辑:
// 遍历所有轮子,从第0层开始
for (int wheel_idx = 0; wheel_idx < wheels_.size(); ++wheel_idx) {
auto& current_wheel = wheels_[wheel_idx];
// 只有当前轮子的指针刚好从最后一个槽位回到0时,才触发上一层轮子的级联
// 这里的判断条件需要与 TimerWheel 内部的 current_slot_index_ 更新逻辑匹配
// 一个更简单的级联方式是:当一个轮子的 tick() 被调用了其 slots_num_ 次后,才触发上一层
// 但是,这和每次 tick 都检查当前槽位是否需要级联是不同的
// 采用经典的级联方式:当一个轮子的指针到达它的“零点”时,其上一层轮子前进一格
// 简化级联:
// 假设我们每推进一个最低层轮子的 tick,就检查所有轮子是否需要级联
// 当某个轮子的 current_slot_index_ == 0 时,说明它完成了一轮,需要级联其上一层
// 实际更准确的级联是在 TimerWheel 内部判断
// 为了简化,我们假设 TimerWheel::tick 仅处理当前槽位到期任务
// 级联则通过一个单独的 cascade_timers 方法实现,当 MultiLevelTimeWheel 发现
// 某个轮子的 tick 已经走完了一圈时,就调用其 cascade_timers
// 重新设计级联触发条件:
// 当一个轮子的当前槽位索引为0,且它不是最底层轮子时,说明它已经走完一圈,需要触发级联
// 但这不够准确,因为只有当它的指针从 N-1 移动到 0 时才应该级联
// 更准确的方法是在 TimerWheel::tick() 中返回一个标志,指示是否完成了“一圈”
// 进一步简化:我们不让 TimerWheel 内部管理级联,而是 MultiLevelTimeWheel 统一管理
// 当 MultiLevelTimeWheel 的 tick 循环到足够多的次数,使得一个高层轮子应该前进时,
// 就调用高层轮子的 cascade_timers 方法。
// 级联的核心思想是:
// 当一个轮子 (比如 W_i) 走完一圈时,其所有槽位都被处理过一次
// 此时,W_i+1 (上一层轮子) 的指针应该前进一格。
// W_i+1 当前槽位里的任务,应该被重新放置到 W_i。
//
// 所以,我们可以在 MultiLevelTimeWheel 的 tick 循环中,
// 维护每个轮子的“已转动次数”。当某个轮子的“已转动次数”达到其 slots_num 时,
// 就将该轮子的上层轮子指针前进一格,并执行级联。
// 这里采用更直接的级联方式:
// 当最底层轮子推进时,如果它完成了整整一圈,就尝试级联上一层。
//
// 这是一个更直接的级联实现,它在每个 tick 步进后,检查所有轮子是否需要级联
// 如果一个轮子 current_slot_index_ 刚刚变为 0,意味着它完成了一圈,需要触发其上层轮子
// 当然,这需要知道之前是否为 N-1,或者计算每个轮子应该前进多少步。
//
// 最简单的级联方案是:
// 在 `tick()` 循环中,每当一个 `TimerWheel` 完成一个完整的周期(即 `current_slot_index_` 从 `slots_num_-1` 变为 `0`),
// 就将其上一级 `TimerWheel` 的 `current_slot_index_` 推进一格,并触发该上一级 `TimerWheel` 的级联操作。
// 我们在 TimerWheel 中提供一个 cascade_timers() 方法,由 MultiLevelTimeWheel 在适当时候调用。
// 为了正确实现级联,我们需要知道每个轮子当前应该前进多少步。
// 维护每个轮子已经转动的“虚拟 tick”次数
static std::vector<long long> wheel_current_ticks(wheels_.size(), 0);
// 每次最底层轮子 tick,所有轮子的虚拟 tick 都会增加
// 对于最底层轮子 (idx = 0)
wheel_current_ticks[0]++;
// 对于其他轮子 (idx > 0)
for (int k = 1; k < wheels_.size(); ++k) {
// 如果当前轮子(W_k-1)的转动次数达到了其槽位总数,则上一层轮子(W_k)需要前进一格
if (wheel_current_ticks[k-1] >= wheels_[k-1]->get_slots_num()) {
wheel_current_ticks[k-1] -= wheels_[k-1]->get_slots_num(); // 归零,准备下一轮
wheel_current_ticks[k]++; // 上层轮子前进一格
// 触发级联:将 W_k 的 current_slot_index_ 指向的槽位中的所有任务级联到 W_k-1
// 实际是 W_k 的 current_slot_index_ 指向的槽位中的任务被重新计算位置,放到 W_k-1 中
cascade_to_next_wheel(wheel_idx, current_ms); // 这里的 current_ms 可能是近似值
}
}
}
}
return executed_tasks_count;
}
// 级联辅助函数,将指定轮子某个槽位的任务级联到下一层轮子
void MultiLevelTimeWheel::cascade_to_next_wheel(int wheel_idx, long long current_ms) {
if (wheel_idx == 0) { // 最底层轮子不需要级联
return;
}
auto& current_wheel = wheels_[wheel_idx];
auto& target_wheel = wheels_[wheel_idx - 1];
int current_slot_to_cascade = current_wheel->current_slot_index_; // 应该级联的槽位
// 将 current_wheel 当前槽位的所有任务移动到 target_wheel
std::list<std::shared_ptr<TimerEntry>> tasks_to_cascade;
tasks_to_cascade.splice(tasks_to_cascade.end(), current_wheel->buckets_[current_slot_to_cascade]);
for (auto& entry : tasks_to_cascade) {
if (entry->cancelled) {
// 已取消的任务直接丢弃
continue;
}
// 从当前轮子的 map 中移除 (因为任务不再由当前轮子管理)
current_wheel->timer_entries_map_.erase(entry->id);
// 重新计算任务在下一层轮子中的位置并添加
// 注意:这里需要确保 entry->expire_ms 仍然是绝对时间戳
int target_slot_idx = (entry->expire_ms / target_wheel->get_tick_interval_ms()) % target_wheel->get_slots_num();
target_wheel->buckets_[target_slot_idx].push_back(entry);
target_wheel->timer_entries_map_[entry->id] = entry; // 添加到目标轮子的 map
}
}
关于 MultiLevelTimeWheel::tick 和级联的更正与优化:
上述 tick 函数中的级联逻辑略显复杂,且 wheel_current_ticks 静态变量在多实例或多线程场景下会有问题。更健壮的级联应该这样做:
MultiLevelTimeWheel::tick()每次被调用时,计算自上次调用以来流逝了多少个 最底层轮子 的tick周期。- 在一个循环中,每次推进一个最底层轮子的
tick。 - 在每次推进
wheels_[0]的tick()之后,检查wheels_[0]是否刚好转完一圈(即current_slot_index_从slots_num - 1变为0)。 - 如果
wheels_[0]转完一圈,则尝试级联wheels_[1]的当前槽位到wheels_[0]。 - 如果
wheels_[1]在级联后,也恰好转完一圈(即current_slot_index_从slots_num - 1变为0),则尝试级联wheels_[2]的当前槽位到wheels_[1],以此类推。
这是一种递归或迭代的级联方式。
// MultiLevelTimeWheel.cpp (Revised tick and cascade logic)
// 辅助函数:将一个轮子的任务级联到它的下一层轮子
void MultiLevelTimeWheel::cascade_wheel(int wheel_idx) {
if (wheel_idx == 0) { // 最底层轮子不级联
return;
}
auto& current_wheel = wheels_[wheel_idx];
auto& lower_wheel = wheels_[wheel_idx - 1];
// 获取当前轮子需要级联的槽位(其 current_slot_index_ 指向的槽位)
int slot_to_cascade_idx = current_wheel->current_slot_index_;
// 将该槽位的所有任务移动到一个临时列表
std::list<std::shared_ptr<TimerEntry>> temp_tasks;
temp_tasks.splice(temp_tasks.end(), current_wheel->buckets_[slot_to_cascade_idx]);
for (auto& entry : temp_tasks) {
if (entry->cancelled) { // 已取消的任务直接丢弃
continue;
}
// 从当前轮子的 map 中移除
current_wheel->timer_entries_map_.erase(entry->id);
// 重新计算任务在下一层轮子中的位置并添加
int target_slot_idx = (entry->expire_ms / lower_wheel->get_tick_interval_ms()) % lower_wheel->get_slots_num();
lower_wheel->buckets_[target_slot_idx].push_back(entry);
lower_wheel->timer_entries_map_[entry->id] = entry; // 添加到目标轮子的 map
}
}
int MultiLevelTimeWheel::tick() {
long long current_ms = get_current_steady_ms();
int executed_tasks_count = 0;
long long elapsed_ticks = (current_ms - last_tick_ms_) / lowest_level_tick_interval_ms_;
last_tick_ms_ += elapsed_ticks * lowest_level_tick_interval_ms_; // 更新到精确的下一个 tick 时间点
for (long long i = 0; i < elapsed_ticks; ++i) {
// 1. 推进最底层轮子 (wheels_[0]) 的指针,并处理到期任务
executed_tasks_count += wheels_[0]->tick(current_ms); // current_ms 是最新的时间
// 2. 检查并级联:从最底层开始,如果一个轮子转了一圈,就将其上层轮子的任务级联下来
// 同时,上层轮子的指针也需要前进
for (int wheel_idx = 0; wheel_idx < wheels_.size(); ++wheel_idx) {
auto& current_wheel = wheels_[wheel_idx];
// 假设 TimerWheel::tick 内部已经将 current_slot_index_ 更新到下一个槽位
// 如果 current_slot_index_ 变为 0,意味着它完成了一圈
if (current_wheel->current_slot_index_ == 0 && wheel_idx < wheels_.size() - 1) {
// 当前轮子转了一圈,需要级联其上层轮子的任务,并推进上层轮子的指针
cascade_wheel(wheel_idx + 1); // 级联上一层轮子的任务到当前轮子
wheels_[wheel_idx + 1]->current_slot_index_ =
(wheels_[wheel_idx + 1]->current_slot_index_ + 1) % wheels_[wheel_idx + 1]->get_slots_num();
} else {
// 如果当前轮子没有转完一圈,或者已经是最高层轮子,则停止向上级联
break;
}
}
}
return executed_tasks_count;
}
重要提示:
TimerWheel::tick()的current_slot_index_更新应该在处理完当前槽位后。MultiLevelTimeWheel::tick()循环中的current_ms应该随着每次“虚拟 tick”的推进而调整,或者直接使用循环外部的current_ms,因为我们处理的是累计的elapsed_ticks。这里使用外部current_ms简化。- 级联逻辑
cascade_wheel将一个轮子当前槽位的所有任务移动到其下一层轮子。MultiLevelTimeWheel::tick负责判断何时触发这个级联。
4.10 MultiLevelTimeWheel::get_next_timeout_ms
这个方法对于与事件循环(如 epoll_wait)集成非常有用,可以计算出 epoll_wait 需要等待的最短时间。
// MultiLevelTimeWheel.cpp
long long MultiLevelTimeWheel::get_next_timeout_ms() const {
long long current_ms = get_current_steady_ms();
long long min_timeout = -1; // -1 表示没有定时器
// 遍历所有轮子,找到最早到期的任务
// 理论上,只需要检查最底层轮子的当前槽位和接下来的几个槽位
// 但为了严谨,这里遍历所有轮子
// 实际优化:最早的定时器一定在最底层轮子中,或者即将被级联到最底层轮子中。
// 所以只需要看最底层轮子。
const auto& lowest_wheel = wheels_[0];
const auto& current_bucket = lowest_wheel->buckets_[lowest_wheel->current_slot_index_];
// 检查当前槽位中的任务
for (const auto& entry : current_bucket) {
if (!entry->cancelled && entry->expire_ms > current_ms) {
long long remaining = entry->expire_ms - current_ms;
if (min_timeout == -1 || remaining < min_timeout) {
min_timeout = remaining;
}
}
}
// 考虑下一个槽位,如果当前槽位为空且下一个槽位有任务,那么下一个槽位的任务将是下一个到期的
// 这是一个近似值,因为高层轮子也可能级联下来
// 如果当前槽位没有任务,并且 lowest_wheel->current_slot_index_ + 1 处的槽位有任务
// 那么最早的到期时间可能就是 lowest_wheel->current_slot_index_ + 1 处的任务
// 对于严格的时间轮,下一个 tick 就会处理当前槽位之后的所有任务
// 所以,min_timeout 不会超过 lowest_level_tick_interval_ms_
// 因此,直接返回 lowest_level_tick_interval_ms_ 是一个安全的等待时间
// 或者返回 0 如果有任务已经到期但未处理(这不应该发生在 tick 之后)
// 为了保守,可以直接返回 lowest_level_tick_interval_ms_
if (min_timeout == -1) { // 如果当前槽位没有未取消的未来任务
// 考虑最底层轮子的下一个 tick 周期
return lowest_level_tick_interval_ms_;
}
return std::min(min_timeout, lowest_level_tick_interval_ms_);
}
五、性能分析与集成
5.1 时间轮的性能特性
| 操作 | 复杂度 | 备注 |
|---|---|---|
| 添加任务 | O(1) | 计算槽位,将任务添加到 std::list 中,将映射添加到 std::unordered_map。 |
| 取消任务 | O(1) | 从 std::unordered_map 中查找 TimerEntry,标记 cancelled=true,从 map 中移除。 |
| 驱动 (Tick) | O(K) (K为当前处理槽位中的任务数量) | 每个 tick 只处理一个槽位中的任务,平均 K = N / (总槽位数)。 |
| 内存占用 | O(N) | N个 TimerEntry 对象 + std::vector + std::list 节点 + std::unordered_map。 |
为什么时间轮高效?
- O(1) 添加/取消: 这是其最大的优势。无论是添加还是取消,都避免了对整个任务集合的遍历或复杂调整。
- 局部化处理: 每次
tick只关注当前槽位,而不是整个定时器集合。当任务数量N非常大时,K(当前槽位的任务数)通常远小于N。 - 级联机制: 有效地将长周期任务“存储”在高层轮子中,只有在任务即将到期时才将其级联到低层轮子,避免了对大量不活跃任务的频繁检查。
5.2 内存优化
TimerEntry对象池: 频繁的std::make_shared和shared_ptr的引用计数操作会带来开销。为TimerEntry实现一个对象池(Memory Pool)可以显著减少内存分配/回收的开销和碎片。在池中预分配TimerEntry对象,使用时从池中取用,用完后归还。- 精简
TimerEntry: 确保TimerEntry结构体尽可能小,减少缓存未命中。 std::listvs. intrusive list:std::list每个节点额外存储了前向和后向指针。如果对内存极致追求,可以考虑实现 intrusive list,让TimerEntry自身包含链表节点指针。但这会增加实现的复杂度。
5.3 线程安全与事件循环集成
- 单线程模型: 推荐将
MultiLevelTimeWheel实例绑定到主事件循环线程。所有add_timer,cancel_timer,tick操作都在该线程中执行。这样可以避免复杂的互斥锁,简化设计并提高性能。 - 跨线程操作: 如果其他线程需要添加或取消定时器,不应直接调用
MultiLevelTimeWheel的方法。正确的做法是:- 将定时器请求封装成消息。
- 通过线程安全的队列(如
std::queue+std::mutex+std::condition_variable或无锁队列)将消息发送给事件循环线程。 - 事件循环线程在每次
tick之前或之后,检查并处理这些消息,将实际的add_timer或cancel_timer调用转发给MultiLevelTimeWheel。
- 非阻塞回调:
TimerCallback必须是非阻塞的。如果回调函数需要执行耗时操作,应将其提交给一个单独的线程池去异步执行,而不是在事件循环线程中阻塞。
5.4 示例集成到事件循环
// 伪代码:事件循环主程序
void event_loop_thread_main() {
// 初始化时间轮,例如:
std::vector<std::pair<int, long long>> configs = {
{256, 1}, {64, 256}, {64, 16384}, {64, 1048576}, {64, 67108864}
};
MultiLevelTimeWheel timer_wheel(configs);
// 假设有一个 epoll 实例
int epoll_fd = epoll_create1(0);
// 循环处理事件
while (true) {
// 1. 获取下一个定时器到期时间,作为 epoll_wait 的超时参数
long long timeout_ms = timer_wheel.get_next_timeout_ms();
if (timeout_ms < 0) timeout_ms = -1; // -1 表示无限等待,如果没有任何定时器
// 2. 调用 epoll_wait 等待I/O事件或定时器到期
// 这里需要将 timeout_ms 转换为 int,注意溢出和单位
int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, static_cast<int>(timeout_ms));
// 3. 处理I/O事件
for (int i = 0; i < num_events; ++i) {
// ... 处理网络连接、数据收发等
}
// 4. 驱动时间轮,处理到期任务
timer_wheel.tick();
// 5. 处理来自其他线程的定时器请求 (如果存在)
// process_pending_timer_requests_from_queue();
}
}
// 示例:如何使用定时器
void on_connection_established(Connection* conn) {
// 10秒后检查连接是否活跃
TimerID heartbeat_timer_id = timer_wheel.add_timer(10000, [conn]() {
if (!conn->is_active()) {
conn->close();
} else {
// 重新添加心跳定时器
conn->reset_heartbeat_timer(); // 内部会再次调用 timer_wheel.add_timer
}
});
conn->set_heartbeat_timer_id(heartbeat_timer_id);
}
void on_connection_closed(Connection* conn) {
// 取消所有与该连接相关的定时器
timer_wheel.cancel_timer(conn->get_heartbeat_timer_id());
// ... 其他清理工作
}
六、结论
时间轮算法,尤其是多层时间轮设计,为高性能C++网络框架中的万级并发定时任务管理提供了一种卓越的解决方案。它通过巧妙地将时间离散化和任务分层存储,实现了接近O(1)的定时任务添加、取消和触发效率。结合C++的强大性能和精细控制能力,以及对内存、线程安全和事件循环集成的深入考量,我们可以构建出既高效又健壮的定时器系统,为您的网络服务提供稳定可靠的基石。