- pthread
- thread
C++ 11 加入了自带的Thread,它是对pthread进行了封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
#include <iostream> #include <thread> #include <unistd.h>
using namespace std;
void runAction(int number) { for (int i = 0; i < 10; ++i) { cout << "runAction:" << number << endl; sleep(1); } }
int main() {
thread thread2(runAction, 100); thread2.join(); cout << "main弹栈了" << endl;
return 0; }
|
pthread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
#include <iostream> #include <pthread.h> using namespace std;
void * customPthreadTask(void * pVoid) { int * number = static_cast<int *>(pVoid); cout << "异步线程执行了:" << *number << endl;
return 0; }
int main() { int number = 9527;
pthread_t pthreadID;
pthread_create(&pthreadID, 0, customPthreadTask, &number);
return 0; }
|
pthread的三种情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
#include <iostream> #include <pthread.h> #include <unistd.h>
using namespace std;
void * runTask(void * pVoid) { int number = *static_cast<int *>(pVoid); cout << "异步线程执行了:" << number << endl;
for (int i = 0; i < 10; ++i) { cout << "run:" << i << endl; sleep(1); }
return 0; }
int main() { int number = 999;
pthread_t pthreadID; pthread_create(&pthreadID, 0, runTask, &number);
pthread_join(pthreadID, 0);
cout << "main函数即将弹栈..." << endl; return 0; }
|
分离线程和非分离线程
- 区别?
- 应用场景?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
#include <iostream> #include <pthread.h> #include <unistd.h>
using namespace std;
void * runTask(void * pVoid) { int number = *static_cast<int *>(pVoid); cout << "异步线程执行了:" << number << endl;
for (int i = 0; i < 10; ++i) { cout << "run:" << i << endl; sleep(1); } return 0; }
int main() { int number = 999;
pthread_t pthreadID; pthread_create(&pthreadID, 0, runTask, &number);
pthread_join(pthreadID, 0);
cout << "main函数即将弹栈..." << endl; return 0; }
|
线程同步
安全锁、互斥锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| #include <iostream> #include <pthread.h> #include <queue> #include <unistd.h>
using namespace std;
queue<int> queueData;
pthread_mutex_t mutex;
void * task(void * pVoid) {
pthread_mutex_lock(&mutex);
cout << "异步线程-当前线程的标记是:" << *static_cast<int *>(pVoid) << "异步线程" << endl;
if (!queueData.empty()) { printf("异步线程-获取队列的数据:%d\n", queueData.front()); queueData.pop(); } else { printf("异步线程-队列中没有数据了\n"); }
pthread_mutex_unlock(&mutex);
return 0; }
int main() { pthread_mutex_init(&mutex, NULL);
for (int i = 10001; i < 10011; ++i) { queueData.push(i); }
pthread_t pthreadIDArray[10]; for (int i = 0; i < 10; ++i) { pthread_create(&pthreadIDArray[i], 0, task, &i);
}
sleep(12);
pthread_mutex_destroy(&mutex); cout << "main函数即将弹栈..." << endl;
return 0; }
|
互斥锁mutex的简单实现
mutex一般用于为一段代码加锁,以保证这段代码的原子性(atomic)操作,即:要么不执行这段代码,要么将这段代码全部执行完毕。
例如,最简单的并发冲突问题就是一个变量自增1:
表面看这是一条语句,可是在背后的汇编中我们可以看到,指令集操作过程中会引入中间变量来保存右边的值,进而这个操作至少会被扩充为:
1 2
| int tmp = balance + 1; balance = tmp;
|
这就需要一把互斥锁(mutual exclusive, mutex)将这段代码给锁住,使其达到任何一个线程“要么全部执行上述代码,要么不执行这段代码”的效果。这个用法可以表示为:
1 2 3 4 5
| lock_t mutex; ... lock(&mutex) balance = balance + 1; unlock(&mutex);
|
那么,一个自然的问题便是,我如何实现上面的这个lock()
函数呢?
乍一看这个问题是非常复杂的,特别是考虑到它能够被适用于各种代码的各种情况。但经过各种简化,这个lock()
实现,可以通过几个test和set的组合得以实现。
例如,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| typedef struct __lock_t { int flag; } lock_t;
void init(lock_t *mutex) { mutex->flag = 0; }
void lock(lock_t *mutex) { while (mutex->flag == 1) { ; mutex->flag = 1; }
void unlock(lock_t *mutex) { mutex->flag = 0; }
|
我第一次看到这个算法的时候非常惊讶,一个本来极其复杂的问题就这么优雅地被解决了。它仅仅涉及到对条件的检验和变量的复制,然后整个问题就这么轻而易举地被攻破了。
当然,我并没能看到上述代码的“坑”,也即是必须依靠指令集级别的支持才能真正做到atomic。这同样说明了并发程序的困难,稍微不注意便会调入一个万劫不复的坑里,并且你还不知道哪里出错了。
上述极端优雅的代码,有一个隐藏的坑,那便是在lock()
函数的实现里,while循环那一段其实是可以被乱入的。
假设thread A
是第一个运行到此的线程,那么它得到的mutex->flag
就肯定是0,于是它继续跳出循环往下运行,希望通过下面的mutex->flag = 1
来持有锁,使得其它线程在检测while循环时为真,进而进入循环的等待状态。
可如果在A执行到这个赋值为1的语句之前,又有另外一个thread B运行到了这个while循环部分,由于mutex->flag
还未被赋值为1,B同样可以跳出while,从而跟A一样拿到这把锁!这就出现了冲突。
那怎么办呢?仔细后可以发现,其实关键问题就在于:
- 对mutex->flag的检测
- 对mutex->flag的赋值
这两个操作必须是不被干扰的,也就是它必须是atomic的,要么这两段代码不被执行,要么这两段代码被不中断地完整执行。
这就需要借助CPU指令集的帮助,来保证上述两条语句的atomic操作,也即是著名的TestAndSet()操作。
1 2 3 4 5
| int TestAndSet(int *ptr, int new) { int old = *ptr; *ptr = new; return old; }
|
这是一条TSL指令。TSL指令是一种需要硬件支持的方案。许多计算机,特别是那些为多处理机设计的计算机,都有一条指令叫做测试并上锁(TSL)。
test-and-set在计算机体系中是必须实现为一条原子性的机器指令,也就是说芯片设计人员设计这条硬件指令的时候必须得保证它是原子操作,否则test-and-set指令毫无价值。这一条机器指令实际上用一条指令干了三件事情,为了说清楚它干了啥才分解成类似C语言的语句进行解释,是伪代码。如果分析test-and-set干的三件事,会觉着毫无逻辑,把某个内存单元的值读出来,然后毫无来由的再将这个内存单元置为1,最后再返回原来内存单元的值,真的有点扯淡的逻辑,但是不要急,就是这么扯淡的逻辑可以很轻松解决共享资源的互斥访问。如果你明白了这是同一条原子操作的机器指令干的活,它是一口气干完的,任何中断都打不断,那么如何用这条指令实现互斥访问的就很容易理解了
作者:zyj21sap
链接:https://www.zhihu.com/question/24277938/answer/1791755655
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
CPU的指令集,并不需要支持繁复的各种atomic操作。仅仅支持上面这个函数,各种互斥加锁的情形,便都能够被涵盖。
此时,在回到我们最开始的那个优雅的lock()实现,就可以将其改造为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| typedef struct __lock_t { int flag; } lock_t;
void init(lock_t *lock) { mutex->flag = 0; }
void lock(lock_t *mutex) { while (TestAndSet(&lock_t->flag, 1) == 1) { ; }
void unlock(lock_t *lock) { lock->flag = 0; }
|
上述代码极其精巧。乍一看在lock()实现里不是还缺少一行mutex->flag = 1;么?可其实呢,它已经被整合到了TestAndSet()函数中。
这样的支持TestAndSet()的实现,便是最简单的spin lock,自旋锁或弹簧锁。之所以叫弹簧锁,那是因为在各类锁当中,弹簧锁就是最初的被投入工业使用的最简单的实现技术。
生产者消费者问题
线程同步的队列
safe_queue_too.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
#ifndef CPPCLIONPROJECT_SAFE_QUEUE_TOO_H #define CPPCLIONPROJECT_SAFE_QUEUE_TOO_H
#endif
#pragma once
#include <iostream> #include <string> #include <pthread.h> #include <string> #include <queue>
using namespace std;
template<typename T>
class SafeQueueClass { private: queue<T> queue; pthread_mutex_t mutex; pthread_cond_t cond;
public: SafeQueueClass() { pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0); } ~SafeQueueClass() { pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond); }
void add(T t) { pthread_mutex_lock(&mutex);
queue.push(t);
pthread_cond_broadcast(&cond);
cout << "add queue.push 我已经notifyAll所有等待线程了" << endl;
pthread_mutex_unlock(&mutex); }
void get(T & t) { pthread_mutex_lock(&mutex);
while (queue.empty()) { cout << "get empty 我已经乖乖等待中.." << endl; pthread_cond_wait(&cond, &mutex); }
t = queue.front(); queue.pop();
pthread_mutex_unlock(&mutex); } };
|
主程序
main.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
#pragma once
#include <iostream>
#include "safe_queue_too.h" using namespace std; SafeQueueClass<int> sq;
void * getMethod(void *) { while (true) { printf("getMethod\n");
int value; sq.get(value); printf("消费者get 得到的数据:%d\n", value);
if (-1 == value) { printf("消费者get 全部执行完毕\n"); break; } } return 0; }
void * setMethod(void *) { while (true) { printf("setMethod\n");
int value; printf("请输入你要生成的信息:\n"); cin >> value;
if (-1 == value) { sq.add(value); printf("消费者get 全部执行完毕\n"); break; }
sq.add(value); } return 0; }
int main() { pthread_t pthreadGet; pthread_create(&pthreadGet, 0, getMethod, 0);
pthread_t pthreadSet; pthread_create(&pthreadSet, 0, setMethod, 0);
pthread_join(pthreadGet, 0);
pthread_join(pthreadSet, 0);
return 0; }
|
如何选择线程库
运行环境对库的支持
JDK JVM 线程 native C++ 用的是 pthreads
Linux 基本上 还是用 pthreads
AndroidNDK 还是用 pthreads
VS 没有 pthread
mingw 没有 pthread
Cygwin 默认有 pthreads
MacOS 默认有 pthreads
网友们的看法
pthread库的导入
上面我们知道,有些运行环境并没有支持pthread库,我们就不得不手动导入了。
暂略…
参考资料
- https://www.jianshu.com/p/a5c98230a93d