Unix网络编程(13):信号驱动式I/O
信号驱动式I/O
是指进程预先告知内核,使得当某个描述符上发生某事时,内核使用信号通知相关进程。
1. 套接字的信号驱动式I/O
针对一个套接字使用信号驱动式I/O
( $SIGIO$ ) 要求进程执行以下 $3$ 个步骤:
- 创建 $SIGIO$ 信号的信号处理函数;
- 设置该套接字的属主;
- 开启该套接字的信号驱动式
I/O
,通常通过使用 $fcntl$ 的 $F_-SETFL$ 命令打开 $O_-ASYNC$ 标志。
1.1 对于UDP
套接字的SIGIO
信号
在UDP
上使用信号驱动式I/O
是简单的。$SIGIO$ 信号在发生以下事件时产生:
- 数据报到达套接字;
- 套接字上发生异步错误。
因此当捕获对于某个UDP
套接字的 $SIGIO$ 信号时,我们调用 $recvfrom$ 或者读入到达的数据报,或者获取发生的异步错误。
1.2 对于TCP
套接字的SIGIO
信号
信号驱动式I/O
对于TCP
套接字近乎无用,因为该信号产生得过于频繁,并且信号出现并没有告诉我们发生了什么事。下列条件均导致TCP
套接字产生 $SIGIO$ 信号:
- 监听套接字上某个连接请求已经完成;
- 某个断连的请求已经发起;
- 某个断连的请求已经完成;
- 某个连接已经半关闭;
- 数据到达套接字;
- 数据已经从套接字发送走 ( 即输出缓冲区有空闲空间 );
- 发生异步错误。
我们应该只考虑对监听套接字使用 $SIGIO$ ,因为监听套接字产生 $SIGIO$ 的唯一条件是某个新连接的完成。
2. 使用SIGIO
的UDP
回射服务器程序
使用 $SIGIO$ 的UDP
回射程序可以设计为这种形式:当一个数据报到达时,$SIGIO$ 处理函数读入数据报,同时记录到达时刻,然后将它置于进程内的另一个队列中,主服务器循环从队列中取出数据并处理。相比起不使用 $SIGIO$ 而是简单使用一个服务器循环处理的程序,这种形式可以精确的获知数据报到达时间戳。
#include "unp.h"
static int sockfd;
#define QSIZE 8 // size of input queue
#define MAXDG 4096
typedef struct {
void *dg_data; // ptr to actual datagram
size_t dg_len; // length of datagram
struct sockaddr *dg_sa; // ptr to sockaddr{} w/client's address
socklen_t dg_salen; // length of sockaddr
} DG;
static DG dg[QSIZE]; // queue of datagrams to process
static long cntread[QSIZE + 1]; // diagnostic counter
static int iget; // next one for main loop to process
static int iput; // next one for signal handler to read into
static int nqueue; // # on queue for main loop to process
static socklen_t clilen; // max length of sockaddr{}
static void sig_io(int);
static void sig_hup(int);
void dg_echo(int sockfd_arg, struct sockaddr *pcliaddr, socklen_t clilen_arg) {
int i;
const int on = 1;
sigset_t zeromask, newmask, oldmask;
sockfd = sockfd_arg;
clilen = clilen_arg;
for (i = 0; i < QSIZE; i++) { // init queue of buffers
dg[i].dg_data = Malloc(MAXDG);
dg[i].dg_sa = Malloc(clilen);
dg[i].dg_salen = clilen;
}
iget = iput = nqueue = 0;
Signal(SIGHUP, sig_hup);
Signal(SIGIO, sig_io);
Fcntl(sockfd, F_SETOWN, getpid());
Ioctl(sockfd, FIOASYNC, &on);
Ioctl(sockfd, FIONBIO, &on);
Sigemptyset(&zeromask); // init three signal sets
Sigemptyset(&oldmask);
Sigemptyset(&newmask);
Sigaddset(&newmask, SIGIO); // signal we want to block
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
for (;;) {
while (nqueue == 0)
sigsuspend(&zeromask); // wait for datagram to process
// unblock SIGIO
Sigprocmask(SIG_SETMASK, &oldmask, NULL);
Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0,
dg[iget].dg_sa, dg[iget].dg_salen);
if (++iget >= QSIZE)
iget = 0;
// block SIGIO
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
nqueue--;
}
}
- $SIGIO$ 信号处理函数把数据报放入一个 $DG$ 数组,我们把它作为环形缓冲区处理;
- $iget$ 指向下一个待处理的数组元素下标,$iput$ 指向下一个可存放数据的数组下标,$nqueue$ 是数组中待处理的数据报总数;
- 初始化三个信号集:$zeromask$ ( 从不改变 )、$oldmask$ ( 记录阻塞 $SIGIO$ 时原来的掩码 ) 和 $newmask$ ;
- 首次调用 $sigprocmask$ 会把进程的当前信号掩码保存到 $oldmask$ 中,然后把 $newmask$ 逻辑或到当前信号掩码,这将阻塞 $SIGIO$ 并返回当前信号掩码;
- 当 $nqueue$ 为 $0$ 时,代表没有数据到达,调用 $sigsuspend$ 。该函数会先内部保存当前信号掩码,再设置当前信号掩码为它的参数 ( $zeromask$ ),该函数总是返回 $EINTR$ 错误。因为 $zeromask$ 是一个空信号集,所以所有信号都可以被捕获,$sigsuspend$ 捕获一个信号,并且等待该信号的处理函数返回后才会返回。在返回之前,$sigsuspend$ 会恢复当前信号掩码,在本例中是 $newmask$ ;
- 调用 $sigprocmask$ 解除 $SIGIO$ 的阻塞状态,然后调用 $sendto$ 发送回复,递增 $iget$ ,表示该数据报已经被处理;
- 再次调用 $sigprocmask$ ,重新阻塞 $SIGIO$ ,最后递减 $nqueue$ 。
在上述函数中,我们在一个循环内不断地阻塞和解阻塞 $SIGIO$ ,是为了保证安全地修改和判断 $nqueue$ ,因为相应的信号处理函数也会修改 $nqueue$ 。另一种方法是让 $SIGIO$ 一直阻塞,但是这样做降低了信号处理函数的及时性,因为直到 $sigsuspend$ 被调用时信号处理函数才会被调用。
static void sig_io(int signo) {
ssize_t len;
int nread;
DG *ptr;
for (nread = 0; ; ) {
if (nqueue >= QSIZE)
err_quit("receive overflow");
ptr = &dg[iput];
ptr->dg_salen = clilen;
len = recvfrom(sockfd, ptr->dg_data, MAXDG, 0,
ptr->dg_sa, &ptr->dg_salen);
if (len < 0) {
if (errno == EWOULDBLOCK)
break; // all done, no more queued to read
else
err_sys("recvfrom error");
}
ptr->dg_len = len;
nread++;
nqueue++;
if (++iput >= QSIZE)
iput = 0;
}
cntread[nread]++; // histgram of # datagrams read per signal
}
POSIX
信号通常不排队,开启信号驱动式I/O
的描述符通常也被设置为非阻塞式。在这个前提下,我们的 $SIGIO$ 信号处理函数会一直读入,直到返回 $EWOULDBLOCK$ 错误。$nread$ 是一个计算每次信号递交读入数据报数目的诊断计数器,在信号处理函数返回前,我们会统计每次读入数据报数目,通过 $SIGHUP$ 信号处理函数输出。
static void sig_hup(int signo) {
int i;
for (i = 0; i <= QSIZE; i++)
printf("cntread[%d] = %ld\n", i, cntread[i]);
}