回到顶部 暗色模式

Unix网络编程(15):程序设计范式

        开发一个Unix服务器程序时,我们有如下类型的进程控制可选:

        TCP客户程序则有以下范式:

1. TCP预先派生子进程服务器程序,accept无上锁保护

        这种技术的优点在于无须引入父进程执行 $fork$ 的开销就能处理新客户,缺点是父进程必须在服务器启动阶段配置预先派生子进程数量。可选的改进方案是增加一些代码,让父进程监听可用子进程数,一旦降低到某个阈值就派生额外的子进程;同样的,一旦可用子进程数增加到某个阈值就终止一些过剩的子进程。

#include "unp.h"

#define MAXN 16384  // max # bytes client can request

static int nchildren;
static pid_t *pids;

int main(int argc, char **argv) {
  int listenfd, i;
  socklen_t addrlen;
  void sig_int(int);
  pid_t child_make(int, int, int);

  if (argc == 3)
    listenfd = Tcp_listen(NULL. argv[1], &addrlen);
  else if (argc == 4)
    listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: serv02 [ <host> ] <port#> <#children>");
  nchildren = atoi(argv[argc - 1]);
  pids = Calloc(nchildren, sizeof(pid_t));

  for (i = 0; i < nchildren; i++)
    pids[i] = child_make(i, listendfd, addrlen);  // parent returns

  Signal(SIGINT, sig_int);

  for (;;)
    pause();  // everything done by children
}

void sig_int(int signo) {
  int i;

  // terminate all children
  for (i = 0; i < nchildren; i++)
    kill(pids[i], SIGTERM);
  while (wait(NULL) > 0)  // wait for all children
    continue;
  if (errno != ECHILD)
    err_sys("wait error");

  exit(0);
}

pid_t child_make(int i, int listenfd, int addrlen) {
  pid_t pid;
  void child_main(int, int, int);

  if ((pid = Fork()) > 0)
    return pid;  // parent

  child_main(i, listenfd, addrlen);  // never returns
}

void child_main(int i, int listenfd, int addrlen) {
  int connfd;
  void web_child(int);
  socklen_t clilen;
  struct sockaddr *cliaddr;

  cliaddr = Malloc(addrlen);

  printf("child %ld starting\n", (long) getpid());
  for (;;) {
    clilen = addrlen;
    connfd = Accept(listenfd, cliaddr, &clilen);
    web_child(connfd);  // process the request
    Close(connfd);
  }
}

void web_child(int sockfd) {
  int ntowrite;
  ssize_t nread;
  char line[MAXLINE], result[MAXN];

  for (;;) {
    if ((nread = Realine(sockfd, line, MAXLINE)) == 0)
      return;  // connection closed by other end

    // line from client specifies #bytes to write back
    ntowrite = atol(line);
    if ((ntowrite <= 0) || (ntowrite > MAXN))
      err_quit("client request for %d bytes", ntowrite);

    Writen(sockfd, result, ntowrite);
  }
}

        父进程在派生任何子进程之前创建监听套接字,从而每次 $fork$ 时,所有描述符也被复制。服务器进程在程序启动阶段派生 $N$ 个子进程,它们各自调用 $accept$ 并处于休眠状态。当客户连接到达时,所有子进程均被唤醒,因为它们使用同一个监听描述符,但是只有最先运行的子进程会获得连接,其余子进程会重新休眠。这就是惊群 ( $thundering$ $herd$ ) 问题,尽管只有一个子进程获得连接,所有的子进程却都被唤醒了,这样会导致性能受损。
        与这个例子相关的另一种现象是 $select$ 冲突。如果我们通过 $select$ 来 $accept$ 新客户,那么内核会唤醒所有在阻塞在 $select$ 并且等待同一个监听套接字的进程,从而产生冲突。从以上问题,我们可以得出:如果有多个进程阻塞在引用同一个实体的描述符上,最好直接阻塞在 $accept$ 等函数中,而不是 $select$ 上。

2. TCP预先派生子进程服务器程序,accept使用文件上锁保护

        允许多个进程在引用同一个监听套接字的描述上调用 $accept$ 仅适用于在内核中实现 $accept$ 的源自 $Berkeley$ 的内核。在其他内核,解决办法是让应用程序在调用 $accept$ 前后使用锁,从而在任意时刻最多只有一个进程阻塞在 $accept$ 调用上。

// 派生子进程之前初始化锁
my_lock_init("/tmp/lock.XXXXXX");  // one lock file for all children
for (i = 0; i < nchildren; i++)
  pids[i] = child_make(i, listenfd, addrlen);  // parent returns

// 子进程调用accept前后加锁
for (;;) {
  clilen = addrlen;
  my_lock_wait();
  connfd = Accept(listenfd, cliaddr, &clilen);
  my_lock_release();

  web_child(connfd);  // process request
  Close(connfd);
}
#include "unp.h"

static struct flock lock_it, unlock_it;
static int lock_fd = -1;

// fcntl() will fail if my_lock_init() not called
void my_lock_init(char *pathname) {
  char lock_file[1024];

  // must copy caller's string, incase is's a constant
  strncpy(lock_file, pathname, sizeof(lock_file));
  lock_fd = Mkstemp(lock_file);

  Unlink(lock_file);  // but lock_fd remains open

  lock_it.l_type = F_WRLCK;
  lock_it.l_whence = SEEK_SET;
  lock_it.l_start = 0;
  lock_it.l_len = 0;

  unlock_it.l_type = F_UNLCK;
  unlock_it.l_whence = SEEK_SET;
  unlock_it.l_start = 0;
  unlock_it.l_len = 0;
}

        调用者将一个路径名模版指定为 $my_-lock_-init$ 函数的参数,$mktemp$ 函数根据该模版创建一个唯一路径名。本函数随后创建一个具备该路径名的文件并立即 $unlink$ 掉,这样以后程序崩溃,这个临时文件也会消失。然而只要有一个进程打开着这个文件,这个文件就不会消失。

void my_lock_wait() {
  int rc;

  while ((rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < 0) {
    if (errno == EINTR)
      continue;
    else
      err_sys("fcntl error for my_lock_wait");
  }
}

void my_lock_release() {
  if (fcntl(lock_fd, F_SETLKW, &unlock_it) < 0)
    err_sys("fcntl error for my_lock_release");
}

3. TCP预先派生子进程服务器程序,accept使用线程上锁保护

        我们有多种方法实现进程上锁,POSIX文件上锁方法可移植到所有POSIX兼容系统,不过因为涉及文件操作,可能比较耗时。在不同进程之间使用线程上锁,要求:(a)互斥锁变量必须存放在由所有进程共享的内存中;(b)必须告知线程函数库这是在不同进程之间共享的互斥锁。

#include "unpthread.h"
#include <sys/mman.h>

static pthread_mutex_t *mptr;  // actual mutex will be in shared memory

void my_lock_init(char *pathname) {
  int fd;
  pthread_mutexattr_t mattr;

  fd = Open("/dev/zero", O_RDWR, 0);

  mptr = Mmap(0, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
              MAP_SHARED, fd, 0);
  Close(fd);

  Pthread_mutexattr_init(&mattr);
  Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
  Pthread_mutex_init(mptr, &mattr);
}

        打开 $/dev/zero$ 然后调用 $mmap$ ,映射的字节数是一个 $pthread_-mutex_-t$ 类型的变量大小。随后关闭描述,因为该描述符已经被内存映射了。在先前的互斥锁中,我们使用 $PTHREAD_-MUTEX_-INTIALIZER$ 初始化全局或静态互斥锁变量。然而对于一个存放在共享内存区的互斥锁,必须调用一些 $pthread$ 库函数告知该函数库这是一个位于共享内存区的互斥锁,用于不同进程间上锁。我们首先为该互斥锁以默认属性初始化一个 $pthread_-mutexattr_-t$ 结构,然后赋予该结构 $PTHREAD_-PROCESS_-SHARED$ 属性 ( 该结构默认属性为 $PTHREAD_-PROCESS_-PRIVATE$ ,即只能在进程内使用 )。最后调用 $pthread_-mutex_-init$ 函数初始化共享内存区的互斥锁。

void my_lock_wait() {
  Pthread_mutex_lock(mptr);
}

void my_lock_release() {
  Pthread_mutex_unlock(mptr);
}

4. TCP预先派生子进程服务器程序,传递描述符

        这个版本是让父进程调用 $accept$ ,然后将描述符传递给子进程。这种技术会使代码变得有点复杂,因为父进程必须跟踪子进程的忙闲状态。为了实现这个机制,我们必须为每个子进程维护一个信息结构。

typedef struct {
  pid_t child_pid;  // process ID;
  int child_pipefd;  // parent's stream pipe to/from child
  int child_status;  // 0 = ready
  long child_count;  // # connections handled
} Child;

  Child *cptr;  // array of Child structures; calloc' ed

        我们在该结构中存放子进程ID、父进程中连接到子进程的字节流管道描述符、子进程状态和子进程已处理客户的计数。

pid_t child_make(int i, int listenfd, int addrlen) {
  int sockfd[2];
  pid_t pid;
  void child_main(int, int, int);

  Socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);

  if ((pid = Fork()) > 0) {
    Close(sockfd[1]);
    cptr[i].child_pid = pid;
    cptr[i].child_pipefd = sockfd[0];
    cptr[i].child_status = 0;
    return pid;  // parent
  }

  Dup2(sockfd[1], STDERR_FILENO);  // child's stream pipe to parent
  Close(sockfd[0]);
  Close(sockfd[1]);
  Close(listenfd);  // child does not need this open
  child_main(i, listenfd, addrlen);  // never returns
}

        $child_-make$ 函数在调用 $fork$ 前先创建一个字节流管道。派生出子进程后,父进程关闭其中一个描述符 $sockfd[1]$ ,子进程关闭另一个描述符 $sockfd[0]$ 。子进程还把流管道的自身拥有端 $sockfd[1]$ 复制到标准错误输出,这样每个子进程就通过读写标准错误输出和父进程通信。

#include "unp.h"
#include "child.h"

static int nchildren;

int main(int argc, char **argv) {
  int listenfd, i, navail, maxfd, nsel, connfd, rc;
  void sig_int(int);
  pid_t child_make(int, int, int);
  ssize_t n;
  fd_set rset, masterset;
  socklen_t addrlen, clilen;
  struct sockaddr *cliaddr;

  if (argc == 3)
    listenfd = Tcp_listen(NULL, argv[1], &addrlen);
  else if (argc == 4)
    listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: serv05 [ <host> ] <potr#> <#children>");

  FD_ZERO(&masterset);
  FD_SET(listenfd, &masterset);
  maxfd = listenfd;
  cliaddr = Malloc(addrlen);

  nchildren = atoi(argv[argc - 1]);
  navail = nchildren;
  cptr = Calloc(nchildren, sizeof(Child));

  // prefork all the children
  for (i = 0; i < nchildren; i++) {
    child_make(i, listenfd, addrlen);  // parent returns
    FD_SET(cptr[i].child_pipefd, &masterset);
    maxfd = max(maxfd, cptr[i].child_pipefd);
  }

  Signal(SIGINT, sig_int);

  for (;;) {
    rset = masterset;
    if (navail <= 0)
      FD_CLR(listenfd, &rset);  // turn off if no available children
    nsel = Select(maxfd + 1, &rset, NULL, NULL, NULL);

    // check for new connections
    if (FD_ISSET(listenfd, &rset)) {
      clilen = addrlen;
      connfd = Accept(listenfd, cliaddr, &clilen);

      for (i = 0; i < nchildren; i++)
        if (cptr[i].child_status == 0)
          break;  // available

      if (i == nchildren)
        err_quit("no available children");
      cptr[i].child_status = 1;  // mark child as busy
      cptr[i].child_count++;
      navail--;

      n = Write_fd(cptr[i].child_pipefd, "", 1, connfd);
      Close(connfd);

      if (--nsel == 0)
        continue;  // all done with select() results
    }

    // find any newly-available children
    for (i = 0; i < nchildren; i++) {
      if (FD_ISSET(cptr[i].child_pipefd, &rset)) {
        if ((n = Read(cptr[i].child_pipefd, &rc, 1)) == 0)
          err_quit("child %d terminated unexpectedly", i);
        cptr[i].child_status = 0;
        navail++;
        if (--nsel == 0)
          break;  // all done with select() results
      }
    }
  }
}

        $navail$ 跟踪当前可用的子进程数,如果为 $0$ ,关闭 $select$ 读描述符集中监听套接字对应的位。这种情况下,内核仍然会处理外来连接,将它们入队,直到达到 $backlog$ 。

void child_main(int i, int listenfd, int addrlen) {
  char c;
  int connfd;
  ssize_t n;
  void web_child(int);

  printf("child %ld starting\n", (long) getpid());
  for (;;) {
    if ((n = Read_fd(STDERR_FILENO, &c, 1, &connfd)) == 0)
      err_quit("read_fd returned 0");
    if (connfd < 0)
      err_quit("no descriptor from read_fd");

    web_child(connfd);  // process request
    Close(connfd);

    Write(STDERR_FILENO, "", 1);  // tell parent we're ready again
  }
}

        $child_-main$ 在处理完一个客户后,通过该子进程的字节流管道拥有端向父进程写回单个字节。父进程读入这单个字节,把子进程标记为可用,并递增 $navail$ 。当子进程意外终止,它的字节流管道拥有端会被关闭,从而 $read$ 会返回 $0$ 。父进程察觉到之后就会停止运行,不过更好的办法是登记这个错误并重新派生一个子进程。

5. TCP并发服务器程序,每个客户一个线程

#include "unpthread.h"

int main(int argc, char **argv) {
  int listenfd, connfd;
  void sig_int(int);
  void *doit(void *);
  pthread_t pid;
  socklen_t clilen, addrlen;
  struct sockaddr *cliaddr;

  if (argc == 2)
    listenfd = Tcp_listen(NULL, argv[1], &addrlen);
  else if (argc == 3)
    listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: serv06 [ <host> ] <port#>");
  cliaddr = Malloc(addrlen);

  Signal(SIGINT, sig_int);

  for (;;) {
    clilen = addrlen;
    connfd = Accept(listenfd, cliaddr, &clilen);
    Pthread_create(&tid, NULL, &doit, (void *) connfd);
  }
}

void *doit(void *arg) {
  void web_child(int);

  Pthread_detach(pthread_self());
  web_child((int) arg);
  Close((int) arg);
  return NULL;
}

6. TCP预先创建线程服务器程序,每个线程各自accept

        在支持线程的系统上,我们有理由在服务器启动阶段先创建一个线程池。本服务器的基本设计是预先创建一个线程池,并让每个线程各自调用 $accept$ ,同时使用互斥锁保证任何时刻只有一个线程在调用 $accept$ 。

typedef struct {
  pthread_t thread_tid;  // thread id
  long thread_count;  // # connections handled
} Thread;
Thread *tptr;  // array of Thread structures; calloc 'ed

int listenfd, nthreads;
socklen_t addrlen;
pthread_mutex_t mlock;

#include "unpthread.h"
#include "pthread07.h"

pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;

int main(int argc, char **argv) {
  int i;
  void sig_int(int), thread_make(int);

  if (argc == 3)
    listenfd = Tcp_listen(NULL, argv[1], &addrlen);
  else if (argc == 4)
    listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: serv07 [ <host> ] <port#> <#threads>");
  nthreads = atoi(argv[argc - 1]);
  tprt = Calloc(nthreads, sizeof(Thread));

  for (i = 0; i< nthreads; i++)
    thread_make(i);  // only main thread returns

  Signal(SIGINT, sig_int);

  for (;;)
    pause();  // everything done by threads
}

#include "unpthread.h"
#include "pthread07.h"

void thread_make(int i) {
  void *thread_main(void *);

  Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
  return;  // main thread returns
}

void *thread_main(void *arg) {
  int connfd;
  void web_child(int);
  socklen_t clilen;
  struct sockaddr *cliaddr;

  cliaddr = Malloc(addrlen);

  printf("thread %d starting\n", (int) arg);
  for (;;) {
    clilen = addrlen;
    Pthread_mutex_lock(&mlock);
    connfd = Accept(listenfd, cliaddr, &clilen);
    Pthread_mutex_unlock(&mlock);
    tprt[(int) arg].thread_count++;

    web_child(connfd);  // process request
    Close(connfd);
  }
}

7. TCP预先创建线程服务器程序,主线程统一accept

        本设计范式的问题在于主线程如何把一个已连接套接字传递给线程池中某个可用线程。这里有多个实现手段。可以像之前那样使用描述符传递,但是没有必要,因为多个线程之间描述符是共享的,接收线程只需要知道已连接套接字描述符的值即可。

typedef struct {
  pthread_t thread_tid;  // thread id
  long thread_count;  // # connections handled
} Thread;
Thread *tptr;  // array of Thread structures; calloc'ed

#define MAXNCLI 32
int clifd[MAXNCLT], iget, iput;
pthread_mutex_t clifd_mutex;
pthread_cond_t clifd_cond;

        $clifd$ 数组会存储线程已经接受的已连接套接字描述符,$iget$ 为下一个待取出描述符元素所在的下标,$iput$ 为下一个待写入描述符元素所在的下标。我们使用互斥锁和条件变量确保这个数组被安全地修改。

#include "unpthread.h"
#include "pthread08.h"

static int nthreads;
pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clifd_cond = PTHREAD_COND_INTIALIZER;

int main(int argc, char **argv) {
  int i, listenfd, connfd;
  void sig_int(int), thread_make(int);
  socklen_t addrlen, clilen;
  struct sockaddr *cliaddr;

  if (argc == 3)
    listenfd = Tcp_listen(NULL, argv[1], &addrlen);
  else if (argc == 4)
    listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
  else
    err_quit("usage: serv08 [ <host> ] <port#> <#threads>");
  cliaddr = Malloc(addrlen);

  nthreads = atoi(argv[argc - 1]);
  tprt = Calloc(nthreads, sizeof(Thread));
  iget = iput = 0;

  // create all the threads
  for (i = 0; i < nthreads; i++)
    thread_make(i);  // only main thread returns

  Signal(SIGINT, sig_int);

  for (;;) {
    clilen = addrlen;
    connfd = Accept(listenfd, cliaddr, &clilen);

    Pthread_mutex_lock(&clifd_mutex);
    clifd[iput] = connfd;
    if (++iput == MAXNCLI)
      iput = 0;
    if (iput == iget)
      err_quit("iput = iget = %d", iput);
    Pthread_cond_signal(&clifd_cond);
    Pthread_mutex_unlock(&clifd_mutex);
  }
}

#include "unpthread.h"
#include "pthread08.h"

void thread_make(int i) {
  void *thread_main(void *);

  Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
  return;  // main thread returns
}

void *thread_main(void *arg) {
  int connfd;
  void web_child(int);

  printf("thread %d starting\n", (int) arg);
  for (;;) {
    Pthread_mutex_lock(&clifd_mutex);
    while(iget == iput)
      Pthread_cond_wait(&clifd_cond, &clifd_mutex);
    connfd = clifd[iget];  // connected socket to service
    if (++iget == MAXNCLI)
      iget = 0;
    Pthread_mutex_unlock(&clifd_mutex);
    tptr[(int) arg].thread_count++;

    web_child(connfd);  // process request
    Close(connfd);
  }
}

        这个版本的慢于之前的每个线程各自 $accept$ 的版本,因为同时需要互斥锁和条件变量。

Unix网络编程(15):程序设计范式