Linux多线程(3)

线程安全的考虑关乎互斥锁和条件变量

消费者与生产者模型

当我们去超市买东西时,我们都认为我们是消费者,那些生产商品的都被认为是生产者。那么超市就被当作交易市场。

这对比的我们的软件开发过程中:代码的某个模块负责生产数据,但是生产出来的数据不得不交给另一模块来对其进行处理,在这之间我们必须要有一个类似上述超市的东西来存储数据,这就抽象成了生产者与消费者模型

其中,产生数据的模块,就形象地称为生产者处理数据的模块,就形象的称为消费者。生产者和消费者之间的中介就叫做缓冲区

消费者与生产者模型的理解

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中取,阻塞队列就相当于一个缓冲区,平衡了它们二者之间的处理能力。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

参考博客:https://blog.csdn.net/xiaochendefendoushi/article/details/81160260

模型实现

消费者与生产者的关系

生产者与生产者之间应该具有互斥关系

消费者与消费者之间应该具有互斥关系

生产者与消费者之间应该有同步+互斥的关系

可以总的来说:一个场所,两个角色,三种关系

消费者与生产者模型的优点

解耦和,支持忙闲不均,支持并发

消费者与生产者模型的代码Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
class BlockQueue
{
public:
BlockQueue(int cap = 10):_capacity(cap){
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_productor, NULL);
pthread_cond_init(&_cond_consumer, NULL);
}
~BlockQueue(){
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_productor);
pthread_cond_destroy(&_cond_consumer);
}
bool QueuePush(int data) {
QueueLock(); //加锁
while (QueueIsFull()) { //队列满了
ProductorWait(); //生产者休眠
}
_queue.push(data);
ConsumerWakeUp(); //唤醒消费者
QueueUnLock(); //解锁
return true;
}
bool QueuePop(int *data) {
QueueLock(); //加锁
while (QueueIsEmpty()) { //队列空
ConsumerWait(); //消费者休眠
}
*data = _queue.front();
_queue.pop();
ProductorWakeUp(); //唤醒生产者
QueueUnLock(); //解锁
return true;
}
private:
void QueueLock() {
pthread_mutex_lock(&_mutex);
}
void QueueUnLock() {
pthread_mutex_unlock(&_mutex);
}
void ProductorWait(){
pthread_cond_wait(&_cond_productor, &_mutex);
}
void ProductorWakeUp() {
pthread_cond_signal(&_cond_productor);
}
void ConsumerWait(){
pthread_cond_wait(&_cond_consumer, &_mutex);
}
void ConsumerWakeUp(){
pthread_cond_signal(&_cond_consumer);
}
bool QueueIsFull(){
return (_queue.size() == _capacity);
}
bool QueueIsEmpty(){
return _queue.empty();
}
private:
std::queue<int> _queue;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _cond_productor;
pthread_cond_t _cond_consumer;
};

void* thr_consumer (void *arg)
{
BlockQueue *q = (BlockQueue*)arg;
while(1) {
int data;
sleep(1);
q->QueuePop(&data);
std::cout<<"consumer get data:"<<data<<std::endl;
}
return NULL;
}
void* thr_productor(void *arg)
{
int i = 0;
BlockQueue *q = (BlockQueue*)arg;
while(1) {
sleep(1);
std::cout<<"productor put data:"<<i<<std::endl;
q->QueuePush(i++);
}
return NULL;
}
int main (int argc, char *argv[])
{
pthread_t ctid[4], ptid[4];
BlockQueue q;
int ret;

for(int i = 0; i < 4; i++) {
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&q);
if (ret != 0) {
std::cout<<"pthread create error\n";
}
}
for(int i = 0; i < 4; i++) {
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&q);
if (ret != 0) {
std::cout<<"pthread create error\n";
}
}
for(int i = 0; i < 4; i++) {
pthread_join(ctid[i], NULL);
}
for(int i = 0; i < 4; i++) {
pthread_join(ptid[i], NULL);
}
return 0;
}

该代码实现了消费者与生产者之间互相处理数据的过程。

当我在线程函数中添加了sleep函数,这样可以看到,在线程中生产者往队列中生产了数据之后,消费者从队列中取出数据然后处理数据。在类中处理数据的过程用锁将其保护。互相不会干扰到

线程1

信号量

信号量的定义

百度百科定义:是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。

信号量的理解

信号量=计数器+等待队列+等待+唤醒

功能:实现线程/进程间的互斥与同步。

计数器就是判断的条件:当计数只用0/1的时候那么就可以实现互斥了

等待队列+等待+唤醒这是实现同步的基本功能

信号量的接口

信号量的原语可以理解为是PV操作。P:阻塞;V:唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int sem_init(sem_t *sem, int pshared, unsigned int value);
// sem: 信号量变量
// pshared:选项标志-决定信号量用于进程间还是线程间同步互斥
// 0 线程间
// !0 进程间
// value: 信号量初始计数
int sem_destroy(sem_t *sem);
// sem: 信号量变量
int sem_wait(sem_t *sem);
//通过计数判断是否有资源可操作 (计数-1 + 等待)
//int sem_wait(sem_t *sem);
// 若计数<=0;则阻塞
//int sem_trywait(sem_t *sem);
// 若计数<=0;则报错返回
//int sem_timedwait(sem_t*sem, struct timespec*abs_timeout);
// 若计数<=0;则限时阻塞,超时则报错返回

信号量实现线程安全的环形队列Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

class RingQueue
{
public:
RingQueue(int cap = 10)
:_queue(10),
_capacity(cap),
_write_step(0),
_read_step(0){
sem_init(&_sem_data, 0, 0);
sem_init(&_sem_idle, 0, cap);
sem_init(&_sem_lock, 0, 1);
}
~RingQueue(){
sem_destroy(&_sem_data);
sem_destroy(&_sem_idle);
sem_destroy(&_sem_lock);
}
bool QueuePush(int data){
ProductorWait();
QueueLock();
_queue[_write_step] = data;
_write_step = (_write_step + 1) % _capacity;
QueueUnLock();
ConsumerWakeUp();
return true;
}
bool QueuePop(int *data){
ConsumerWait();
QueueLock();
*data = _queue[_read_step];
_read_step = (_read_step + 1) % _capacity;
QueueUnLock();
ProductorWakeUp();
return true;
}
private:
void QueueLock(){
sem_wait(&_sem_lock);
}
void QueueUnLock(){
sem_post(&_sem_lock);
}
void ProductorWait(){
sem_wait(&_sem_idle);
}
void ProductorWakeUp(){
sem_post(&_sem_idle);
}
void ConsumerWait(){
sem_wait(&_sem_data);
}
void ConsumerWakeUp(){
sem_post(&_sem_data);
}
private:
std::vector<int> _queue;
int _capacity;
int _write_step;
int _read_step;
sem_t _sem_data;//数据资源计数
sem_t _sem_idle;//空闲空间计数
sem_t _sem_lock;//实现互斥
};
void *thr_consumer(void *arg)
{
RingQueue *q = (RingQueue*)arg;
while(1) {
int data;
q->QueuePop(&data);
std::cout<<"consumer thread get data:"<<data<<std::endl;
}
return NULL;
}
void *thr_productor(void *arg)
{
RingQueue *q = (RingQueue*)arg;
int i = 0;
while(1) {
q->QueuePush(i);
std::cout<<"productor thread put data:"<<i<<std::endl;
i++;
}
return NULL;
}
int main (int argc, char *argv[])
{
pthread_t ctid[4], ptid[4];
int ret, i;
RingQueue q;

for (i = 0; i < 4; i++) {
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&q);
if (ret != 0) {
std::cout<<"thread create error"<<endl;
return -1;
}
}
for (i = 0; i < 4; i++) {
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&q);
if (ret != 0) {
std::cout<<"thread create error"<<endl;
return -1;
}
}
for (i = 0; i < 4; i++) {
pthread_join(ctid[i], NULL);
}
for (i = 0; i < 4; i++) {
pthread_join(ptid[i], NULL);
}
return 0;
}

线程2

在封装的加锁与解锁的操作中,有sem_wait()sem_post()操作,这其实就是计数的+1和-1的操作。当wait时,进行了阻塞,此时另一方可以进行操作。

线程3

因为信号量的初始计数为10,所以空闲空间计数最开始生产者生产了10个数据,当capacity满了之后,生产者等待,消费者唤醒。此时消费者开始读取数据,读取也是读取0个开始,到10个时将不再读取数据,此时消费者等待,生产者唤醒。

信号量与条件变量的区别

信号量拥有资源计数的功能,临界资源是否能够操作,通过自身计数判断。sem_post和sem_wait

条件变量是搭配互斥锁一起使用的

信号量还可以实现互斥,计数仅为0/1

线程池

线程池概念

一堆固定数量/有最大数量限制的线程+任务队列

可以用于并发处理任务请求

线程池特性

避免大量频繁的线程创建销毁的时间成本,当大量的创建线程时,主线程创建完成之后要等待子进程的退出,子进程退出之后,主进程才能退出。当有了线程池之后,线程池避免峰值压力带来瞬间大量线程被创建资源耗尽,程序崩溃的危险

线程池的实现

线程数量固定的线程池,需要定义最大线程和当前线程数量

自主实现线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#include <iostream>
#include <queue>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>

typedef bool (*task_callback)(int data);
bool deal_data(int data)
{
srand(time(NULL));
int n = rand()%5;
printf("thread:%p----deal data:%d---sleep %d sec\n", pthread_self(), data, n);
sleep(n);
return true;
}

class Task
{
public:
Task(){
}
~Task(){
}
public:
void SetTask(int data, task_callback handle){
_data = data;
_handle = handle;
}
bool Run(){
return _handle(_data);
}
private:
int _data;//任务处理要处理的数据
task_callback _handle;//任务的处理方法
};


#define MAX_THR 5
#define MAX_QUE 10
class ThreadPool
{
public:
ThreadPool(){
}
~ThreadPool(){
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_con);
pthread_cond_destroy(&_cond_pro);
}
bool ThreadInit(int max_thr = MAX_THR, int max_que = MAX_QUE) {
//实现变量初始化和线程创建
_quit_flag = false;
_max_thr = max_thr;
_cur_thr = max_thr;
_capacity = max_que;
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_con, NULL);
pthread_cond_init(&_cond_pro, NULL);
int ret;
pthread_t tid;
for (int i = 0; i < _max_thr; i++) {
ret=pthread_create(&tid, NULL, thr_start, (void*)this);
if (ret != 0) {
printf("thread create error\n");
return false;
}
pthread_detach(tid);
}
return true;
}
bool PushTask(Task &task){
//向任务队列中添加任务
if (_quit_flag == true){
return false;
}
QueueLock();
while (QueueIsFull()){
ProWait();
}
_queue.push(task);
ConWakeUp();
QueueUnLock();
return true;
}
void ThreadQuit() {
//线程池退出
if (_quit_flag != true) {
_quit_flag = true;
}
while(_cur_thr > 0) {
ConWakeUpAll();
sleep(1);
}
return;
}
private:
void QueueLock(){
pthread_mutex_lock(&_mutex);
}
void QueueUnLock(){
pthread_mutex_unlock(&_mutex);
}
void ConWait(){
//进入ConWait表示消费者这时候没有数据待处理,则解锁退出
if (_quit_flag == true) {
pthread_mutex_unlock(&_mutex);
printf("thread:%p exit\n", pthread_self());
_cur_thr--;
pthread_exit(NULL);
}
pthread_cond_wait(&_cond_con, &_mutex);
}
void ConWakeUp(){
pthread_cond_signal(&_cond_con);
}
void ConWakeUpAll(){
printf("wake up all\n");
pthread_cond_broadcast(&_cond_con);
}
void ProWait(){
pthread_cond_wait(&_cond_pro, &_mutex);
}
void ProWakeUp(){
pthread_cond_signal(&_cond_pro);
}
bool QueueIsFull(){
return (_capacity == _queue.size());
}
bool QueueIsEmpty(){
return _queue.empty();
}
void PopTask(Task *task){
*task = _queue.front();
_queue.pop();
}
static void *thr_start(void *arg){
ThreadPool *pool = (ThreadPool*)arg;
while(1){
pool->QueueLock();
while(pool->QueueIsEmpty()){
pool->ConWait();
}
Task task;
pool->PopTask(&task);
pool->ProWakeUp();
pool->QueueUnLock();
//任务处理要放到解锁之外,否则任务处理时间过程导致其它线程阻塞
task.Run();
}
return NULL;
}
private:
int _max_thr;//线程池中最大线程数量
int _cur_thr;//线程池中当前线程数量
int _quit_flag;//线程池中线程退出标志
int _capacity;//队列的最大节点数目
std::queue<Task> _queue;//任务队列
pthread_mutex_t _mutex;//
pthread_cond_t _cond_pro;//生产者
pthread_cond_t _cond_con;//消费者
};

int main (int argc, char *argv[])
{
ThreadPool pool;

pool.ThreadInit();
Task task[10];
for (int i = 0; i < 10; i++) {
task[i].SetTask(i, deal_data);
pool.PushTask(task[i]);
}
pool.ThreadQuit();
return 0;
}

线程池的工作可以看成,安排多个线程进行工作,此时有一个任务队列,队列中的容量是有限的。

我们还是看成

1、生产者安排生产任务,加上锁,生产完后,消费者唤醒,此时解开锁。

2、消费者被唤醒之后处理数据,每处理一次数据后,任务队列数-1。

3、任务处理要放到解锁之外,否则任务处理时间过程导致其它线程阻塞

运行截图

线程4

单例模式

单例模式是设计模式中的一种:一个对象只能被实例化一次(资源只被加载一次)

单例模式的类型

饿汉模式

程序初始化时实例化完毕,优点是不需要再初始化,不需要加载资源,因此运行速度快,流畅

缺点是:加载时间耗时比较长

懒汉模式

程序资源使用的时候再进行加载,对象使用的时候再进行实例化,初始化加载速度快,但是运行流畅度不够

但是需要注意线程安全问题

C++的总结中,会再次提到这个问题。

多线程个人总结

关于多线程的部分总结终于结束了。跨度比较大,因为很多时候不太能理解是怎么运行的。

尤其是到了手撕模型的时候,最关键的点在于线程安全我们必须要考虑,因为线程如果再运行中丢失,那么数据有可能也会丢失或者泄漏,这带来的损失也是巨大的。

模型也多从消费者和生产者做起。其实各个特性都总结完之后,会发现,互斥锁+条件变量可以在多个地方用到,就因为要为了保护线程安全。所以这是个重点。线程池关键的在于,数据的处理过程。当数据处理时要与锁分开,否则数据处理时,如果被加锁了,那么其他进程此时在外等待,那么此时就造成了阻塞,时间上大大加长了。这就又没有开启多个线程的意义了。

queue类的使用也是一知半解,所以掌握起来比较不容易上手。C++有了一定的理解之后可以更好的去实现Demo

-------------The End-------------