1. pthread
  2. thread

C++ 11 加入了自带的Thread,它是对pthread进行了封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// TODO C++ 11 后出现的 自带 Thread

#include <iostream>
#include <thread> // C++ 11 后出现的 自带 Thread 不是我们的重头戏,只是过一下
#include <unistd.h>

using namespace std;

// 异步线程 子线程
void runAction(int number) { // 相当于 Java的 run函数一样
for (int i = 0; i < 10; ++i) {
cout << "runAction:" << number << endl;
sleep(1);
}
}

// main函数的线程
int main() {
// TODO 方式一 main只等3秒钟,各种玩各种的,老死不相往来
/*thread thread1(runAction, 100);

// sleep(3); // 我只等你三秒钟
cout << "main弹栈了" << endl;*/

// TODO 方式二 我等你执行完成后,我再执行
thread thread2(runAction, 100);
thread2.join(); // 我等runAction执行完成后,我再继续执行下面代码..
cout << "main弹栈了" << endl;
/*
runAction:100
runAction:100
runAction:100
runAction:100
runAction:100
runAction:100
runAction:100
runAction:100
runAction:100
runAction:100
main弹栈了
*/

return 0;
}

pthread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// TODO pthreads 我们必须掌握的内容 重头戏

// TODO pthreads 最简单的案例

#include <iostream>
#include <pthread.h> // Cygwin 有 pthreads支持
using namespace std;

// void *(*)(void *)
void * customPthreadTask(void * pVoid) { // 异步线程 相当于Java的Thread.run函数一样
// C++转换static_cast 转换指针操作的
int * number = static_cast<int *>(pVoid); // pVoid==number int的地址,所以我用int*接收,很合理
cout << "异步线程执行了:" << *number << endl;

return 0; // 坑 坑 坑,必须返回,否则有错误,不好查询
}

int main() {
int number = 9527;

/**
int pthread_create (pthread_t *, // 参数一:线程ID
const pthread_attr_t *, // 参数二:线程属性
void *(*)(void *), // 参数三:函数指针的规则
void *); // 参数四:给函数指针传递的内容,void * 可以传递任何内容
*/
pthread_t pthreadID; // 线程ID,每个线程都需要有的线程ID

pthread_create(&pthreadID, 0, customPthreadTask, &number);

return 0;
}

pthread的三种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// TODO pthread 的 三种情况分析 的
// 第一种情况,main函数只要结束,不等异步线程,全部技术
// 第二种情况,我们开发者,千万不要让 main函数睡眠的方式,去等待异步线程
// 第三种情况,main函数一直等待 异步线程,只有异步线程执行完成后,我在执行 join后面的代码
#include <iostream>
#include <pthread.h> // Derry Cygwin 有 pthreads支持
#include <unistd.h>

using namespace std;

// void *(*)(void *)
void * runTask(void * pVoid) { // 异步线程 子线程
int number = *static_cast<int *>(pVoid);
cout << "异步线程执行了:" << number << endl;

for (int i = 0; i < 10; ++i) {
cout << "run:" << i << endl;
sleep(1);
}

return 0;
}

int main() {
int number = 999;

pthread_t pthreadID;
pthread_create(&pthreadID, 0, runTask, &number);

// 为了演示第二种情况
// sleep(3); // main函数只 异步线程三秒钟

// 异步线程在执行的过程中,我们的main函数 相当于 阻塞在这里不动了,只有异步线程执行完成后,我才开始执行join后面的代码
pthread_join(pthreadID, 0);

cout << "main函数即将弹栈..." << endl;
return 0;
}

分离线程和非分离线程

  1. 区别?
  2. 应用场景?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// TODO 分离线程  非分离线程  理论知识

// C++ 服务器开发,线程是非常要去高
// C++ 分离线程 和 非分离线程 区别,应用场景?
/**
* 答:分离线程: 各个线程都是自己运行自己的,老死不相往来,例如:main函数结束,全部结束,不会等待异步线程 【多线程情况下场景】
* 非分离线程: 线程有协作的能力,例如:main函数线程会等待 异步线程执行完成后,我再执行 后面main函数的代码【协作,顺序执行 场景】
*/

#include <iostream>
#include <pthread.h> // Derry Cygwin 有 pthreads支持
#include <unistd.h>

using namespace std;

// void *(*)(void *)
void * runTask(void * pVoid) { // 异步线程 子线程
int number = *static_cast<int *>(pVoid);
cout << "异步线程执行了:" << number << endl;

for (int i = 0; i < 10; ++i) {
cout << "run:" << i << endl;
sleep(1);
}
return 0;
}

int main() {
int number = 999;

pthread_t pthreadID; // Cygwin允许有野指针
pthread_create(&pthreadID, 0, runTask, &number);

pthread_join(pthreadID, 0);

cout << "main函数即将弹栈..." << endl;
return 0;
}

线程同步

安全锁、互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// TODO  C++ 互斥锁 == Java版本(synchronize) 多线程操作的安全  持有内置锁
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h> // sleep(秒)

using namespace std;

queue<int> queueData; // 定义一个全局的队列,用于 存储/获取

pthread_mutex_t mutex; // 定义一个互斥锁,注意:(Cygwin平台 此互斥锁,不能有野指针,坑)

// void *(*)(void *)
void * task(void * pVoid) {

/*synchronize(锁) {
// code
}*/

pthread_mutex_lock(&mutex); // 锁住

cout << "异步线程-当前线程的标记是:" << *static_cast<int *>(pVoid) << "异步线程" << endl;

if (!queueData.empty()) { // 有元素
printf("异步线程-获取队列的数据:%d\n", queueData.front());
queueData.pop(); // 把数据弹出去,删除的意思
} else { // 没有元素
printf("异步线程-队列中没有数据了\n");
}

// sleep(0.2);

pthread_mutex_unlock(&mutex); // 解锁

return 0;
}

int main()
{
// 初始化 互斥锁
pthread_mutex_init(&mutex, NULL);

// 给队列 初始化数据 手动增加数据进去
for (int i = 10001; i < 10011; ++i) {
queueData.push(i);
}

// 一次性定义10个线程
pthread_t pthreadIDArray[10];
for (int i = 0; i < 10; ++i) {
pthread_create(&pthreadIDArray[i], 0, task, &i);

// 不能使用 join,如果使用(就变成顺序的方式,就没有多线程的意义了,所以不能写join)
// pthread_join(pthreadIDArray[i], 0);
}

// main函数等 异步线程
sleep(12);

// 销毁 互斥锁
pthread_mutex_destroy(&mutex);
cout << "main函数即将弹栈..." << endl;

// 每次运行 效果都不同:1,8,9,10,3,2,5,8
// 每次运行 效果都是错乱

return 0;
}

互斥锁mutex的简单实现

mutex一般用于为一段代码加锁,以保证这段代码的原子性(atomic)操作,即:要么不执行这段代码,要么将这段代码全部执行完毕。

例如,最简单的并发冲突问题就是一个变量自增1:

1
balance = balance + 1;

表面看这是一条语句,可是在背后的汇编中我们可以看到,指令集操作过程中会引入中间变量来保存右边的值,进而这个操作至少会被扩充为:

1
2
int tmp = balance + 1;
balance = tmp;

这就需要一把互斥锁(mutual exclusive, mutex)将这段代码给锁住,使其达到任何一个线程“要么全部执行上述代码,要么不执行这段代码”的效果。这个用法可以表示为:

1
2
3
4
5
lock_t mutex;
...
lock(&mutex)
balance = balance + 1;
unlock(&mutex);

那么,一个自然的问题便是,我如何实现上面的这个lock()函数呢?

乍一看这个问题是非常复杂的,特别是考虑到它能够被适用于各种代码的各种情况。但经过各种简化,这个lock()实现,可以通过几个test和set的组合得以实现。

例如,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct __lock_t { int flag; } lock_t;

void init(lock_t *mutex) {
// 0: lock is available
// 1: lock is held
mutex->flag = 0;
}

void lock(lock_t *mutex) {
while (mutex->flag == 1) { // Test the flag.
; // Wait the lock
mutex->flag = 1; // Set the lock, i.e. start to hold lock
}

void unlock(lock_t *mutex) {
mutex->flag = 0;
}

我第一次看到这个算法的时候非常惊讶,一个本来极其复杂的问题就这么优雅地被解决了。它仅仅涉及到对条件的检验和变量的复制,然后整个问题就这么轻而易举地被攻破了。

当然,我并没能看到上述代码的“坑”,也即是必须依靠指令集级别的支持才能真正做到atomic。这同样说明了并发程序的困难,稍微不注意便会调入一个万劫不复的坑里,并且你还不知道哪里出错了。

上述极端优雅的代码,有一个隐藏的坑,那便是在lock()函数的实现里,while循环那一段其实是可以被乱入的。

假设thread A是第一个运行到此的线程,那么它得到的mutex->flag就肯定是0,于是它继续跳出循环往下运行,希望通过下面的mutex->flag = 1来持有锁,使得其它线程在检测while循环时为真,进而进入循环的等待状态。

可如果在A执行到这个赋值为1的语句之前,又有另外一个thread B运行到了这个while循环部分,由于mutex->flag还未被赋值为1,B同样可以跳出while,从而跟A一样拿到这把锁!这就出现了冲突。

那怎么办呢?仔细后可以发现,其实关键问题就在于:

  • 对mutex->flag的检测
  • 对mutex->flag的赋值

这两个操作必须是不被干扰的,也就是它必须是atomic的,要么这两段代码不被执行,要么这两段代码被不中断地完整执行。

这就需要借助CPU指令集的帮助,来保证上述两条语句的atomic操作,也即是著名的TestAndSet()操作。

1
2
3
4
5
int TestAndSet(int *ptr, int new) {
int old = *ptr;
*ptr = new;
return old;
}

这是一条TSL指令。TSL指令是一种需要硬件支持的方案。许多计算机,特别是那些为多处理机设计的计算机,都有一条指令叫做测试并上锁(TSL)。

test-and-set在计算机体系中是必须实现为一条原子性的机器指令,也就是说芯片设计人员设计这条硬件指令的时候必须得保证它是原子操作,否则test-and-set指令毫无价值。这一条机器指令实际上用一条指令干了三件事情,为了说清楚它干了啥才分解成类似C语言的语句进行解释,是伪代码。如果分析test-and-set干的三件事,会觉着毫无逻辑,把某个内存单元的值读出来,然后毫无来由的再将这个内存单元置为1,最后再返回原来内存单元的值,真的有点扯淡的逻辑,但是不要急,就是这么扯淡的逻辑可以很轻松解决共享资源的互斥访问。如果你明白了这是同一条原子操作的机器指令干的活,它是一口气干完的,任何中断都打不断,那么如何用这条指令实现互斥访问的就很容易理解了
作者:zyj21sap
链接:https://www.zhihu.com/question/24277938/answer/1791755655
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

CPU的指令集,并不需要支持繁复的各种atomic操作。仅仅支持上面这个函数,各种互斥加锁的情形,便都能够被涵盖。

此时,在回到我们最开始的那个优雅的lock()实现,就可以将其改造为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct __lock_t { int flag; } lock_t;

void init(lock_t *lock) {
// 0: lock is available
// 1: lock is held
mutex->flag = 0;
}

void lock(lock_t *mutex) {
while (TestAndSet(&lock_t->flag, 1) == 1) {
;
}

void unlock(lock_t *lock) {
lock->flag = 0;
}

上述代码极其精巧。乍一看在lock()实现里不是还缺少一行mutex->flag = 1;么?可其实呢,它已经被整合到了TestAndSet()函数中。

这样的支持TestAndSet()的实现,便是最简单的spin lock,自旋锁或弹簧锁。之所以叫弹簧锁,那是因为在各类锁当中,弹簧锁就是最初的被投入工业使用的最简单的实现技术。

生产者消费者问题

线程同步的队列

safe_queue_too.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 生产者 消费者 工具类

#ifndef CPPCLIONPROJECT_SAFE_QUEUE_TOO_H
#define CPPCLIONPROJECT_SAFE_QUEUE_TOO_H

#endif //CPPCLIONPROJECT_SAFE_QUEUE_TOO_H

#pragma once // 防止重复写 include 的控制

#include <iostream>
#include <string>
#include <pthread.h>
#include <string>
#include <queue>

using namespace std;

// 定义模版函数 int double float == Java的泛型
template<typename T>

class SafeQueueClass {
private:
queue<T> queue; // 定义队列
pthread_mutex_t mutex; // 定义互斥锁(不允许有野指针)
pthread_cond_t cond; // 条件变量,为了实现 等待 读取 等功能 (不允许有野指针)

public:
SafeQueueClass() {
// 初始化 互斥锁
pthread_mutex_init(&mutex, 0);

// 初始化 条件变量
pthread_cond_init(&cond, 0);
}
~SafeQueueClass() {
// 回收 互斥锁
pthread_mutex_destroy(&mutex);

// 回收 条件变量
pthread_cond_destroy(&cond);
}

// TODO 加入到队列中(进行生成)
void add(T t) {
// 为了安全 加锁
pthread_mutex_lock(&mutex);

queue.push(t); // 把数据加入到队列中

// 告诉消费者,我已经生产好了
// pthread_cond_signal(&cond) // Java notify 单个的
pthread_cond_broadcast(&cond); // Java notifyAll 所有的的

cout << "add queue.push 我已经notifyAll所有等待线程了" << endl;

// 解锁
pthread_mutex_unlock(&mutex);
}

// TODO 从队列中获取(进行消费) 外面的人消费 你可以直接返回,你也可以采用引用
void get(T & t) {
// 为了安全 加锁
pthread_mutex_lock(&mutex);

while (queue.empty()) {
cout << "get empty 我已经乖乖等待中.." << endl;
pthread_cond_wait(&cond, &mutex); // 相当于 Java的 wait 等待了[有可能被系统唤醒]
}

// 证明被唤醒了
t = queue.front(); // 得到 队列中的元素数据 仅此而已
queue.pop(); // 删除元素

// 解锁
pthread_mutex_unlock(&mutex);
}
};

主程序
main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// TODO C++ 条件变量+互斥锁 == Java版本的(notify 与 wait 操作)

#pragma once

#include <iostream>

#include "safe_queue_too.h"
using namespace std;
SafeQueueClass<int> sq;

// TODO 模拟演示 消费者
void * getMethod(void *) {
while (true) {
printf("getMethod\n");

int value;
sq.get(value);
printf("消费者get 得到的数据:%d\n", value);

// 你只要传入 -1 就结束当前循环
if (-1 == value) {
printf("消费者get 全部执行完毕\n");
break;
}
}
return 0;
}

// TODO 模拟演示 生产者
void * setMethod(void *) {
while (true) {
printf("setMethod\n");

int value;
printf("请输入你要生成的信息:\n");
cin >> value;

// 你只要传入 -1 就结束当前循环
if (-1 == value) {
sq.add(value); // 为了让消费者 可以结束循环
printf("消费者get 全部执行完毕\n");
break;
}

sq.add(value);
}
return 0;
}

int main() {
pthread_t pthreadGet;
pthread_create(&pthreadGet, 0, getMethod, 0);
// pthread_join(pthreadGet, 0); 不能这样写,否则,下面的代码,可能无法有机会执行

pthread_t pthreadSet;
pthread_create(&pthreadSet, 0, setMethod, 0);


pthread_join(pthreadGet, 0);

pthread_join(pthreadSet, 0);


return 0;
}

如何选择线程库

运行环境对库的支持

JDK JVM 线程 native C++ 用的是 pthreads
Linux 基本上 还是用 pthreads
AndroidNDK 还是用 pthreads
VS 没有 pthread
mingw 没有 pthread
Cygwin 默认有 pthreads
MacOS 默认有 pthreads

网友们的看法

pthread库的导入

上面我们知道,有些运行环境并没有支持pthread库,我们就不得不手动导入了。

暂略…

参考资料

  1. https://www.jianshu.com/p/a5c98230a93d