C++ 多线程同步condition_variable的用法

一 介绍

多线程并发有两大需求:互斥和同步(等待-触发)。互斥是因为线程间存在共享数据,等待-触发是因为线程间存在依赖关系。互斥最常见,一般通过互斥锁unique_lock(mutex)形式实现。同步,由于线程间需要按照预定的先后次序顺序进行,就要用到condition_variable和condition_variable_any。互斥锁条件变量中也要用到,互斥锁是条件变量的应用前提,必须先学会基础的unique_lock(mutex)互斥锁应用。

condition_variable和condition_variable_any基础上,就可以实现事件(event)模式了,事件模式要比轮询模式效率高,在优秀的多线程框架和应用中事件模式是首选。

二 condition_variable的使用

condition_variable是一个能够阻止调用线程直到被通知恢复的对象。condition_variable的成员方法除了构造和析构函数外,区分了两大类:wait(阻塞)类和notify(唤醒)类。本文中wait代表阻塞之意,notify代表唤醒之意,后面加函数或小括号才是指某个具体的函数。wait类函数有wait(),wait_for(),wait_until()三个函数,notify类函数有:notify_one()和notify_all()两个函数。

wait 阻塞函数
wait
阻塞等待直到被notified唤醒
wait_for
阻塞等待直到超时唤醒或被notified唤醒
wait_until
阻塞等待直到截止时间或被notified唤醒

Notify 唤醒函数
notify_one
随机唤醒一个被wait阻塞的线程
notify_all
唤醒全部被wait阻塞的线程

当调用线程的一个wait函数时,它使用unique_lock(通过mutex)来锁定线程。该线程保持阻塞状态,直到被另一个调用同一条件变量对象的notify函数的线程唤醒。
condition_variable类型的对象总是使用unique_lock<mutex>来等待:有关其他锁类型的替代方法,请参考第四点condition_variable_any,condition_variable_any支持其他类型的锁。

1 wait函数,有两种函数形式

void wait(unique_lock<mutex>&lck); //当前线程的执行会被阻塞,直到收到 notify 为止。
void wait(unique_lock<mutex>&lck,Predicate pred); //当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞。为了避免虚假唤醒,所以有pred来看确认是否真的被唤醒

当前线程调用wait(unique_lock<mutex>&lck)函数时,线程被阻塞,直到收到notify唤醒通知。注意:在调用wait(unique_lock<mutex>&lck)函数时,函数会自动调用lck.unlock()释放mutex,从而允许其他锁定同一mutex的某一线程继续后续执行。这是condition_variable学习的重点,不理解这点,则不能理解condition_variable的使用。
阻塞函数一旦收到唤醒通知(由其他线程明确notify),该函数将取消阻塞并调用lck.lock(),使lck锁定mutex,线程继续独占式执行后续工作。
通常,在另一个线程中调用成员notify_one或成员notify_all来通知阻塞函数唤醒。但某些实现可能会产生虚假的唤醒调用,而不调用这些函数中的任何一个。因此,官方建议使用该功能的用户应确保满足其恢复条件。虚假唤醒的情形,作者目前还没有遇到。

2 wait_for函数,有两种函数形式

wait_for(unique_lock<mutex>&lck, const chrono::duration<Rep,Period>& rel_time);
wait_for(unique_lock<mutex>&lck,const chrono::duration<Rep,Period>& rel_time, Predicate pred);

当前线程调用wait_for(lck,rel_time)函数在rel_time期间被阻止,或者直到被notify唤醒(如果notify先发生)。在阻塞线程时,wait_for(lck,rel_time)函数会自动调用lck.unlock(),允许其他锁定的线程继续。
阻塞线程一旦收到唤醒通知(由其他线程明确notify)或者超时唤醒,wait_for(lck,rel_time)函数将取消阻塞并调用lck.lock(),使lck锁定mutex,线程继续独占式执行后续工作。
wait_for(lck,rel_time,pred)wait_for(lck,rel_time)基础上增加了pred参数,当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞。为了避免虚假唤醒,所以有pred来看确认是否真的被唤醒

3 wait_until函数,有两种函数形式

wait_until(unique_lock<mutex>&lck,const chrono::time_point<Clock,Duration>& abs_time);
wait_until(unique_lock<mutex>&lck,const chrono::time_point<Clock,Duration>& abs_time,Predicatepred);

与wait_for 类似,wait_until(lck,abs_time)可以指定一个截止时间点,在当前线程收到notify通知或者到达指定的截止时间点 abs_time超时之前,该线程都会处于阻塞状态。
阻塞线程一旦收到唤醒通知(由其他线程明确notify)或者超时唤醒,wait_until (lck, abs_time)函数将取消阻塞并调用lck.lock(),使lck锁定mutex,线程继续独占式执行后续工作。
wait_until(lck,abs_time,pred)wait_until(lck,abs_time)基础上增加了pred参数,当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞。

4 notify_one

void notify_one() noexcept;

notify_one()唤醒相同condition_variable阻塞的一个线程。如果有多个线程,则未指定选择哪个线程,唤醒的线程是随机的。如果没有线程在等待,则函数不执行任何操作。

5 notify_all

void notify_all() noexcept;

notify_all()唤醒相同condition_variable阻塞的所有线程。如果没有线程在等待,则函数不执行任何操作。

三 condition_variable应用示例

1 condition_variable::wait(with predicate)示例

//condition_variable::wait (with predicate)
#include <iostream>           // std::cout
#include <thread>             // std::thread, std::this_thread::yield
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available() {return cargo!=0;}

void consume (int n) {
  for (int i=0; i<n; ++i) {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck,shipment_available);//shipment_available为 false时阻塞;为true时,不阻塞。
    // consume:
    std::cout << cargo << '\n';
    cargo=0;
  }
}

int main ()
{
  std::thread consumer_thread (consume,10);

  // produce 10 items when needed:
  for (int i=0; i<10; ++i) {
    while (shipment_available()) {
      std::this_thread::yield(); //为true时,则将本时间片让渡给其他线程,主线程继续执行while循环。为false时,退出循环,执行下面代码。
    }
    std::unique_lock<std::mutex> lck(mtx);
    cargo = i+1;
    cv.notify_one();
  }

  consumer_thread.join();

  return 0;
}

看懂这段代码的前提条件:n-2次提醒
当前线程调用wait(unique_lock<mutex>&lck)函数时,线程被阻塞,直到收到notify唤醒通知。注意:在调用wait(unique_lock<mutex>&lck)函数时,函数会自动调用lck.unlock()释放mutex,从而允许其他锁定同一mutex的某一线程继续后续执行。这是condition_variable学习的重点,不理解这点,则不能理解condition_variable的使用。
阻塞函数一旦收到唤醒通知(由其他线程明确notify),该函数将取消阻塞并调用lck.lock(),使lck锁定mutex,线程继续独占式执行后续工作。

程序输出:

1
2
3
4
5
6
7
8
9
10

2 condition_variable::wait_for示例

//condition_variable::wait_for example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status

std::condition_variable cv;

int value;

void read_value() {
  std::cin >> value;
  cv.notify_one();
}

int main ()
{
  std::cout << "Please, enter an integer (I'll be printing dots): \n";
  std::thread th (read_value);

  std::mutex mtx;
  std::unique_lock<std::mutex> lck(mtx);
  while (cv.wait_for(lck,std::chrono::seconds(1)) == std::cv_status::timeout) { //每超时一秒则打印一个'.',并且超时后会继续循环,直到被notify唤醒才退出循环
    std::cout << '.' << std::endl;
  }
  std::cout << "You entered: " << value << '\n';

  th.join();

  return 0;
}

看懂这段代码的前提条件:n-1次提醒
当前线程调用wait_for(lck,rel_time)函数在rel_time期间被阻止,或者直到被notify唤醒(如果notify先发生)。在阻塞线程时,wait_for(lck,rel_time)函数会自动调用lck.unlock(),允许其他锁定的线程继续。
阻塞线程一旦收到唤醒通知(由其他线程明确notify)或者超时唤醒,wait_for(lck,rel_time)函数将取消阻塞并调用lck.lock(),使lck锁定mutex,线程继续独占式执行后续工作。
程序可能的输出(程序运行后,等待输入时间越长,打印的’.’越多):

Please, enter an integer (I'll be priniting dots):
.
.
7
You entered: 7

3 condition_variable::wait(without predicate)示例

//condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << '\n';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

看懂这段代码的前提条件:n次提醒
当前线程调用wait(unique_lock<mutex>&lck)函数时,线程被阻塞,直到收到notify唤醒通知。注意:在调用wait(unique_lock<mutex>&lck)函数时,函数会自动调用lck.unlock()释放mutex,从而允许其他锁定同一mutex的某一线程继续后续执行。这是condition_variable学习的重点,不理解这点,则不能理解condition_variable的使用。
阻塞函数一旦收到唤醒通知(由其他线程明确notify),该函数将取消阻塞并调用lck.lock(),使lck锁定mutex,线程继续独占式执行后续工作。
可能的输出(线程执行顺序是变化的):

10 threads ready to race...
thread 2
thread 0
thread 9
thread 4
thread 6
thread 8
thread 7
thread 5
thread 3
thread 1

四 扩展condition_variable_any用法

condition_variable_any用法与condition_variable基本相同,只是它的等待函数可以采用任何可锁定类型(mutex 类型,例如std::mutex)直接作为参数,condition_variable对象只能采用unique_lock<mutex>。除此之外,它们的用法是相同的。

三、1的例子用condition_variable_any实现

// condition_variable_any::wait (with predicate)
#include <iostream>           // std::cout
#include <thread>             // std::thread, std::this_thread::yield
#include <mutex>              // std::mutex
#include <condition_variable> // std::condition_variable_any

std::mutex mtx;
std::condition_variable_any cv;

int cargo = 0;
bool shipment_available() {return cargo!=0;}

void consume (int n) {
  for (int i=0; i<n; ++i) {
    mtx.lock();
    cv.wait(mtx,shipment_available);
    // consume:
    std::cout << cargo << '\n';
    cargo=0;
    mtx.unlock();
  }
}

int main ()
{
  std::thread consumer_thread (consume,10);

  // produce 10 items when needed:
  for (int i=0; i<10; ++i) {
    while (shipment_available()) {
      std::this_thread::yield();
    }
    mtx.lock();
    cargo = i+1;
    cv.notify_one();
    mtx.unlock();
  }

  consumer_thread.join();

  return 0;
}

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

Contents
滚动至顶部