C++ `std::latch` 和 `std::barrier` (C++20):实现复杂的并发同步模式

哈喽,各位好!今天我们来聊聊C++20里两位并发界的新秀:std::latchstd::barrier。这两位可不是什么泛泛之辈,它们能帮你实现一些相当复杂的并发同步模式,让你的多线程程序不再像一团乱麻,而是井井有条。

Part 1: 为什么我们需要 std::latchstd::barrier?

在并发编程的世界里,线程之间的同步一直是个让人头疼的问题。传统的 std::mutexstd::condition_variable 等工具虽然强大,但用起来就像开着坦克去菜市场,有点大材小用,而且容易出错。

比如,你想让多个线程都完成初始化之后,再一起开始执行核心任务,或者你想让多个线程在一个计算循环的每个阶段都同步一下。用传统的工具也能实现,但代码会变得非常复杂,而且容易出现死锁、活锁等问题。

std::latchstd::barrier 的出现,就是为了解决这些问题。它们提供了一种更简单、更安全的方式来实现特定的同步模式。可以把它们想象成线程世界的门卫,负责控制线程的进出。

Part 2: std::latch: 一次性倒计时门卫

std::latch 就像一个一次性的倒计时门卫。它有一个初始计数器,当计数器减到零时,所有等待的线程都会被释放。一旦计数器归零,std::latch 就失效了,不能再次使用。

2.1 std::latch 的基本用法

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

int main() {
  std::latch start_latch(3); // 初始化计数器为3

  auto worker = [&](int id) {
    std::cout << "线程 " << id << " 准备就绪...n";
    start_latch.count_down(); // 计数器减1
    start_latch.wait();       // 等待计数器归零
    std::cout << "线程 " << id << " 开始执行...n";
  };

  std::vector<std::thread> threads;
  for (int i = 0; i < 3; ++i) {
    threads.emplace_back(worker, i);
  }

  // 等待所有线程准备就绪
  std::cout << "主线程等待所有工作线程准备就绪...n";

  for (auto& t : threads) {
    t.join();
  }

  std::cout << "所有线程执行完毕!n";
  return 0;
}

在这个例子中,我们创建了一个 std::latch,初始计数器为3。每个工作线程在准备就绪后,调用 count_down() 方法将计数器减1,然后调用 wait() 方法等待计数器归零。主线程什么都不做,只是等待所有工作线程完成。只有当所有工作线程都调用了 count_down() 方法后,计数器才会归零,所有等待的线程才会被释放,开始执行核心任务。

2.2 std::latch 的成员函数

  • latch(int count): 构造函数,初始化计数器为 count
  • count_down(): 计数器减1。
  • wait(): 等待计数器归零。如果计数器已经为零,则立即返回。
  • try_wait(): 非阻塞地检查计数器是否为零。如果为零,则返回 true,否则返回 false
  • count_down_and_wait(): 计数器减1,然后等待计数器归零。这个方法可以简化一些代码,避免重复调用 count_down()wait()

2.3 std::latch 的应用场景

  • 线程初始化同步: 确保所有线程都完成初始化后,再开始执行核心任务。就像上面那个例子。
  • 测试框架: 在单元测试中,可以使用 std::latch 来等待所有测试用例都执行完毕。
  • 资源加载: 等待所有资源加载完毕后,再启动应用程序。

Part 3: std::barrier: 可重复使用的同步点

std::barrier 就像一个可重复使用的同步点。它也有一个计数器,但与 std::latch 不同的是,当所有等待的线程都被释放后,std::barrier 会自动重置计数器,可以再次使用。

3.1 std::barrier 的基本用法

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  std::barrier barrier(3); // 初始化计数器为3,并设置完成函数

  auto worker = [&](int id) {
    for (int i = 0; i < 3; ++i) {
      std::cout << "线程 " << id << " 在第 " << i + 1 << " 轮计算前...n";
      barrier.arrive_and_wait(); // 等待所有线程到达同步点
      std::cout << "线程 " << id << " 在第 " << i + 1 << " 轮计算后...n";
    }
  };

  std::vector<std::thread> threads;
  for (int i = 0; i < 3; ++i) {
    threads.emplace_back(worker, i);
  }

  for (auto& t : threads) {
    t.join();
  }

  std::cout << "所有线程执行完毕!n";
  return 0;
}

在这个例子中,我们创建了一个 std::barrier,初始计数器为3。每个工作线程在一个循环中执行计算,并在每次循环开始前调用 arrive_and_wait() 方法等待所有线程到达同步点。当所有线程都到达同步点后,std::barrier 会释放所有等待的线程,并自动重置计数器,以便下一次循环使用。

3.2 std::barrier 的成员函数

  • barrier(int count): 构造函数,初始化计数器为 count
  • barrier(int count, CompletionFunction completion): 构造函数,初始化计数器为 count,并设置完成函数。完成函数会在所有线程到达同步点后,但在释放线程之前执行。
  • arrive_and_wait(): 到达同步点并等待。
  • arrive(): 到达同步点,但不等待。这个方法会减少计数器,并返回一个 arrival_token
  • wait(arrival_token token): 等待与指定 arrival_token 关联的同步点。通常与 arrive() 配合使用。
  • arrive_and_drop(): 到达同步点,并将计数器减1。如果计数器变为零,则不重置计数器,相当于线程退出了同步。
  • get_parties(): 返回参与同步的线程数量(初始计数器值)。

3.3 std::barrier 的完成函数

std::barrier 允许你设置一个完成函数,这个函数会在所有线程到达同步点后,但在释放线程之前执行。完成函数可以用来执行一些需要在所有线程都到达同步点后才能执行的操作,例如更新共享状态、进行统计等。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  int shared_data = 0;
  auto completion_function = [&]() {
    shared_data++;
    std::cout << "完成函数执行,shared_data = " << shared_data << "n";
    return false; // 返回 false 表示继续使用 barrier
  };

  std::barrier barrier(3, completion_function);

  auto worker = [&](int id) {
    for (int i = 0; i < 3; ++i) {
      std::cout << "线程 " << id << " 在第 " << i + 1 << " 轮计算前...n";
      barrier.arrive_and_wait();
      std::cout << "线程 " << id << " 在第 " << i + 1 << " 轮计算后...n";
    }
  };

  std::vector<std::thread> threads;
  for (int i = 0; i < 3; ++i) {
    threads.emplace_back(worker, i);
  }

  for (auto& t : threads) {
    t.join();
  }

  std::cout << "所有线程执行完毕!n";
  return 0;
}

在这个例子中,我们设置了一个完成函数,它会在每次所有线程到达同步点后,将 shared_data 的值加1。完成函数返回 false 表示继续使用 barrier,如果返回 true,则所有等待的线程会被抛出一个 std::broken_promise 异常,并且 barrier 进入 broken 状态。

3.4 std::barrier 的应用场景

  • 迭代计算: 在迭代计算中,可以使用 std::barrier 来同步每个迭代步骤。
  • 并行算法: 在并行算法中,可以使用 std::barrier 来同步不同的并行阶段。
  • 游戏开发: 在游戏开发中,可以使用 std::barrier 来同步不同的游戏逻辑,例如物理引擎、AI、渲染等。

Part 4: std::latch vs std::barrier:选择哪个门卫?

特性 std::latch std::barrier
用途 一次性同步 可重复使用的同步
计数器重置 不重置 自动重置
完成函数 不支持 支持
适用场景 线程初始化、一次性事件同步 迭代计算、并行算法、多阶段同步
broken状态 不支持 支持,当completion function 返回true 或抛出异常时进入
线程退出同步 线程无法退出同步 支持,使用 arrive_and_drop()

简单来说:

  • 如果你只需要同步一次,就用 std::latch
  • 如果你需要多次同步,就用 std::barrier

Part 5: 高级用法和注意事项

5.1 arrive()wait(arrival_token)

arrive() 方法不会阻塞线程,它只是将计数器减1,并返回一个 arrival_token。你可以使用 wait(arrival_token) 方法来等待与指定 arrival_token 关联的同步点。这种方式可以让你在到达同步点后,先执行一些不需要同步的操作,然后再等待其他线程到达同步点。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  std::barrier barrier(3);

  auto worker = [&](int id) {
    std::cout << "线程 " << id << " 到达同步点前...n";
    auto token = barrier.arrive();
    std::cout << "线程 " << id << " 到达同步点后,等待前...n";
    // 执行一些不需要同步的操作
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * id));
    barrier.wait(token);
    std::cout << "线程 " << id << " 完成等待...n";
  };

  std::vector<std::thread> threads;
  for (int i = 0; i < 3; ++i) {
    threads.emplace_back(worker, i);
  }

  for (auto& t : threads) {
    t.join();
  }

  std::cout << "所有线程执行完毕!n";
  return 0;
}

5.2 arrive_and_drop()

arrive_and_drop() 方法允许线程退出同步。当线程调用 arrive_and_drop() 方法后,计数器会减1,但如果计数器变为零,则不会重置计数器,这意味着该线程不再参与后续的同步。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

int main() {
  std::barrier barrier(3);

  auto worker = [&](int id) {
    for (int i = 0; i < 3; ++i) {
      std::cout << "线程 " << id << " 在第 " << i + 1 << " 轮计算前...n";
      if (i == 1 && id == 0) {
          std::cout << "线程 " << id << " 退出同步 n";
          barrier.arrive_and_drop();
          return;
      }
      barrier.arrive_and_wait();
      std::cout << "线程 " << id << " 在第 " << i + 1 << " 轮计算后...n";
    }
  };

  std::vector<std::thread> threads;
  for (int i = 0; i < 3; ++i) {
    threads.emplace_back(worker, i);
  }

  for (auto& t : threads) {
    t.join();
  }

  std::cout << "所有线程执行完毕!n";
  return 0;
}

5.3 异常处理

在使用 std::barrier 时,需要注意异常处理。如果在完成函数中抛出异常,或者完成函数返回 true,则所有等待的线程会被抛出一个 std::broken_promise 异常。你需要捕获这个异常,并进行适当的处理。

5.4 性能考量

std::latchstd::barrier 的性能通常比传统的 std::mutexstd::condition_variable 更好,因为它们是专门为特定的同步模式设计的。但是,在高并发的情况下,仍然需要注意性能问题。尽量避免在同步点执行耗时的操作,并合理调整线程数量,以获得最佳的性能。

Part 6: 总结

std::latchstd::barrier 是C++20中非常有用的并发工具,它们可以帮助你更简单、更安全地实现复杂的并发同步模式。选择哪个工具取决于你的具体需求。

  • std::latch 适用于一次性同步,例如线程初始化。
  • std::barrier 适用于多次同步,例如迭代计算。

希望今天的讲解能够帮助你更好地理解和使用这两个强大的工具,让你的多线程程序更加健壮和高效!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注