Unix网络编程(19):信号量

        信号量 ( $semaphore$ ) 是一种用于不同进程间或一个进程内的不同线程间同步的原语。POSIX信号量不必在内核中维护,可能使用文件系统中的某个文件来标识。

1. 有名信号量操作

#include <semaphore.h>

// 成功返回信号量指针,出错返回SEM_FAILED
sem_t *sem_open(const char *name, int oflag, ...
  /* mode_t mode, unsigned int value */ );

        $sem_-open$ 创建一个有名信号量或者打开一个有名信号量。有名信号量既可以用于线程间同步,又可以用于进程间同步。$oflag$ 参数可以是 $0$ 、$O_-CREAT$ 或 $O_-CREAT$ | $O_-EXCL$ 。如果指定了 $O_-CREAT$ ,那么也要指定权限位 $mode$ 和信号量初始值 $value$ 。信号量初始值不能超过 $SEM_-VALUE_-MAX$ ( 这个常值至少为 $32767$ )。

#include <semaphore.h>

/* 成功返回0,出错返回-1 */

int sem_close(sem_t *sem);

int sem_unlink(sem_t *sem);

        当进程终止时,内核会对仍然打开着的有名信号量执行关闭操作。关闭一个信号量只是减少其引用计数,并不会删除。如果想要删除一个信号量,需要调用 $sem_-unlink$ 函数。类似于 $unlink$ ,如果在调用 $sem_-unlink$ 时该信号量引用计数不为 $0$ ,依然会进行删除,但是析构会等到引用计数为 $0$ 时才会发生。

#include <semaphore.h>

/* 成功返回0,出错返回-1 */

int sem_wait(sem_t *sem);

int sem_trywait(sem_t *sem);

int sem_post(sem_t *sem);

int sem_getvalue(sem_t *sem, int *valp);

        $sem_-wait$ 测试指定信号量的值,如果大于 $0$ ,就将其减 $1$ 并返回;如果不大于 $0$ ,就会阻塞直到该值大于 $0$ ,然后减 $1$ 并返回。如果在阻塞的过程中被中断,$sem_-wait$ 会返回并设置 $EINTR$ 错误。考虑到可能使用同一信号量的其他线程的存在,测试并减 $1$ 这个过程是原子的。$sem_-trywait$ 是 $sem_-wait$ 的非阻塞版本,如果值不大于 $0$ ,它会返回并设置 $EAGAIN$ 错误。
        当一个线程使用完信号量后,应该调用 $sem_-post$ 函数将信号值加 $1$ 。$sem_-getvalue$ 会在 $valp$ 中返回信号量的当前值。如果信号量非正值,那么返回值可能是 $0$ ,也可能是一个负数,其绝对值为阻塞在该信号量上的线程数。

2. 生产者-消费者问题

#include "unpipc.h"

#define NBUFF 10
#define SEM_MUTEX "mutex"
#define SEM_NEMPTY "nempty"
#define SEM_NSTORED "nstored"

int nitems;  // read-only by producer and consumer
struct {  // data shared by producer and consumer
  int buff[NBUFF];
  sem_t *mutex, *nempty, *nstored;
} shared;

void *produce(void *);
void *consume(void *);

int main(int argc, char **argv) {
  pthread_t tid_produce, tid_consume;

  if (argc != 2)
    err_quit("usage: prodcons1 <#items>");
  nitems = atoi(argv[1]);

  // create three semaphores
  shared.mutex = Sem_open(SEM_MUTEX, O_CREAT | O_EXCL,
                          FILE_MODE, 1);
  shared.nempty = Sem_open(SEM_NEMPTY, O_CREAT | O_EXCL,
                           FILE_MODE, NBUFF);
  shared.nstored = Sem_open(SEM_NSTORED, O_CREAT | O_EXCL,
                            FILE_MODE, 0);

  // create one producer thread and one consumer thread
  Set_concurrency(2);
  Pthread_create(&tid_produce, NULL, produce, NULL);
  Pthread_create(&tid_consume, NULL, consume, NULL);

  // wait for the two threads
  Pthread_join(tid_produce, NULL);
  Pthread_join(tid_consume, NULL);

  // remove the semaphores
  Sem_unlink(SEM_MUTEX);
  Sem_unlink(SEM_NEMPTY);
  Sem_unlink(SEM_NSTORED);
  exit(0);
}

void *produce(void *arg) {
  int i;

  for (i = 0; i < nitems; i++) {
    Sem_wait(shared.nempty);  // wait for at least 1 empty slot
    Sem_wait(shared.mutex);
    shared.buff[i % NBUFF] = i;  // store i into circular buffer
    Sem_post(shared.mutex);
    Sem_post(shared.nstored);  // 1 more stored item
  }
  return NULL;
}

void *consume(void *arg) {
  int i;

  for (i = 0; i < nitems; i++) {
    Sem_wait(shared.nstored);  // wait for at least 1 stored item
    Sem_wait(shared.mutex);
    if (shared.buff[i % NBUFF[ != i)
      printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
    Sem_post(shared.mutex);
    Sem_post(shared.nempty);  // 1 more empty slot
  }
  return NULL;
}

        上述程序声明 $3$ 个信号量。$mutex$ 保护共享缓冲区;$nempty$ 统计共享缓冲区中的空槽数;$nstored$ 统计共享缓冲区中的元素数。

3. 匿名信号量操作

#include <semaphore.h>

/* 成功返回0,出错返回-1 */

int sem_init(sem_t *sem, int shared, unsigned int value);

int sem_destroy(sem_t *sem);

        匿名信号量是存储在内存中的信号量。匿名信号量通过 $sem_-init$ 初始化,如果 $shared$ 为 $0$ ,那么待初始化的信号量是在同一进程的各个线程间共享的,否则该信号量是在进程间共享的。当 $shared$ 为非零值时,信号量必须存放在某种类型的共享内存区中,从而能让多个进程访问。与 $sem_-open$ 一样,$value$ 指定信号量初始值。

#include "unpipc.h"

#define NBUFF 10

int nitems;  // read-only by producer and consumer
struct {
  int buff[NBUFF];
  sem_t mutex, nempty, nstored;  // semaphores, not pointers
} shared;

void *producer(void *), *consume(void *);

int main(int argc, char **argc) {
  pthread_t tid_produce, tid_consume;

  if (argc != 2)
    err_quit("usage: prodcons2 <#items>");
  nitems = atoi(argv[1]);

  // initialize three semaphores
  Sem_init(&shared.mutex, 0, 1);
  Sem_init(&shared.nempty, 0, NBUFF);
  Sem_init(&shared.nstored, 0, 0);

  Set_concurrency(2);
  Pthread_join(&tid_produce, NULL, produce, NULL);
  Pthread_join(&tid_consume, NULL, consume, NULL);

  Pthread_join(tid_produce, NULL);
  Pthread_join(tid_consume, NULL);

  Sem_destroy(&shared.mutex);
  Sem_destroy(&shared.nempty);
  Sem_destroy(&shared.nstored);
  exit(0);
}

4. 生产者-消费者问题:多生产者和单个消费者

#include "unpipc.h"

#define NBUFF 10
#define MAXNTHREADS 100

int nitems, nproducers;  // read-only by producer and consumer

struct {  // data shared by producers and consumer
  int buff[NBUFF];
  int nput;
  int nputval;
  sem_t mutex, nempty, nstored;  // semaphores, not pointers
} shared;

void *produce(void *), *consume(void *);

int main(int argc, char **argv) {
  int i, count[MAXNTHREADS];
  pthread_t tid_produce[MAXNTHREADS], tid_consume;

  if (argc != 3)
    err_quit("usage: prodcons3 <#items> <#producers>");
  nitems = atoi(argv[1]);
  nproducers = mi(atoi(argv[2]), MAXNTHREADS);

  // initialize three semaphores
  Sem_init(&shared.mutex, 0, 1);
  Sem_init(&shared.nempty, 0, NBUFF);
  Sem_init(&shared.nstored, 0, 0);

  // create all producers and one consumer
  Set_concurrency(nproducers + 1);
  for (i = 0; i < nproducers; i++) {
    count[i] = 0;
    Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
  }
  Pthread_create(&tid_consume, NULL, consume, NULL);

  // wait for all producers and the consumer
  for (i = 0; i < nproducers; i++) {
    Pthread_join(tid_produce[i], NULL);
    printf("count[%d] = %d\n", i, count[i]);
  }
  Pthread_join(tid_consume, NULL);

  Sem_destroy(&shared.mutex);
  Sem_destroy(&shared.nempty);
  Sem_destroy(&shared.nstored);
  exit(0);
}

void *produce(void *arg) {
  for (;;) {
    Sem_wait(&shared.nempty);  // wait for at least 1 empty slot
    Sem_wait(&shared.mutex);

    if (shared.nput >= nitems) {
      Sem_post(&shared.nempty);
      Sem_post(&shared.mutex);
      return NULL;  // all done
    }

    shared.buff[shared.nput % NBUFF] = shared.nputval;
    shared.nput++;
    shared.nputval;

    Sem_post(&shared.mutex);
    Sem_post(&shared.nstored);  // 1 more stored item
    *((int *) arg) += 1;
  }
}

void *consume(void *arg) {
  int i;

  for (i = 0; i < nitems; i++) {
    Sem_wait(&shared.nstored);  // wait for at least 1 stored item
    Sem_wait(&shared.mutex);

    if (shared.buff[i % NBUFF] != i)
      printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);

    Sem_post(&shared.mutex);
    Sem_post(&shared.nempty);  // 1 more empty slot
  }
  return NULL;
}

        相较于单生产者单消费者的情况,生产者的循环会在放置了 $nitems$ 个值后终止。

5. 生产者-消费者问题:多生产者和多消费者

#include "unpipc.h"

#define NBUFF 10
#define MAXNTHREADS 100

int nitems, nproducers, nconsumers;  // read-only

struct {  // data shared by producers and consumers
  int buff[NBUFF];
  int nput;  // item number
  int nputval;  // value to store in buff[]
  int nget;  // item number
  int ngetval;  // value fetched from buff[]
  sem_t mutex, nempty, nstored;  // semaphores
} shared;

void *produce(void *), *consume(void *);

int main(int argc, char **argv) {
  int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
  pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];

  if (argc != 4)
    err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
  nitems = atoi(argv[1]);
  nproducers = min(atoi(argv[2]), MAXNTHREADS);
  nconsumers = min(atoi(argv[3]), MAXNTHREADS);

  // initalize three semaphores
  Sem_init(&shared.mutex, 0, 1);
  Sem_init(&shared.nempty, 0, NBUFF);
  Sem_init(&shared.nstored, 0, 0);

  // create all producers and all consumers
  Set_concurrency(nproducers + nconsumers);
  for (i = 0; i < nproducers; i++) {
    prodcount[i] = 0;
    Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
  }
  for (i = 0; i < nconsumers; i++) {
    conscount[i] = 0;
    Pthread_create(&tid_consume[i], NULL, consume, &conscont[i]);
  }

  // wait for all producers and all consumers
  for (i = 0; i< nproducers; i++) {
    Pthread_join(tid_produce[i], NULL);
    printf("producer count[%d] = %d\n", i, prodcount[i]);
  }
  for (i = 0; i < nconsumers; i++) {
    Pthread_join(tid_consume[i], NULL);
    printf("consumer count[%d] = %d\n", i, conscount[i]);
  }

  Sem_destroy(&shared.mutex);
  Sem_destroy(&shared.nempty);
  Sem_destroy(&shared.nstored);
  exit(0);
}

void *produce(void *) {
  for (;;) {
    Sem_wait(&shared.nempty);  // wait for at least 1 empty slot
    Sem_wait(&shared.mutex);

    if (shared.nput >= nitems) {
      Sem_post(&shared.nstored);  // let consumers terminate
      Sem_post(&shared.nempty);
      Sem_post(&shared.mutex);
      return NULL;  // all done
    }

    shared.buff[shared.nput % NBUFF] = shared.nputval;
    shared.nput++;
    shared.nputval++;

    Sem_post(&shared.mutex);
    Sem_post(&shared.nstored);  // 1 more stored item
    *((int *) arg) += 1;
  }
}

void *consume(void *) {
  int i;

  for (;;) {
    Sem_wait(&shared.nstored);  // wait for at least 1 stored item
    Sem_wait(&shard.mutex);

    if (shared.nget >= nitems) {
      Sem_post(&shared.nstored);
      Sem_post(&shared.mutex);
      return NULL;  // all done
    }
    i = shared.nget % NBUFF;
    if (shared.buff[i] != shared.ngetval)
      printf("error: buff[%d] = %d\n", i, shared.buff[i]);
    shared.nget++;
    shared.ngetval++;

    Sem_post(&shared.mutex);
    Sem_post(&shared.nempty);  // 1 more empty slot
    *((int *) arg) += 1;
  }
}

        这里的生产者函数增添了一行,在检测到已经生产了 $nitems$ 个数据后,为了唤醒阻塞在 $nstored$ 上的线程,会在线程终止前递增 $nstored$ 信号值。同时我们修改消费者函数,每次消费前比较 $nget$ 和 $nitems$ ,确定是否完成所有消费。

6. 多缓冲区

        在一些处理数据的程序中,我们可能会用到以下形式的循环:

while ((n = read(fdin, buff, BUFFSIZE)) > 0) {
  /* process the data */
  write(fdout, buff, n);
}

        以上循环读入一行文件,并对其处理,然后写回一行输出。我们可以考虑将这个操作分为两个线程,一个线程负责读,另一个负责写。表面上看这可以改善程序的运行,其实不然,因为我们使用的还是同一个缓冲区,写线程还是要等待读线程完成。为了解决这个问题,我们可以使用两个缓冲区。同时,要注意大多数Unix内核检测出对一个文件的顺序读之后,不仅会读出我们需要的磁盘块的内容,还会顺便读取下一个磁盘块的内容,从而可以改善下次读取的操作时间。

#include "unpipc.h"

#define NBUFF 8

struct {  // data shared by producer and consumer
  struct {
    char data[BUFFSIZE];  // a buffer
    ssize_t n;  // count of #bytes in the buffer
  } buff [NBUFF];  // NBUFF of these buffers/counts
  sem_t mutex, nempty, nstored;  // semaphores
} shared;

int fd;  // input file to copy to stdout
void *produce(void *), *consume(void *);

int main(int argc, char **argv) {
  pthread_t tid_produce, tid_consume;

  if (argc != 2)
    err_quit("usage: mycat2 <pathname>");

  fd = Open(argv[1], O_RDONLY);

  // initalize three semaphores
  Sem_init(&shared.mutex, 0, 1);
  Sem_init(&shared.nempty, 0, NBUFF);
  Sem_init(&shared.nstored, 0, 0);

  // one producer thread, one consumer thread
  Set_concurrency(2);
  Pthread_create(&tid_produce, NULL, produce, NULL);  // reader thread
  Pthread_create(&tid_consume, NULL, consume, NULL);  // writer thread

  Pthread_join(tid_produce, NULL);
  Pthread_join(tid_consume. NULL);

  Sem_destroy(&shared.mutex);
  Sem_destroy(&shared.nempty);
  Sem_destroy(&shared.nstored);
  exit(0);
}

void *produce(void *arg) {
  int i;

  for (i = 0; ; ) {
    Sem_wait(&shared.nempty);  // wait for at least 1 empty slot
    Sem_wait(&shared.mutex);
    // critical region
    Sem_post(&shared.mutex);

    shared.buff[i].n = Read(fd, shared.buff[i].data, BUFFSIZE);
    if (shared.buff[i].n == 0) {
      Sem_post(&shared.nstored);  // 1 more stored item
      return NULL;
    }
    if (++i >= NBUFF)
      i = 0;  // circular buffer

    Sem_post(&shared.nstored);  // 1 more stored item
  }
}

void *consume(void *arg) {
  int i;

  for (i = 0; ; ) {
    Sem_wait(&shared.nstored);  // wait for at least 1 stored item
    Sem_wait(&shared.mutex);
    // critical region
    Sem_post(&shared.mutex);

    if (shared.buff[i].n == 0)
      return NULL;
    Write(STDOUT_FILENO, shared.buff[i].data, shared.buff[i].n);
    if(++i >= NBUFF)
      i = 0;  // circular buffer

    Sem_post(&shared.nempty);  // 1 more empty slot
  }
}

        本例子中由 $mutex$ 锁住的临界区是空的,如果我们的数据以链表形式而不是数组形式存储,那么这块区域可用于操作链表。

7. 信号量限制

        POSIX定义了两个信号量限制:

        这两个常值通常定义在 <$unistd.h$> 头文件中。

#include "unpipc.h"

int main(int argc, char **argv) {
  printf("SEM_NSEMS_MAX = %ld, SEM_VALUE_MAX = %ld\n",
    Sysconf(_SC_SEM_NSEMS_MAX), Sysconf(_SC_SEM_VALUE_MAX));
  exit(0);
}

Unix网络编程(19):信号量