Unix网络编程(14):线程

        在传统的Unix模型中,当一个进程需要另一个实体来完成某事时,它就 $fork$ 一个子进程并让子进程去处理。尽管这种范式多年来一直应用得挺好,但是 $fork$ 调用却存在一些问题:

        线程有助于解决这两个问题,它的创建可能比进程快 $10 \sim 100$ 倍,而且同一进程内的线程共享相同的全局内存。然而,线程的引入也会带来同步 ( $synchronization$ ) 问题。同一进程内的所有线程除了共享全局变量之外还共享:

        每个线程也有各自的:

1. 创建和终止

1.1 pthread_create函数

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_create(pthread_t *tid, const pthread_attr_t *attr,
  void *(*func)(void *), void *arg);

        当一个程序由 $exec$ 启动时,称为初始线程 ( $initial$ $thread$ ) 或主线程 ( $main$ $thread$ ) 的单个线程就创建了,其余线程则由 $pthread_-create$ 函数创建。一个进程内每个线程都由线程ID标识,数据类型为 $pthread_-t$ 。如果新线程成功创建,ID会通过 $tid$ 返回。每个线程都有属性:优先级、初始栈大小、是否应该成为守护线程等,可以通过 $attr$ 参数指定。通常情况下我们采取默认配置,这时会把 $attr$ 设置为 $NULL$ 。最后两个参数是线程执行的函数以及参数,线程通过调用这个函数开始执行,然后显式终止 ( $pthread_-exit$ ) 或者隐式终止 ( 函数返回 ) 。如果我们需要传递多个参数,需要把它们打包成一个结构,通过结构指针传递。

1.2 pthread_join函数

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_join(pthread_t *tid, void **status);

        $pthread_-join$ 等待一个给定线程的终止,$tid$ 指定线程ID。$pthread_-join$ 不能像 $waitpid$ 那样等待任意一个线程终止。如果 $status$ 非空,来自所等待线程的返回值将存入 $status$ 指向的位置。

1.3 pthread_self函数

#include <pthread.h>

// 返回调用线程的线程ID
pthread_t pthread_self(void);

        每个线程都有一个在所属进程内标识自身的ID,由 $pthread_-create$ 返回。每个线程通过 $pthread_-self$ 可以获取自身ID

1.4 pthread_detach函数

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_detach(pthread_t tid);

        一个线程或者是可汇合的 ( $joinable$ ),或者是脱离的 ( $datached$ ),默认情况下是可汇合的。当一个可汇合的线程终止时,它的线程ID和退出状态将留存到另一个线程对它调用 $pthread_-join$ 。脱离的线程像守护进程,终止时所有资源都被释放,不能通过 $pthread_-join$ 等待终止。$pthread_-detach$ 函数负责将指定线程变为脱离状态。

1.5 pthread_exit函数

#include <pthread.h>

void pthread_exit(void *status);

        $pthread_-exit$ 让一个线程终止。如果线程可汇合,它的线程ID和退出状态将保持到调用进程内某个其他线程对它调用 $pthread_-join$ 。

2. 使用线程的TCP回射服务器程序

#include "unpthread.h"

static void *doit(void *);  // each thread executes this function

int main(int argc, char **argv) {
  int listenfd, connfd;
  pthread_t tid;
  socklen_t addrlen, len;
  struct sockaddr *cliaddr;

  if (argc == 2)
    listenfd = Tcp_listen(NULL, argv[1], &addrlen);
  else if (argc == 3)
    listenfd == Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: tcpserv01 [ <host> ] <service or port>");

  cliaddr = Malloc(addrlen);
  for (;;) {
    len = addrlen;
    connfd = Accept(listenfd, cliaddr, &len);
    Pthread_create(&tid, NULL, &doit, (void *) connfd);
  }
}

stativ void *doit(void *arg) {
  Pthread_detach(pthrea_self());
  str_echo((int) arg);  // same function as before
  Close((int) arg);  // done with connected socket
  return NULL;
}

        $accept$ 返回后,不是使用 $fork$ 而是使用 $pthread_-create$ ,传递给 $doit$ 函数的是已连接套接字描述符 $connfd$ 。在 $doit$ 函数中,线程先让自身脱离,因为主线程不会等待其创建的线程。函数返回后,我们就 $close$ 已连接套接字。因为线程共享描述符,所以需要主动 $close$ 已连接描述符;使用 $fork$ 时,由于创建的是进程,进程会在退出时关闭所有打开的描述符,所以使用 $fork$ 时不必手动关闭已连接描述符。还要注意,线程不会 $close$ 监听套接字,因为线程间描述符都是共享的。
        在 $doit$ 函数中,我们简单地把 $connfd$ 传递给新线程。从ANSI C的角度来看是可以的,ANSI C保证我们能够把一个整数指针类型强制转为 $void$ $\star$ 并重新转回来。但是要注意的是,主线程中只有一个 $connfd$ 变量,每次调用 $accept$ 都会产生一个新值。为了解决这个问题,我们修改程序:

#include "unpthread.h"

static void *doit(void *);  // each thread executes this function

int main(int argc, char **argv) {
  int listenfd, connfd;
  pthread_t tid;
  socklen_t addrlen, len;
  struct sockaddr *cliaddr;

  if (argc == 2)
    listenfd = Tcp_listen(NULL, argv[1], &addrlen);
  else if (argc == 3)
    listenfd == Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: tcpserv01 [ <host> ] <service or port>");

  cliaddr = Malloc(addrlen);
  for (;;) {
    len = addrlen;
    iptr = Malloc(sizeof(int));
    *iptr = Accept(listenfd, cliaddr, &len);
    Pthread_create(&tid, NULL, &doit, iptr);
  }
}

stativ void *doit(void *arg) {
  int connfd;

  connfd = *((int *) arg);
  free(arg);

  Pthread_detach(pthrea_self());
  str_echo((int) arg);  // same function as before
  Close((int) arg);  // done with connected socket
  return NULL;
}

        $malloc$ 和 $free$ 是不可重入的。换句话说,在主线程正处于这两个函数之一的内部处理期间,从某个信号处理函数中调用这两个函数之一有可能导致不可预知的后果。

2.1 线程安全函数

不必线程安全的版本 必须线程安全的版本 备注
$asctime$ $asctime_-r$
$ctermid$ 仅当参数非空时才是线程安全的
$ctime$ $ctime_-r$
$getc_-unlocked$
$getchar_-unlocked$
$getgrid$ $getgrid_-r$
$getgrnam$ $getgrnam_-r$
$getlogin$ $getlogin_-r$
$getpwnam$ $getpwnam_-r$
$getpwuid$ $getpwuid_-r$
$gmtime$ $gmtime_-r$
$localtime$ $localtime_-r$
$putc_-unlocked$
$putchar_-unlocked$
$rand$ $rand_-r$
$readdir$ $readdir_-r$
$strtok$ $strtok_-r$
$tmpnam$ 仅当参数非空时才是线程安全的
$ttyname$ $ttyname_-r$
$gethostXXX$
$getnetXXX$
$getprotoXXX$
$getservXXX$
$inet_-ntoa$

        $ctermid$ 和 $tmpnam$ 的线程安全条件是:调用者为返回结果预先分配空间,并把指向该空间的指针作为参数传递给函数。

3. 线程特定数据

        把一个未线程化的程序转换成使用线程的版本时,有时会碰到因其中有函数使用静态变量而引起的错误。这个错误是在将现有的函数转换成在线程环境中运行时常碰到的问题,并有多个解决办法。

        使用线程特定数据是使得现有函数变为线程安全的一个常用技巧。每个系统支持有限数量的线程特定数据元素,POSIX要求这个限制不少于 $128$ ( 每个进程 )。系统 ( 可能是线程函数库 ) 为每个进程维护一个我们称之为 $Key$ 结构的结构体数组。$Key$ 结构的标志指示这个数组元素是否正在被使用,所有的标志初始值都是不被使用。当一个线程调用 $pthread_-key_-create$ 创建一个新的线程特定数据元素时,系统搜索其 $Key$ 结构数组找出第一个不在使用的元素,返回给调用线程该元素的索引。除了进程范围的 $Key$ 结构数组之外,系统还在进程内维护关于每个线程的多条信息,我们称为 $Pthread$ 结构,其部分内容是名为 $pkey$ 的一个 $128$ 个元素的指针数组。$pkey$ 数组的所有元素都被初始化为空指针,这 $128$ 个指针和进程内 $Key$ 结构数组的 $128$ 个键关联。
        我们调用 $pthread_-key_-create$ 创建一个键,系统会告诉我们这个键的索引。每个线程随后为该键存储一个值 ( 指针 ),这个指针通常又是每个线程通过调用 $malloc$ 获得的。线程特定数据中容易混淆的地方之一是:该指针是键-值对中的值,但是真正的线程特定数据可以是该指针指向的任意内容。

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_key_create(pthread_key_t *keyptr, void (*destructor) (void *value));

// 成功返回0,出错返回非负错误码
int pthread_once(pthread_once_t *onceptr, void (*init) (void));

// 返回指向线程特定数据的指针,可能为空
void *pthread_getspecific(pthread_key_t key);

// 成功返回0,出错返回非负错误码
int pthread_setspecific(pthread_key_t key, const void *value);

        $pthread_-once$ 使用 $onceptr$ 参数指向的变量中的值,确保 $init$ 参数所指的函数在进程内只被调用一次。在进程范围内对于一个给定的键,$pthread_-key_-create$ 只能被调用一次,所创建的键通过 $keyptr$ 返回,如果 $destructor$ 不为空,它所指的函数将由为该键存放过某个值的每个线程在终止时调用。

#include "unpthread.h"

static pthread_key_t rl_key;
static pthread_once_t rl_once = PTHREAD_ONCE_INIT;  // 0

static void readline_desctructor(void *ptr) {
  free(ptr);
}

static void readline_once(void) {
  Pthread_key_create(&rl_key, readline_destructor);
}

typedef struct {
  int rl_cntl;  // initialize to 0
  char *rl_bufptr;  // initialize to rl_buf
  char rl_buf[MAXLINE];
} Rline;

static ssize_t my_read(Rline *tsd, int fd, char *ptr) {
  if (tsd->rl_cnt <= 0) {
again:
    if ((tsd->rl_cnt = read(fd, tsd->rl_buf, MAXLINE)) < 0) {
      if (errno == EINTR)
        goto again;
      return -1;
    } else if (tsd->rl_cnt == 0)
      return 0;
    tsd->rl_bufptr = tsd->rl_buf;
  }

  tsd->rl_cnt--;
  *ptr = *tsd->rl_bufptr++;
  return 1;
}

ssize_t readline(int fd, void *vptr, size_t maxlen) {
  size_t n, rc;
  char c, *ptr;
  Rline *tsd;

  Pthread_once(&rl_once, readline_once);
  if ((tsd = pthread_getspecific(rl_key)) == NULL) {
    tsd = Calloc(1, sizeof(Rline));  // init to 0
    Pthread_setspecific(rl_key, tsd);
  }
  ptr = vptr;
  for (n = 1; n < maxlen; n++) {
    if ((rc = my_read(tsd, fd, &c)) == 1) {
      *ptr++ = c;
      if (c == '\n')
        break;
    } else if (rc == 0) {
      *ptr = 0;
      return n - 1;  // EOF, n - 1 bytes read
    } else
      return -1;  // error, errno set by read()
  }
  *ptr = 0;
  return n;
}

        在这个程序中,我们通过线程特定数据的方式解决了多线程中共享静态变量带来的问题。我们为每个线程都创建一个 $Rline$ 结构,存储读取的数据。虽然这个程序也存在共享变量 $rl_-key$ ,但是该变量只会被 $readline_-once$ 调用,而且在调用前通过 $pthread_-once$ 保证了其线程安全性。

4. 互斥锁

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_mutex_lock(pthread_mutex_t *mptr);

// 成功返回0,出错返回非负错误码
int pthread_mutext_unlock(pthread_mutex_t *mptr);

        在多线程环境下,使一个共享变量被安全地修改的办法是使用互斥锁 ( $mutex$ ) 保护这个共享变量。如果试图上锁一个已经被另外某个线程锁住的互斥锁,线程将阻塞,直到互斥锁被解锁。如果某个互斥锁变量是静态分配的,我们就必须把它初始化为常值 $PTHREAD_-MUTEXT_-INITIALIZER$ 。如果我们在共享内存区中分配一个互斥锁,那么必须通过调用 $pthread_-mutex_-init$ 函数在运行时初始化。

#include "unpthread.h"

#define NLOOP 5000

int counter;  // incremented by threads
pthread_mutext_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *);

int main(int argc, char **argv) {
  pthread_t tidA, tidB;

  Pthread_create(&tidA, NULL, &doit, NULL);
  Pthread_create(&tidB, NULL, &doit, NULL);

  // wait for both threads to terminate
  Pthread_join(tidA, NULL);
  Pthread_join(tidB, NULL);

  exit(0);
}

void *doit(void *vptr) {
  int i, val;

  /*
  * Each thread fetches, prints and increments the counter NLOOP times.
  * The value of the counter should increase monotonically.
  */
  for (i = 0; i< NLOOP; i++) {
    Pthread_mutex_lock(&counter_mutex);

    val = counter;
    printf("%d: %d\n", pthread_self(), val + 1);
    counter = val + 1;

    Pthread_mutex_unlock(&counter_mutex);
  }

  return NULL;
}

5. 条件变量

        互斥锁适用于防止同时访问某个共享变量,但是我们需要另外某种在等待某个条件发生时能让我们进入休眠状态,然后在有事件发生时通知我们的东西。

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);

// 成功返回0,出错返回非负错误码
int pthread_cond_signal(pthread_cond_t *cptr);

        与互斥锁一样,使用条件变量时,如果是静态分配的,需要初始化为 $PTHREAD_-COND_-INITIALIZER$ 。条件变量与互斥锁配合使用:

Pthread_mutex_lock(&mutex);
n--;
Pthread_cond_signal(&ndone_cond);
Pthread_mutex_unlock(&mutex);

// 或者

Pthread_mutex_lock(&mutex);
while (n)
  Pthread_cond_wait(&cond, &mutex);
Pthread_mutex_unlock(&mutex);

        可以看到我们总是先上锁再使用条件变量,因为条件变量本身并不会上锁,为了在休眠或者唤醒的时候保证变量的安全性,我们需要配合互斥锁使用条件变量。

#include <pthread.h>

// 成功返回0,出错返回非负错误码
int pthread_cond_broadcast(pthread_cond_t *cptr);

// 成功返回0,出错返回非负错误码
int pthread_cond_timedwait(pthread_cond_t *cptr, pthread_mutex_t *mptr,
  const struct timespec *abstime);

        $pthread_-cond_-broadcast$ 可以唤醒在条件上等待的所有线程。$pthread_-cond_-timedwait$ 允许线程设置一个阻塞时间,指定该函数必须返回时刻的系统时间,如果发生超时,就设置 $ETIME$ 错误。$abstime$ 是一个绝对时间,而不是时间增量。通常通过 $gettimeofday$ 获取当前时间后加上时间增量的形式创建。

Unix网络编程(14):线程