跳至主要內容

C++11线程间的同步通信(生产者-消费者模型)

张威大约 8 分钟c/c++c++11线程

C++11线程间的同步通信(生产者-消费者模型)

1、多线程编程两个问题

1.1、线程间的互斥

竞态条件: 多线程执行的结果出现不一致的情况,由于CPU对线程不同的调用顺序,而产生不同的运行结果。

发生竞态条件的代码段,称为临界区代码段(只有一个线程可以进来),保证临界区代码段原子操作

C++11的mutex底层实现:

使用strace ./a.out跟踪代码,使用C++11提供的mutex,Linux底层使用的也是自己的pthread_mutex互斥锁。

1.2、线程间的同步通信

  • 线程间不通信的话,每个线程受CPU的调度,没有任何执行上的顺序可言,线程1和线程2是根据CPU调度算法来的,两个线程都有可能先运行,是不确定的,线程间的运行顺序是不确定的;
  • 所以多线程程序出问题,难以复现,因为谁也不知道当时线程执行的先后顺序,我们一般可以得到每个线程的线程栈信息来分析是否发生的问题之类的
  • 我们要保证线程间的运行顺序

通信就是

  • 线程1和线程2一起运行,线程2要做的事情必须先依赖于线程1完成部分的事情,然后告诉线程2这部分东西做好了,线程2就可以继续向下执行了。
  • 或者是线程1接下来要做某些操作,这些操作需要线程2把另外一部分事情做完,然后通知一下线程1它做完了,然后线程1才能做这些操作。

2、生产者-消费者线程模型

如果直接使用queue,使用queue的push和pop操作时,会涉及线程安全问题。我们直接在queue的基础上,直接将其封装成线程安全的queue。 注意:线程函数代码和后面的main函数代码都是不会变的

//这里模拟生产者生产10个物品,消费者消费10个物品
void producer(Queue* que)//生产者线程
{
	for (int i = 1; i <= 10; ++i)
	{
		que->put(i);
		std::this_thread::sleep_for(std::chrono::milliseconds(100));//睡眠100毫秒 
	}
}
void consumer(Queue* que)//消费者线程
{
	for (int i = 1; i <= 10; ++i)
	{
		que->get();
		std::this_thread::sleep_for(std::chrono::milliseconds(100));//睡眠100毫秒 
	}
}
int main()
{
	Queue que;	//两个线程共享的队列 

	std::thread t1(producer, &que);//开启生产者线程 
	std::thread t2(consumer, &que);//开启消费者线程 

	//主线程等待两个子线程都执行完再结束。
	t1.join(); 
	t2.join();

	return 0;
}
  • 使用lock_guard,不用直接使用互斥锁的lock和unlock方法,通过栈上的对象构造和出作用域析构,来自动调用互斥锁的lock和unlock方法;
std::mutex mtx; //自定义互斥锁,做线程间的互斥操作

//生产者生产一个物品,通知消费者消费一个;消费完了,消费者再通知生产者继续生产物品
class Queue {
public:
    void put(int val) {	//生产物品
        lock_guard<std::mutex> guard(mtx);//相当于scoped_ptr
        que.push(val);
        cout << "生产者 生产:" << val << "号物品" << endl;
    }
    
    int get() {	//消费物品
        lock_guard<std::mutex> guard(mtx);	//相当于scoped_ptr
        int val = que.front();
        que.pop();
        cout << "消费者 消费" << val << "号物品" << endl;
        
        return val;
    }
    
private:
    queue<int> que;
};

封装的queue代码,为什么每次出错都是不一样的?

  • ;(需要有线程间的通信机制)

我们需要:生产者生产一个物品,通知消费者消费一个;消费完了,消费者再通知生产者继续生产物品

条件变量: 可以精确做线程间的同步通信;

信号量也可以做,但是做不到生产一个消费一个,这么精确。

注意: 因为lock_gard将拷贝构造和赋值重载都delete了实参到形参是一个的过程,传不进来

#include <iostream>
#include <thread>//多线程的头文件 
#include <mutex>//互斥锁的头文件 
#include <condition_variable>//条件变量的头文件 
#include <queue>//C++ STL所有的容器都不是线程安全
using namespace std;

std::mutex mtx;//定义互斥锁,做线程间的互斥操作
std::condition_variable cv;//定义条件变量,做线程间的同步通信操作

//生产者生产一个物品,通知消费者消费一个;消费完了,消费者再通知生产者继续生产物品
class Queue  
{
public:
	void put(int val)//生产物品
	{
		unique_lock<std::mutex> lck(mtx);//unique_ptr
		while (!que.empty())
		{
			//que不为空,生产者应该通知消费者去消费,消费者消费完了,生产者再继续生产
			//生产者线程进入#1等待状态,并且#2把mtx互斥锁释放掉
			cv.wait(lck);//传入一个互斥锁,当前线程挂起,处于等待状态,并且释放当前锁 lck.lock()  lck.unlock
		}
		que.push(val);
		/*
		notify_one:通知唤醒另外的一个线程的
		notify_all:通知唤醒其它所有线程的
		通知其它所有的线程,我生产了一个物品,你们赶紧消费吧
		其它线程得到该通知,就会从等待状态 =》 到阻塞状态 =》 但是要获取互斥锁才能继续向下执行
		*/
		cv.notify_all();
		cout << "生产者 生产:" << val << "号物品" << endl;
	}
	int get()//消费物品
	{
		//lock_guard<std::mutex> guard(mtx);//相当于scoped_ptr
		unique_lock<std::mutex> lck(mtx);//相当于unique_ptr 更安全 
		while (que.empty())
		{
			//消费者线程发现que是空的,通知生产者线程先生产物品
			//#1 挂起,进入等待状态 #2 把互斥锁mutex释放
			cv.wait(lck);
		}//如果其他线程执行notify了,当前线程就会从等待状态 =》到阻塞状态 =》但是要获取互斥锁才能继续向下执行 
		int val = que.front();
		que.pop();
		cv.notify_all();//通知其它线程我消费完了,赶紧生产吧
		cout << "消费者 消费:" << val << "号物品" << endl;
		return val;
	}
private:
	queue<int> que;
};

//这里模拟生产者生产10个物品,消费者消费10个物品
void producer(Queue* que)//生产者线程
{
	for (int i = 1; i <= 10; ++i)
	{
		que->put(i);
		std::this_thread::sleep_for(std::chrono::milliseconds(100));//睡眠100毫秒 
	}
}
void consumer(Queue* que)//消费者线程
{
	for (int i = 1; i <= 10; ++i)
	{
		que->get();
		std::this_thread::sleep_for(std::chrono::milliseconds(100));//睡眠100毫秒 
	}
}
int main()
{
	Queue que;	//两个线程共享的队列 

	std::thread t1(producer, &que);//开启生产者线程 
	std::thread t2(consumer, &que);//开启消费者线程 

	//主线程等待两个子线程都执行完再结束。
	t1.join(); 
	t2.join();

	return 0;
}

3、lock_gard和unique_lock

lock_gard和unique_lock可以看成unique_ptr和scope_ptr之间的关系,lock_gard和unique_lock做的事情是一样的,

lock_gard源码:

unique_lock源码:

image-20240427160328925
image-20240427160328925

4、流程分析

  • 首先消费者线程拿到互斥锁,生产者线程没有拿到互斥锁,生产者线程处于阻塞状态;

  • 消费者线程发现que队列是空的,进入while循环,进入的等待状态,将互斥锁释放(都是由条件变量控制的);

  • 此时生产者线程拿到互斥锁,不进入while循环,生产对象放入que队列中,notify_all通知其他线程,也就是消费者线程,由等待状态进入阻塞状态, 当生产者线程函数完之后,出了函数作用域,将mutex互斥锁释放了,消费者线程拿到锁了,阻塞状态变为运行状态,继续向下执行;

  • 消费者线程消费完之后,继续通知生产者,我消费完了。