Unix网络编程(20):共享内存
共享内存是可用IPC
形式中最快的。因为将内存区映射到进程的地址空间之后,进程间的数据传递就不用通过内核。
1. 共享内存操作
#include <sys/mman.h>
// 成功返回映射内存区的地址,出错返回MAP_FAILED
void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
$mmap$ 函数把一个文件或一个POSIX
共享内存区对象映射到调用进程的地址空间。使用该函数有三个目的:
- 对普通文件使用内存映射
I/O
; - 对特殊文件进行匿名内存映射;
- 使用 $shm_-open$ 提供无亲缘关系的进程间的
POSIX
共享内存区。
$addr$ 可以指定描述符 $fd$ 应该被映射到的进程内存空间的地址,也可以指定为 $NULL$ ,从而让内核自己选择。$len$ 是映射到调用进程地址空间中的字节数,从被映射字节开头第 $offset$ 个字节开始,$offset$ 可以为 $0$ 。内存映射区的保护由 $prot$ 参数提供,可选值有:
$prot$ | 说明 |
---|---|
$PROT_-READ$ | 数据可读 |
$PROT_-WRITE$ | 数据可写 |
$PROT_-EXEC$ | 数据可执行 |
$PROT_-NONE$ | 数据不可访问 |
$prot$ 参数的常见值为 $PROT_-READ$ | $PROT_-WRITE$ 。$flags$ 指定共享内存属性,可选值有:
$flag$ | 说明 |
---|---|
$MAP_-SHARED$ | 变动是共享的 |
$MAP_-PRIVATE$ | 变动是私有的 |
$MAP_-FIXED$ | 准确地解释 $addr$ |
其中 $MAP_-SHARED$ 和 $MAP_-PRIVATE$ 必须选择其中一个。如果选择 $MAP_-SHARED$ ,那么调用进程对映射区内所做的修改对其他进程可见,并且也会修改底层对象;如果选择 $MAP_-PRIVATE$ ,那么调用进程对映射区所做的修改只有当前进程可见,而且不会修改底层对象;从移植性上考虑,不应该指定 $MAP_-FIXED$ ,如果没有指定该标志并且 $addr$ 非空,则具体行为取决于实现。可移植的代码应该把 $addr$ 设置为 $NULL$ 并不指定 $MAP_-FIXED$ 。
#include <sys/mman.h>
// 成功返回0,出错返回-1
int munmap(void *addr, size_t len);
// 成功返回0,出错返回-1
int msync(void *addr, size_t len, int flags);
$munmap$ 从进程空间中删除映射区,删除之后再次访问先前映射区地址会产生 $SIGSEGV$ 信号。$msync$ 算法用于主动将映射区中的数据同步到硬盘,可选的 $flags$ 参数有:
$flag$ | 说明 |
---|---|
$MS_-ASYNC$ | 异步写 |
$MS_-SYNC$ | 同步写 |
$MS_-INVALIDATE$ | 使高速缓存中的数据失效 |
其中 $MS_-ASYNC$ 和 $MS_-SYNC$ 必须指定其中一个。
#include "unpipc.h"
struct shared {
sem_t mutex; // the mutex: a Posix memory-based semaphore
int count; // the counter
} shared;
int main(int argc, char **argv) {
int fd, i, nloop;
struct shared *ptr;
if (argc != 3)
err_quit("usage: incr2 <pathname> <#loops>");
nloop = atoi(argv[2]);
// open file, initalize to 0, map into memory
fd = Open(argv[1], O_RDWR | O_CREAT, FILE_MODE);
Write(fd, &shared, sizeof(struct shared));
ptr = Mmap(NULL, sizeof(struct shared), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
Close(fd);
// initialize semaphore that is shared between processes
Sem_init(&ptr->mutex, 1, 1);
setbuf(stdout, NULL); // stdout is unbuffered
if (Fork() == 0) { // child
for (i = 0; i < nloop; i++) {
Sem_wait(&ptr->mutex);
printf("child: %d\n", ptr->count++);
Sem_post(&ptr->mutex);
}
exit(0);
}
// parent
for (i = 0; i < nloop; i++) {
Sem_wait(&ptr->mutex);
printf("parent: %d\n", ptr->count++);
Sem_post(&ptr->mutex);
}
exit(0);
}
2. 匿名内存映射
匿名内存映射可以让我们在不用创建文件的前提下使用内存映射。
int main(int argc, char **argv) {
int i, nloop;
int *ptr;
sem_t *mutex;
if (argc != 2)
err_quit("usage: incr_map_anon <#loops>");
nloop = atoi(argv[1]);
// map into memory
ptr = Mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
// 后略...
}
$/dev/zero$ 设备文件会不断产生空字符,也可以用于实现匿名内存映射。
int main(int argc, char **argv) {
int fd, i, nloop;
int *ptr;
sem_t *mutex;
if (argc != 2)
err_quit("usage: incr_dev_zero <#loops>");
nloop = atoi(argv[1]);
// open /dev/zerp, map into memory
fd = Open("/dev/zero", O_RDWR);
ptr = Mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
Close(fd);
// 后略...
}
3. 安全访问共享内存映射区
访问共享内存映射区时可能会产生两种错误:
- 当访问的区域位于映射区以外时,返回 $SIGSEGV$ 错误;
- 当访问的区域位于映射区以内但是超出文件大小时,返回 $SIGBUS$ 错误。
#include "unpipc.h"
#define FILE "test.data"
#define SIZE 32768
int main(int argc, char **argv) {
int fd, i;
char *ptr;
// open: create or truncate; then mmap file
fd = Open(FILE, O_RDWR | O_CREAT | O_TRUNC, FILE_MODE);
ptr = Mmap(NULL, SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
for (i = 4096; i <= SIZE; i += 4096) {
printf("setting file size to %d\n", i);
Ftruncate(fd, i);
printf("ptr[%d] = %d\n", i - 1, ptr[i - 1]);
}
exit(0);
}
上述程序使用了让文件持续增长的技巧。它指定了一个大于文件大小的内存映射区,同时不断跟踪文件大小,并让文件大小随着写入增长。
#include <unistd.h>
int ftruncate(int fd, off_t len);
处理 $mmap$ 的时候,普通文件或共享内存区的大小都可以通过 $ftruncate$ 修改。但是对于普通文件和共享内存区对象文件,POSIX
对它们的处理逻辑不同:
- 对于普通文件,如果文件大小大于 $length$ ,那么文件会被缩小并且丢弃额外的数据;如果文件大小小于 $length$ ,那么文件怎么修改和大小怎么改变是未定义的。实际上,对于一个普通文件,一种可移植的增加大小的方法是先使用 $lseek$ 定位到 $length - 1$ ,然后写入 $1$ 字节数据。所幸的是,几乎所有
Unix
实现都支持使用 $ftruncate$ 扩展一个文件; - 对于共享内存区对象文件,$ftruncate$ 会把大小设置为 $length$ 字节。
普通文件指通过 $open$ 打开的文件,共享内存区对象文件则指通过 $shm_-open$ 打开的文件,稍后我们会看到 $shm_-open$ 的调用。
#include <sys/types.h>
#include <sys/stat.h>
// 成功返回0,出错返回-1
int fstat(int fd, struct stat *buf);
当打开一个已存在的共享内存区对象文件时,我们可以通过 $fstat$ 获取该文件信息。$stat$ 结构有 $12$ 或以上的成员,然而当 $fd$ 指代一个共享内存区对象时,只有四个成员有效:
struct stat {
// ...
mode_t st_mode; // mode: S_I{RW}{USR,GRP,OTH}
uid_t st_uid; // user ID of owner
gid_t st_gid; // group ID of owner
off_t st_size; // size in bytes
// ...
};
4. POSIX
共享内存区操作
#include <sys/mman.h>
// 成功返回非负描述符,出错返回-1
int shm_open(const char *name, int oflag, mode_t mode);
// 成功返回0,出错返回-1
int shm_unlink(const char *name);
POSIX
共享内存区涉及以下两个步骤要求:
- 调用 $shm_-open$ 并指定文件名,创建或打开一个共享内存区对象文件;
- 调用 $mmap$ 把这个共享内存区对象文件映射到调用进程的地址空间。
$oflag$ 参数必须含有 $ORDONLY$ 和 $O_-RDWR$ 的其中一个,还可以指定 $O_-CREAT$ 、$O_-EXCL$ 和 $O_-TRUNC$ 。$mode$ 指定权限位,在指定了 $O_-CREAT$ 的前提下使用,如果没有指定 $O_-CREAT$ ,可以设置为 $0$ 。$shm_-unlink$ 则负责删除一个共享内存区对象文件,跟其他 $unlink$ 一样,已打开的文件不会影响删除。
#include "unpipc.h"
int main(int argc, char **argv) {
int fd1, fd2, *ptr1, *ptr2;
pid_t childpid;
struct stat stat;
if (argc != 2)
err_quit("usage: test3 <name>");
shm_unlink(argv[1]);
fd1 = Shm_open(argv[1], O_RDWR | O_CREAT | O_EXCL, FILE_MODE);
Ftruncate(fd1, sizeof(int));
fd2 = Open("/etc/hostname", O_RDONLY);
Fstat(fd2, &stat);
if ((childpid = Fork()) == 0) { // child
ptr2 = Mmap(NULL, stat.st_size, PROT_READ, MAP_SHARED, fd2, 0);
ptr1 = Mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE,
MAP_SHARED, fd1, 0);
printf("child: shm ptr = %p, hostname ptr = %p\n", *ptr1);
sleep(5);
printf("shared memory integer = %d\n", *ptr1);
exit(0);
}
// parent: mmap in reverse order from child
ptr1 = Mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd1, 0);
ptr2 = Mmap(NULL, stat.st_size, PROT_READ, MAP_SHARED, fd2, 0);
printf("parent: shm ptr = %p, hostname ptr = %p\n", ptr1, ptr2);
*ptr1 = 777;
Waitpid(childpid, NULL, 0);
exit(0);
}
上面的程序演示了父子进程可以把同一个共享内存区对象映射到不同的进程地址,并且通过这不同的进程地址读出相同的值。
5. 生产者-消费者模型
我们可以通过共享内存技术实现生产者-消费者模型,通过共享内存区来容纳消息。
#include "unpipc.h"
#define MESGSIZE 256 // max #bytes per message, incl. null at end
#define NMESG 16 // max #messages
struct shmstruct { // struct stored in shared memory
sem_t mutex; // three POSIX memory-based semaphores
sem_t nempty;
sem_t nstored;
int nput; // index into msgoff[] for next put
long noverflow; // #overflows by senders
sem_t noverflowmutex; // mutex for noverflow counter
long msgoff[NMESG]; // offset in shared memory of each message
char msgdata[NMESG * MESGSIZE]; // the actual messages
};
当生产者想发送消息但是没有空间时,我们不会阻塞生产者,而是递增 $noverflow$ 计数。$msgoff$ 指出每个消息的偏移,其中 $msgoff[0] = 0$ 。
#include "cliserv2.h"
int main(int argc, char **argv) {
int fd, index, lastnoverflow, temp;
long offset;
struct shmstruct *ptr;
if (argc != 2)
err_quit("usage: server2 <name>");
// create shm, set its size, map it, close descriptor
shm_unlink(argv[1]); // OK if this fails
fd = Shm_open(argv[1], O_RDWR | O_CREAT | O_EXCL, FILE_MODE);
ptr = Mmap(NULL, sizeof(struct shmstruct), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
Ftruncate(fd, sizeof(struct shmstruct));
Close(fd);
// initialize the array of offsetss
for (index = 0; index < NMESG; index++)
ptr->msgoff[index] = index * MESGSIZE;
// initialize the semaphores in shared memory
Sem_init(&ptr->mutex, 1, 1);
Sem_init(&ptr->nempty, 1, NMESG);
Sem_init(&ptr->nstored, 1, 0);
Sem_init(&ptr->noverflowmutex, 1, 1);
// this program is the consumer
index = 0;
lastnoverflow = 0;
for (;;) {
Sem_wait(&ptr->nstored);
Sem_wait(&ptr->mutex);
offset = ptr->msgoff[index];
printf("index = %d: %s\n", index, &ptr->msgdata[offset]);
if (++index >= NMESG)
index = 0; // circular buffer
Sem_post(&ptr->mutex);
Sem_post(&ptr->nempty);
Sem_wait(&ptr->noverflowmutex);
temp = ptr->noverflow; // don't printf while mutex held
Sem_post(&ptr->noverflowmutex);
if (temp != lastnoverflow) {
printf("noverflow = %d\n", temp);
lastnoverflow = temp;
}
}
exit(0);
}
消费者程序会先删除可能存在的共享内存区对象文件,然后重新创建。接着会初始化每个消息的偏移。最后,在每次循环的末尾都会检查 $noverflow$ ,如果改变了,就输出它的最新值。
#include "cliserv2.h"
int main(int argc, char **argv) {
int fd, i, nloop, nusec;
pid_t pid;
char mesg[MESGSIZE];
long offset;
struct shmstruct *ptr;
if (argc != 4)
err_quit("usage: client2 <name> <#loops> <#usec>");
nloop = atoi(argv[2]);
nusec = atoi(argv[3]);
// open and map shared memory that server must create
fd = Shm_open(argv[1], O_RDWR, FILE_MODE);
ptr = Mmap(NULL, sizeof(struct shmstruct), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
Close(fd);
pid = getpid();
for (i = 0; i < nloop; i++) {
sleep_us(nusec);
snprintf(mesg, MESGSIZE, "pid %ld: message %d", (long) pid, i);
if (sem_trywait(&ptr->nempty) == -1) {
if (errno == EAGAIN) {
Sem_wait(&ptr->noverflowmutex);
ptr->noverflow++;
Sem_post(&ptr->noverflowmutex);
continue;
} else
err_sys("sem_trywait error");
}
Sem_wait(&ptr->mutex);
offset = ptr->msgoff[ptr->nput];
if (++(ptr->nput) >= NMESG)
ptr->nput = 0; // circular buffer
Sem_post(&ptr->mutex);
strncpy(&ptr->msgdata[offset], mesg, MESGSIZE);
Sem_post(&ptr->nstored);
}
exit(0);
}
生产者程序使用不会阻塞 $sem_-trywait$ 调用,如果调用失败 ( 即 $nempty$ 信号量不大于 $0$ ),说明映射区中没有空间,递增 $noverflow$ 。