C++ `std::barrier` (C++20):可重用同步栅的原理与应用

好的,让我们开始这场关于 C++20 std::barrier 的讲座!

C++20 std::barrier:让你的线程们排排坐,吃果果!

各位观众,大家好!今天我们要聊的是 C++20 引入的一个非常实用的同步原语——std::barrier。如果你经常与多线程打交道,并且常常为线程间的同步问题抓耳挠腮,那么 std::barrier 绝对是你的救星!

什么是同步栅栏?(别想歪了,不是那种拦牛羊的)

想象一下,你组织了一场家庭运动会,有赛跑、跳远、拔河等等项目。但是,只有当所有家庭成员都到达了运动场,运动会才能正式开始。std::barrier 就扮演着类似的角色。

简单来说,std::barrier 是一个同步点,它会阻塞一组线程,直到所有线程都到达这个点。一旦所有线程都到达,std::barrier 就会释放所有线程,让它们继续执行。就像发令枪响,大家一起冲向终点!

std::barrier 的基本原理

std::barrier 的工作原理可以用一个简单的计数器来理解。当你创建一个 std::barrier 对象时,你需要指定参与同步的线程数量(也就是所谓的“参与者”)。每当一个线程到达 std::barrier,它就会调用 arrive_and_wait() 方法,这会将内部计数器减 1。当计数器变为 0 时,表示所有线程都已经到达,std::barrier 就会释放所有线程。

std::barrier 的构造函数

std::barrier 的构造函数接受两个参数:

  1. ptrs: 参与者的数量,也就是需要同步的线程数量。
  2. completion_function (可选): 一个在最后一个线程到达时执行的函数或函数对象。这个函数只会被调用一次。
#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  size_t num_threads = 3;

  // 定义一个 completion_function
  auto completion_function = []() {
    std::cout << "所有线程已到达栅栏!" << std::endl;
    // 可以进行一些收尾工作,比如重置状态等
  };

  // 创建一个 barrier 对象
  std::barrier my_barrier(num_threads, completion_function);

  // 创建并启动线程
  std::vector<std::thread> threads;
  for (size_t i = 0; i < num_threads; ++i) {
    threads.emplace_back([&my_barrier, i]() {
      std::cout << "线程 " << i << " 正在努力工作..." << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(i + 1)); // 模拟不同的工作时间
      std::cout << "线程 " << i << " 到达栅栏,等待其他线程..." << std::endl;
      my_barrier.arrive_and_wait(); // 到达栅栏并等待
      std::cout << "线程 " << i << " 已通过栅栏,继续执行..." << std::endl;
    });
  }

  // 等待所有线程完成
  for (auto& thread : threads) {
    thread.join();
  }

  std::cout << "所有线程已完成!" << std::endl;

  return 0;
}

在这个例子中,completion_function 是一个 lambda 表达式,它会在最后一个线程到达栅栏时打印一条消息。

arrive_and_wait() 方法

这是 std::barrier 的核心方法。当一个线程调用 arrive_and_wait() 时,它会通知 std::barrier 自己已经到达,并且会阻塞,直到所有其他线程也到达。

my_barrier.arrive_and_wait();

arrive()drop() 方法

除了 arrive_and_wait()std::barrier 还提供了 arrive()drop() 方法。

  • arrive(): 线程通知 std::barrier 自己已经到达,但不阻塞。通常与 wait() 配合使用。
  • drop(): 线程放弃参与同步。这会将参与者数量减 1。如果参与者数量变为 0,std::barrier 将会释放所有剩余的线程。
#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  size_t num_threads = 3;
  std::barrier my_barrier(num_threads);

  std::vector<std::thread> threads;
  for (size_t i = 0; i < num_threads; ++i) {
    threads.emplace_back([&my_barrier, i, num_threads]() {
      std::cout << "线程 " << i << " 正在努力工作..." << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(i + 1)); // 模拟不同的工作时间

      if (i == 0) {
        std::cout << "线程 " << i << " 放弃参与同步!" << std::endl;
        my_barrier.drop(); // 线程 0 放弃参与同步
      } else {
        std::cout << "线程 " << i << " 到达栅栏!" << std::endl;
        my_barrier.arrive_and_wait(); // 到达栅栏并等待
        std::cout << "线程 " << i << " 已通过栅栏,继续执行..." << std::endl;
      }
    });
  }

  for (auto& thread : threads) {
    thread.join();
  }

  std::cout << "所有线程已完成!" << std::endl;

  return 0;
}

在这个例子中,线程 0 调用了 drop() 方法,放弃了参与同步。因此,只有当线程 1 和线程 2 到达栅栏时,它们才会被释放。

wait() 方法

wait() 方法允许一个线程在调用 arrive() 之后等待所有其他线程到达。它返回一个 std::ptrdiff_t 值,指示当前线程是第几个到达的。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  size_t num_threads = 3;
  std::barrier my_barrier(num_threads);

  std::vector<std::thread> threads;
  for (size_t i = 0; i < num_threads; ++i) {
    threads.emplace_back([&my_barrier, i]() {
      std::cout << "线程 " << i << " 正在努力工作..." << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(i + 1)); // 模拟不同的工作时间
      std::cout << "线程 " << i << " 到达栅栏,准备等待..." << std::endl;
      auto token = my_barrier.arrive(); // 到达栅栏,但不等待
      std::cout << "线程 " << i << " 已经到达,等待其他线程..." << std::endl;
      my_barrier.wait(token); // 等待其他线程
      std::cout << "线程 " << i << " 已通过栅栏,继续执行..." << std::endl;
    });
  }

  for (auto& thread : threads) {
    thread.join();
  }

  std::cout << "所有线程已完成!" << std::endl;

  return 0;
}

std::barrier 的应用场景

std::barrier 在很多多线程应用中都能派上用场。以下是一些常见的例子:

  1. 并行算法: 在并行算法中,通常需要将数据分成多个块,由多个线程并行处理。在每个处理阶段结束时,需要所有线程都完成,才能进行下一个阶段。std::barrier 可以确保所有线程都完成了当前阶段,然后再一起进入下一个阶段。
  2. 游戏开发: 在游戏开发中,可以使用 std::barrier 来同步游戏逻辑、渲染和音频线程。例如,可以先更新所有游戏对象的逻辑,然后使用 std::barrier 等待所有逻辑线程完成,再进行渲染。
  3. 数据分析: 在数据分析中,可以使用 std::barrier 来同步数据加载、数据处理和数据存储线程。例如,可以先并行加载多个数据文件,然后使用 std::barrier 等待所有数据加载线程完成,再进行数据处理。

代码示例:并行计算数组的和

#include <iostream>
#include <thread>
#include <vector>
#include <numeric> // std::accumulate
#include <barrier>

int main() {
  const size_t array_size = 1000000;
  const size_t num_threads = 4;

  // 创建一个包含随机数的数组
  std::vector<int> data(array_size);
  std::generate(data.begin(), data.end(), []() { return rand() % 100; });

  // 计算每个线程处理的块大小
  size_t block_size = array_size / num_threads;

  // 创建一个存储每个线程计算结果的数组
  std::vector<long long> partial_sums(num_threads, 0);

  // 创建一个 barrier 对象
  std::barrier my_barrier(num_threads);

  // 创建并启动线程
  std::vector<std::thread> threads;
  for (size_t i = 0; i < num_threads; ++i) {
    threads.emplace_back([&data, &partial_sums, &my_barrier, i, block_size]() {
      // 计算当前线程处理的起始和结束位置
      size_t start = i * block_size;
      size_t end = (i == num_threads - 1) ? data.size() : start + block_size;

      // 计算当前线程的局部和
      partial_sums[i] = std::accumulate(data.begin() + start, data.begin() + end, 0LL);

      std::cout << "线程 " << i << " 计算的局部和为:" << partial_sums[i] << std::endl;

      // 等待所有线程完成局部和的计算
      my_barrier.arrive_and_wait();

      // 最后一个线程计算总和
      if (i == 0) {
        long long total_sum = std::accumulate(partial_sums.begin(), partial_sums.end(), 0LL);
        std::cout << "数组的总和为:" << total_sum << std::endl;
      }
    });
  }

  // 等待所有线程完成
  for (auto& thread : threads) {
    thread.join();
  }

  return 0;
}

在这个例子中,我们将一个大的数组分成多个块,每个线程计算一个块的局部和。然后,使用 std::barrier 等待所有线程完成局部和的计算。最后,由第一个线程将所有局部和加起来,得到数组的总和。

std::barrier vs. std::latch

C++20 还引入了另一个同步原语——std::latchstd::latch 也很像 std::barrier,但它们之间有一个关键的区别:

  • std::barrier 是可重用的: 一旦所有线程到达 std::barrier,它可以被重置,用于下一轮同步。
  • std::latch 是一次性的: 一旦 std::latch 的计数器变为 0,它就不能再被重置。

因此,如果你需要多次同步一组线程,std::barrier 是更好的选择。如果只需要同步一次,std::latch 也是一个不错的选择,它可能更轻量级。

std::barrier 的一些注意事项

  1. 死锁风险: 如果线程数量与 std::barrier 的参与者数量不匹配,可能会导致死锁。确保所有参与同步的线程都调用了 arrive_and_wait()drop()
  2. 异常处理: 如果在 arrive_and_wait() 期间抛出异常,可能会导致未定义的行为。尽量避免在 arrive_and_wait() 期间抛出异常。
  3. 性能: std::barrier 的性能取决于底层平台的实现。在某些情况下,使用其他同步原语(如条件变量)可能更高效。

总结

std::barrier 是 C++20 中一个非常强大的同步原语,它可以简化多线程程序的编写。通过使用 std::barrier,你可以更容易地同步一组线程,确保它们在特定点上同步执行。掌握 std::barrier 的使用方法,可以帮助你编写更高效、更可靠的多线程程序。

表格总结

特性 std::barrier std::latch
可重用性 可重用 一次性
主要用途 多次同步一组线程 同步一次性事件
方法 arrive_and_wait(), arrive(), drop(), wait() count_down(), wait()
适用场景 并行算法、游戏开发、数据分析等需要多次同步的场景 启动多个线程后等待它们完成初始化等一次性场景

希望今天的讲座对大家有所帮助! 感谢各位的观看!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注