Unix网络编程(17):消息队列
消息队列可认为是一个消息链表,有足够写权限的线程可往队列中放置消息,有足够读权限的线程可从队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。与管道和FIFO
不同,在某个进程往一个队列写入消息前,并不需要另外某个进程在该队列上等待消息到达。
1. mq_open
、mq_close
和mq_unlink
函数
#include <mqueue.h>
// 成功返回消息队列描述符,出错返回-1
mqd_t mq_open(const char *name, int oflag, ...
/* mode_t mode, struct aq_attr *attr */ );
// 成功返回0,出错返回-1
int mq_close(mqd_t mqdes);
// 成功返回0,出错返回-1
int mq_unlink(const char *name);
$oflag$ 参数是 $O_-RDONLY$ 、$O_-WRONLY$ 或 $O_-RDWR$ 之一,并且可以按位或 $O_-CREAT$ 、$O_-EXCL$ 或 $O_-NONBLOCK$ 。当实际操作是创建一个新队列时 ( 指定 $O_-CREAT$ 且请求的消息队列不存在 ),需要 $mode$ 和 $attr$ 参数。$mq_-close$ 调用类似于 $close$ 调用,调用之后消息队列不会从系统中删除。当一个进程终止时,它的所有打开着的消息队列会被关闭。如果要从系统中删除一个消息队列,则需要调用 $mq_-unlink$ ,类似于 $unlink$ ,当一个消息队列的打开计数大于 $0$ 时,不会立即删除,而是直到该消息队列真正被关闭时才会删除。POSIX
消息队列至少具备随内核的持续性,这意味着即使当前没有进程打开着消息队列,其中的消息也将一直存在,直到被删除。
#include "unpipc.h"
int main(int argc, char **argv) {
int c, flags;
mqd_t mqd;
flags = O_RDWR | O_CREAT;
while ((c = Getopt(argc, argv, "e")) != -1) {
switch(c) {
case 'e':
flags |= O_EXCL;
break;
}
}
if (optind != argc - 1)
err_quit("usage: mqcreate [ -e ] <name>");
mql = Mq_open(argv[optind], flags, FILE_MODE, NULL);
Mq_close(mqd);
exit(0);
}
允许有一个排他选项 $-e$ 创建消息队列。通过 $getopt$ 获取这个选项,$getopt$ 会在 $optind$ 中存放待处理的下一个参数的下标。
2. mq_getattr
和mq_setattr
函数
#include <mqueue.h>
// 成功返回0,出错返回-1
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
// 成功返回0,出错返回-1
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);
struct mq_attr {
long mq_flags; // message queue flag: 0, O_NONBLOCK
long mq_maxmsg; // max number of messages allowed on queue
long mq_msgsize; // max size of a message (in bytes)
long mq_curmsgs; // number of messages currently on queue
};
$mq_-attr$ 结构指针可以作为 $mq_-open$ 的第四个参数传递,从而允许我们在该函数的实际操作是创建一个新队列时,为它指定 $mq_-maxmsg$ 和 $mq_-msgsize$ 属性,其他两个属性会被忽略。$mq_-setattr$ 为所指队列设置属性,但是只使用 $mq_-flags$ ,用于设置或清除非阻塞标志。另外,如果 $oattr$ 非空,当前队列的先前属性会返回到 $oattr$ 中。
3. mq_send
和mq_receive
函数
#include <mqueue.h>
// 成功返回0,出错返回-1
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
// 成功返回消息字节数,出错返回-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
这两个函数分别用于往消息队列放置一个消息和从消息队列中取出一个消息。每个消息都对应一个优先级 $prio$ ,它是一个小于 $MQ_-PRIO_-MAX$ 的无符号整数,POSIX
要求这个上限最小为 $32$ 。$mq_-receive$ 总是会取出队列中优先级最高的最早发送的消息,并且该优先级会和消息长度一起返回。$mq_-receive$ 的 $len$ 不能小于消息队列中的消息最大大小,如果小于,会返回 $EMSGSIZE$ 错误。如果应用不必使用优先级不同的消息,可以统一将优先级指定为 $0$ ,并将 $mq_-receive$ 的最后一个参数指定为 $NULL$ 。
4. 消息队列限制
消息队列存在两个限制,都是在创建队列时指定的:
- $mq_-mqxmsg$ :队列中的最大消息数;
- $mq_-msgsize$ :给定消息的最大字节数。
消息队列的实现定义了另外两个限制:
- $MQ_-OPEN_-MAX$ :一个进程能够同时拥有的打开着的消息队列的最大数目 (
POSIX
要求至少为 $8$ ); - $MQ_-PRIO_-MAX$ :任意消息的最大优先级加 $1$ (
POSIX
要求至少为 $32$ )。
#include "unpipc.h"
int main(int argc, char **argc) {
printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n",
Sysconf(_SC_MQ_OPEN_MAX), Sysconf(_SC_MQ_PRIO_MAX));
exit(0);
}
5. mq_notify
函数
#include <mqueue.h>
// 成功返回0,出错返回-1
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
POSIX
消息队列允许异步事件通知 ( $asynchronous$ $event$ $notification$ ),用于告知何时有一个消息被放置到空的消息队列中。这种通知有两种方式可以选择:
- 产生一个信号;
- 创建一个线程来执行一个指定的函数。
$mq_-notify$ 为指定的消息队列建立或删除异步事件通知。
#include <signal.h>
union sigval {
int sival_int; // integer value
void *sival_ptr; // pointer value
};
struct sigevent {
int sigev_notify; // SIGEV_{NONE, SIGNAL, THREAD}
int sigev_signo; // signal number if SIGEV_SIGNAL
union sigval sigev_value; /* passed to signal handler or thread
following two if SIGEV_THREAD */
void (*sigev_notifiy_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
- 如果 $notification$ 非空,那么当前进程希望在有一个消息到达前得到通知;
- 如果 $notification$ 为空,而且当前进程已被注册接收指定队列的通知,那么撤销先前的注册;
- 任意时刻只有一个进程可以被注册接收某个指定队列的通知;
- 当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的 $mq_-receive$ 调用的前提下,通知才会发出。即,$mq_-receive$ 优于注册通知;
- 当通知被发送给注册进程时,注册即被撤销。如果想要再次接收通知,需要再次注册。
#include "unpipc.h"
mqd_t mqd;
void *buff;
struct mq_attr attr;
struct sigevent sigev;
static void sig_usr1(int);
int main(int argc, char **argv) {
if (argc != 2)
err_quit("usage: mqnotifysig1 <name>");
// open queue, get attributes, allocate read buffer
mqd = Mq_open(argv[1], O_RDONLY);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);
// establish signal handler, enable notification
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);
for (;;)
pause(); // signal hadler does everything
exit(0);
}
static vodi sig_usr1(int signo) {
ssize_t n;
Mq_notify(mqd, &sigev); // reregister first
n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
printf("SIGUSR1 received, read %ld bytes\n", (long) n);
return;
}
5.1 异步信号安全函数
上述程序的问题是它在信号处理程序中调用 $mq_-notify$ 、$mq_-receive$ 和 $printf$ 。这些函数实际上都不可以在信号处理程序中调用。POSIX
使用异步信号安全 ( $async-signal-safe$ ) 这个术语描述可以在信号处理函数中调用的函数。
$$ \begin{array}{|c|c|c|c|} \hline access & fpathconf & rename & sysconf\\ aio_-return & fstat & rmdir & tcdrian \\ aio_-suspend & fsync & sem_-post & tcflow \\ alarm & getegid & setgit & tcflush \\ cfgetispeed & geteuid & setpgid & tcgetattr \\ cfgetospeed & getpgrp & sigaction & tcsetattr \\ chdir & getpid & sigaddset & tcsetpgrp \\ chmod & getppid & sigdelset & time \\ chown & getuid & sigemptyset & timer_-getoverrun \\ clock_-gettime & kill & sigfillset & timer_-gettime \\ close & link & sigismember & timer_-settime \\ creat & lseek & signal & times \\ dup & mkdir & sigpause & umask \\ dup2 & mkfifo & sigpending & uname \\ execle & open & sigprocmask & unlink \\ execve & pathconf & sigqueue & utim \\ _-exit & pause & sigset & wait \\ fcntl & pipe & sigsuspend & waitpid \\ fdatasync & raise & sleep & write \\ fork & read & stat \\ \hline \end{array} $$
没有列在上表中的函数不能在信号处理函数中调用。注意所有标准I/O
函数和 $pthread_-XXX$ 函数都没有列在其中。IPC
相关函数只有 $sem_-post$ 、$read$ 和 $write$ 在其中。
避免从信号处理程序中掉哟该任何函数的办法之一是让处理函数仅仅设置一个全局标志,再由某个线程检查该标志以确定何时收到消息。
#include "unpipc.h"
volatile sig_atomic_t mqflag; // set nonzero by signal handler
static void sig_usr1(int);
int main(int argc, char **argv) {
mqd_t mqd;
void *buff;
ssize_t n;
sigset_t zeromask, newmask, oldmask;
struct mq_attr attr;
struct sigevent sigev;
if (argc != 2)
err_quit("usage: mqnotifysig2 <name>");
// open queue, get attributes, allocate read buffer
mqd = Mq_open(argv[1], O_RDONLY);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);
Sigemptyset(&zeromask); // no signals block
Sigemptyset(&newmask);
Sigemptyset(&oldmask);
Sigaddset(&newmask, SIGUSR1);
// establish signal handler, enable notification
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);
for (;;) {
Sigprocmask(SIG_BLOCK, &newmask, &oldmask); // block SIGUSR1
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0; // reset flag
Mq_notify(mqd, &sigev); // reregister first
n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
printf("read %ld bytes\n", (long) n);
Sigprocmask(SIG_UNBLOCK, &newmask, NULL); // unblock SIGUSR1
}
exit(0);
}
static void sig_usr1(int signo) {
mqflag = 1;
return;
}
5.2 非阻塞的信号通知
上面的程序还存在一个问题,那就是Unix
的信号是不排队的,如果有两个消息到达,我们的程序只会处理一个。解决办法是使用非阻塞模式读取消息。
int main(int argc, char **argv) {
mqd_t mqd;
void *buff;
ssize_t n;
sigset_t zeromask, newmask, oldmask;
struct mq_attr attr;
struct sigevent sigev;
if (argc != 2)
err_quit("usage: mqnotifysig3 <name>");
// open queue, get attributes, allocate read buffer
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);
Sigemptyset(&zeromask); // no signals blocked
Sigemptyset(&newmask);
Sigemptyset(&oldmask);
Sigaddset(&newmask, SIGUSR1);
// establish signal handler, enable notification
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);
for (;;) {
Sigprocmask(SIG_BLOCK, &newmask, &oldmask); // block SIGUSR1
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0; // reset flag
Mq_notify(mqd, &sigev); // reregister first
while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
Sigprocmask(SIG_UNBLOCK, &newmask, NULL); // unblock SIGUSR1
}
exit(0);
}
5.3 sigwait
函数
上述程序尽管正确,但是效率还可以更高。因为我们的程序是通过 $sigsuspend$ 阻塞的,直到 $mqflag$ 非零,更高效的办法是只等待 $SIGUSR1$ 而不是任意一个信号。
#include <signal.h>
// 成功返回0,出错返回非负错误码
int sigwait(const sigset_t *set, int *sig);
$sigwait$ 会阻塞等待信号集 $set$ 中的某个信号返回,返回时设置 $sig$ ,标识产生的信号。
#include "unpipc.h"
int main(int argc, char **argv) {
int signo;
mqd_t mqd;
void *buff;
ssize_t n;
sigset_t newmask;
struct mq_attr attr;
struct sigevent sigev;
if (argc != 2)
err_quit("usage: mqnotifysig4 <name>");
// open queue, get attributes, allocate read buffer
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK;
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);
Sigemptyset(&newmask);
Sigaddset(&newmask, SIGUSR1);
Sigprocmask(SIG_BLOCK, &newmask, NULL); // block SIGUSR1
// establish signal handler, enable notification
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);
for (;;) {
Sigwait(&newmask, &signo);
if (signo == SIGUSR1) {
Mq_notify(mqd, &sigev); // reregister first
while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
}
}
exit(0);
}
5.4 select
函数
消息队列描述符 $mqd_-t$ 不能用于 $select$ 或 $poll$ 。然而我们可以通过管道结合 $mq_-notify$ 使用。
#include "unpipc.h"
int pipefd[2];
static void sig_usr1(int);
int main(int argc, char **argv) {
int nfds;
char c;
fd_set rset;
mqd_t mqd;
void *buff;
ssize_t n;
struct mq_attr attr;
struct sigevent sigev;
if (argc != 2)
err_quit("usage: mqnotifysig5 <name>");
// open queue, get attributes, allocate read buffer
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);
Pipe(pipefd);
// establish signal handler, enable notification
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);
FD_ZERO(&rset);
for (;;) {
FD_SET(pipefd[0], &rset);
nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);
if (FD_ISSET(pipefd[0], &rset)) {
Read(pipefd[0], &c, 1);
Mq_notify(mqd, &sigev); // reregister first
while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
}
}
exit(0);
}
static void sig_usr1(int signo) {
Write(pipefd[1], "", 1); // one byte of 0
return;
}
5.5 线程
异步事件通知的另一种方式是把 $sigev_-notify$ 指定为 $SIGEV_-THREAD$ ,这会创建一个新线程。线程属性由 $sigev_-notify_-attributes$ 指定,调用 $sigev_-notify_-function$ 函数,参数为 $sigev_-value$ 。
#include "unpipc.h"
mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval); // our thread function
int main(int argc, char **argv) {
if (argc != 2)
err_quit("usage: mqnotifythread1 <name>");
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
Mq_notify(mqd, &sigev);
for (;;)
pause(); // each new thread does everything
exit(0);
}
static void notify_thread(union sigval arg) {
ssize_t n;
void *buff;
printf("notify_thread started\n");
buff = Malloc(attr.mq_msgsize);
Mq_notify(mqd, &sigev); // reregister
while ((m = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
free(buf);
pthread_exit(NULL);
}
6. POSIX
实时信号
信号可划分为两组:
- 值在 $SIGRTMIN$ 和 $SIGTRMAX$ 之间 ( 包括两者 ) 的实时信号,
POSIX
要求至少提供 $RTSIG_-MAX$ 种实时信号,最小为 $8$ ; - 其他信号,比如 $SIGALRM$ 、$SIGINT$ 和 $SIGKILL$ 等。
接收某个信号的进程的 $sigaction$ 调用中是否制定了新的 $SA_-SIGINFO$ 标志会带来差异。
信号 | $sigaction$ 调用 | |
---|---|---|
指定 $SA_-SIGINFO$ | 未指定 $SA_-SIGINFO$ | |
$SIGRTMIN$ $\sim$ $SIGRTMAX$ | 已指定实时行为 | 未指定实时行为 |
其他信号 | 未指定实时行为 | 未指定实时行为 |
未指定实时行为意味着有些实现可能提供实时行为,有些不提供。如果需要实时行为,我们必须使用 $SIGRTMIN$ $\sim$ $SIGRTMAX$ 之间的信号,并且在调用 $sigaction$ 时指定 $SA_-SIGINFO$ 。实时行为意味着:
- 信号是排队的;
- 当有多个解阻塞信号排队时,值较小的信号优于值较大信号递交;
- 某个非实时信号递交时,传递给它的信号处理程序的唯一参数是信号值。而实时信号可以传递更多信息。
- 一些新信号定义成使用实时信号工作。例如,$sigqueue$ 函数代替 $kill$ 函数向某个进程发送信号,该新函数允许发送者随所发送信号传递一个 $sigval$ 。
void func(int signo, siginfo_t *info, void *context);
typedef struct {
int si_signo; // same value as signo argument
int si_code; // SI_{USER, QUEUE, TIMER, ASYNCIO, MEGEQ}
union sigval si_value; // integer or pointer value from sender
} siginfo_t;
$context$ 参数所指的内容依赖于具体实现。
- $SI_-ASYNCIO$ :信号由某个异步
I/O
请求产生,即POSIX
的 $aio_-XXX$ 函数; - $SI_-MESGQ$ :信号在有一个消息被放置到某个空消息队列时产生;
- $SI_-QUEUE$ :信号由 $timer_-settime$ 函数设置的定时器产生;
- $SI_-USER$ :信号由 $kill$ 函数产生。
如果信号由其他事件产生,$si_-code$ 的值就会被设置成不同于上述的值。而 $siginfo_-t$ 结构的 $si_-value$ 成员只有 $si_-code$ 是上述所列的值之一时才启用。
#include "unpipc.h"
static volatile siginfo_t arrival[10];
static volatile int nsig;
static void sig_rt(int, siginfo_t *, void *);
int main(int argc, char **argv) {
int i, j;
pid_t pid;
sigset_t newset;
union sigval val;
printf("SIGRTMIN = %d, SIGRTMAX = %d\n", (int) SIGRTMIN, (int) SIGRTMAX);
if ((pid = Fork()) == 0) {
// child: block three realtime signals
Sigemptyset(&newset);
Sigaddset(&newset, SIGRTMAX);
Sigaddset(&newset, SIGRTMAX - 1);
Sigaddset(&newset, SIGRTMAX - 2);
Sigprocmask(SIG_BLOCK, &newset, NULL);
// establish signal handler with SA_SIGINFO set
Signal_rt(SIGRTMAX, sig_rt, &newset);
Signal_rt(SIGRTMAX - 1, sig_rt, &newset);
Signal_rt(SIGRTMAX - 2, sig_rt, &newset);
sleep(6); // let parent send all the signals
Sigprocmask(SIG_UNBLOCK, &newset, NULL); // unblock
sleep(3); // let all queued signals be delivered
for (i = 0; i < nsig; i++) {
printf("received signal #%d, code = %d, ival = %d\n",
arrival[i].si_signo, arrival[i].si_code,
arrival[i].si_value.sival_int);
}
exit(0);
}
// parent sends nine signals to child
sleep(3); // let child block add signals
for (i = SIGRTMAX; i >= SIGRTMAX - 2; i--) {
for (j = 0; j <= 2; j++) {
val.sival_int = j;
Sigqueue(pid, i, val);
printf("sent signal %d, val = %d\n", i, j);
}
}
exit(0);
}
static void sig_rt(int signo, siginfo_t *info, void *context) {
arrival[nsig++] = *info; // save info for child to print
}
首先输出最小和最大实时信号值,查看系统支持的实时信号数量。然后派生一个子进程,阻塞我们需要的实时信号。接着子进程调用 $signal_-rt$ 函数建立信号处理程序,父进程则等待 $6$ 秒后发送信号,最后子进程输出接收到的所有信号。
6.1 sinal_rt
函数
#include "unpipc.h"
typedef void Sigfunc_rt(int, siginfo_t *, void *);
Sigfunc_rt *signal_rt(int signo, Sigfunc_rt *func, sigset_t *mask) {
struct sigaction act, oact;
act.sa_sigaction = func; // must store function addr here
act.sa_mask = *mask; // signals to block
act.sa_flags = SA_SIGINFO; // must specify this for realtime
if (signo == SIGALRM) {
#ifdef SA_INTERRUPT
act.sa_flags |= SA_INTERRUPT; // SunOS 4.x
#endif
} else {
#ifdef SA_RESTART
act.sa_flags |= SA_RESTART; // SVR4, 44BSD
#endif
}
if (sigaction(signo, &act, &oact) < 0)
return (Sigfunc_rt *) SIG_ERR;
return oact.sa_sigaction;
}
加入实时信号支持后,$sigaction$ 发生了变化,添加了 $sa_-sigaction$ 成员。
struct sigaction {
void (*sa_handler)(); // SIG_DFL, SIG_IGN or add of signal handler
sigset_t sa_mask; // additional signals to block
int sa_flags; // signal options: SA_xxx
void (*sa_sigaction)(int, siginfo_t, void *); // addr of signal handler if SA_SIGINFO set
};
- 如果 $sa_-flags$ 成员设置了 $SA_-SIGINFO$ 标志,那么 $sa_-sigaction$ 成员会指定信号处理函数地址;
- 如果 $sa_-flags$ 成员没有设置 $SA_-SIGINFO$ 标志,那么 $sa_-handler$ 成员会指定信号处理函数地址;
- 给某个信号指定默认行为或忽略信号,应该把 $sa_-handler$ 设置为 $SIG_-DFL$ 或 $SIG_-IGN$ 并不设置 $SA_-SIGINFO$ 标志。