Unix网络编程(17):消息队列

        消息队列可认为是一个消息链表,有足够写权限的线程可往队列中放置消息,有足够读权限的线程可从队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。与管道和FIFO不同,在某个进程往一个队列写入消息前,并不需要另外某个进程在该队列上等待消息到达。

1. mq_openmq_closemq_unlink函数

#include <mqueue.h>

// 成功返回消息队列描述符,出错返回-1
mqd_t mq_open(const char *name, int oflag, ...
  /* mode_t mode, struct aq_attr *attr */ );

// 成功返回0,出错返回-1
int mq_close(mqd_t mqdes);

// 成功返回0,出错返回-1
int mq_unlink(const char *name);

        $oflag$ 参数是 $O_-RDONLY$ 、$O_-WRONLY$ 或 $O_-RDWR$ 之一,并且可以按位或 $O_-CREAT$ 、$O_-EXCL$ 或 $O_-NONBLOCK$ 。当实际操作是创建一个新队列时 ( 指定 $O_-CREAT$ 且请求的消息队列不存在 ),需要 $mode$ 和 $attr$ 参数。$mq_-close$ 调用类似于 $close$ 调用,调用之后消息队列不会从系统中删除。当一个进程终止时,它的所有打开着的消息队列会被关闭。如果要从系统中删除一个消息队列,则需要调用 $mq_-unlink$ ,类似于 $unlink$ ,当一个消息队列的打开计数大于 $0$ 时,不会立即删除,而是直到该消息队列真正被关闭时才会删除。POSIX消息队列至少具备随内核的持续性,这意味着即使当前没有进程打开着消息队列,其中的消息也将一直存在,直到被删除。

#include "unpipc.h"

int main(int argc, char **argv) {
  int c, flags;
  mqd_t mqd;

  flags = O_RDWR | O_CREAT;
  while ((c = Getopt(argc, argv, "e")) != -1) {
    switch(c) {
    case 'e':
      flags |= O_EXCL;
      break;
    }
  }
  if (optind != argc - 1)
    err_quit("usage: mqcreate [ -e ] <name>");

  mql = Mq_open(argv[optind], flags, FILE_MODE, NULL);

  Mq_close(mqd);
  exit(0);
}

        允许有一个排他选项 $-e$ 创建消息队列。通过 $getopt$ 获取这个选项,$getopt$ 会在 $optind$ 中存放待处理的下一个参数的下标。

2. mq_getattrmq_setattr函数

#include <mqueue.h>

// 成功返回0,出错返回-1
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

// 成功返回0,出错返回-1
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);

struct mq_attr {
  long mq_flags;  // message queue flag: 0, O_NONBLOCK
  long mq_maxmsg;  // max number of messages allowed on queue
  long mq_msgsize;  // max size of a message (in bytes)
  long mq_curmsgs;  // number of messages currently on queue
};

        $mq_-attr$ 结构指针可以作为 $mq_-open$ 的第四个参数传递,从而允许我们在该函数的实际操作是创建一个新队列时,为它指定 $mq_-maxmsg$ 和 $mq_-msgsize$ 属性,其他两个属性会被忽略。$mq_-setattr$ 为所指队列设置属性,但是只使用 $mq_-flags$ ,用于设置或清除非阻塞标志。另外,如果 $oattr$ 非空,当前队列的先前属性会返回到 $oattr$ 中。

3. mq_sendmq_receive函数

#include <mqueue.h>

// 成功返回0,出错返回-1
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

// 成功返回消息字节数,出错返回-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

        这两个函数分别用于往消息队列放置一个消息和从消息队列中取出一个消息。每个消息都对应一个优先级 $prio$ ,它是一个小于 $MQ_-PRIO_-MAX$ 的无符号整数,POSIX要求这个上限最小为 $32$ 。$mq_-receive$ 总是会取出队列中优先级最高的最早发送的消息,并且该优先级会和消息长度一起返回。$mq_-receive$ 的 $len$ 不能小于消息队列中的消息最大大小,如果小于,会返回 $EMSGSIZE$ 错误。如果应用不必使用优先级不同的消息,可以统一将优先级指定为 $0$ ,并将 $mq_-receive$ 的最后一个参数指定为 $NULL$ 。

4. 消息队列限制

        消息队列存在两个限制,都是在创建队列时指定的:

        消息队列的实现定义了另外两个限制:

#include "unpipc.h"

int main(int argc, char **argc) {
  printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n",
    Sysconf(_SC_MQ_OPEN_MAX), Sysconf(_SC_MQ_PRIO_MAX));
  exit(0);
}

5. mq_notify函数

#include <mqueue.h>

// 成功返回0,出错返回-1
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

        POSIX消息队列允许异步事件通知 ( $asynchronous$ $event$ $notification$ ),用于告知何时有一个消息被放置到空的消息队列中。这种通知有两种方式可以选择:

        $mq_-notify$ 为指定的消息队列建立或删除异步事件通知。

#include <signal.h>

union sigval {
  int sival_int;  // integer value
  void *sival_ptr;  // pointer value
};

struct sigevent {
  int sigev_notify;  // SIGEV_{NONE, SIGNAL, THREAD}
  int sigev_signo;  // signal number if SIGEV_SIGNAL
  union sigval sigev_value;  /* passed to signal handler or thread
                                following two if SIGEV_THREAD */
  void (*sigev_notifiy_function)(union sigval);
  pthread_attr_t *sigev_notify_attributes;
};
  1. 如果 $notification$ 非空,那么当前进程希望在有一个消息到达前得到通知;
  2. 如果 $notification$ 为空,而且当前进程已被注册接收指定队列的通知,那么撤销先前的注册;
  3. 任意时刻只有一个进程可以被注册接收某个指定队列的通知;
  4. 当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的 $mq_-receive$ 调用的前提下,通知才会发出。即,$mq_-receive$ 优于注册通知;
  5. 当通知被发送给注册进程时,注册即被撤销。如果想要再次接收通知,需要再次注册。
#include "unpipc.h"

mqd_t mqd;
void *buff;
struct mq_attr attr;
struct sigevent sigev;

static void sig_usr1(int);

int main(int argc, char **argv) {
  if (argc != 2)
    err_quit("usage: mqnotifysig1 <name>");

  // open queue, get attributes, allocate read buffer
  mqd = Mq_open(argv[1], O_RDONLY);
  Mq_getattr(mqd, &attr);
  buff = Malloc(attr.mq_msgsize);

  // establish signal handler, enable notification
  Signal(SIGUSR1, sig_usr1);
  sigev.sigev_notify = SIGEV_SIGNAL;
  sigev.sigev_signo = SIGUSR1;
  Mq_notify(mqd, &sigev);

  for (;;)
    pause();  // signal hadler does everything
  exit(0);
}

static vodi sig_usr1(int signo) {
  ssize_t n;

  Mq_notify(mqd, &sigev);  // reregister first
  n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
  printf("SIGUSR1 received, read %ld bytes\n", (long) n);
  return;
}

5.1 异步信号安全函数

        上述程序的问题是它在信号处理程序中调用 $mq_-notify$ 、$mq_-receive$ 和 $printf$ 。这些函数实际上都不可以在信号处理程序中调用。POSIX使用异步信号安全 ( $async-signal-safe$ ) 这个术语描述可以在信号处理函数中调用的函数。

$$ \begin{array}{|c|c|c|c|} \hline access & fpathconf & rename & sysconf\\ aio_-return & fstat & rmdir & tcdrian \\ aio_-suspend & fsync & sem_-post & tcflow \\ alarm & getegid & setgit & tcflush \\ cfgetispeed & geteuid & setpgid & tcgetattr \\ cfgetospeed & getpgrp & sigaction & tcsetattr \\ chdir & getpid & sigaddset & tcsetpgrp \\ chmod & getppid & sigdelset & time \\ chown & getuid & sigemptyset & timer_-getoverrun \\ clock_-gettime & kill & sigfillset & timer_-gettime \\ close & link & sigismember & timer_-settime \\ creat & lseek & signal & times \\ dup & mkdir & sigpause & umask \\ dup2 & mkfifo & sigpending & uname \\ execle & open & sigprocmask & unlink \\ execve & pathconf & sigqueue & utim \\ _-exit & pause & sigset & wait \\ fcntl & pipe & sigsuspend & waitpid \\ fdatasync & raise & sleep & write \\ fork & read & stat \\ \hline \end{array} $$

        没有列在上表中的函数不能在信号处理函数中调用。注意所有标准I/O函数和 $pthread_-XXX$ 函数都没有列在其中。IPC相关函数只有 $sem_-post$ 、$read$ 和 $write$ 在其中。
        避免从信号处理程序中掉哟该任何函数的办法之一是让处理函数仅仅设置一个全局标志,再由某个线程检查该标志以确定何时收到消息。

#include "unpipc.h"

volatile sig_atomic_t mqflag; // set nonzero by signal handler
static void sig_usr1(int);

int main(int argc, char **argv) {
  mqd_t mqd;
  void *buff;
  ssize_t n;
  sigset_t zeromask, newmask, oldmask;
  struct mq_attr attr;
  struct sigevent sigev;

  if (argc != 2)
    err_quit("usage: mqnotifysig2 <name>");

  // open queue, get attributes, allocate read buffer
  mqd = Mq_open(argv[1], O_RDONLY);
  Mq_getattr(mqd, &attr);
  buff = Malloc(attr.mq_msgsize);

  Sigemptyset(&zeromask);  // no signals block
  Sigemptyset(&newmask);
  Sigemptyset(&oldmask);
  Sigaddset(&newmask, SIGUSR1);

  // establish signal handler, enable notification
  Signal(SIGUSR1, sig_usr1);
  sigev.sigev_notify = SIGEV_SIGNAL;
  sigev.sigev_signo = SIGUSR1;
  Mq_notify(mqd, &sigev);

  for (;;) {
    Sigprocmask(SIG_BLOCK, &newmask, &oldmask);  // block SIGUSR1
    while (mqflag == 0)
      sigsuspend(&zeromask);
    mqflag = 0;  // reset flag

    Mq_notify(mqd, &sigev);  // reregister first
    n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
    printf("read %ld bytes\n", (long) n);
    Sigprocmask(SIG_UNBLOCK, &newmask, NULL);  // unblock SIGUSR1
  }
  exit(0);
}

static void sig_usr1(int signo) {
  mqflag = 1;
  return;
}

5.2 非阻塞的信号通知

        上面的程序还存在一个问题,那就是Unix的信号是不排队的,如果有两个消息到达,我们的程序只会处理一个。解决办法是使用非阻塞模式读取消息。

int main(int argc, char **argv) {
  mqd_t mqd;
  void *buff;
  ssize_t n;
  sigset_t zeromask, newmask, oldmask;
  struct mq_attr attr;
  struct sigevent sigev;

  if (argc != 2)
    err_quit("usage: mqnotifysig3 <name>");

  // open queue, get attributes, allocate read buffer
  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
  Mq_getattr(mqd, &attr);
  buff = Malloc(attr.mq_msgsize);

  Sigemptyset(&zeromask);  // no signals blocked
  Sigemptyset(&newmask);
  Sigemptyset(&oldmask);
  Sigaddset(&newmask, SIGUSR1);

  // establish signal handler, enable notification
  Signal(SIGUSR1, sig_usr1);
  sigev.sigev_notify = SIGEV_SIGNAL;
  sigev.sigev_signo = SIGUSR1;
  Mq_notify(mqd, &sigev);

  for (;;) {
    Sigprocmask(SIG_BLOCK, &newmask, &oldmask);  // block SIGUSR1
    while (mqflag == 0)
      sigsuspend(&zeromask);
    mqflag = 0;  // reset flag

    Mq_notify(mqd, &sigev);  // reregister first
    while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
      printf("read %ld bytes\n", (long) n);
    }
    if (errno != EAGAIN)
      err_sys("mq_receive error");
    Sigprocmask(SIG_UNBLOCK, &newmask, NULL);  // unblock SIGUSR1
  }
  exit(0);
}

5.3 sigwait函数

        上述程序尽管正确,但是效率还可以更高。因为我们的程序是通过 $sigsuspend$ 阻塞的,直到 $mqflag$ 非零,更高效的办法是只等待 $SIGUSR1$ 而不是任意一个信号。

#include <signal.h>

// 成功返回0,出错返回非负错误码
int sigwait(const sigset_t *set, int *sig);

        $sigwait$ 会阻塞等待信号集 $set$ 中的某个信号返回,返回时设置 $sig$ ,标识产生的信号。

#include "unpipc.h"

int main(int argc, char **argv) {
  int signo;
  mqd_t mqd;
  void *buff;
  ssize_t n;
  sigset_t newmask;
  struct mq_attr attr;
  struct sigevent sigev;

  if (argc != 2)
    err_quit("usage: mqnotifysig4 <name>");

  // open queue, get attributes, allocate read buffer
  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK;
  Mq_getattr(mqd, &attr);
  buff = Malloc(attr.mq_msgsize);

  Sigemptyset(&newmask);
  Sigaddset(&newmask, SIGUSR1);
  Sigprocmask(SIG_BLOCK, &newmask, NULL);  // block SIGUSR1

  // establish signal handler, enable notification
  sigev.sigev_notify = SIGEV_SIGNAL;
  sigev.sigev_signo = SIGUSR1;
  Mq_notify(mqd, &sigev);

  for (;;) {
    Sigwait(&newmask, &signo);
    if (signo == SIGUSR1) {
      Mq_notify(mqd, &sigev);  // reregister first
      while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
        printf("read %ld bytes\n", (long) n);
      }
      if (errno != EAGAIN)
        err_sys("mq_receive error");
    }
  }
  exit(0);
}

5.4 select函数

        消息队列描述符 $mqd_-t$ 不能用于 $select$ 或 $poll$ 。然而我们可以通过管道结合 $mq_-notify$ 使用。

#include "unpipc.h"

int pipefd[2];
static void sig_usr1(int);

int main(int argc, char **argv) {
  int nfds;
  char c;
  fd_set rset;
  mqd_t mqd;
  void *buff;
  ssize_t n;
  struct mq_attr attr;
  struct sigevent sigev;

  if (argc != 2)
    err_quit("usage: mqnotifysig5 <name>");

  // open queue, get attributes, allocate read buffer
  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
  Mq_getattr(mqd, &attr);
  buff = Malloc(attr.mq_msgsize);

  Pipe(pipefd);

  // establish signal handler, enable notification
  Signal(SIGUSR1, sig_usr1);
  sigev.sigev_notify = SIGEV_SIGNAL;
  sigev.sigev_signo = SIGUSR1;
  Mq_notify(mqd, &sigev);

  FD_ZERO(&rset);
  for (;;) {
    FD_SET(pipefd[0], &rset);
    nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);

    if (FD_ISSET(pipefd[0], &rset)) {
      Read(pipefd[0], &c, 1);
      Mq_notify(mqd, &sigev);  // reregister first
      while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
        printf("read %ld bytes\n", (long) n);
      }
      if (errno != EAGAIN)
        err_sys("mq_receive error");
    }
  }
  exit(0);
}

static void sig_usr1(int signo) {
  Write(pipefd[1], "", 1);  // one byte of 0
  return;
}

5.5 线程

        异步事件通知的另一种方式是把 $sigev_-notify$ 指定为 $SIGEV_-THREAD$ ,这会创建一个新线程。线程属性由 $sigev_-notify_-attributes$ 指定,调用 $sigev_-notify_-function$ 函数,参数为 $sigev_-value$ 。

#include "unpipc.h"

mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;

static void notify_thread(union sigval);  // our thread function

int main(int argc, char **argv) {
  if (argc != 2)
    err_quit("usage: mqnotifythread1 <name>");

  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
  Mq_getattr(mqd, &attr);

  sigev.sigev_notify = SIGEV_THREAD;
  sigev.sigev_value.sival_ptr = NULL;
  sigev.sigev_notify_function = notify_thread;
  sigev.sigev_notify_attributes = NULL;
  Mq_notify(mqd, &sigev);

  for (;;)
    pause();  // each new thread does everything
  exit(0);
}

static void notify_thread(union sigval arg) {
  ssize_t n;
  void *buff;

  printf("notify_thread started\n");
  buff = Malloc(attr.mq_msgsize);
  Mq_notify(mqd, &sigev);  // reregister

  while ((m = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
    printf("read %ld bytes\n", (long) n);
  }
  if (errno != EAGAIN)
    err_sys("mq_receive error");

  free(buf);
  pthread_exit(NULL);
}

6. POSIX实时信号

        信号可划分为两组:

  1. 值在 $SIGRTMIN$ 和 $SIGTRMAX$ 之间 ( 包括两者 ) 的实时信号,POSIX要求至少提供 $RTSIG_-MAX$ 种实时信号,最小为 $8$ ;
  2. 其他信号,比如 $SIGALRM$ 、$SIGINT$ 和 $SIGKILL$ 等。

        接收某个信号的进程的 $sigaction$ 调用中是否制定了新的 $SA_-SIGINFO$ 标志会带来差异。

信号 $sigaction$ 调用
指定 $SA_-SIGINFO$ 未指定 $SA_-SIGINFO$
$SIGRTMIN$ $\sim$ $SIGRTMAX$ 已指定实时行为 未指定实时行为
其他信号 未指定实时行为 未指定实时行为

        未指定实时行为意味着有些实现可能提供实时行为,有些不提供。如果需要实时行为,我们必须使用 $SIGRTMIN$ $\sim$ $SIGRTMAX$ 之间的信号,并且在调用 $sigaction$ 时指定 $SA_-SIGINFO$ 。实时行为意味着:

void func(int signo, siginfo_t *info, void *context);

typedef struct {
  int si_signo;  // same value as signo argument
  int si_code;  // SI_{USER, QUEUE, TIMER, ASYNCIO, MEGEQ}
  union sigval si_value;  // integer or pointer value from sender
} siginfo_t;

        $context$ 参数所指的内容依赖于具体实现。

        如果信号由其他事件产生,$si_-code$ 的值就会被设置成不同于上述的值。而 $siginfo_-t$ 结构的 $si_-value$ 成员只有 $si_-code$ 是上述所列的值之一时才启用。

#include "unpipc.h"

static volatile siginfo_t arrival[10];
static volatile int nsig;

static void sig_rt(int, siginfo_t *, void *);

int main(int argc, char **argv) {
  int i, j;
  pid_t pid;
  sigset_t newset;
  union sigval val;

  printf("SIGRTMIN = %d, SIGRTMAX = %d\n", (int) SIGRTMIN, (int) SIGRTMAX);

  if ((pid = Fork()) == 0) {
    // child: block three realtime signals
    Sigemptyset(&newset);
    Sigaddset(&newset, SIGRTMAX);
    Sigaddset(&newset, SIGRTMAX - 1);
    Sigaddset(&newset, SIGRTMAX - 2);
    Sigprocmask(SIG_BLOCK, &newset, NULL);

    // establish signal handler with SA_SIGINFO set
    Signal_rt(SIGRTMAX, sig_rt, &newset);
    Signal_rt(SIGRTMAX - 1, sig_rt, &newset);
    Signal_rt(SIGRTMAX - 2, sig_rt, &newset);

    sleep(6);  // let parent send all the signals

    Sigprocmask(SIG_UNBLOCK, &newset, NULL);  // unblock
    sleep(3);  // let all queued signals be delivered

    for (i = 0; i < nsig; i++) {
      printf("received signal #%d, code = %d, ival = %d\n",
        arrival[i].si_signo, arrival[i].si_code,
        arrival[i].si_value.sival_int);
    }
    exit(0);
  }

  // parent sends nine signals to child
  sleep(3);  // let child block add signals
  for (i = SIGRTMAX; i >= SIGRTMAX - 2; i--) {
    for (j = 0; j <= 2; j++) {
      val.sival_int = j;
      Sigqueue(pid, i, val);
      printf("sent signal %d, val = %d\n", i, j);
    }
  }
  exit(0);
}

static void sig_rt(int signo, siginfo_t *info, void *context) {
  arrival[nsig++] = *info;  // save info for child to print
}

        首先输出最小和最大实时信号值,查看系统支持的实时信号数量。然后派生一个子进程,阻塞我们需要的实时信号。接着子进程调用 $signal_-rt$ 函数建立信号处理程序,父进程则等待 $6$ 秒后发送信号,最后子进程输出接收到的所有信号。

6.1 sinal_rt函数

#include "unpipc.h"

typedef void Sigfunc_rt(int, siginfo_t *, void *);

Sigfunc_rt *signal_rt(int signo, Sigfunc_rt *func, sigset_t *mask) {
  struct sigaction act, oact;

  act.sa_sigaction = func;  // must store function addr here
  act.sa_mask = *mask;  // signals to block
  act.sa_flags = SA_SIGINFO;  // must specify this for realtime
  if (signo == SIGALRM) {
#ifdef SA_INTERRUPT
    act.sa_flags |= SA_INTERRUPT;  // SunOS 4.x
#endif
  } else {
#ifdef SA_RESTART
    act.sa_flags |= SA_RESTART;  // SVR4, 44BSD
#endif
  }
  if (sigaction(signo, &act, &oact) < 0)
    return (Sigfunc_rt *) SIG_ERR;
  return oact.sa_sigaction;
}

        加入实时信号支持后,$sigaction$ 发生了变化,添加了 $sa_-sigaction$ 成员。

struct sigaction {
  void (*sa_handler)();  // SIG_DFL, SIG_IGN or add of signal handler
  sigset_t sa_mask;  // additional signals to block
  int sa_flags;  // signal options: SA_xxx
  void (*sa_sigaction)(int, siginfo_t, void *);  // addr of signal handler if SA_SIGINFO set
};

Unix网络编程(17):消息队列