C++中的原子操作(Atomic Operations)实现:了解锁总线与缓存锁定机制

C++ 原子操作:锁总线与缓存锁定机制

大家好,今天我们来深入探讨 C++ 中的原子操作,以及实现原子操作的关键机制:锁总线和缓存锁定。理解这些概念对于编写高效、线程安全的多线程程序至关重要。

什么是原子操作?

原子操作是指不可再分的操作。在多线程环境中,原子操作保证了操作的完整性,即操作要么完全执行,要么完全不执行。不会出现执行到一半被其他线程打断的情况,从而避免了数据竞争和不一致性。

为什么需要原子操作?

考虑一个简单的例子:一个全局变量 count,多个线程同时对其进行自增操作。如果直接使用 count++,实际上包含了三个步骤:

  1. 读取 count 的值。
  2. count 的值加 1。
  3. 将结果写回 count

在多线程环境中,这三个步骤可能会被其他线程打断,导致最终结果错误。例如:

  • 线程 A 读取 count 的值为 5。
  • 线程 B 读取 count 的值为 5。
  • 线程 A 将 count 的值加 1,得到 6,并写回。 count 现在是 6。
  • 线程 B 将 count 的值加 1,得到 6,并写回。 count 现在是 6。

正确的结果应该是 7,但由于数据竞争,最终结果是 6。

原子操作可以保证 count++ 作为一个不可分割的整体执行,避免数据竞争。

C++ 中的原子类型和操作

C++11 引入了 <atomic> 头文件,提供了原子类型和原子操作函数。常用的原子类型包括:

  • atomic_bool
  • atomic_char
  • atomic_schar
  • atomic_uchar
  • atomic_short
  • atomic_ushort
  • atomic_int
  • atomic_uint
  • atomic_long
  • atomic_ulong
  • atomic_llong
  • atomic_ullong
  • atomic<T*> (原子指针)

这些原子类型提供了各种原子操作,例如:

  • load():原子地读取值。
  • store():原子地存储值。
  • exchange():原子地交换值。
  • compare_exchange_weak()/compare_exchange_strong():原子地比较并交换值。
  • fetch_add():原子地加法。
  • fetch_sub():原子地减法。
  • fetch_and():原子地按位与。
  • fetch_or():原子地按位或。
  • fetch_xor():原子地按位异或。

代码示例:原子自增

#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> count(0);

void increment() {
    for (int i = 0; i < 100000; ++i) {
        count++; // 原子自增
        // 等价于 count.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Count: " << count << std::endl; // 期望输出 400000
    return 0;
}

在这个例子中,count 被声明为 std::atomic<int>count++ 操作实际上是原子自增,保证了在多线程环境下的正确性。

原子操作的实现机制:锁总线与缓存锁定

原子操作的底层实现依赖于硬件提供的支持,主要有两种机制:锁总线和缓存锁定。

1. 锁总线 (Bus Locking)

在早期的单核 CPU 中,多个 CPU 通过总线连接到内存。当一个 CPU 需要执行原子操作时,它可以发出一个 LOCK# 信号到总线上。这个信号会阻塞其他 CPU 对内存的访问,从而保证当前 CPU 可以独占地完成原子操作。

工作原理:

  1. CPU 发出 LOCK# 信号。
  2. 总线仲裁器阻止其他 CPU 访问内存。
  3. 当前 CPU 执行原子操作。
  4. CPU 释放 LOCK# 信号。
  5. 总线仲裁器允许其他 CPU 访问内存。

优点: 实现简单,容易理解。

缺点: 性能较低。因为锁总线会导致其他 CPU 无法访问内存,即使它们访问的是不同的内存地址,也会被阻塞,从而降低了系统的并发性。

2. 缓存锁定 (Cache Locking)

为了提高性能,现代多核 CPU 引入了缓存机制。每个 CPU 都有自己的高速缓存,可以更快地访问数据。当一个 CPU 需要执行原子操作时,如果操作的数据已经被缓存在该 CPU 的缓存行中,那么 CPU 可以使用缓存锁定来保证原子性,而不需要锁定总线。

工作原理:

  1. CPU 检查要操作的数据是否被缓存在自己的缓存行中,并且缓存行状态为 Exclusive 或 Modified (MESI 协议)。
  2. 如果数据在缓存行中,CPU 可以直接在缓存行上执行原子操作,并使用 MESI 协议保证缓存一致性。
  3. 如果数据不在缓存行中,或者缓存行状态不是 Exclusive 或 Modified,CPU 仍然需要使用锁总线。

MESI 协议:

MESI (Modified, Exclusive, Shared, Invalid) 是一种常用的缓存一致性协议,用于维护多个 CPU 缓存之间的数据一致性。每个缓存行都有一个 MESI 状态,表示该缓存行数据的有效性和与其他 CPU 缓存的共享情况。

  • Modified (M): 缓存行中的数据已被修改,并且只有当前 CPU 拥有该缓存行。数据与主内存不一致。
  • Exclusive (E): 缓存行中的数据与主内存一致,并且只有当前 CPU 拥有该缓存行。
  • Shared (S): 缓存行中的数据与主内存一致,并且多个 CPU 共享该缓存行。
  • Invalid (I): 缓存行中的数据无效。

当一个 CPU 需要执行原子操作时,它需要保证操作的数据在缓存行中,并且缓存行状态为 Exclusive 或 Modified。如果是 Shared 状态,CPU 需要先通过 MESI 协议将缓存行状态转换为 Exclusive 状态,才能执行原子操作。这个转换过程可能需要与其他 CPU 进行通信,以确保只有一个 CPU 拥有该缓存行的独占访问权。

优点: 性能较高。因为缓存锁定只需要锁定缓存行,而不需要锁定整个总线,从而减少了对其他 CPU 的影响,提高了系统的并发性。

缺点: 实现较为复杂。需要 CPU 硬件和缓存一致性协议的支持。

锁总线与缓存锁定的比较:

特性 锁总线 (Bus Locking) 缓存锁定 (Cache Locking)
锁定范围 整个总线 缓存行
性能 较低 较高
实现复杂度 简单 复杂
适用场景 数据不在缓存中 数据在缓存中

代码示例:使用 compare_exchange_weak 实现自旋锁

自旋锁是一种忙等待锁,当锁被占用时,线程会不断地循环检查锁是否释放,直到获取到锁为止。compare_exchange_weak 可以用于实现自旋锁。

#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

class SpinLock {
private:
    std::atomic_flag flag = ATOMIC_FLAG_INIT; // 初始化为未锁定状态

public:
    void lock() {
        while (flag.test_and_set(std::memory_order_acquire)); // 循环尝试获取锁
    }

    void unlock() {
        flag.clear(std::memory_order_release); // 释放锁
    }
};

SpinLock spinLock;
int shared_data = 0;

void increment() {
    for (int i = 0; i < 100000; ++i) {
        spinLock.lock();
        shared_data++;
        spinLock.unlock();
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Shared Data: " << shared_data << std::endl; // 期望输出 400000
    return 0;
}

在这个例子中,std::atomic_flag 用于表示锁的状态。test_and_set 方法原子地检查 flag 是否被设置,如果未设置,则设置 flag 并返回 false,否则返回 trueclear 方法原子地清除 flag

理解 compare_exchange_weakcompare_exchange_strong

compare_exchange_weakcompare_exchange_strong 都是原子地比较并交换值,但它们在行为上有一些差异。

  • compare_exchange_weak: 即使比较成功,也可能虚假地失败(spurious failure),即比较的值相等,但交换操作仍然失败。这意味着你需要在一个循环中调用 compare_exchange_weak,直到成功为止。
  • compare_exchange_strong: 只有在比较的值不相等时才会失败。

在性能方面,compare_exchange_weak 通常比 compare_exchange_strong 更快,因为它允许硬件在某些情况下优化操作。但是,由于 compare_exchange_weak 可能虚假地失败,你需要在一个循环中调用它,这可能会增加代码的复杂性。

代码示例:使用 compare_exchange_weak 实现原子计数器

#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> atomic_counter(0);

void increment_counter() {
    for (int i = 0; i < 100000; ++i) {
        int expected = atomic_counter.load(std::memory_order_relaxed);
        int desired = expected + 1;
        while (!atomic_counter.compare_exchange_weak(expected, desired, std::memory_order_release, std::memory_order_relaxed)) {
            desired = expected + 1; // Re-calculate desired value based on the new expected
        }
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(increment_counter);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Atomic Counter: " << atomic_counter << std::endl; // 期望输出 400000
    return 0;
}

在这个例子中,我们使用 compare_exchange_weak 来原子地增加计数器。compare_exchange_weak 接受四个参数:

  1. expected:期望的当前值。
  2. desired:期望的新值。
  3. success_memorder:如果比较成功,使用的内存顺序。
  4. failure_memorder:如果比较失败,使用的内存顺序。

如果 atomic_counter 的当前值等于 expected,则将 atomic_counter 的值设置为 desired,并返回 true。否则,将 expected 的值更新为 atomic_counter 的当前值,并返回 false

由于 compare_exchange_weak 可能虚假地失败,我们需要在一个循环中调用它,直到成功为止。

内存顺序 (Memory Ordering)

在多线程编程中,内存顺序是指 CPU 执行内存访问操作的顺序。不同的内存顺序可能会影响程序的正确性和性能。C++11 提供了六种内存顺序:

  • memory_order_relaxed
  • memory_order_consume
  • memory_order_acquire
  • memory_order_release
  • memory_order_acq_rel
  • memory_order_seq_cst

各种内存顺序的含义:

内存顺序 描述
memory_order_relaxed 对该原子操作没有施加任何同步约束。只保证原子性,不保证不同线程之间操作的顺序。
memory_order_consume 如果线程 A 的原子读取操作使用 memory_order_consume,并且线程 B 的原子写入操作释放了该值,那么线程 A 可以安全地使用读取到的值,以及依赖于该值的其他值。 但依赖关系需要小心处理。
memory_order_acquire 如果线程 A 的原子读取操作使用 memory_order_acquire,并且线程 B 的原子写入操作释放了该值,那么线程 A 可以看到线程 B 在释放之前的所有写入操作。
memory_order_release 如果线程 A 的原子写入操作使用 memory_order_release,并且线程 B 的原子读取操作获取了该值,那么线程 B 可以看到线程 A 在释放之后的所有写入操作。
memory_order_acq_rel 同时具有 memory_order_acquirememory_order_release 的特性。通常用于原子读-修改-写操作,例如 fetch_add
memory_order_seq_cst 默认的内存顺序。提供最强的同步保证,保证所有线程看到的操作顺序都是一致的。但性能最低。

选择合适的内存顺序:

选择合适的内存顺序需要在正确性和性能之间进行权衡。

  • 如果只需要保证原子性,而不需要保证不同线程之间操作的顺序,可以使用 memory_order_relaxed
  • 如果需要保证线程之间的同步,可以使用 memory_order_acquirememory_order_releasememory_order_acq_rel
  • 如果需要保证所有线程看到的操作顺序都是一致的,可以使用 memory_order_seq_cst

代码示例:使用不同的内存顺序

#include <iostream>
#include <atomic>
#include <thread>

std::atomic<bool> data_ready(false);
int data = 0;

void producer() {
    data = 42;
    data_ready.store(true, std::memory_order_release); // 释放
}

void consumer() {
    while (!data_ready.load(std::memory_order_acquire)); // 获取
    std::cout << "Data: " << data << std::endl; // 保证 consumer 看到 data 的写入
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    return 0;
}

在这个例子中,producer 线程将 data 的值设置为 42,然后使用 memory_order_release 释放 data_readyconsumer 线程使用 memory_order_acquire 获取 data_ready,并读取 data 的值。通过使用 memory_order_releasememory_order_acquire,我们可以保证 consumer 线程可以看到 producer 线程在释放 data_ready 之前对 data 的写入。

注意事项

  • 原子操作并非万能的。虽然原子操作可以避免数据竞争,但它们并不能解决所有并发问题。例如,死锁仍然可能发生。
  • 过度使用原子操作可能会降低性能。在某些情况下,使用锁可能更有效。
  • 理解内存顺序对于编写正确的并发程序至关重要。

小结

今天我们学习了 C++ 中原子操作的概念、实现机制(锁总线和缓存锁定)以及内存顺序。 理解这些概念对于编写高效、线程安全的多线程程序至关重要。掌握原子操作能让我们在多线程编程中更加得心应手,编写出更健壮、更高效的程序。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注