20230610semaphore

Jun 10, 2023

Semaphore 信号量

作用是在多线程环境下使用的一种方案,用来保证两个或多个关键代码不被并发调用。比如在当前线程某个任务完成后,通知别的线程继续完成任务。

二值信号量: 信号值只有0和1,这和互斥量很类似,若信号量的值为0,则资源被锁住;若信号量的值为1,则资源可用。

计数信号量: 信号量的值在0到一个大于1的限制值之间,该计数表示可用的资源的个数。

信号量在创建时需要设置一个初始值,表示同时可以有几个任务可以访问该信号量保护的共享资源,初始值为1就变成互斥锁Mutex,即同时只能有一个任务可以访问信号量保护的共享资源。

本文所学来自于Ref

这里同时注意,Mac使用semaphore.h 时会报错: Function not implemented. Mac正确的做法是使用dispatch/dispatch.h

函数使用

1
2
3
4
5
6
7
8
// 初始化信号量,如果pshared=0,表示信号量是当前进程的局部信号量,否则信号量就可以在多个进程间共享
int sem_init(sem_t* sem, int pshared, unsigned int value);
// 信号量的值加1
int sem_post(sem_t* sem);
//信号量的值-1
int sem_wait(sem_t* sem);
// 销毁
int sem_destroy(sem_t* sem);

为了解决平台差异,新建my_semaphore 处理不同平台的处理方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#ifdef __APPLE__
#include <dispatch/dispatch.h>
#else
#include <semaphore.h>
#endif

struct ak_sema {
#ifdef __APPLE__
dispatch_semaphore_t sem;
#else
sem_t sem;
#endif
};
void ak_sema_init(struct ak_sema* s, uint32_t value) {
#ifdef __APPLE__
dispatch_semaphore_t* sem = &s->sem;
*sem = dispatch_semaphore_create(value);
#else
sem_init(&s->sem, 0, value);
#endif
}
void ak_sema_wait(struct ak_sema* s) {
#ifdef __APPLE__
dispatch_semaphore_wait(s->sem, DISPATCH_TIME_FOREVER);
#else
int r;
do {
r = sem_wait(&s->sem);
} while (r == -1 && errno == EINTR);
#endif
}
void ak_sema_post(struct ak_sema* s) {
#ifdef __APPLE__
dispatch_semaphore_signal(s->sem);
#else
sem_post(&s->sem);
#endif
}
void ak_sema_destroy(struct ak_sema* s) {
#ifdef __APPLE__
dispatch_release(s->sem);
#else
sem_destroy(s->sem);
#endif
}

示例1: 进行三个下载任务,但是最多选择同时执行两个(同时两个线程执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <my_semaphore.h>
#include <pthread.h>

#define MAXNUM 2
ak_sema my_semaphore;
pthread_t a_thread, b_thread, c_thread;
int g_pthreadNum = 1;

void InputInfo(void)
{
printf("****************************************\n");
printf("*** which task you want to download? ***\n");
printf("*** you can enter [1-3],[0] is done ***\n");
printf("****************************************\n");
}
void* func1(void* arg) {
ak_sema_wait(&my_semaphore);
printf("============== Downloading Task 1 ============== \n");
sleep(5);
printf("============== Finished Task 1 ============== \n");
g_pthreadNum--;
}
void* func2(void* arg) {
ak_sema_wait(&my_semaphore);
printf("============== Downloading Task 2 ============== \n");
sleep(3);
printf("============== Finished Task 2 ============== \n");
g_pthreadNum--;
}
void* func3(void* arg) {
ak_sema_wait(&my_semaphore);
printf("============== Downloading Task 3 ============== \n");
sleep(1);
printf("============== Finished Task 3 ============== \n");
g_pthreadNum--;
}

void two_thread_concurrently_run_test() {
int ret;
int taskNum;
InputInfo();
ak_sema_init(&my_semaphore, 0);
if (ret != 0) {
printf("error, sem_init failed: %s!\n", strerror(errno));
}
while (scanf("%d", &taskNum) != EOF) {
if (taskNum == 0) {
if (g_pthreadNum <= 1) {
break;
} else {
printf("Can not quit, cause count of threads is [%d]\n", g_pthreadNum);
}
}
printf("your choose Downloading Task [%d]\n", taskNum);

if (g_pthreadNum > MAXNUM) {
printf("!!! You've reached a limit on the number of threads!!!\n");
continue;
}
switch (taskNum) {
case 1:
pthread_create(&a_thread, NULL, func1, NULL);
ak_sema_post(&my_semaphore);
g_pthreadNum++;
break;
case 2:
pthread_create(&b_thread, NULL, func2, NULL);
ak_sema_post(&my_semaphore);
g_pthreadNum++;
break;
case 3:
pthread_create(&c_thread, NULL, func3, NULL);
ak_sema_post(&my_semaphore);
g_pthreadNum++;
break;
default:
printf("!!! error task [%d] !!!\n", taskNum);
break;
}
}
ak_sema_destroy(&my_semaphore);
}

示例2: 单生产者单消费者模型

一个线程生产,另一个线程消费,线程A生产完后立马给线程B发信号,线程B收到后立即回应线程A,告诉它我收到了,发送100000次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#define log_func_one(fmt) printf("[%s]", __FUNCTION__); printf(fmt);
#define log_func(fmt, ...) printf("[%s]", __FUNCTION__); printf(fmt, __VA_ARGS__);
#define LTM (3000)

static ak_sem sem, sem_R;
static int count = 0;
static int count_R = 0;
static void* f_producer(void* arg) {
Sleep(1000);
int ret;
while (1) {
log_func_one("produce start\n");
log_func_one("produce finish, sem_post\n");

ak_sem_post(&sem);
count++;
putchar('\n');
log_func("count = %d\n", count);
if (count == LTM) {
log_func_one("break;\n");
break;
}
log_func_one("waiting for semR\n");
ak_sem_wait(&sem_R);
log_func_one("received sem_R\n");
}
}
static void* f_consumer(void* arg) {
Sleep(1000);
while (1) {
log_func_one("waiting for sem\n");
ak_sem_wait(&sem);
log_func_one("received sem\n");
count_R++;
putchar('\n');
log_func("count_R = %d\n", count_R);
if (count_R == LTM) {
log_func_one("break;\n");
break;
}
log_func_one("return start\n");
ak_sem_post(&sem_R);
}
}
static void semaphore_test() {
pthread_t consumer, producer;
int ret;

ak_sem_init(&sem, 0);
ak_sem_init(&sem_R, 0);

pthread_create(&producer, NULL, f_producer, NULL);
pthread_create(&consumer, NULL, f_consumer, NULL);

ret = pthread_join(producer, NULL);
if (ret != 0) {
log_func("error, producer join failed, ret = %d\n", ret);
}
ret = pthread_join(consumer, NULL);
if (ret != 0) {
log_func("error, consumer join failed, ret = %d\n", ret);
}
ak_sem_destroy(&sem);
ak_sem_destroy(&sem_R);
}

Ref

semaphore