Unix网络编程(9):非阻塞式I/O
套接字的默认状态是阻塞的,这就意味着当发出一个不能立即完成的套接字调用时,其进程将被置于休眠状态,直到相应操作完成。可能阻塞套接字的调用分为以下四类:
- 输入:$read$ 、$readv$ 、$recv$ 、$recvfrom$ 和 $recvmsg$ 共 $5$ 个函数。如果对阻塞式套接字调用,而且套接字接收缓冲区中没有数据可读,进程将休眠。而对于非阻塞式套接字,如果输入操作不能被满足,调用将返回 $EWOULDBLOCK$ 错误;
- 输出:$write$ 、$writev$ 、$send$ 、$sendto$ 和 $sendmsg$ 共 $5$ 个函数。如果对阻塞式套接字调用,而且套接字发送缓冲区中没有空间,进程将休眠。对于非阻塞的
TCP
套接字,如果发送缓冲区中没有空间,输出函数调用将立即返回 $EWOULDBLOCK$ 错误; - 接受外来连接,即 $accept$ 函数。如果对一个非阻塞套接字调用 $accept$ 函数,而且无新连接到达,将会立即返回 $EWOULDBLOCK$ 错误;
- 发起外出连接,即用于
TCP
的 $connect$ 函数。如果对一个非阻塞套接字调用 $connect$ ,而且连接不能立即建立,那么会立即返回 $EINPROGRESS$ 错误。
1. 非阻塞读和写
#include "unp.h"
#include <time.h>
char *gf_time(void);
void str_cli(FILE *fp, int sockfd) {
int maxfdp1, val, stdineof;
ssize_t n , nwritten;
fd_set rset, wset;
char to[MAXLINE], fr[MAXLINE];
char *toiptr, *tooptr, *friptr, *froptr;
val = Fcntl(sockfd, F_GETFL, 0);
Fcntl(sockfd, F_SETFL, val | O_NONBLOCK);
val = Fcntl(STDIN_FILENO, F_GETFL, 0);
Fcntl(STDIN_FILENO, F_SETFL, val | O_NONBLOCK);
val = Fcntl(STDOUT_FILENO, F_GETFL, 0);
Fcntl(STDOUT_FILENO, F_SETFL, val | O_NONBLOCK);
toiptr = tooptr = to;
friptr = froptr = fr;
stdineof = 0;
maxfdp1 = max(max(STDIN_FILENO, STDOUT_FILENO), sockfd) + 1;
for (;;) {
FD_ZERO(&rset);
FD_ZERO(&wset);
if (stdineof == 0 && toiptr < &to[MAXLINE])
FD_SET(STDIN_FILENO, &rset); // read from stdin
if (friptr < &fr[MAXLINE])
FD_SET(sockfd, &rset); // read from socket
if (tooptr != toiptr)
FD_SET(STDOUT_FILENO, &wset); // write to socket
if (froptr != friptr)
FD_SET(STDOUT_FILENO, &wset); // write to stdout
Select(maxfdp1, &set, &wset, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &rset)) {
if ((n = read(STDIN_FILENO, toiptr, &to[MAXLINE] - toiptr)) < 0) {
if (errno != EWOULDBLOCK)
err_sys("read error on stdin");
} else if (n == 0) {
fprintf(stderr, "%s: EOF on stdin\n", gf_time());
stdineof = 1; // all done withn stdin
if (tooptr == toiptr)
Shutdown(sockfd, SHUT_WR); // send FIN
} else {
fprintf(stderr, "%s: read %d bytes from stdin\n", gf_time(), n);
toiptr += n;
FD_SET(sockfd, &wset); // try and write to socket below
}
}
if (FD_ISSET(sockfd, &rset)) {
if ((n = reada(sockfd, friptr, &fr[MAXLINE] - friptr)) < 0) {
if (errno != EWOULDBLOCK)
err_sys("read error on socket");
} else if (n == 0) {
fprintf(stderr, "%s: EOF on socket\n", gf_time());
if (stdineof)
return; // normal termination
else
err_quit("str_cli: server terminated prematurely");
} else {
fprintf(stderr, "%s: read %d bytes from socket\n", gf_time(), n);
friptr += n;
FD_SET(STDOUT_FILENO, &wset); // try and write below
}
}
if (FD_ISSET(STDOUT_FILENO, &wset) && ((n = friptr - froptr) > 0)) {
if ((nwritten = write(STDOUT_FILENO, froptr, n)) < 0) {
if (errno != EWOULDBLOCK)
err_sys("write error to stdout");
} else {
fprintf(stderr, "%s: wrote %d bytes to stdout\n", gf_time(), nwritten);
froptr += nwritten;
if (froptr == friptr)
froptr = friptr = fr; // back to beginning of buffer
}
}
if (FD_ISSET(sockfd, &wset) && ((n = toiptr - tooptr) > 0)) {
if ((nwritten = write(sockfd, tooptr, n)) < 0) {
if (errno != EWOULDBLOCK)
err_sys("write error to socket");
} else {
fprintf(stderr, "%s: wrote %d bytes to socket\n", gf_time(), nwritten);
tooptr += nwritten;
if (tooptr == toiptr) {
toiptr = tooptr = to; // back to beginning of buffer
if (stdineof)
Shutdown(sockfd, SHUT_WR); // send FIN
}
}
}
}
}
char *gf_time() {
struct timeval tv;
static char str[30];
char *ptr;
if (gettimeofday(&tv, NULL) < 0)
err_sys("gettimeofday error");
ptr = ctime(&tv.tv_sec);
strcpy(str, &ptr[11]);
// Fri Sep 13 00:00:00 1986\n\0
// 0123456789012345678901234 5
snprintf(str + 8, sizeof(str) - 8, ".%06ld", tv.tv_usec);
return str;
}
- 维护两个缓冲区:$to$ 容纳从标准输入到服务器的数据,$fr$ 容纳从服务器到标准输出的数据。其中 $toiptr$ 指针指向从标准输入读入的数据存放的起始字节,$tooptr$ 指向下一个应当向套接字写入的字节;$friptr$ 指向从从套接字读入的数据存放的起始字节,$ftoptr$ 指向下一个应当向标准输出写入的字节。一旦 $tooptr$ 移动到 $toiptr$ 这两个指针就一起恢复到缓冲区开始,$froptr$ 和 $friptr$ 同理;
- $fcntl$ 把 $3$ 个描述符都设置为非阻塞;
- 如果发生 $EWOULDBLOCK$ 错误,就忽略。通常情况下这种情况不会发生,因为 $select$ 已经告诉我们该描述符可读;
可以发现使用非阻塞I/O
比较复杂,需要进行缓冲区管理,但是确可以让代码在批量模式下运行速度提高。然而考虑到结果代码的复杂性,像上述方式那样使用非阻塞式I/O
是不值得的。我们发现,每当需要使用非阻塞式I/O
时,更简单的办法通常是把应用程序任务划分到多个进程或多个线程。
#include "unp.h"
void str_cli(FILE *fp, int sockfd) {
pid_t pid;
char sendline[MAXLINE], recvline[MAXLINE];
if ((pid = Fork()) == 0) { // child: server->stdout
while (Readline(sockfd, recvline, MAXLINE) > 0)
Fputs(recvline, stdout);
kill(getppid(), SIGTERM); // in case parent still running
exit(0);
}
// parent: stdin->server
while (Fgets(sendline, MAXLINE, fp) != NULL)
Writen(sockfd, sendline, strlen(sendline));
Shutdown(sockfd, SHUT_WR); // EOF on stdin, send FIN
pause();
return;
}
这个函数一开始就调用 $fork$ 把当前进程划分为一个父进程和一个子进程,前者负责将数据从标准输入传递给服务器,后者负责将数据从服务器传递给标准输出。父进程完成数据复制后调用 $pause$ 让自己休眠,直到捕获一个信号 ( 来自子进程的 $SIGTERM$ ) 信号,该信号的默认行为是终止进程。
2. 非阻塞connect
当在一个非阻塞的TCP
套接字上调用 $connect$ 时,$connect$ 将立即返回 $EINPROGRESS$ 错误,不过已经发起的TCP
三次握手会继续进行。我们接着使用 $select$ 检测这个连接或成功或失败的已建立条件。非阻塞的 $connect$ 有以下用途:
- 可以把三次握手叠加在其他处理上。完成一个 $connect$ 要花费一个 $RTT$ 时间,而 $RTT$ 波动范围很大,这段时间内可以执行其他工作;
- 可以同时建立多个连接;
- 通过 $select$ 等待连接建立,可以给 $select$ 指定一个时间限制,从而缩短 $connect$ 的超时时间。
使用非阻塞 $connect$ 会有以下问题:
- 如果连接到的服务器在同一个主机上,连接通常会立刻建立;
- 源自 $Berkeley$ 的实现 ( 和
POSIX
) 有关于 $select$ 和非阻塞 $connect$ 的两个规则:(a)当连接成功建立时,描述符变为可写;(b)当连接过程遇到错误时,描述符会变为既可读又可写。
#include "unp.h"
int connect_nonb(int sockfd, const struct sockaddr *saptr,
socklen_t salen, int nsec) {
int flags, n, error;
socklen_t len;
fd_set rset, wset;
struct timeval tval;
flags = Fcntl(sockfd, F_GETFL, 0);
Fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
error = 0;
if ((n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return -1;
// do whatever we want while the connect is taking place
if (n == 0)
goto done; // connect completed immediately
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
wset = rset;
tval.tv_sec = nsec;
tval.tv_usec = 0;
if ((n = Select(sockfd + 1, &rset, &wset, NULL,
nsec ? &tval : NULL)) == 0) {
close(sockfd); // timeout
errno = ETIMEDOUT;
return -1;
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return -1;
} else
err_quit("select error: sockfd not set");
done:
Fcntl(sockfd, F_SETFL, flags); // resotre file status flags
if (error) {
close(sockfd); // just in cast
errno = error;
return -1;
}
return 0;
}
如果 $select$ 返回 $0$ ,那么就代表发生超时,于是返回 $ETIMEOUT$ 错误,同时关闭套接字。如果描述符变为可读或可写,我们就调用 $getsockopt$ 取得其待处理的错误 ( 使用 $SO_-ERROR$ ) 。如果连接成功建立,该值将为 $0$ ;如果发生错误,该值就是对应的 $errno$ 。这里会有一个移植性问题,发生错误时,源自 $Berkeley$ 的实现会在变量 $error$ 中返回错误,$getsockopt$ 返回 $0$ ;然而Solaris
却让 $getsockopt$ 返回 $-1$ ,并设置 $errno$ 变量。如果 $getsockopt$ 返回的 $error$ 为非 $0$ 值,就把该值存入 $errno$ ,并返回 $-1$ 。
套接字的各种实现以及非阻塞 $connect$ 会带来移植性问题。首先,调用 $select$ 之前可能连接已经建立并且有来自对端的数据到达,这种情况下即使未发生错误,套接字也是可读可写的。其次,我们不能假设套接字的可写条件是 $select$ 返回套接字操作成功条件的唯一方法,该问题的解决方法多种多样:
- 调用 $getpeername$ 代替 $getsockopt$ 。如果 $getpeername$ 返回 $ENOTCONN$ ,就代表连接建立失败,我们必须接着以 $SO_-ERROR$ 调用 $getsockopt$ 取得待处理的错误;
- 以长度为 $0$ 的方式调用 $read$ ,如果失败,代表连接建立失败,$read$ 返回的 $errno$ 给出连接失败的原因;如果连接成功,$read$ 会返回 $0$ ;
- 再次调用 $connect$ ,如果失败,而且错误是 $EISCONN$ ,那么连接成功。
非阻塞 $connect$ 是最难移植的部分,使用该技术必须准备解决移植性问题,特别是较老的实现。避免移植性问题的一个较简单的技术是为每个连接创建一个线程。
3. 非阻塞accept
当有一个已完成的连接准备好被 $accept$ 时,$select$ 将作为可读描述符返回该连接的监听套接字。因此,如果我们使用 $select$ 在某个监听套接字上等待一个外来连接,那么就没有必要把该监听套接字设置为非阻塞。不过,这里可能存在一个问题。
#include "unp.h"
int main(int argc, char **argv) {
int sockfd;
struct linger ling;
struct sockaddr_in servaddr;
if (argc != 2)
err_quit("usage: tcpcli <IPaddress>");
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
Connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
ling.l_onoff = 1; // cause RST to be sent on close()
ling.l_linger = 0;
Setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Close(sockfd);
exit(0);
}
上述程序在连接建立后会发送一个RST
给服务器。假设服务器代码如下:
if (FD_ISSET(listenfd, &rset)) { // new client connection
printf("listening socket readable\n");
sleep(5);
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (struct sockaddr *) &cliaddr, &clilen);
}
上述代码模拟一个繁忙的服务器,无法在 $select$ 返回可读时立即调用 $accept$ 。通常情况下没有问题,但是客户程序会在连接建立后立即发送RST
,导致连接在调用 $accept$ 前已经中止。源自 $Berkeley$ 的实现不把这个中止的连接返回给服务器,其他实现应该返回 $ECONNABORTED$ 错误,却往往返回 $EPROTO$ 错误。考虑源自 $Berkeley$ 的实现,在这个例子中,服务器会一直阻塞在 $accept$ 调用上,直到其他客户建立一个连接。这个问题的解决办法是:
- 当使用 $select$ 监听套接字上是否有连接准备好被 $accept$ 时,总是把这个监听套接字设置为非阻塞;
- 在后续的 $accept$ 调用中忽略 $EWOULDBLOCK$ ( 源自 $Berkeley$ 实现,客户中止连接时 )、$ECONNABORTED$ (
POSIX
实现,客户中止连接时 )、$EPROTO$ (SVR4
实现,客户中止连接时 ) 和 $EINTR$ ( 如果有信号被捕获 )。