Unix网络编程(9):非阻塞式I/O

        套接字的默认状态是阻塞的,这就意味着当发出一个不能立即完成的套接字调用时,其进程将被置于休眠状态,直到相应操作完成。可能阻塞套接字的调用分为以下四类:

  1. 输入:$read$ 、$readv$ 、$recv$ 、$recvfrom$ 和 $recvmsg$ 共 $5$ 个函数。如果对阻塞式套接字调用,而且套接字接收缓冲区中没有数据可读,进程将休眠。而对于非阻塞式套接字,如果输入操作不能被满足,调用将返回 $EWOULDBLOCK$ 错误;
  2. 输出:$write$ 、$writev$ 、$send$ 、$sendto$ 和 $sendmsg$ 共 $5$ 个函数。如果对阻塞式套接字调用,而且套接字发送缓冲区中没有空间,进程将休眠。对于非阻塞的TCP套接字,如果发送缓冲区中没有空间,输出函数调用将立即返回 $EWOULDBLOCK$ 错误;
  3. 接受外来连接,即 $accept$ 函数。如果对一个非阻塞套接字调用 $accept$ 函数,而且无新连接到达,将会立即返回 $EWOULDBLOCK$ 错误;
  4. 发起外出连接,即用于TCP的 $connect$ 函数。如果对一个非阻塞套接字调用 $connect$ ,而且连接不能立即建立,那么会立即返回 $EINPROGRESS$ 错误。

1. 非阻塞读和写

#include "unp.h"
#include <time.h>

char *gf_time(void);

void str_cli(FILE *fp, int sockfd) {
  int maxfdp1, val, stdineof;
  ssize_t n , nwritten;
  fd_set rset, wset;
  char to[MAXLINE], fr[MAXLINE];
  char *toiptr, *tooptr, *friptr, *froptr;

  val = Fcntl(sockfd, F_GETFL, 0);
  Fcntl(sockfd, F_SETFL, val | O_NONBLOCK);

  val = Fcntl(STDIN_FILENO, F_GETFL, 0);
  Fcntl(STDIN_FILENO, F_SETFL, val | O_NONBLOCK);

  val = Fcntl(STDOUT_FILENO, F_GETFL, 0);
  Fcntl(STDOUT_FILENO, F_SETFL, val | O_NONBLOCK);

  toiptr = tooptr = to;
  friptr = froptr = fr;
  stdineof = 0;

  maxfdp1 = max(max(STDIN_FILENO, STDOUT_FILENO), sockfd) + 1;
  for (;;) {
    FD_ZERO(&rset);
    FD_ZERO(&wset);
    if (stdineof == 0 && toiptr < &to[MAXLINE])
      FD_SET(STDIN_FILENO, &rset);  // read from stdin
    if (friptr < &fr[MAXLINE])
      FD_SET(sockfd, &rset);  // read from socket
    if (tooptr != toiptr)
      FD_SET(STDOUT_FILENO, &wset);  // write to socket
    if (froptr != friptr)
      FD_SET(STDOUT_FILENO, &wset);  // write to stdout

    Select(maxfdp1, &set, &wset, NULL, NULL);

    if (FD_ISSET(STDIN_FILENO, &rset)) {
      if ((n = read(STDIN_FILENO, toiptr, &to[MAXLINE] - toiptr)) < 0) {
        if (errno != EWOULDBLOCK)
          err_sys("read error on stdin");
      } else if (n == 0) {
        fprintf(stderr, "%s: EOF on stdin\n", gf_time());
        stdineof = 1;  // all done withn stdin
        if (tooptr == toiptr)
          Shutdown(sockfd, SHUT_WR);  // send FIN
      } else {
        fprintf(stderr, "%s: read %d bytes from stdin\n", gf_time(), n);
        toiptr += n;
        FD_SET(sockfd, &wset);  // try and write to socket below
      }
    }

    if (FD_ISSET(sockfd, &rset)) {
      if ((n = reada(sockfd, friptr, &fr[MAXLINE] - friptr)) < 0) {
        if (errno != EWOULDBLOCK)
          err_sys("read error on socket");
      } else if (n == 0) {
        fprintf(stderr, "%s: EOF on socket\n", gf_time());
        if (stdineof)
          return;  // normal termination
        else
          err_quit("str_cli: server terminated prematurely");
      } else {
        fprintf(stderr, "%s: read %d bytes from socket\n", gf_time(), n);
        friptr += n;
        FD_SET(STDOUT_FILENO, &wset);  // try and write below
      }
    }

    if (FD_ISSET(STDOUT_FILENO, &wset) && ((n = friptr - froptr) > 0)) {
      if ((nwritten = write(STDOUT_FILENO, froptr, n)) < 0) {
        if (errno != EWOULDBLOCK)
          err_sys("write error to stdout");
      } else {
        fprintf(stderr, "%s: wrote %d bytes to stdout\n", gf_time(), nwritten);
        froptr += nwritten;
        if (froptr == friptr)
          froptr = friptr = fr;  // back to beginning of buffer
      }
    }

    if (FD_ISSET(sockfd, &wset) && ((n = toiptr - tooptr) > 0)) {
      if ((nwritten = write(sockfd, tooptr, n)) < 0) {
        if (errno != EWOULDBLOCK)
          err_sys("write error to socket");
      } else {
        fprintf(stderr, "%s: wrote %d bytes to socket\n", gf_time(), nwritten);
        tooptr += nwritten;
        if (tooptr == toiptr) {
          toiptr = tooptr = to;  // back to beginning of buffer
          if (stdineof)
            Shutdown(sockfd, SHUT_WR);  // send FIN
        }
      }
    }
  }
}

char *gf_time() {
  struct timeval tv;
  static char str[30];
  char *ptr;

  if (gettimeofday(&tv, NULL) < 0)
    err_sys("gettimeofday error");

  ptr = ctime(&tv.tv_sec);
  strcpy(str, &ptr[11]);
  // Fri Sep 13 00:00:00 1986\n\0
  // 0123456789012345678901234 5
  snprintf(str + 8, sizeof(str) - 8, ".%06ld", tv.tv_usec);

  return str;
}

        可以发现使用非阻塞I/O比较复杂,需要进行缓冲区管理,但是确可以让代码在批量模式下运行速度提高。然而考虑到结果代码的复杂性,像上述方式那样使用非阻塞式I/O是不值得的。我们发现,每当需要使用非阻塞式I/O时,更简单的办法通常是把应用程序任务划分到多个进程或多个线程。

#include "unp.h"

void str_cli(FILE *fp, int sockfd) {
  pid_t pid;
  char sendline[MAXLINE], recvline[MAXLINE];

  if ((pid = Fork()) == 0) {  // child: server->stdout
    while (Readline(sockfd, recvline, MAXLINE) > 0)
      Fputs(recvline, stdout);
    kill(getppid(), SIGTERM);  // in case parent still running
    exit(0);
  }

  // parent: stdin->server
  while (Fgets(sendline, MAXLINE, fp) != NULL)
    Writen(sockfd, sendline, strlen(sendline));

  Shutdown(sockfd, SHUT_WR);  // EOF on stdin, send FIN
  pause();
  return;
}

        这个函数一开始就调用 $fork$ 把当前进程划分为一个父进程和一个子进程,前者负责将数据从标准输入传递给服务器,后者负责将数据从服务器传递给标准输出。父进程完成数据复制后调用 $pause$ 让自己休眠,直到捕获一个信号 ( 来自子进程的 $SIGTERM$ ) 信号,该信号的默认行为是终止进程。

2. 非阻塞connect

        当在一个非阻塞的TCP套接字上调用 $connect$ 时,$connect$ 将立即返回 $EINPROGRESS$ 错误,不过已经发起的TCP三次握手会继续进行。我们接着使用 $select$ 检测这个连接或成功或失败的已建立条件。非阻塞的 $connect$ 有以下用途:

  1. 可以把三次握手叠加在其他处理上。完成一个 $connect$ 要花费一个 $RTT$ 时间,而 $RTT$ 波动范围很大,这段时间内可以执行其他工作;
  2. 可以同时建立多个连接;
  3. 通过 $select$ 等待连接建立,可以给 $select$ 指定一个时间限制,从而缩短 $connect$ 的超时时间。

        使用非阻塞 $connect$ 会有以下问题:

#include "unp.h"

int connect_nonb(int sockfd, const struct sockaddr *saptr,
  socklen_t salen, int nsec) {
  int flags, n, error;
  socklen_t len;
  fd_set rset, wset;
  struct timeval tval;

  flags = Fcntl(sockfd, F_GETFL, 0);
  Fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

  error = 0;
  if ((n = connect(sockfd, saptr, salen)) < 0)
    if (errno != EINPROGRESS)
      return -1;

  // do whatever we want while the connect is taking place

  if (n == 0)
    goto done;  // connect completed immediately

  FD_ZERO(&rset);
  FD_SET(sockfd, &rset);
  wset = rset;
  tval.tv_sec = nsec;
  tval.tv_usec = 0;

  if ((n = Select(sockfd + 1, &rset, &wset, NULL,
    nsec ? &tval : NULL)) == 0) {
    close(sockfd);  // timeout
    errno = ETIMEDOUT;
    return -1;
  }

  if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
    len = sizeof(error);
    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
      return -1;
  } else
    err_quit("select error: sockfd not set");

done:
  Fcntl(sockfd, F_SETFL, flags);  // resotre file status flags
  if (error) {
    close(sockfd);  // just in cast
    errno = error;
    return -1;
  }

  return 0;
}

        如果 $select$ 返回 $0$ ,那么就代表发生超时,于是返回 $ETIMEOUT$ 错误,同时关闭套接字。如果描述符变为可读或可写,我们就调用 $getsockopt$ 取得其待处理的错误 ( 使用 $SO_-ERROR$ ) 。如果连接成功建立,该值将为 $0$ ;如果发生错误,该值就是对应的 $errno$ 。这里会有一个移植性问题,发生错误时,源自 $Berkeley$ 的实现会在变量 $error$ 中返回错误,$getsockopt$ 返回 $0$ ;然而Solaris却让 $getsockopt$ 返回 $-1$ ,并设置 $errno$ 变量。如果 $getsockopt$ 返回的 $error$ 为非 $0$ 值,就把该值存入 $errno$ ,并返回 $-1$ 。
        套接字的各种实现以及非阻塞 $connect$ 会带来移植性问题。首先,调用 $select$ 之前可能连接已经建立并且有来自对端的数据到达,这种情况下即使未发生错误,套接字也是可读可写的。其次,我们不能假设套接字的可写条件是 $select$ 返回套接字操作成功条件的唯一方法,该问题的解决方法多种多样:

  1. 调用 $getpeername$ 代替 $getsockopt$ 。如果 $getpeername$ 返回 $ENOTCONN$ ,就代表连接建立失败,我们必须接着以 $SO_-ERROR$ 调用 $getsockopt$ 取得待处理的错误;
  2. 以长度为 $0$ 的方式调用 $read$ ,如果失败,代表连接建立失败,$read$ 返回的 $errno$ 给出连接失败的原因;如果连接成功,$read$ 会返回 $0$ ;
  3. 再次调用 $connect$ ,如果失败,而且错误是 $EISCONN$ ,那么连接成功。

        非阻塞 $connect$ 是最难移植的部分,使用该技术必须准备解决移植性问题,特别是较老的实现。避免移植性问题的一个较简单的技术是为每个连接创建一个线程。

3. 非阻塞accept

        当有一个已完成的连接准备好被 $accept$ 时,$select$ 将作为可读描述符返回该连接的监听套接字。因此,如果我们使用 $select$ 在某个监听套接字上等待一个外来连接,那么就没有必要把该监听套接字设置为非阻塞。不过,这里可能存在一个问题。

#include "unp.h"

int main(int argc, char **argv) {
  int sockfd;
  struct linger ling;
  struct sockaddr_in servaddr;

  if (argc != 2)
    err_quit("usage: tcpcli <IPaddress>");

  sockfd = Socket(AF_INET, SOCK_STREAM, 0);
  bzero(&servaddr, sizeof(servaddr));
  servaddr.sin_family = AF_INET;
  servaddr.sin_port = htons(SERV_PORT);
  Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);

  Connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));

  ling.l_onoff = 1;  // cause RST to be sent on close()
  ling.l_linger = 0;
  Setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
  Close(sockfd);

  exit(0);
}

        上述程序在连接建立后会发送一个RST给服务器。假设服务器代码如下:

if (FD_ISSET(listenfd, &rset)) {  // new client connection
  printf("listening socket readable\n");
  sleep(5);
  clilen = sizeof(cliaddr);
  connfd = Accept(listenfd, (struct sockaddr *) &cliaddr, &clilen);
}

        上述代码模拟一个繁忙的服务器,无法在 $select$ 返回可读时立即调用 $accept$ 。通常情况下没有问题,但是客户程序会在连接建立后立即发送RST,导致连接在调用 $accept$ 前已经中止。源自 $Berkeley$ 的实现不把这个中止的连接返回给服务器,其他实现应该返回 $ECONNABORTED$ 错误,却往往返回 $EPROTO$ 错误。考虑源自 $Berkeley$ 的实现,在这个例子中,服务器会一直阻塞在 $accept$ 调用上,直到其他客户建立一个连接。这个问题的解决办法是:

  1. 当使用 $select$ 监听套接字上是否有连接准备好被 $accept$ 时,总是把这个监听套接字设置为非阻塞;
  2. 在后续的 $accept$ 调用中忽略 $EWOULDBLOCK$ ( 源自 $Berkeley$ 实现,客户中止连接时 )、$ECONNABORTED$ ( POSIX实现,客户中止连接时 )、$EPROTO$ ( SVR4实现,客户中止连接时 ) 和 $EINTR$ ( 如果有信号被捕获 )。

Unix网络编程(9):非阻塞式I/O