Unix网络编程(19):信号量
信号量 ( $semaphore$ ) 是一种用于不同进程间或一个进程内的不同线程间同步的原语。POSIX
信号量不必在内核中维护,可能使用文件系统中的某个文件来标识。
1. 有名信号量操作
#include <semaphore.h>
// 成功返回信号量指针,出错返回SEM_FAILED
sem_t *sem_open(const char *name, int oflag, ...
/* mode_t mode, unsigned int value */ );
$sem_-open$ 创建一个有名信号量或者打开一个有名信号量。有名信号量既可以用于线程间同步,又可以用于进程间同步。$oflag$ 参数可以是 $0$ 、$O_-CREAT$ 或 $O_-CREAT$ | $O_-EXCL$ 。如果指定了 $O_-CREAT$ ,那么也要指定权限位 $mode$ 和信号量初始值 $value$ 。信号量初始值不能超过 $SEM_-VALUE_-MAX$ ( 这个常值至少为 $32767$ )。
#include <semaphore.h>
/* 成功返回0,出错返回-1 */
int sem_close(sem_t *sem);
int sem_unlink(sem_t *sem);
当进程终止时,内核会对仍然打开着的有名信号量执行关闭操作。关闭一个信号量只是减少其引用计数,并不会删除。如果想要删除一个信号量,需要调用 $sem_-unlink$ 函数。类似于 $unlink$ ,如果在调用 $sem_-unlink$ 时该信号量引用计数不为 $0$ ,依然会进行删除,但是析构会等到引用计数为 $0$ 时才会发生。
#include <semaphore.h>
/* 成功返回0,出错返回-1 */
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_getvalue(sem_t *sem, int *valp);
$sem_-wait$ 测试指定信号量的值,如果大于 $0$ ,就将其减 $1$ 并返回;如果不大于 $0$ ,就会阻塞直到该值大于 $0$ ,然后减 $1$ 并返回。如果在阻塞的过程中被中断,$sem_-wait$ 会返回并设置 $EINTR$ 错误。考虑到可能使用同一信号量的其他线程的存在,测试并减 $1$ 这个过程是原子的。$sem_-trywait$ 是 $sem_-wait$ 的非阻塞版本,如果值不大于 $0$ ,它会返回并设置 $EAGAIN$ 错误。
当一个线程使用完信号量后,应该调用 $sem_-post$ 函数将信号值加 $1$ 。$sem_-getvalue$ 会在 $valp$ 中返回信号量的当前值。如果信号量非正值,那么返回值可能是 $0$ ,也可能是一个负数,其绝对值为阻塞在该信号量上的线程数。
2. 生产者-消费者问题
#include "unpipc.h"
#define NBUFF 10
#define SEM_MUTEX "mutex"
#define SEM_NEMPTY "nempty"
#define SEM_NSTORED "nstored"
int nitems; // read-only by producer and consumer
struct { // data shared by producer and consumer
int buff[NBUFF];
sem_t *mutex, *nempty, *nstored;
} shared;
void *produce(void *);
void *consume(void *);
int main(int argc, char **argv) {
pthread_t tid_produce, tid_consume;
if (argc != 2)
err_quit("usage: prodcons1 <#items>");
nitems = atoi(argv[1]);
// create three semaphores
shared.mutex = Sem_open(SEM_MUTEX, O_CREAT | O_EXCL,
FILE_MODE, 1);
shared.nempty = Sem_open(SEM_NEMPTY, O_CREAT | O_EXCL,
FILE_MODE, NBUFF);
shared.nstored = Sem_open(SEM_NSTORED, O_CREAT | O_EXCL,
FILE_MODE, 0);
// create one producer thread and one consumer thread
Set_concurrency(2);
Pthread_create(&tid_produce, NULL, produce, NULL);
Pthread_create(&tid_consume, NULL, consume, NULL);
// wait for the two threads
Pthread_join(tid_produce, NULL);
Pthread_join(tid_consume, NULL);
// remove the semaphores
Sem_unlink(SEM_MUTEX);
Sem_unlink(SEM_NEMPTY);
Sem_unlink(SEM_NSTORED);
exit(0);
}
void *produce(void *arg) {
int i;
for (i = 0; i < nitems; i++) {
Sem_wait(shared.nempty); // wait for at least 1 empty slot
Sem_wait(shared.mutex);
shared.buff[i % NBUFF] = i; // store i into circular buffer
Sem_post(shared.mutex);
Sem_post(shared.nstored); // 1 more stored item
}
return NULL;
}
void *consume(void *arg) {
int i;
for (i = 0; i < nitems; i++) {
Sem_wait(shared.nstored); // wait for at least 1 stored item
Sem_wait(shared.mutex);
if (shared.buff[i % NBUFF[ != i)
printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
Sem_post(shared.mutex);
Sem_post(shared.nempty); // 1 more empty slot
}
return NULL;
}
上述程序声明 $3$ 个信号量。$mutex$ 保护共享缓冲区;$nempty$ 统计共享缓冲区中的空槽数;$nstored$ 统计共享缓冲区中的元素数。
3. 匿名信号量操作
#include <semaphore.h>
/* 成功返回0,出错返回-1 */
int sem_init(sem_t *sem, int shared, unsigned int value);
int sem_destroy(sem_t *sem);
匿名信号量是存储在内存中的信号量。匿名信号量通过 $sem_-init$ 初始化,如果 $shared$ 为 $0$ ,那么待初始化的信号量是在同一进程的各个线程间共享的,否则该信号量是在进程间共享的。当 $shared$ 为非零值时,信号量必须存放在某种类型的共享内存区中,从而能让多个进程访问。与 $sem_-open$ 一样,$value$ 指定信号量初始值。
#include "unpipc.h"
#define NBUFF 10
int nitems; // read-only by producer and consumer
struct {
int buff[NBUFF];
sem_t mutex, nempty, nstored; // semaphores, not pointers
} shared;
void *producer(void *), *consume(void *);
int main(int argc, char **argc) {
pthread_t tid_produce, tid_consume;
if (argc != 2)
err_quit("usage: prodcons2 <#items>");
nitems = atoi(argv[1]);
// initialize three semaphores
Sem_init(&shared.mutex, 0, 1);
Sem_init(&shared.nempty, 0, NBUFF);
Sem_init(&shared.nstored, 0, 0);
Set_concurrency(2);
Pthread_join(&tid_produce, NULL, produce, NULL);
Pthread_join(&tid_consume, NULL, consume, NULL);
Pthread_join(tid_produce, NULL);
Pthread_join(tid_consume, NULL);
Sem_destroy(&shared.mutex);
Sem_destroy(&shared.nempty);
Sem_destroy(&shared.nstored);
exit(0);
}
4. 生产者-消费者问题:多生产者和单个消费者
#include "unpipc.h"
#define NBUFF 10
#define MAXNTHREADS 100
int nitems, nproducers; // read-only by producer and consumer
struct { // data shared by producers and consumer
int buff[NBUFF];
int nput;
int nputval;
sem_t mutex, nempty, nstored; // semaphores, not pointers
} shared;
void *produce(void *), *consume(void *);
int main(int argc, char **argv) {
int i, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons3 <#items> <#producers>");
nitems = atoi(argv[1]);
nproducers = mi(atoi(argv[2]), MAXNTHREADS);
// initialize three semaphores
Sem_init(&shared.mutex, 0, 1);
Sem_init(&shared.nempty, 0, NBUFF);
Sem_init(&shared.nstored, 0, 0);
// create all producers and one consumer
Set_concurrency(nproducers + 1);
for (i = 0; i < nproducers; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);
// wait for all producers and the consumer
for (i = 0; i < nproducers; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);
Sem_destroy(&shared.mutex);
Sem_destroy(&shared.nempty);
Sem_destroy(&shared.nstored);
exit(0);
}
void *produce(void *arg) {
for (;;) {
Sem_wait(&shared.nempty); // wait for at least 1 empty slot
Sem_wait(&shared.mutex);
if (shared.nput >= nitems) {
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return NULL; // all done
}
shared.buff[shared.nput % NBUFF] = shared.nputval;
shared.nput++;
shared.nputval;
Sem_post(&shared.mutex);
Sem_post(&shared.nstored); // 1 more stored item
*((int *) arg) += 1;
}
}
void *consume(void *arg) {
int i;
for (i = 0; i < nitems; i++) {
Sem_wait(&shared.nstored); // wait for at least 1 stored item
Sem_wait(&shared.mutex);
if (shared.buff[i % NBUFF] != i)
printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
Sem_post(&shared.mutex);
Sem_post(&shared.nempty); // 1 more empty slot
}
return NULL;
}
相较于单生产者单消费者的情况,生产者的循环会在放置了 $nitems$ 个值后终止。
5. 生产者-消费者问题:多生产者和多消费者
#include "unpipc.h"
#define NBUFF 10
#define MAXNTHREADS 100
int nitems, nproducers, nconsumers; // read-only
struct { // data shared by producers and consumers
int buff[NBUFF];
int nput; // item number
int nputval; // value to store in buff[]
int nget; // item number
int ngetval; // value fetched from buff[]
sem_t mutex, nempty, nstored; // semaphores
} shared;
void *produce(void *), *consume(void *);
int main(int argc, char **argv) {
int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
if (argc != 4)
err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
nitems = atoi(argv[1]);
nproducers = min(atoi(argv[2]), MAXNTHREADS);
nconsumers = min(atoi(argv[3]), MAXNTHREADS);
// initalize three semaphores
Sem_init(&shared.mutex, 0, 1);
Sem_init(&shared.nempty, 0, NBUFF);
Sem_init(&shared.nstored, 0, 0);
// create all producers and all consumers
Set_concurrency(nproducers + nconsumers);
for (i = 0; i < nproducers; i++) {
prodcount[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
}
for (i = 0; i < nconsumers; i++) {
conscount[i] = 0;
Pthread_create(&tid_consume[i], NULL, consume, &conscont[i]);
}
// wait for all producers and all consumers
for (i = 0; i< nproducers; i++) {
Pthread_join(tid_produce[i], NULL);
printf("producer count[%d] = %d\n", i, prodcount[i]);
}
for (i = 0; i < nconsumers; i++) {
Pthread_join(tid_consume[i], NULL);
printf("consumer count[%d] = %d\n", i, conscount[i]);
}
Sem_destroy(&shared.mutex);
Sem_destroy(&shared.nempty);
Sem_destroy(&shared.nstored);
exit(0);
}
void *produce(void *) {
for (;;) {
Sem_wait(&shared.nempty); // wait for at least 1 empty slot
Sem_wait(&shared.mutex);
if (shared.nput >= nitems) {
Sem_post(&shared.nstored); // let consumers terminate
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return NULL; // all done
}
shared.buff[shared.nput % NBUFF] = shared.nputval;
shared.nput++;
shared.nputval++;
Sem_post(&shared.mutex);
Sem_post(&shared.nstored); // 1 more stored item
*((int *) arg) += 1;
}
}
void *consume(void *) {
int i;
for (;;) {
Sem_wait(&shared.nstored); // wait for at least 1 stored item
Sem_wait(&shard.mutex);
if (shared.nget >= nitems) {
Sem_post(&shared.nstored);
Sem_post(&shared.mutex);
return NULL; // all done
}
i = shared.nget % NBUFF;
if (shared.buff[i] != shared.ngetval)
printf("error: buff[%d] = %d\n", i, shared.buff[i]);
shared.nget++;
shared.ngetval++;
Sem_post(&shared.mutex);
Sem_post(&shared.nempty); // 1 more empty slot
*((int *) arg) += 1;
}
}
这里的生产者函数增添了一行,在检测到已经生产了 $nitems$ 个数据后,为了唤醒阻塞在 $nstored$ 上的线程,会在线程终止前递增 $nstored$ 信号值。同时我们修改消费者函数,每次消费前比较 $nget$ 和 $nitems$ ,确定是否完成所有消费。
6. 多缓冲区
在一些处理数据的程序中,我们可能会用到以下形式的循环:
while ((n = read(fdin, buff, BUFFSIZE)) > 0) {
/* process the data */
write(fdout, buff, n);
}
以上循环读入一行文件,并对其处理,然后写回一行输出。我们可以考虑将这个操作分为两个线程,一个线程负责读,另一个负责写。表面上看这可以改善程序的运行,其实不然,因为我们使用的还是同一个缓冲区,写线程还是要等待读线程完成。为了解决这个问题,我们可以使用两个缓冲区。同时,要注意大多数Unix
内核检测出对一个文件的顺序读之后,不仅会读出我们需要的磁盘块的内容,还会顺便读取下一个磁盘块的内容,从而可以改善下次读取的操作时间。
#include "unpipc.h"
#define NBUFF 8
struct { // data shared by producer and consumer
struct {
char data[BUFFSIZE]; // a buffer
ssize_t n; // count of #bytes in the buffer
} buff [NBUFF]; // NBUFF of these buffers/counts
sem_t mutex, nempty, nstored; // semaphores
} shared;
int fd; // input file to copy to stdout
void *produce(void *), *consume(void *);
int main(int argc, char **argv) {
pthread_t tid_produce, tid_consume;
if (argc != 2)
err_quit("usage: mycat2 <pathname>");
fd = Open(argv[1], O_RDONLY);
// initalize three semaphores
Sem_init(&shared.mutex, 0, 1);
Sem_init(&shared.nempty, 0, NBUFF);
Sem_init(&shared.nstored, 0, 0);
// one producer thread, one consumer thread
Set_concurrency(2);
Pthread_create(&tid_produce, NULL, produce, NULL); // reader thread
Pthread_create(&tid_consume, NULL, consume, NULL); // writer thread
Pthread_join(tid_produce, NULL);
Pthread_join(tid_consume. NULL);
Sem_destroy(&shared.mutex);
Sem_destroy(&shared.nempty);
Sem_destroy(&shared.nstored);
exit(0);
}
void *produce(void *arg) {
int i;
for (i = 0; ; ) {
Sem_wait(&shared.nempty); // wait for at least 1 empty slot
Sem_wait(&shared.mutex);
// critical region
Sem_post(&shared.mutex);
shared.buff[i].n = Read(fd, shared.buff[i].data, BUFFSIZE);
if (shared.buff[i].n == 0) {
Sem_post(&shared.nstored); // 1 more stored item
return NULL;
}
if (++i >= NBUFF)
i = 0; // circular buffer
Sem_post(&shared.nstored); // 1 more stored item
}
}
void *consume(void *arg) {
int i;
for (i = 0; ; ) {
Sem_wait(&shared.nstored); // wait for at least 1 stored item
Sem_wait(&shared.mutex);
// critical region
Sem_post(&shared.mutex);
if (shared.buff[i].n == 0)
return NULL;
Write(STDOUT_FILENO, shared.buff[i].data, shared.buff[i].n);
if(++i >= NBUFF)
i = 0; // circular buffer
Sem_post(&shared.nempty); // 1 more empty slot
}
}
本例子中由 $mutex$ 锁住的临界区是空的,如果我们的数据以链表形式而不是数组形式存储,那么这块区域可用于操作链表。
7. 信号量限制
POSIX
定义了两个信号量限制:
- $SEM_-NSEMS_-MAX$ :一个进程可同时打开着的最大信号量 (
POSIX
要求至少为 $256$ ); - $SEM_-VALUE_-MAX$ :一个信号量的最大值 (
POSIX
要求至少为 $32767$ )。
这两个常值通常定义在 <$unistd.h$> 头文件中。
#include "unpipc.h"
int main(int argc, char **argv) {
printf("SEM_NSEMS_MAX = %ld, SEM_VALUE_MAX = %ld\n",
Sysconf(_SC_SEM_NSEMS_MAX), Sysconf(_SC_SEM_VALUE_MAX));
exit(0);
}