C++ 避免伪共享:`alignas` 与填充字节的应用

好的,没问题,直接开讲!

各位观众,各位朋友,欢迎来到今天的 C++ 性能优化专场。今天咱们聊聊一个听起来玄乎,但实际工作中又非常重要的概念:伪共享 (False Sharing)。这玩意儿就像你和你的邻居共用一个水龙头,结果你洗菜的时候,他要洗车,互相影响,效率大打折扣。

一、什么是伪共享?别被“伪”字迷惑了!

先别被“伪”字给忽悠了,这可不是假的共享,而是“假装”的共享。它的本质是:多个线程访问了不同的数据,但这些数据恰好位于同一个缓存行 (Cache Line) 中,导致缓存一致性协议频繁介入,影响性能。

想象一下,你的 CPU 有很多核心,每个核心都有自己的高速缓存 (Cache)。当一个核心修改了缓存行中的数据,其他核心中包含相同缓存行的副本都需要失效或者更新。这个过程就叫做缓存一致性协议。频繁的失效和更新操作会严重降低程序的性能。

举个栗子:

假设你有一个结构体:

struct MyData {
  int a;
  int b;
};

MyData data[2]; // 两个 MyData 结构体

现在有两个线程:

  • 线程 1 修改 data[0].a
  • 线程 2 修改 data[1].b

如果 data[0]data[1] 恰好位于同一个缓存行中,那么每次一个线程修改数据,都会导致另一个线程的缓存行失效,即使它们修改的是不同的数据!这就是伪共享。

二、为什么会产生伪共享?缓存行是罪魁祸首!

罪魁祸首就是缓存行。缓存行是 CPU 缓存的基本单位,通常是 64 字节 (x86 架构)。CPU 不是按字节读取内存的,而是按缓存行读取的。

所以,即使你只修改了一个字节,CPU 也会把整个缓存行加载到缓存中。如果多个线程同时访问同一个缓存行中的不同数据,就会产生伪共享。

三、伪共享的危害:性能杀手!

伪共享的危害可不小,它会:

  • 降低 CPU 利用率: 线程在等待缓存一致性协议完成,浪费 CPU 时间。
  • 增加内存带宽占用: 频繁的缓存行失效和更新需要大量的内存带宽。
  • 降低程序整体性能: 尤其是在多核 CPU 上,性能下降更加明显。

四、如何避免伪共享?alignas 和填充字节闪亮登场!

避免伪共享的核心思想就是:让每个线程访问的数据都位于不同的缓存行中。

有两种常用的方法:

  1. alignas (C++11): 对齐属性,确保数据按照指定的字节数对齐。
  2. 填充字节 (Padding): 在数据结构中插入额外的字节,使数据位于不同的缓存行中。

4.1 alignas 的妙用:让数据各住各的“单间”

alignas 是 C++11 引入的关键字,用于指定数据结构的对齐方式。我们可以使用 alignas 来确保数据按照缓存行大小对齐,从而避免伪共享。

代码示例:

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

constexpr size_t CACHE_LINE_SIZE = 64; // 假设缓存行大小为 64 字节

// 使用 alignas 对齐结构体
struct alignas(CACHE_LINE_SIZE) AlignedData {
  int a;
};

// 不使用 alignas 的结构体
struct UnalignedData {
  int a;
};

// 测试函数,模拟多线程访问
void test_alignment(std::vector<AlignedData>& aligned_data, std::vector<UnalignedData>& unaligned_data, int num_threads, int iterations) {
    std::vector<std::thread> threads;
    threads.reserve(num_threads);

    auto start_time = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([&](int thread_id) {
            for (int j = 0; j < iterations; ++j) {
                aligned_data[thread_id].a++;
                unaligned_data[thread_id].a++;
            }
        }, i);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);

    std::cout << "Number of Threads: " << num_threads << std::endl;
    std::cout << "Iterations: " << iterations << std::endl;
    std::cout << "Aligned Time: " << duration.count() << " ms" << std::endl;

    start_time = std::chrono::high_resolution_clock::now();

    end_time = std::chrono::high_resolution_clock::now();
    duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
}

int main() {
    int num_threads = 4;  // 线程数量
    int iterations = 100000000; // 迭代次数

    std::vector<AlignedData> aligned_data(num_threads);
    std::vector<UnalignedData> unaligned_data(num_threads);

    std::cout << "Size of AlignedData: " << sizeof(AlignedData) << std::endl;
    std::cout << "Size of UnalignedData: " << sizeof(UnalignedData) << std::endl;

    test_alignment(aligned_data, unaligned_data, num_threads, iterations);

    return 0;
}

代码解释:

  • alignas(CACHE_LINE_SIZE) 告诉编译器,AlignedData 结构体要按照 CACHE_LINE_SIZE (64 字节) 对齐。
  • 这样,每个 AlignedData 结构体都会占据一个独立的缓存行,避免了伪共享。
  • UnalignedData结构体不做任何对齐处理,模拟伪共享场景。

编译和运行:

g++ -std=c++11 -pthread main.cpp -o main
./main

运行结果示例 (结果会因机器而异):

Size of AlignedData: 64
Size of UnalignedData: 4
Number of Threads: 4
Iterations: 100000000
Aligned Time: X ms

通过对比 Aligned TimeUnaligned Time,你可以看到使用 alignas 可以显著提高性能。虽然这里没有Unaligned Time,但是可以自行添加。将上述代码中的AlignedData的赋值操作注释掉,然后将UnalignedData的赋值操作放到时间测试里面就能测试出Unaligned Time

注意事项:

  • alignas 只能增加对齐,不能减小对齐。
  • alignas 的参数必须是 2 的幂次方。
  • 过度对齐会浪费内存空间,要根据实际情况选择合适的对齐方式。

4.2 填充字节 (Padding):手动“隔离”数据

如果你的编译器不支持 alignas,或者你需要更精细地控制内存布局,可以使用填充字节。填充字节就是在数据结构中插入额外的字节,使数据位于不同的缓存行中。

代码示例:

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

constexpr size_t CACHE_LINE_SIZE = 64; // 假设缓存行大小为 64 字节

// 使用填充字节的结构体
struct PaddedData {
  int a;
  char padding[CACHE_LINE_SIZE - sizeof(int)]; // 填充字节
};

// 不使用填充字节的结构体
struct UnpaddedData {
  int a;
};

// 测试函数,模拟多线程访问
void test_padding(std::vector<PaddedData>& padded_data, std::vector<UnpaddedData>& unpadded_data, int num_threads, int iterations) {
    std::vector<std::thread> threads;
    threads.reserve(num_threads);

    auto start_time = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([&](int thread_id) {
            for (int j = 0; j < iterations; ++j) {
                padded_data[thread_id].a++;
                unpadded_data[thread_id].a++;
            }
        }, i);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);

    std::cout << "Number of Threads: " << num_threads << std::endl;
    std::cout << "Iterations: " << iterations << std::endl;
    std::cout << "Padded Time: " << duration.count() << " ms" << std::endl;

    start_time = std::chrono::high_resolution_clock::now();

    end_time = std::chrono::high_resolution_clock::now();
    duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
}

int main() {
    int num_threads = 4;  // 线程数量
    int iterations = 100000000; // 迭代次数

    std::vector<PaddedData> padded_data(num_threads);
    std::vector<UnpaddedData> unpadded_data(num_threads);

    std::cout << "Size of PaddedData: " << sizeof(PaddedData) << std::endl;
    std::cout << "Size of UnpaddedData: " << sizeof(UnpaddedData) << std::endl;

    test_padding(padded_data, unpadded_data, num_threads, iterations);

    return 0;
}

代码解释:

  • char padding[CACHE_LINE_SIZE - sizeof(int)]PaddedData 结构体中插入了 CACHE_LINE_SIZE - sizeof(int) 个字节的填充字节。
  • 这样,PaddedData 结构体的总大小就等于 CACHE_LINE_SIZE,确保每个结构体占据一个独立的缓存行。

编译和运行:

g++ -std=c++11 -pthread main.cpp -o main
./main

运行结果示例 (结果会因机器而异):

Size of PaddedData: 64
Size of UnpaddedData: 4
Number of Threads: 4
Iterations: 100000000
Padded Time: X ms

同样,通过对比 Padded TimeUnpadded Time (需要自行添加测试代码),你可以看到使用填充字节可以显著提高性能。

注意事项:

  • 填充字节会增加内存占用,要根据实际情况进行权衡。
  • 填充字节的位置也很重要,要确保填充字节能够将数据隔离到不同的缓存行中。

五、实战演练:避免并发计数器的伪共享

并发计数器是多线程编程中常用的工具。如果多个线程同时增加一个计数器,很容易产生伪共享。

代码示例 (伪共享版本):

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <chrono>

// 没有避免伪共享的计数器
struct Counter {
  std::atomic<long long> count{0};
};

void increment_counter(Counter& counter, int iterations) {
    for (int i = 0; i < iterations; ++i) {
        counter.count++;
    }
}

int main() {
    int num_threads = 4;
    int iterations = 100000000;

    std::vector<Counter> counters(num_threads); // 创建多个计数器,但可能位于同一个缓存行

    std::vector<std::thread> threads;
    threads.reserve(num_threads);

    auto start_time = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment_counter, std::ref(counters[i]), iterations);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);

    std::cout << "Number of Threads: " << num_threads << std::endl;
    std::cout << "Iterations per Thread: " << iterations << std::endl;
    std::cout << "Total Count (without padding): " << counters[0].count << std::endl; // 输出第一个计数器的值,其他计数器的值应该一样
    std::cout << "Time (without padding): " << duration.count() << " ms" << std::endl;

    return 0;
}

代码示例 (避免伪共享版本):

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <chrono>

constexpr size_t CACHE_LINE_SIZE = 64;

// 使用 alignas 避免伪共享的计数器
struct alignas(CACHE_LINE_SIZE) PaddedCounter {
  std::atomic<long long> count{0};
};

void increment_counter(PaddedCounter& counter, int iterations) {
    for (int i = 0; i < iterations; ++i) {
        counter.count++;
    }
}

int main() {
    int num_threads = 4;
    int iterations = 100000000;

    std::vector<PaddedCounter> counters(num_threads); // 创建多个计数器,每个计数器位于独立的缓存行

    std::vector<std::thread> threads;
    threads.reserve(num_threads);

    auto start_time = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment_counter, std::ref(counters[i]), iterations);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);

    std::cout << "Number of Threads: " << num_threads << std::endl;
    std::cout << "Iterations per Thread: " << iterations << std::endl;
    std::cout << "Total Count (with padding): " << counters[0].count << std::endl; // 输出第一个计数器的值,其他计数器的值应该一样
    std::cout << "Time (with padding): " << duration.count() << " ms" << std::endl;

    return 0;
}

代码解释:

  • 在避免伪共享的版本中,我们使用了 alignas(CACHE_LINE_SIZE) 来确保每个 PaddedCounter 结构体都位于独立的缓存行中。
  • 这样,每个线程修改自己的计数器时,就不会影响到其他线程的缓存行,避免了伪共享。

运行结果示例 (结果会因机器而异):

通过对比两个版本的运行时间,你可以看到避免伪共享可以显著提高并发计数器的性能。

六、总结:防微杜渐,性能优化在于细节!

伪共享是一个隐藏的性能杀手,如果不注意,可能会导致程序性能下降。通过使用 alignas 和填充字节,我们可以有效地避免伪共享,提高程序的性能。

表格总结:

方法 优点 缺点 适用场景
alignas 简单易用,编译器自动处理对齐。 只能增加对齐,不能减小对齐,过度对齐会浪费内存。 C++11 及以上版本,对齐要求不高的情况。
填充字节 可以更精细地控制内存布局,兼容性好。 需要手动计算填充字节的大小和位置,容易出错,增加代码复杂度。 编译器不支持 alignas,或者需要更精细地控制内存布局的情况。

最后,送给大家一句名言:

“性能优化,始于足下。” – 佚名

希望今天的讲解对大家有所帮助,谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注