C++ 定时器设计:在高性能网络框架中实现万级并发定时任务的时间轮算法

各位编程爱好者、系统架构师,以及所有致力于构建高性能网络服务的同仁们,大家好!

今天,我们齐聚一堂,共同探讨一个在高性能网络框架中至关重要的议题:如何高效地管理海量的定时任务。在现代网络服务中,无论是连接超时检测、心跳机制、延迟重试、还是各种业务逻辑的定时触发,定时器都扮演着核心角色。尤其是在处理万级甚至更高并发量的场景下,传统定时器实现的性能瓶颈会迅速显现,成为系统扩展的阿喀琉斯之踵。

我们将深入剖析一种业界广泛采用,并被证明极为高效的算法——时间轮(Time Wheel),来解决这一挑战。本次讲座将以C++语言为实现载体,从原理、设计到具体代码实践,一步步揭示时间轮如何在O(1)级别的复杂度下,支撑起百万级并发定时任务的稳定运行。

一、高性能网络框架中定时器的核心需求与挑战

在高性能网络框架(如基于Reactor/Proactor模式的事件驱动框架)中,定时器并不仅仅是“一段时间后执行某个函数”这么简单。它面临着一系列严苛的需求和挑战:

  1. 大规模并发支持: 框架可能同时管理数万、数十万甚至数百万个活跃连接或逻辑实体,每个都可能需要一个或多个定时器。这意味着定时器实现必须能够高效地存储和管理海量的任务。
  2. 高精度与低延迟: 许多场景要求定时器能够以毫秒级的精度触发,并且从任务到期到实际执行的延迟要尽可能小。
  3. 高性能操作:
    • 添加(Add): 快速注册新的定时任务。
    • 取消(Cancel): 快速撤销尚未到期的定时任务(例如,连接关闭时需要取消其所有相关定时器)。
    • 触发(Tick): 核心操作,周期性地检查并执行到期任务,这个操作必须极其高效,以避免阻塞主事件循环。
  4. 内存效率: 大规模定时任务意味着如果每个任务的元数据占用过多内存,将导致巨大的内存开销。
  5. 线程安全与事件循环: 定时器通常与事件循环(Event Loop)紧密集成,在单线程环境下运行以避免复杂的锁竞争,但也要考虑如何安全地从其他线程添加或取消任务。
  6. 易用性: 提供简洁明了的API,方便框架用户快速集成。

传统的定时器实现方式,如基于排序链表或小顶堆(std::priority_queue),在面对大规模并发时,会暴露出明显的性能短板。

1.1 传统定时器实现的局限性

我们先回顾一下常见的两种传统定时器实现及其性能特点:

1.1.1 排序链表 (Sorted Linked List)

这种方法将所有定时任务按照其到期时间顺序存储在一个链表中。

  • 添加任务: 需要遍历链表找到合适的插入位置,平均时间复杂度为O(N),最坏情况下也是O(N)。
  • 取消任务: 同样需要遍历链表找到并移除任务,平均时间复杂度为O(N)。
  • 触发任务: 只需要检查链表头部的任务是否到期,O(1);如果到期则移除并执行,然后检查下一个。

优点: 实现简单,内存开销相对较小(每个节点只存储任务信息和指针)。
缺点: 随着任务数量N的增加,添加和取消操作的性能急剧下降,无法应对高并发场景。

1.1.2 小顶堆 (Min-Heap / std::priority_queue)

使用小顶堆存储定时任务,堆顶元素是到期时间最早的任务。

  • 添加任务: 将新任务插入堆中,并进行堆调整,时间复杂度为O(log N)。
  • 取消任务: std::priority_queue不直接支持按ID取消任意任务。如果需要取消,通常需要找到该任务在堆中的位置,然后进行删除和堆调整,这通常是O(N)的遍历,或者通过标记任务为“已取消”并在弹出时跳过,但这样会导致堆中存在大量无效任务,浪费内存和CPU。如果自定义堆,并维护一个映射关系,可以将查找优化到O(1),删除和堆调整仍是O(log N)。
  • 触发任务: 检查堆顶任务是否到期,如果到期则弹出堆顶并执行,然后重新调整堆,时间复杂度为O(log N)。

优点: 添加和触发操作的性能相对稳定,对数时间复杂度优于排序链表的线性复杂度。
缺点:

  • 取消任意任务比较复杂或效率低下。
  • 堆的内存局部性可能不如链表。
  • 频繁的堆调整在大规模任务下仍然会带来显著的CPU开销。

为了更直观地对比,我们用一个表格来总结:

定时器实现方式 添加任务复杂度 取消任务复杂度 触发任务复杂度 (单个到期任务) 适用场景
排序链表 O(N) O(N) O(1) 任务量小,不常取消
小顶堆 O(log N) O(N) 或 O(log N) (需额外实现) O(log N) 任务量适中,取消不频繁
时间轮 O(1) O(1) O(K) (K为当前槽位任务数) 任务量大,高并发,频繁取消

从上表可以看出,排序链表和小顶堆在“万级并发”的场景下,性能都难以令人满意。时间轮算法的O(1)特性,使其在面对高并发时展现出无可比拟的优势。

二、时间轮算法的核心原理

时间轮算法,顾名思义,就像一个时钟。它将时间划分为一系列离散的“槽位”(slots),每个槽位代表一个固定的时间间隔。所有在某个特定时间间隔内到期的定时任务,都会被放置到对应的槽位中。

2.1 单层时间轮 (Single-Level Time Wheel)

想象一个由N个槽位组成的圆环,每个槽位可以看作一个链表或一个桶。有一个指针(或索引) current_slot_index 周期性地向前移动,每移动一次,就代表“时间流逝”了一个单位(称为“tick”)。

基本工作流程:

  1. 初始化: 创建一个固定大小的数组(例如 std::vector<std::list<TimerEntry*>>),数组的每个元素就是一个槽位,槽位中存储着指向 TimerEntry 的指针列表。同时初始化 current_slot_index 为0。
  2. 添加定时任务:
    • 假设时间轮的每个槽位代表1毫秒,总共有256个槽位。
    • 如果要添加一个在 delay_ms 后到期的任务,其到期时间 expire_time_ms = current_time_ms + delay_ms
    • 计算任务应该放置的槽位:slot_index = (expire_time_ms / tick_interval_ms) % slot_count
    • 将任务插入到对应槽位链表的末尾。
    • 同时,为了支持快速取消,我们还需要一个哈希表(std::unordered_map<TimerID, TimerEntry*>)来存储 TimerIDTimerEntry 对象的映射。
  3. 时间轮转动(Tick):
    • 每经过一个 tick_interval_ms,时间轮的 current_slot_index 就会向前移动一位。
    • current_slot_index 移动到下一个槽位时,它会检查并处理当前槽位(即 current_slot_index 指向的槽位)中的所有定时任务。
    • 对于槽位中的每个任务:
      • 如果任务的 expire_time_ms 已经达到或超过 current_time_ms,则执行任务回调,并从槽位中移除该任务。
      • 如果任务的 expire_time_ms 尚未达到,说明这个任务需要多转几轮时间轮才能到期(即“圈数”未到),则继续保留在当前槽位。
    • current_slot_index 到达数组末尾后,会回到0,完成一轮循环。

单层时间轮的局限:

单层时间轮对于短时间的、密集的定时任务非常高效。但是,如果需要支持非常长的定时任务(例如,一个小时),而时间轮的槽位又很小(例如,每个槽位1毫秒,总共256个槽位),那么一个小时的任务需要时间轮转动很多很多圈才能到期。这意味着在 tick() 操作时,你需要频繁地检查任务的圈数,这增加了每次 tick 的计算量。为了解决这个问题,我们引入了多层时间轮。

2.2 多层时间轮 (Multi-Level Time Wheel)

多层时间轮类似于钟表的时、分、秒针。它由多个单层时间轮组成,每个轮子代表不同的时间粒度。例如:

  • 最底层轮子 (Wheel 0): 精度最高,槽位间隔最小(如1毫秒),负责处理短期的定时任务。
  • 第二层轮子 (Wheel 1): 槽位间隔是底层轮子总容量,负责处理中期的定时任务。
  • 第三层轮子 (Wheel 2): 槽位间隔是第二层轮子总容量,负责处理更长期的定时任务。

工作流程:

  1. 初始化: 创建N个时间轮,每个轮子有自己的槽位数量和槽位间隔。
  2. 添加定时任务:
    • 根据任务的 delay_ms,计算它应该被放置在哪一层时间轮的哪个槽位。
    • 例如,一个任务在65秒后到期。如果最底层轮子只能处理256毫秒的任务,那么它显然不能放在最底层。它会被放置在更高层的时间轮上。
    • 核心思想是,任务总是被放置在能够容纳其到期时间,且精度足够高的最顶层时间轮。
  3. 时间轮转动与级联 (Cascading):
    • tick() 操作总是从最底层的时间轮开始。
    • 当最底层时间轮的 current_slot_index 完成一轮循环(即从最后一个槽位回到0)时,它会触发其上一层时间轮的 current_slot_index 前进一格。
    • 当一个高层时间轮的指针前进时,它会将其当前槽位中的所有任务“级联”(cascade)到下一层时间轮。这些任务在被级联到下一层时,需要重新计算它们在下一层中的槽位位置。
    • 这个过程就像钟表的秒针走完一圈,分钟针就走一格;分钟针走完一圈,时针就走一格。

举例说明级联:

假设我们有两层时间轮:

  • W0 (秒轮): 槽位大小1毫秒,256个槽位,总容量256毫秒。
  • W1 (分钟轮): 槽位大小256毫秒,64个槽位,总容量 256 * 64 = 16384毫秒 (~16.3秒)。

一个任务要在 10000ms 后到期。

  • 10000ms 远超 W0 的容量 256ms
  • 10000msW1 的容量 16384ms 之内。
  • 所以,任务会被添加到 W1。计算其在 W1 中的槽位:slot_idx = (current_time_ms + 10000ms) / 256ms % 64

W0 的指针转动 256 圈,即 256ms 后,W1 的指针会前进一格。假设 W1 的指针当前指向 slot XW1 会将 slot X 中的所有任务级联到 W0。这些任务现在相对于 W0 来说,它们的到期时间更近了,可以在 W0 中找到合适的槽位。

这样,只有当任务即将到期时,它才会被移动到精度更高的底层时间轮,大大减少了高层时间轮的tick操作的开销。

三、C++ 实现:数据结构设计

在C++中实现时间轮,我们需要考虑几个关键的数据结构。

3.1 TimerEntry:定时任务的抽象

每个定时任务都需要一个结构体来保存其信息。

#pragma once

#include <functional>
#include <chrono>
#include <memory> // For std::shared_ptr or custom memory management

// 使用 long long 作为 TimerID,保证唯一性且足够大
using TimerID = long long;
using TimerCallback = std::function<void()>;

struct TimerEntry {
    TimerID id;                             // 定时器唯一ID
    long long expire_ms;                    // 任务到期时间戳 (毫秒)
    TimerCallback callback;                 // 任务回调函数
    bool cancelled;                         // 任务是否已被取消的标记
    int rounds;                             // 对于单层轮,表示还需要转几圈才到期
                                            // 对于多层轮,通常不需要这个字段,因为任务会级联

    // 为了支持O(1)删除,需要存储其在 std::list 中的迭代器
    // 注意:这里的迭代器是一个占位符,实际使用时需要特定类型
    // std::list<std::shared_ptr<TimerEntry>>::iterator list_iterator; 
    // 或者当使用 TimerEntry* 存储时:
    void* list_iterator_ptr; // 存储 std::list<TimerEntry*>::iterator 的原始指针,需小心转换

    TimerEntry() : id(0), expire_ms(0), cancelled(false), rounds(0), list_iterator_ptr(nullptr) {}

    // 构造函数
    TimerEntry(TimerID _id, long long _expire_ms, TimerCallback _cb)
        : id(_id), expire_ms(_expire_ms), callback(std::move(_cb)), cancelled(false), rounds(0), list_iterator_ptr(nullptr) {}

    // 禁用拷贝构造和赋值,因为迭代器等内部状态需要特殊处理
    // 或者如果 TimerEntry 不直接拥有迭代器,而是由外部管理,则可以允许
    TimerEntry(const TimerEntry&) = delete;
    TimerEntry& operator=(const TimerEntry&) = delete;

    // 允许移动构造和赋值
    TimerEntry(TimerEntry&&) = default;
    TimerEntry& operator=(TimerEntry&&) = default;
};

TimerEntry 字段说明:

  • id: 唯一标识一个定时器。用于取消操作。
  • expire_ms: 任务的绝对到期时间,使用毫秒时间戳。建议使用 std::chrono::steady_clock 来获取单调递增的时间,避免系统时间调整带来的问题。
  • callback: 任务到期后需要执行的回调函数,std::function<void()> 提供了灵活的函数封装能力。
  • cancelled: 一个布尔标记,用于实现“惰性取消”。当一个任务被取消时,我们不立即从槽位链表中删除它,而是标记为 cancelled=true。等到 tick 操作遍历到它时,发现已取消,便跳过执行并删除。这避免了在 cancel 操作中遍历链表的高昂开销。
  • rounds: 主要用于单层时间轮,表示任务还需要时间轮转动多少圈才能到期。在多层时间轮中,这个概念被级联机制取代。
  • list_iterator_ptr: 关键优化。 为了实现O(1)的取消操作,当 TimerEntry 被添加到某个槽位的 std::list 中时,我们需要将该 TimerEntrystd::list 中的迭代器存储起来。之后,如果需要取消这个任务,可以直接通过这个迭代器O(1)地从 std::list 中移除。由于 std::list::iterator 是一个模板类型,直接存储有点麻烦,这里我们用 void* 占位,实际实现中需要更严谨的方式(例如,在 TimerEntry 外部维护映射,或使用特殊设计的 intrusive_list)。

3.2 TimerWheel:单个时间轮的实现

一个 TimerWheel 类代表一个单独的时间轮层。

#pragma once

#include <vector>
#include <list>
#include <unordered_map>
#include <atomic>
#include <memory> // For std::shared_ptr or custom memory management
#include "TimerEntry.h" // 包含 TimerEntry 定义

// TimerWheel 的前向声明,用于多层时间轮中的级联
class MultiLevelTimeWheel;

class TimerWheel {
public:
    TimerWheel(int slots_num, long long tick_interval_ms, MultiLevelTimeWheel* parent_wheel = nullptr);
    ~TimerWheel();

    // 添加一个定时任务到当前时间轮
    // 返回 TimerEntry*,用于存储到外部的映射中
    std::shared_ptr<TimerEntry> add_timer(long long expire_ms, TimerCallback cb);

    // 从当前时间轮中移除一个定时任务
    // 实际是惰性取消:标记为cancelled,并在tick时处理
    bool cancel_timer(TimerID id);

    // 驱动时间轮转动一个tick
    // 返回到期并执行的任务数量
    int tick(long long current_ms);

    // 级联当前槽位中的任务到下一层时间轮
    void cascade_timers();

    // 获取当前时间轮的容量 (毫秒)
    long long get_capacity_ms() const { return static_cast<long long>(slots_num_) * tick_interval_ms_; }

    // 获取当前时间轮的槽位数
    int get_slots_num() const { return slots_num_; }

    // 获取当前时间轮的tick间隔
    long long get_tick_interval_ms() const { return tick_interval_ms_; }

    // 查找 TimerEntry*
    std::shared_ptr<TimerEntry> find_timer(TimerID id);

private:
    int slots_num_;                                 // 槽位总数
    long long tick_interval_ms_;                    // 每个槽位代表的时间间隔 (毫秒)
    std::vector<std::list<std::shared_ptr<TimerEntry>>> buckets_; // 存储定时任务的槽位
    int current_slot_index_;                        // 当前时间轮指针指向的槽位

    MultiLevelTimeWheel* parent_wheel_;             // 指向顶层时间轮的指针,用于级联
                                                    // 或者可以设计为直接级联到下一个 TimerWheel

    // 映射 TimerID 到 TimerEntry,用于O(1)查找和取消
    // 注意:这里存储的是 shared_ptr,确保 TimerEntry 生命周期被管理
    std::unordered_map<TimerID, std::shared_ptr<TimerEntry>> timer_entries_map_;

    static std::atomic<TimerID> next_timer_id_; // 用于生成唯一 TimerID
    TimerID generate_timer_id() { return next_timer_id_.fetch_add(1, std::memory_order_relaxed); }

    // 辅助函数,计算任务应放入哪个槽位
    int get_slot_index(long long expire_ms) const;
};

TimerWheel 字段说明:

  • slots_num_: 时间轮的槽位数量,例如256。
  • tick_interval_ms_: 每个槽位代表的时间长度,例如1毫秒。
  • buckets_: 核心数据结构,std::vector 存储着 std::list<std::shared_ptr<TimerEntry>>。每个 std::list 就是一个槽位,存储着到期时间落在这个槽位范围内的定时任务。使用 std::shared_ptr<TimerEntry> 简化内存管理,但也可以使用自定义对象池和裸指针来进一步优化性能和内存。
  • current_slot_index_: 当前时间轮的指针,指向正在处理的槽位。
  • parent_wheel_: 在多层时间轮中,用于向上级联或向下传递。这里暂时设计为指向 MultiLevelTimeWheel,实际可以更灵活。
  • timer_entries_map_: std::unordered_map<TimerID, std::shared_ptr<TimerEntry>>。这个映射至关重要,它允许我们通过 TimerID O(1)地查找 TimerEntry 对象,从而实现O(1)的取消操作(通过标记 cancelled 字段)。
  • next_timer_id_: 静态原子变量,用于生成全局唯一的定时器ID。

3.3 MultiLevelTimeWheel:多层时间轮的协调器

MultiLevelTimeWheel 类将多个 TimerWheel 实例组织起来,实现级联机制。

#pragma once

#include <vector>
#include <chrono>
#include <memory>
#include "TimerWheel.h"
#include "TimerEntry.h"

// 静态成员初始化
std::atomic<TimerID> TimerWheel::next_timer_id_ = 1;

class MultiLevelTimeWheel {
public:
    // 构造函数:初始化多层时间轮
    // wheel_configs: vector of {slots_num, tick_interval_ms} for each wheel
    MultiLevelTimeWheel(const std::vector<std::pair<int, long long>>& wheel_configs);
    ~MultiLevelTimeWheel();

    // 添加定时任务
    TimerID add_timer(long long delay_ms, TimerCallback cb);

    // 取消定时任务
    bool cancel_timer(TimerID id);

    // 驱动所有时间轮转动,并处理到期任务
    // 建议每隔 tick_interval_ms_lowest (最底层时间轮的tick间隔) 调用一次
    int tick();

    // 获取下一个即将到期的任务的剩余时间 (ms)
    // 这是一个优化,可以用于epoll_wait/select的timeout参数
    long long get_next_timeout_ms() const;

private:
    std::vector<std::unique_ptr<TimerWheel>> wheels_; // 存储各层时间轮
    long long last_tick_ms_;                          // 上次tick操作的时间戳 (毫秒)
    long long lowest_level_tick_interval_ms_;         // 最底层时间轮的tick间隔

    // 内部方法,根据 expire_ms 决定将任务放置在哪一层时间轮
    void add_timer_internal(std::shared_ptr<TimerEntry> entry);

    // 用于 TimerWheel 内部调用的级联接口
    friend class TimerWheel;
    void cascade_to_next_wheel(std::shared_ptr<TimerEntry> entry, int current_wheel_idx);
};

MultiLevelTimeWheel 字段说明:

  • wheels_: std::vector<std::unique_ptr<TimerWheel>> 存储所有时间轮层。unique_ptr 确保了 TimerWheel 对象的生命周期由 MultiLevelTimeWheel 管理。
  • last_tick_ms_: 记录上一次 tick() 操作时的时间戳,用于计算本次 tick 应该前进多少个最底层轮子的 tick
  • lowest_level_tick_interval_ms_: 最底层时间轮的 tick_interval_ms,这决定了 MultiLevelTimeWheel::tick() 应该被调用的最小频率。

四、C++ 实现:核心操作逻辑

接下来,我们详细实现这些类的关键方法。

4.1 获取当前时间

在所有时间相关的操作中,获取一个稳定、单调递增的时间戳至关重要。std::chrono::steady_clock 是理想的选择。

long long get_current_steady_ms() {
    return std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::steady_clock::now().time_since_epoch()
    ).count();
}

4.2 TimerWheel 构造与辅助函数

// TimerWheel.cpp
#include "TimerWheel.h"
#include <iostream> // For debugging

// 静态成员初始化 (在 TimerWheel.cpp 中定义)
std::atomic<TimerID> TimerWheel::next_timer_id_ = 1;

TimerWheel::TimerWheel(int slots_num, long long tick_interval_ms, MultiLevelTimeWheel* parent_wheel)
    : slots_num_(slots_num),
      tick_interval_ms_(tick_interval_ms),
      buckets_(slots_num), // 初始化槽位 vector
      current_slot_index_(0),
      parent_wheel_(parent_wheel) {
    // 确保槽位数量和tick间隔是正数
    if (slots_num_ <= 0 || tick_interval_ms_ <= 0) {
        throw std::invalid_argument("slots_num and tick_interval_ms must be positive.");
    }
}

TimerWheel::~TimerWheel() {
    // 清理所有桶中的 TimerEntry
    for (auto& bucket : buckets_) {
        bucket.clear(); // shared_ptr 会自动释放 TimerEntry
    }
    timer_entries_map_.clear();
}

int TimerWheel::get_slot_index(long long expire_ms) const {
    // 根据到期时间戳计算槽位索引
    // 注意这里除以 tick_interval_ms_ 是为了将绝对时间戳转换为相对槽位时间
    // 然后取模得到在当前轮中的索引
    return (expire_ms / tick_interval_ms_) % slots_num_;
}

std::shared_ptr<TimerEntry> TimerWheel::find_timer(TimerID id) {
    auto it = timer_entries_map_.find(id);
    if (it != timer_entries_map_.end()) {
        return it->second;
    }
    return nullptr;
}

4.3 TimerWheel::add_timer (内部使用,由 MultiLevelTimeWheel 调用)

这个 add_timer 方法是供 MultiLevelTimeWheel 内部调用,负责将已经创建好的 TimerEntry 放入当前时间轮的正确槽位。

// TimerWheel.cpp
std::shared_ptr<TimerEntry> TimerWheel::add_timer(long long expire_ms, TimerCallback cb) {
    TimerID id = generate_timer_id();
    auto entry = std::make_shared<TimerEntry>(id, expire_ms, std::move(cb));

    // 计算任务应该放置的槽位
    int slot_idx = get_slot_index(expire_ms);

    // 将 TimerEntry 插入到对应槽位的链表
    buckets_[slot_idx].push_back(entry);

    // 存储 TimerEntry 在链表中的迭代器(这里我们暂时不直接存储迭代器)
    // 如果需要O(1)的实际删除,TimerEntry 需要包含 std::list<std::shared_ptr<TimerEntry>>::iterator 成员
    // 并在这里设置它,但为了简化代码和避免交叉依赖,我们依赖惰性取消。

    // 将 TimerID 和 TimerEntry 映射存储起来,用于取消操作
    timer_entries_map_[id] = entry;

    return entry; // 返回 TimerEntry 方便上层进行管理
}

// 实际的多层时间轮的add_timer会是这样的:
// MultiLevelTimeWheel::add_timer(long long delay_ms, TimerCallback cb) {
//     long long current_ms = get_current_steady_ms();
//     long long expire_ms = current_ms + delay_ms;
//     TimerID id = generate_timer_id(); // 注意这里 TimerID 应该由 MultiLevelTimeWheel 统一生成
//     auto entry = std::make_shared<TimerEntry>(id, expire_ms, std::move(cb));
//     add_timer_internal(entry); // 调用内部方法将任务放置到合适的轮子
//     return id;
// }

4.4 TimerWheel::cancel_timer (惰性取消)

// TimerWheel.cpp
bool TimerWheel::cancel_timer(TimerID id) {
    auto it = timer_entries_map_.find(id);
    if (it != timer_entries_map_.end()) {
        it->second->cancelled = true; // 标记为已取消
        // 实际从 map 中移除,但 TimerEntry 还在 list 中
        timer_entries_map_.erase(it); // 从 map 中移除,表示不再管理
        return true;
    }
    return false;
}

4.5 TimerWheel::tick (时间轮转动与任务处理)

这是时间轮的核心逻辑。它负责推进 current_slot_index_,并处理当前槽位中的任务。

// TimerWheel.cpp
int TimerWheel::tick(long long current_ms) {
    // 先处理当前槽位
    std::list<std::shared_ptr<TimerEntry>>& current_bucket = buckets_[current_slot_index_];
    auto it = current_bucket.begin();
    int executed_count = 0;

    while (it != current_bucket.end()) {
        std::shared_ptr<TimerEntry> entry = *it;

        if (entry->cancelled) {
            // 任务已取消,直接从链表中移除
            it = current_bucket.erase(it);
            // 注意:因为我们已经从 timer_entries_map_ 中移除了,这里不需要再操作 map
            // 并且 shared_ptr 会自动管理 TimerEntry 的生命周期
            continue;
        }

        // 检查任务是否到期
        // 对于单层时间轮,需要检查圈数
        // 对于多层时间轮,如果到达这一层,说明到期时间在当前轮的容量内
        // 并且如果到期时间 <= current_ms,则执行
        if (entry->expire_ms <= current_ms) {
            // 任务到期,执行回调
            if (entry->callback) {
                entry->callback();
                executed_count++;
            }
            // 从链表中移除已执行的任务
            it = current_bucket.erase(it);
            // 从 map 中移除 (如果之前没有取消,这里才移除)
            timer_entries_map_.erase(entry->id);
        } else {
            // 任务未到期,或者需要级联到下一层
            // 对于多层时间轮,这里是处理级联逻辑的地方
            // 如果是最高层轮子,且任务未到期,则保留
            // 如果不是最高层,且任务未到期,则级联
            // ----------------------------------------------------
            // 在 MultiLevelTimeWheel 的 tick 逻辑中,会调用 cascade_timers
            // 这里假设这个 tick 方法只处理本轮到期的任务,不处理级联
            // 级联由 MultiLevelTimeWheel 协调
            // ----------------------------------------------------
            ++it; // 移动到下一个任务
        }
    }

    // 移动到下一个槽位
    current_slot_index_ = (current_slot_index_ + 1) % slots_num_;
    return executed_count;
}

4.6 MultiLevelTimeWheel 构造函数

// MultiLevelTimeWheel.cpp
#include "MultiLevelTimeWheel.h"
#include <algorithm> // For std::min

MultiLevelTimeWheel::MultiLevelTimeWheel(const std::vector<std::pair<int, long long>>& wheel_configs)
    : last_tick_ms_(get_current_steady_ms()),
      lowest_level_tick_interval_ms_(0) { // Initialized later

    if (wheel_configs.empty()) {
        throw std::invalid_argument("Wheel configurations cannot be empty.");
    }

    long long prev_wheel_capacity = 1; // 虚拟的上一层轮子容量,用于计算第一层轮子的tick_interval

    for (size_t i = 0; i < wheel_configs.size(); ++i) {
        int slots_num = wheel_configs[i].first;
        long long tick_interval_ms = wheel_configs[i].second;

        // 验证配置的合理性
        if (slots_num <= 0 || tick_interval_ms <= 0) {
            throw std::invalid_argument("Invalid wheel config: slots_num and tick_interval_ms must be positive.");
        }

        // 通常,高层轮子的tick_interval_ms应该是低层轮子的总容量
        // 但是为了更灵活的配置,这里允许直接指定
        // 如果我们遵循严格的级联模式:
        // 第0层轮子: tick_interval_ms_0
        // 第1层轮子: tick_interval_ms_1 = slots_num_0 * tick_interval_ms_0
        // ...
        // 这里的 tick_interval_ms 是指当前轮子每个槽位的实际时间粒度
        // 如果是第一层轮子 (i=0),则其 tick_interval_ms 由配置指定
        // 如果是更高层轮子 (i>0),其 tick_interval_ms 应该是前一层轮子的总容量
        // 为了简化,我们直接使用配置传入的 tick_interval_ms,但用户需要确保其逻辑正确性

        wheels_.push_back(std::make_unique<TimerWheel>(slots_num, tick_interval_ms, this));
        prev_wheel_capacity = wheels_.back()->get_capacity_ms(); // 更新上一层轮子的容量

        if (i == 0) {
            lowest_level_tick_interval_ms_ = tick_interval_ms;
        }
    }

    if (lowest_level_tick_interval_ms_ == 0) { // 防止配置为空
        throw std::runtime_error("Lowest level tick interval not set.");
    }
}

MultiLevelTimeWheel::~MultiLevelTimeWheel() {
    wheels_.clear(); // unique_ptr 会自动清理
}

示例 wheel_configs 配置:

// 假设我们需要支持最长约48天的定时任务
// Wheel 0 (最底层): 256 slots, 1ms/slot = 256ms 容量
// Wheel 1: 64 slots, 256ms/slot = 16384ms (~16秒) 容量
// Wheel 2: 64 slots, 16384ms/slot = 1048576ms (~17分钟) 容量
// Wheel 3: 64 slots, 1048576ms/slot = 67108864ms (~18小时) 容量
// Wheel 4 (最高层): 64 slots, 67108864ms/slot = 4294967296ms (~48天) 容量

std::vector<std::pair<int, long long>> configs = {
    {256, 1},       // Wheel 0: 1ms per slot, 256 slots => 256ms capacity
    {64, 256},      // Wheel 1: 256ms per slot, 64 slots => 16384ms capacity (256 * 64)
    {64, 16384},    // Wheel 2: 16384ms per slot, 64 slots => 1048576ms capacity (16384 * 64)
    {64, 1048576},  // Wheel 3: 1048576ms per slot, 64 slots => 67108864ms capacity (1048576 * 64)
    {64, 67108864}  // Wheel 4: 67108864ms per slot, 64 slots => 4294967296ms capacity (67108864 * 64)
};
MultiLevelTimeWheel my_timer_wheel(configs);

注意: 这里的 tick_interval_ms 配置需要用户自己计算好。例如,Wheel 1tick_interval_ms 应该等于 Wheel 0 的总容量。这个设计是灵活的,但也要求使用者理解其含义。

4.7 MultiLevelTimeWheel::add_timer

// MultiLevelTimeWheel.cpp
TimerID MultiLevelTimeWheel::add_timer(long long delay_ms, TimerCallback cb) {
    long long current_ms = get_current_steady_ms();
    long long expire_ms = current_ms + delay_ms;

    TimerID id = TimerWheel::next_timer_id_.fetch_add(1, std::memory_order_relaxed); // 由 TimerWheel 统一生成 ID

    auto entry = std::make_shared<TimerEntry>(id, expire_ms, std::move(cb));
    add_timer_internal(entry);
    return id;
}

void MultiLevelTimeWheel::add_timer_internal(std::shared_ptr<TimerEntry> entry) {
    long long delay_ms_remaining = entry->expire_ms - get_current_steady_ms(); // 相对延迟

    // 找到合适的轮子来放置任务
    for (int i = 0; i < wheels_.size(); ++i) {
        long long wheel_capacity = wheels_[i]->get_capacity_ms();
        long long wheel_tick_interval = wheels_[i]->get_tick_interval_ms();

        // 如果任务的剩余延迟小于等于当前轮子的总容量
        // 并且任务的到期时间 (相对于当前轮子的 tick 粒度) 能够被当前轮子处理
        // (即,如果放在当前轮子,它会在一轮内到期,或者它距离最近的 tick 足够远,不会被立即处理)
        if (delay_ms_remaining < wheel_capacity || i == wheels_.size() - 1) { // 最后一层强制放入
            // 计算任务在当前轮子中需要转的圈数 (rounds)
            // 这里我们不需要显式存储 rounds,级联机制会处理
            // 只需要确保 expire_ms 落在当前轮子的一个槽位内

            // 实际插入到对应轮子
            int slot_idx = (entry->expire_ms / wheel_tick_interval) % wheels_[i]->get_slots_num();
            wheels_[i]->buckets_[slot_idx].push_back(entry);
            wheels_[i]->timer_entries_map_[entry->id] = entry; // 维护映射
            return;
        }
    }
    // 如果走到这里,说明任务延迟超出了所有轮子的最大容量,通常是逻辑错误
    // 或者可以考虑抛出异常或记录日志
    // std::cerr << "Error: Timer delay_ms " << delay_ms_remaining << " exceeds max wheel capacity." << std::endl;
}

4.8 MultiLevelTimeWheel::cancel_timer

// MultiLevelTimeWheel.cpp
bool MultiLevelTimeWheel::cancel_timer(TimerID id) {
    // 遍历所有轮子查找并取消
    for (const auto& wheel : wheels_) {
        // 先尝试在当前轮子的 map 中找到 TimerEntry
        auto entry_ptr = wheel->find_timer(id);
        if (entry_ptr) {
            entry_ptr->cancelled = true; // 标记为取消
            wheel->timer_entries_map_.erase(id); // 从 map 中移除
            return true;
        }
    }
    return false; // 未找到该ID的定时器
}

4.9 MultiLevelTimeWheel::tick 与级联

这是最复杂的部分,需要协调所有时间轮的转动和任务的级联。

// MultiLevelTimeWheel.cpp
int MultiLevelTimeWheel::tick() {
    long long current_ms = get_current_steady_ms();
    int executed_tasks_count = 0;

    // 计算自上次 tick 以来,最底层时间轮应该转动多少个槽位
    long long elapsed_ms = current_ms - last_tick_ms_;
    if (elapsed_ms < lowest_level_tick_interval_ms_) {
        // 时间还没到下一个 tick 周期,不做任何处理
        return 0;
    }

    // 计算应该转动多少个最底层轮子的 tick
    long long ticks_to_advance = elapsed_ms / lowest_level_tick_interval_ms_;
    last_tick_ms_ = current_ms; // 更新上次 tick 时间

    for (long long i = 0; i < ticks_to_advance; ++i) {
        // 从最底层轮子开始驱动
        // 依次调用每个轮子的 tick 方法
        // 注意:这里的 current_ms 应该是每次 tick 时的准确时间,但为了简化,用当前的 current_ms
        // 实际应用中,如果 tick_to_advance 很大,需要更精细的时间步进
        executed_tasks_count += wheels_[0]->tick(current_ms);

        // 检查最底层轮子是否完成一轮,如果完成,则触发上一层轮子的级联
        // 这里需要判断 current_slot_index_ 是否刚刚从 slots_num - 1 变为 0
        // 但为了简化,我们让每个轮子在 tick 内部处理其 current_slot_index_ 的更新
        // 并在 MultiLevelTimeWheel 中根据轮子容量判断是否需要级联

        // 级联逻辑:
        // 遍历所有轮子,从第0层开始
        for (int wheel_idx = 0; wheel_idx < wheels_.size(); ++wheel_idx) {
            auto& current_wheel = wheels_[wheel_idx];
            // 只有当前轮子的指针刚好从最后一个槽位回到0时,才触发上一层轮子的级联
            // 这里的判断条件需要与 TimerWheel 内部的 current_slot_index_ 更新逻辑匹配
            // 一个更简单的级联方式是:当一个轮子的 tick() 被调用了其 slots_num_ 次后,才触发上一层
            // 但是,这和每次 tick 都检查当前槽位是否需要级联是不同的
            // 采用经典的级联方式:当一个轮子的指针到达它的“零点”时,其上一层轮子前进一格

            // 简化级联:
            // 假设我们每推进一个最低层轮子的 tick,就检查所有轮子是否需要级联
            // 当某个轮子的 current_slot_index_ == 0 时,说明它完成了一轮,需要级联其上一层
            // 实际更准确的级联是在 TimerWheel 内部判断

            // 为了简化,我们假设 TimerWheel::tick 仅处理当前槽位到期任务
            // 级联则通过一个单独的 cascade_timers 方法实现,当 MultiLevelTimeWheel 发现
            // 某个轮子的 tick 已经走完了一圈时,就调用其 cascade_timers

            // 重新设计级联触发条件:
            // 当一个轮子的当前槽位索引为0,且它不是最底层轮子时,说明它已经走完一圈,需要触发级联
            // 但这不够准确,因为只有当它的指针从 N-1 移动到 0 时才应该级联
            // 更准确的方法是在 TimerWheel::tick() 中返回一个标志,指示是否完成了“一圈”
            // 进一步简化:我们不让 TimerWheel 内部管理级联,而是 MultiLevelTimeWheel 统一管理
            // 当 MultiLevelTimeWheel 的 tick 循环到足够多的次数,使得一个高层轮子应该前进时,
            // 就调用高层轮子的 cascade_timers 方法。

            // 级联的核心思想是:
            // 当一个轮子 (比如 W_i) 走完一圈时,其所有槽位都被处理过一次
            // 此时,W_i+1 (上一层轮子) 的指针应该前进一格。
            // W_i+1 当前槽位里的任务,应该被重新放置到 W_i。
            //
            // 所以,我们可以在 MultiLevelTimeWheel 的 tick 循环中,
            // 维护每个轮子的“已转动次数”。当某个轮子的“已转动次数”达到其 slots_num 时,
            // 就将该轮子的上层轮子指针前进一格,并执行级联。

            // 这里采用更直接的级联方式:
            // 当最底层轮子推进时,如果它完成了整整一圈,就尝试级联上一层。
            //
            // 这是一个更直接的级联实现,它在每个 tick 步进后,检查所有轮子是否需要级联
            // 如果一个轮子 current_slot_index_ 刚刚变为 0,意味着它完成了一圈,需要触发其上层轮子
            // 当然,这需要知道之前是否为 N-1,或者计算每个轮子应该前进多少步。
            //
            // 最简单的级联方案是:
            // 在 `tick()` 循环中,每当一个 `TimerWheel` 完成一个完整的周期(即 `current_slot_index_` 从 `slots_num_-1` 变为 `0`),
            // 就将其上一级 `TimerWheel` 的 `current_slot_index_` 推进一格,并触发该上一级 `TimerWheel` 的级联操作。

            // 我们在 TimerWheel 中提供一个 cascade_timers() 方法,由 MultiLevelTimeWheel 在适当时候调用。
            // 为了正确实现级联,我们需要知道每个轮子当前应该前进多少步。

            // 维护每个轮子已经转动的“虚拟 tick”次数
            static std::vector<long long> wheel_current_ticks(wheels_.size(), 0);

            // 每次最底层轮子 tick,所有轮子的虚拟 tick 都会增加
            // 对于最底层轮子 (idx = 0)
            wheel_current_ticks[0]++;
            // 对于其他轮子 (idx > 0)
            for (int k = 1; k < wheels_.size(); ++k) {
                // 如果当前轮子(W_k-1)的转动次数达到了其槽位总数,则上一层轮子(W_k)需要前进一格
                if (wheel_current_ticks[k-1] >= wheels_[k-1]->get_slots_num()) {
                    wheel_current_ticks[k-1] -= wheels_[k-1]->get_slots_num(); // 归零,准备下一轮
                    wheel_current_ticks[k]++; // 上层轮子前进一格

                    // 触发级联:将 W_k 的 current_slot_index_ 指向的槽位中的所有任务级联到 W_k-1
                    // 实际是 W_k 的 current_slot_index_ 指向的槽位中的任务被重新计算位置,放到 W_k-1 中
                    cascade_to_next_wheel(wheel_idx, current_ms); // 这里的 current_ms 可能是近似值
                }
            }
        }
    }
    return executed_tasks_count;
}

// 级联辅助函数,将指定轮子某个槽位的任务级联到下一层轮子
void MultiLevelTimeWheel::cascade_to_next_wheel(int wheel_idx, long long current_ms) {
    if (wheel_idx == 0) { // 最底层轮子不需要级联
        return;
    }

    auto& current_wheel = wheels_[wheel_idx];
    auto& target_wheel = wheels_[wheel_idx - 1];

    int current_slot_to_cascade = current_wheel->current_slot_index_; // 应该级联的槽位

    // 将 current_wheel 当前槽位的所有任务移动到 target_wheel
    std::list<std::shared_ptr<TimerEntry>> tasks_to_cascade;
    tasks_to_cascade.splice(tasks_to_cascade.end(), current_wheel->buckets_[current_slot_to_cascade]);

    for (auto& entry : tasks_to_cascade) {
        if (entry->cancelled) {
            // 已取消的任务直接丢弃
            continue;
        }
        // 从当前轮子的 map 中移除 (因为任务不再由当前轮子管理)
        current_wheel->timer_entries_map_.erase(entry->id);

        // 重新计算任务在下一层轮子中的位置并添加
        // 注意:这里需要确保 entry->expire_ms 仍然是绝对时间戳
        int target_slot_idx = (entry->expire_ms / target_wheel->get_tick_interval_ms()) % target_wheel->get_slots_num();
        target_wheel->buckets_[target_slot_idx].push_back(entry);
        target_wheel->timer_entries_map_[entry->id] = entry; // 添加到目标轮子的 map
    }
}

关于 MultiLevelTimeWheel::tick 和级联的更正与优化:

上述 tick 函数中的级联逻辑略显复杂,且 wheel_current_ticks 静态变量在多实例或多线程场景下会有问题。更健壮的级联应该这样做:

  1. MultiLevelTimeWheel::tick() 每次被调用时,计算自上次调用以来流逝了多少个 最底层轮子tick 周期。
  2. 在一个循环中,每次推进一个最底层轮子的 tick
  3. 在每次推进 wheels_[0]tick() 之后,检查 wheels_[0] 是否刚好转完一圈(即 current_slot_index_slots_num - 1 变为 0)。
  4. 如果 wheels_[0] 转完一圈,则尝试级联 wheels_[1] 的当前槽位到 wheels_[0]
  5. 如果 wheels_[1] 在级联后,也恰好转完一圈(即 current_slot_index_slots_num - 1 变为 0),则尝试级联 wheels_[2] 的当前槽位到 wheels_[1],以此类推。

这是一种递归或迭代的级联方式。

// MultiLevelTimeWheel.cpp (Revised tick and cascade logic)

// 辅助函数:将一个轮子的任务级联到它的下一层轮子
void MultiLevelTimeWheel::cascade_wheel(int wheel_idx) {
    if (wheel_idx == 0) { // 最底层轮子不级联
        return;
    }

    auto& current_wheel = wheels_[wheel_idx];
    auto& lower_wheel = wheels_[wheel_idx - 1];

    // 获取当前轮子需要级联的槽位(其 current_slot_index_ 指向的槽位)
    int slot_to_cascade_idx = current_wheel->current_slot_index_;

    // 将该槽位的所有任务移动到一个临时列表
    std::list<std::shared_ptr<TimerEntry>> temp_tasks;
    temp_tasks.splice(temp_tasks.end(), current_wheel->buckets_[slot_to_cascade_idx]);

    for (auto& entry : temp_tasks) {
        if (entry->cancelled) { // 已取消的任务直接丢弃
            continue;
        }
        // 从当前轮子的 map 中移除
        current_wheel->timer_entries_map_.erase(entry->id);

        // 重新计算任务在下一层轮子中的位置并添加
        int target_slot_idx = (entry->expire_ms / lower_wheel->get_tick_interval_ms()) % lower_wheel->get_slots_num();
        lower_wheel->buckets_[target_slot_idx].push_back(entry);
        lower_wheel->timer_entries_map_[entry->id] = entry; // 添加到目标轮子的 map
    }
}

int MultiLevelTimeWheel::tick() {
    long long current_ms = get_current_steady_ms();
    int executed_tasks_count = 0;

    long long elapsed_ticks = (current_ms - last_tick_ms_) / lowest_level_tick_interval_ms_;
    last_tick_ms_ += elapsed_ticks * lowest_level_tick_interval_ms_; // 更新到精确的下一个 tick 时间点

    for (long long i = 0; i < elapsed_ticks; ++i) {
        // 1. 推进最底层轮子 (wheels_[0]) 的指针,并处理到期任务
        executed_tasks_count += wheels_[0]->tick(current_ms); // current_ms 是最新的时间

        // 2. 检查并级联:从最底层开始,如果一个轮子转了一圈,就将其上层轮子的任务级联下来
        //    同时,上层轮子的指针也需要前进
        for (int wheel_idx = 0; wheel_idx < wheels_.size(); ++wheel_idx) {
            auto& current_wheel = wheels_[wheel_idx];

            // 假设 TimerWheel::tick 内部已经将 current_slot_index_ 更新到下一个槽位
            // 如果 current_slot_index_ 变为 0,意味着它完成了一圈
            if (current_wheel->current_slot_index_ == 0 && wheel_idx < wheels_.size() - 1) {
                // 当前轮子转了一圈,需要级联其上层轮子的任务,并推进上层轮子的指针
                cascade_wheel(wheel_idx + 1); // 级联上一层轮子的任务到当前轮子
                wheels_[wheel_idx + 1]->current_slot_index_ = 
                    (wheels_[wheel_idx + 1]->current_slot_index_ + 1) % wheels_[wheel_idx + 1]->get_slots_num();
            } else {
                // 如果当前轮子没有转完一圈,或者已经是最高层轮子,则停止向上级联
                break; 
            }
        }
    }
    return executed_tasks_count;
}

重要提示:

  • TimerWheel::tick()current_slot_index_ 更新应该在处理完当前槽位后。
  • MultiLevelTimeWheel::tick() 循环中的 current_ms 应该随着每次“虚拟 tick”的推进而调整,或者直接使用循环外部的 current_ms,因为我们处理的是累计的 elapsed_ticks。这里使用外部 current_ms 简化。
  • 级联逻辑 cascade_wheel 将一个轮子当前槽位的所有任务移动到其下一层轮子。MultiLevelTimeWheel::tick 负责判断何时触发这个级联。

4.10 MultiLevelTimeWheel::get_next_timeout_ms

这个方法对于与事件循环(如 epoll_wait)集成非常有用,可以计算出 epoll_wait 需要等待的最短时间。

// MultiLevelTimeWheel.cpp
long long MultiLevelTimeWheel::get_next_timeout_ms() const {
    long long current_ms = get_current_steady_ms();
    long long min_timeout = -1; // -1 表示没有定时器

    // 遍历所有轮子,找到最早到期的任务
    // 理论上,只需要检查最底层轮子的当前槽位和接下来的几个槽位
    // 但为了严谨,这里遍历所有轮子

    // 实际优化:最早的定时器一定在最底层轮子中,或者即将被级联到最底层轮子中。
    // 所以只需要看最底层轮子。
    const auto& lowest_wheel = wheels_[0];
    const auto& current_bucket = lowest_wheel->buckets_[lowest_wheel->current_slot_index_];

    // 检查当前槽位中的任务
    for (const auto& entry : current_bucket) {
        if (!entry->cancelled && entry->expire_ms > current_ms) {
            long long remaining = entry->expire_ms - current_ms;
            if (min_timeout == -1 || remaining < min_timeout) {
                min_timeout = remaining;
            }
        }
    }

    // 考虑下一个槽位,如果当前槽位为空且下一个槽位有任务,那么下一个槽位的任务将是下一个到期的
    // 这是一个近似值,因为高层轮子也可能级联下来
    // 如果当前槽位没有任务,并且 lowest_wheel->current_slot_index_ + 1 处的槽位有任务
    // 那么最早的到期时间可能就是 lowest_wheel->current_slot_index_ + 1 处的任务

    // 对于严格的时间轮,下一个 tick 就会处理当前槽位之后的所有任务
    // 所以,min_timeout 不会超过 lowest_level_tick_interval_ms_
    // 因此,直接返回 lowest_level_tick_interval_ms_ 是一个安全的等待时间
    // 或者返回 0 如果有任务已经到期但未处理(这不应该发生在 tick 之后)
    // 为了保守,可以直接返回 lowest_level_tick_interval_ms_

    if (min_timeout == -1) { // 如果当前槽位没有未取消的未来任务
        // 考虑最底层轮子的下一个 tick 周期
        return lowest_level_tick_interval_ms_; 
    }
    return std::min(min_timeout, lowest_level_tick_interval_ms_);
}

五、性能分析与集成

5.1 时间轮的性能特性

操作 复杂度 备注
添加任务 O(1) 计算槽位,将任务添加到 std::list 中,将映射添加到 std::unordered_map
取消任务 O(1) std::unordered_map 中查找 TimerEntry,标记 cancelled=true,从 map 中移除。
驱动 (Tick) O(K) (K为当前处理槽位中的任务数量) 每个 tick 只处理一个槽位中的任务,平均 K = N / (总槽位数)。
内存占用 O(N) N个 TimerEntry 对象 + std::vector + std::list 节点 + std::unordered_map

为什么时间轮高效?

  • O(1) 添加/取消: 这是其最大的优势。无论是添加还是取消,都避免了对整个任务集合的遍历或复杂调整。
  • 局部化处理: 每次 tick 只关注当前槽位,而不是整个定时器集合。当任务数量N非常大时,K(当前槽位的任务数)通常远小于N。
  • 级联机制: 有效地将长周期任务“存储”在高层轮子中,只有在任务即将到期时才将其级联到低层轮子,避免了对大量不活跃任务的频繁检查。

5.2 内存优化

  • TimerEntry 对象池: 频繁的 std::make_sharedshared_ptr 的引用计数操作会带来开销。为 TimerEntry 实现一个对象池(Memory Pool)可以显著减少内存分配/回收的开销和碎片。在池中预分配 TimerEntry 对象,使用时从池中取用,用完后归还。
  • 精简 TimerEntry 确保 TimerEntry 结构体尽可能小,减少缓存未命中。
  • std::list vs. intrusive list: std::list 每个节点额外存储了前向和后向指针。如果对内存极致追求,可以考虑实现 intrusive list,让 TimerEntry 自身包含链表节点指针。但这会增加实现的复杂度。

5.3 线程安全与事件循环集成

  • 单线程模型: 推荐将 MultiLevelTimeWheel 实例绑定到主事件循环线程。所有 add_timer, cancel_timer, tick 操作都在该线程中执行。这样可以避免复杂的互斥锁,简化设计并提高性能。
  • 跨线程操作: 如果其他线程需要添加或取消定时器,不应直接调用 MultiLevelTimeWheel 的方法。正确的做法是:
    1. 将定时器请求封装成消息。
    2. 通过线程安全的队列(如 std::queue + std::mutex + std::condition_variable 或无锁队列)将消息发送给事件循环线程。
    3. 事件循环线程在每次 tick 之前或之后,检查并处理这些消息,将实际的 add_timercancel_timer 调用转发给 MultiLevelTimeWheel
  • 非阻塞回调: TimerCallback 必须是非阻塞的。如果回调函数需要执行耗时操作,应将其提交给一个单独的线程池去异步执行,而不是在事件循环线程中阻塞。

5.4 示例集成到事件循环

// 伪代码:事件循环主程序
void event_loop_thread_main() {
    // 初始化时间轮,例如:
    std::vector<std::pair<int, long long>> configs = {
        {256, 1}, {64, 256}, {64, 16384}, {64, 1048576}, {64, 67108864}
    };
    MultiLevelTimeWheel timer_wheel(configs);

    // 假设有一个 epoll 实例
    int epoll_fd = epoll_create1(0);

    // 循环处理事件
    while (true) {
        // 1. 获取下一个定时器到期时间,作为 epoll_wait 的超时参数
        long long timeout_ms = timer_wheel.get_next_timeout_ms();
        if (timeout_ms < 0) timeout_ms = -1; // -1 表示无限等待,如果没有任何定时器

        // 2. 调用 epoll_wait 等待I/O事件或定时器到期
        // 这里需要将 timeout_ms 转换为 int,注意溢出和单位
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, static_cast<int>(timeout_ms));

        // 3. 处理I/O事件
        for (int i = 0; i < num_events; ++i) {
            // ... 处理网络连接、数据收发等
        }

        // 4. 驱动时间轮,处理到期任务
        timer_wheel.tick();

        // 5. 处理来自其他线程的定时器请求 (如果存在)
        // process_pending_timer_requests_from_queue();
    }
}

// 示例:如何使用定时器
void on_connection_established(Connection* conn) {
    // 10秒后检查连接是否活跃
    TimerID heartbeat_timer_id = timer_wheel.add_timer(10000, [conn]() {
        if (!conn->is_active()) {
            conn->close();
        } else {
            // 重新添加心跳定时器
            conn->reset_heartbeat_timer(); // 内部会再次调用 timer_wheel.add_timer
        }
    });
    conn->set_heartbeat_timer_id(heartbeat_timer_id);
}

void on_connection_closed(Connection* conn) {
    // 取消所有与该连接相关的定时器
    timer_wheel.cancel_timer(conn->get_heartbeat_timer_id());
    // ... 其他清理工作
}

六、结论

时间轮算法,尤其是多层时间轮设计,为高性能C++网络框架中的万级并发定时任务管理提供了一种卓越的解决方案。它通过巧妙地将时间离散化和任务分层存储,实现了接近O(1)的定时任务添加、取消和触发效率。结合C++的强大性能和精细控制能力,以及对内存、线程安全和事件循环集成的深入考量,我们可以构建出既高效又健壮的定时器系统,为您的网络服务提供稳定可靠的基石。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注