20230610pthread

Jun 10, 2023

pthread

在C++开发中,原生的线程库主要有两个,一个是C++11提供的<thread.h>(Mac/Linux)类,另一个是Linux 下的<pthread.h>(Mac/Windows/Linux)。

std::thread 是C++11中的新特性,将多线程程序的编写提升到了语言层面,是的编写的多线程程序的可移植性大大提供。std::thread更加简便易用,而pthread功能更加强大。

std::thread 使用请参考这篇内容, 下面我们来介绍pthread 的使用。

本文所学来自于Ref

pthread 的创建和管理

int(*pthread_create)(pthread_t* thread, pthread_attr_t* attr, void*(*start_routine)(void*), void* arg)

若创建成功,返回0。

  • thread 是线程标识符
  • attr 指定线程的属性,可以用NULL表示默认属性
  • start_routine 指定线程开始运行的函数
  • arg 是start_routine 所需要的参数,是一个无类型指针

结束线程方法:

1
2
void pthread_exit(void* retval);
int pthread_cancel(pthread_t thread);

一个简单的多线程实现

多线程打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#define NUM_THREADS 5

struct Simple_s {
long i;
};
void* PrintHello(void* obj) {
usleep(30);
long tid;
tid = ((Simple_s*)obj)->i;
printf("Hello World! It's me, thread #%ld!\n", tid);
free(obj);
pthread_exit(NULL);
}
static void simple_test() {
pthread_t threads[NUM_THREADS];
int rc;
long t;
for (t = 0; t < NUM_THREADS; t++) {
Simple_s* s = (Simple_s*)malloc(sizeof(Simple_s));
s->i = t;
rc = pthread_create(&threads[t], NULL, PrintHello, s);
if (rc) {
printf("ERROR: return code from pthread_create() is %d\n", rc);
exit(-1);
}
printf("In main thread: created thread %ld\n", s->i);
}
puts("End created pthread\n");
pthread_exit(NULL);
}

输出的结果为:

1
2
3
4
5
6
7
8
9
10
11
12
In main thread: created thread 0
In main thread: created thread 1
In main thread: created thread 2
Hello World! It's me, thread #0!
In main thread: created thread 3
Hello World! It's me, thread #1!
Hello World! It's me, thread #3!
In main thread: created thread 4
Hello World! It's me, thread #2!
End created pthread

Hello World! It's me, thread #4!

当前线程调用pthread_exit()来等待其他线程的结束。(如果不适用这个函数的话,可能main()函数结束了也有线程没有执行完毕!)

对象的阻塞(Joining and Detaching Threads)

阻塞是线程之间同步的一种方法。

int(*pthread_join)(pthread_t threadid, void** value_ptr)

  • pthread_join 函数会让调用他的线程等待threadid 线程运行结束之后再运行。

  • value_ptr 存放了其他线程的返回值。

一个可以被join的线程,仅仅可以被别的一个线程join,如果同时有多个线程尝试join同一个线程时,最终结果是未知的。另外,线程不能join自己。

上面提到过,创建一个线程时,要赋予它一定的属性,这其中就包括joinable or detachable 的属性,只有被声明成joinable的线程,可以被其他线程join。

线程问题除了死锁问题之外,另一个是僵尸线程(zombie thread)。他是一种已经退出来的joinable 的线程,但是等待其他线程调用pthread_join 来join他,以收集他的退出信息(exit status)。如果没有其他线程调用pthread_join 来join他的话,他占用的一些系统资源不会被释放,比如堆栈。如果main()函数需要长时间运行,并且创建大量joinable 的线程,就有可能出现堆栈不足的error。

对于那些不需要join的线程,最好利用pthread_detach,这样他运行结束后,资源就会及时自动释放。

总而言之,创建的没一个线程都应该使用pthread_join 或者pthread_detach 其中一个,以防止僵尸线程的出现。

相关函数:

1
2
3
pthread_detach(threadid)
pthread_attr_setdetachstate(attr, detachstate)
pthread_attr_getdetachstate(attr, detachstate)

显式地设置一个线程为joinable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void *BusyWork(void* t) {
int i;
long tid;
double result = 0.0;
tid = *(long*)t;
printf("Thread %ld starting...\n", tid);
for (i = 0; i < 1000000; i++) {
result = result + sin(i) * tan(i);
}
printf("Tread %ld done. Result = %e\n", tid, result);
pthread_exit(t);
}
int main() {
pthread_t thread[NUM_THREADS];
pthread_attr_t attr;
int rc;
long t;
void* status;

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (t = 0; t < NUM_THREADS; t++) {
printf("Main: creating thread %ld\n", t);
long* i = (long*)malloc(sizeof(long));
*i = t;
rc = pthread_create(&thread[t], &attr, BusyWork, i);
if (rc) {
printf("ERROR, return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
pthread_attr_destroy(&attr);

for (t = 0; t < NUM_THREADS; t++) {
rc = pthread_join(thread[t], &status);
if (rc) {
printf("ERROR; return code from pthread_join() is %d\n", rc);
exit(-1);
}
printf("Main: completed join with thread %ld having a status of %ld\n", t, *(long*)status);
free(status);
}
printf("Main: program completed. Exiting.\n");
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Main: creating thread 0
Main: creating thread 1
Thread 0 starting...
Main: creating thread 2
Thread 1 starting...
Main: creating thread 3
Thread 2 starting...
Thread 3 starting...
Main: creating thread 4
Thread 4 starting...
Tread 3 done. Result = -3.153838e+06
Tread 4 done. Result = -3.153838e+06
Tread 0 done. Result = -3.153838e+06
Main: completed join with thread 0 having a status of 0
Tread 1 done. Result = -3.153838e+06
Main: completed join with thread 1 having a status of 1
Tread 2 done. Result = -3.153838e+06
Main: completed join with thread 2 having a status of 2
Main: completed join with thread 3 having a status of 3
Main: completed join with thread 4 having a status of 4
Main: program completed. Exiting.

堆栈管理(Stack Management)

相关函数:

1
2
3
4
pthread_attr_getstacksize (attr, stacksize)
pthread_attr_setstacksize (attr, stacksize)
pthread_attr_getstackaddr (attr, stackaddr)
pthread_attr_setstackaddr (attr, stackaddr)

堆栈管理重要点在于一个进程内存在数量众多的线程,每个线程默认会占有一定的堆栈大小。这时要防止内存溢出问题,通过调整每个线程堆栈大小合理配置。

互斥锁 Mutex

Mutex 常常被用来保护那些可以被多个线程访问的共享资源,比如可以防止多个线程同时更新同一个数据时出现混乱。

相关函数:

1
2
3
4
5
6
7
pthread_mutex_init(mutex, attr)
pthread_mutex_destroy(pthread_mutex_t* mutex)
pthread_mutexattr_init(attr)
pthread_mutexattr_destroy(attr)
pthread_mutex_lock(pthread_mutex_t* mutex)
pthread_mutex_trylock(pthread_mutex_t* mutex)
pthread_mutex_unlock(pthread_mutex_t* mutex)

示例

下面是一个利用多线程进行向量点乘的程序,其中需要对dotstr.sum 这个共同读写的数据进行保护:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
typedef struct {
double* a;
double* b;
double sum;
int veclen;
} DOTDATA;
#define NUMTHRDS 4
#define VECLEN 100000
DOTDATA dotstr;
pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum;
void* dotprod(void* arg) {
int i, start, end, len;
long offset;
double mysum, *x, *y;
offset = (long)arg;

len = dotstr.veclen;
start = offset * len;
end = start + len;
x = dotstr.a;
y = dotstr.b;

mysum = 0;
for (i = start; i < end; i++) {
mysum += (x[i] * y[i]);
}
pthread_mutex_lock(&mutexsum);
dotstr.sum += mysum;
printf("Thread %ld did %d to %d: mysum=%f global sum=%f\n", offset, start, end, mysum, dotstr.sum);
pthread_mutex_unlock(&mutexsum);
pthread_exit((void*)0);
}
void mutex_test() {
long i;
double *a, *b;
void* status;
pthread_attr_t attr;

a = (double*)malloc(NUMTHRDS * VECLEN * sizeof(double));
b = (double*)malloc(NUMTHRDS * VECLEN * sizeof(double));

for (i = 0; i < VECLEN * NUMTHRDS; i++) {
a[i] = 1;
b[i] = a[i];
}
dotstr.veclen = VECLEN;
dotstr.a = a;
dotstr.b = b;
dotstr.sum = 0;

pthread_mutex_init(&mutexsum, NULL);

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

for (i = 0; i < NUMTHRDS; i++) {
pthread_create(&callThd[i], &attr, dotprod, (void*)i);
}
pthread_attr_destroy(&attr);

for (i = 0; i < NUMTHRDS; i++) {
pthread_join(callThd[i], &status);
}
printf("Sum = %f \n", dotstr.sum);
free(a);
free(b);
pthread_mutex_destroy(&mutexsum);
}

输出结果:

1
2
3
4
5
Thread 3 did 300000 to 400000: mysum=100000.000000 global sum=100000.000000
Thread 1 did 100000 to 200000: mysum=100000.000000 global sum=200000.000000
Thread 0 did 0 to 100000: mysum=100000.000000 global sum=300000.000000
Thread 2 did 200000 to 300000: mysum=100000.000000 global sum=400000.000000
Sum = 400000.000000

条件变量Condition Variable

互斥锁只有两种状态,这限制了他的用途,条件变量允许线程在阻塞的时候等待另一个线程发送的信号,当收到信号,阻塞的线程就被唤醒并试图锁定预支相关的互斥锁,条件变量要和互斥锁结合使用。

相关函数:

1
2
3
pthread_cond_wait(condition, mutex)
pthread_cond_signal(condition)
pthread_cond_broadcast(condition)

pthread_cond_wait() 会阻塞调用他的线程,直到收到某一信号。这个函数需要再mutex已经被锁之后进行调用,并且当线程被阻塞时,会自动解锁mutex。信号收到后,线程被唤醒,这时mutex又会被这个线程锁定。

pthread_cond_signal() 函数结束时,必须解锁mutex,以供pthread_cond_wait() 锁定mutex。

示例

下面是一个例子,三个线程共同访问count 变量,thread2 和thread3 竞争地对其进行加1的操作,thread1等count 达到12的时候,一次性加125. 然后thread2 和thread3再去竞争count 的控制权,直到完成自己的对count加10次的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#define NUM_THREADS 3
#define TCOUNT 10
#define COUNT_LIMIT 12

int count = 0;
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

void* inc_count(void* t) {
int i;
long my_id = (long)t;

for (i = 0; i < TCOUNT; i++) {
pthread_mutex_lock(&count_mutex);
count++;

if (count == COUNT_LIMIT) {
printf("inc_count(): thread %ld, count = %d Threshold reached.", my_id, count);
pthread_cond_signal(&count_threshold_cv);
printf("Just sent signal.\n");
}
printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count);
pthread_mutex_unlock(&count_mutex);

usleep(100);
}
pthread_exit(NULL);
}
void* watch_count(void* t) {
long my_id = (long)t;
printf("Starting watch_cont(): thread %ld\n", my_id);

pthread_mutex_lock(&count_mutex);
while (count < COUNT_LIMIT) {
printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id, count);
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id,
count);
printf("watch_count(): thread %ld Updating the value of count...\n", my_id, count);
count += 125;
printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
}
printf("watch_count(): thread %ld Unlocking mutex.\n", my_id);
pthread_mutex_unlock(&count_mutex);
pthread_exit(NULL);
}
int main() {
int i;
long t1 = 1, t2 = 2, t3 = 3;
pthread_t threads[NUM_THREADS];
pthread_attr_t attr;

pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&count_threshold_cv, NULL);

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void*)t1);
pthread_create(&threads[1], &attr, inc_count, (void*)t2);
pthread_create(&threads[2], &attr, inc_count, (void*)t3);

for (i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf("Main(): Waited and joined with %d threads. Final value of count = %d. Done."
"n", NUM_THREADS, count);

pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Starting watch_cont(): thread 1
inc_count(): thread 2, count = 1, unlocking mutex
inc_count(): thread 3, count = 2, unlocking mutex
watch_count(): thread 1 Count= 2. Going into wait...
inc_count(): thread 2, count = 3, unlocking mutex
inc_count(): thread 3, count = 4, unlocking mutex
inc_count(): thread 2, count = 5, unlocking mutex
inc_count(): thread 3, count = 6, unlocking mutex
inc_count(): thread 2, count = 7, unlocking mutex
inc_count(): thread 3, count = 8, unlocking mutex
inc_count(): thread 2, count = 9, unlocking mutex
inc_count(): thread 3, count = 10, unlocking mutex
inc_count(): thread 2, count = 11, unlocking mutex
inc_count(): thread 3, count = 12 Threshold reached.Just sent signal.
inc_count(): thread 3, count = 12, unlocking mutex
watch_count(): thread 1 Condition signal received. Count= 12
watch_count(): thread 1 Updating the value of count...
watch_count(): thread 1 count now = 137.
watch_count(): thread 1 Unlocking mutex.
inc_count(): thread 2, count = 138, unlocking mutex
inc_count(): thread 3, count = 139, unlocking mutex
inc_count(): thread 2, count = 140, unlocking mutex
inc_count(): thread 3, count = 141, unlocking mutex
inc_count(): thread 2, count = 142, unlocking mutex
inc_count(): thread 3, count = 143, unlocking mutex
inc_count(): thread 2, count = 144, unlocking mutex
inc_count(): thread 3, count = 145, unlocking mutex
Main(): Waited and joined with 3 threads. Final value of count = 145. Done.

Ref

pthread