Unix网络编程(18):锁
1. 互斥锁上锁和解锁
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_trylock(pthread_mutex_t *mptr);
int pthread_mutex_unlock(pthread_mutex_t *mptr);
POSIX
互斥锁被声明为 $pthread_-mutex_-t$ 类型的变量。如果是静态分配的,需要初始化为常值 $PTHREAD_-MUTEX_-INITIALIZER$ ;如果是动态分配的,或者分配在共享内存区中,需要通过 $pthread_-mutex_-init$ 函数初始化。$pthread_-mutex_-lock$ 是阻塞版本的上锁,会一直阻塞直到该互斥锁解锁;$pthread_-mutex_-trylock$ 是非阻塞版本的上锁,如果该互斥锁已经上锁,会返回 $EBUSY$ 错误。
2. 生产者消费者问题
生产者-消费者 ( $producer-consumer$ ) 问题,也称为有界缓冲区 ( $bounded$ $buffer$ ) 问题。一个或多个生产者创建一个个数据条目,这些条目由一个或多个消费者处理。数据条目在生产者和消费者之间使用某种方式的IPC
传递。对于使用管道或消息队列进行通信的生产者和消费者,内核会进行隐式同步。然而对于使用共享内存区进行通信的生产者和消费者,需要进行显式同步。我们使用互斥锁进行显式同步。
#include "unpipc.h"
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
int nitems; // read-only by producer and consumer
struct {
pthread_mutex_t mutex;
int buff[MAXNITEMS];
int nput;
int nval;
} shared = {
PTHREAD_MUTEX_INITIALIZER
};
void *produce(void *);
void *consume(void *);
int main(int argc, char **argv) {
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons2 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
Set_concurrency(nthreads);
// start all the producer threads
for (i = 0; i < nthreads; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
// wait for all the producer threads
for (i = 0; i < nthreads; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
// start, then wait for the consumer thread
Pthread_create(&tid_consume, NULL, consume, NULL);
Pthread_join(tid_consume, NULL);
exit(0);
}
$set_-concurrency$ 告知系统我们希望并发运行的线程数。
void *produce(void *arg) {
for (;;) {
Pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
Pthread_mutex_unlock(&shared.mutex);
return NULL; // array is full, we're done
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
Pthread_mutex_unlock(&shared.mutex);
*((int *) arg) += 1;
}
}
void *consume(void *arg) {
int i;
for (i = 0; i < nitems; i++) {
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return NULL;
}
3. 上锁与等待
我们修改程序,让消费者线程在所有生产者启动后启动。
int main(int argc, char **argv) {
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons3 <#item> <#nthreads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
// create all producers and one consumer
Set_concurrency(nthreads + 1);
for (i = 0; i < nthreads; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);
// wait for all producers and the consumer
for (i = 0; i < nthreads; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);
exit(0);
}
void consume_wait(int i) {
for (;;) {
Pthread_mutex_lock(&shared.mutex);
if (i < shared.nput) {
Pthread_mutex_unlock(&shared.mutex);
return; // an item is ready
}
Pthread_mutex_unlock(&shared.mutex);
}
}
void *consume(void *arg) {
int i;
for (i = 0; i < nitems; i++) {
consume_wait(i);
if (shared.buff[i] != i) {
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
}
return NULL;
}
当数组中对应的位置缺少数据,从而消费者不能消费时,我们采用了轮询的方式等待生产者生产。
4. 条件变量的唤醒和等待
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
int pthread_cond_signal(pthread_cond_t *cptr);
条件变量是类型为 $pthread_-cond_-t$ 的变量。每个条件变量总是与一个互斥锁关联。
#include "unpipc.h"
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
// globals shared by threads
int nitems; // read-only by producer and consumer
int buff[MAXNITEMS];
struct {
pthread_mutex_t mutex;
int nput; // next index to store
int nval; // next value to store
} put = {
PTHREAD_MUTEX_INITIALIZER
};
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready; // number ready for consumer
} nready = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER
};
void *produce(void *arg) {
for (;;) {
Pthread_mutex_lock(&put.mutex);
if (put.nput >= nitems) {
Pthread_mutex_unlock(&put.mutex);
return NULL; // array is full, we're done
}
buff[put.nput] = put.nval;
put.nput++;
put.nval++;
Pthread_mutex_unlock(&put.mutex);
Pthread_mutex_lock(&nready.mutex);
if (nready.nready == 0)
Pthread_cond_signal(&nready.cond);
nready.nready++;
Pthread_mutex_unlock(&nready.mutex);
*((int *) arg) += 1;
}
}
void *consume(void *arg) {
int i;
for (i = 0; i < nitems; i++) {
Pthread_mutex_lock(&nready.mutex);
while (nready.nready == 0)
Pthread_cond_wait(&nready.cond, &nready.mutex);
nready.nready--;
Pthread_mutex_unlock(&nready.mutex);
if (buff[i] != i)
printf("buff[%d] = %d\n", i, buff[i]);
}
return NULL;
}
以上程序可能存在上锁冲突:
Pthread_mutex_lock(&nready.mutex);
while (nready.nready == 0)
Pthread_cond_wait(&nready.cond, &nready.mutex);
nready.nready--;
Pthread_mutex_unlock(&nready.mutex);
在等待和唤醒的时候都需要先获取互斥锁。然而由于其他线程唤醒的时候也持有锁,所以可能会让被唤醒的线程因为唤醒后无法获取锁,从而重新进入阻塞状态。为了避免这种冲突,可以修改代码:
int dosignal;
Pthread_mutex_lock(&nready.mutex);
dosignal = (nready.nready == 0);
nready.nready++;
Pthread_mutex_unlock(&nready.mutex);
if (dosignal)
Pthread_cond_signal(&nready.cond);
POSIX
允许线程在没有持有与该条件变量相关联的互斥锁的前提下调用 $pthread_-cond_-signal$ ,但是对于需要可预见的调度行为,在调用前必须获取互斥锁。
5. 条件变量的定时等待和广播
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_cond_broadcast(pthread_cond_t *cptr);
int pthread_cond_timedwait(pthread_cond_t *cptr, pthread_mutex_t *mptr,
const struct timespec *abstime);
$pthread_-cond_-broadcast$ 唤醒在条件变量上等待的所有线程。 $pthread_-cond_-timedwait$ 允许线程设置一个阻塞时间上限,$abstime$ 是绝对时间而不是时间差。
6. 互斥锁和条件变量属性
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_mutex_init(pthread_mutex_t *mptr, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mptr);
int pthread_cond_init(pthread_cond_t *cptr, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cptr);
静态分配时通常使用 $PTHREAD_-MUTEX_-INITIALIZER$ 和 $PTHREAD_-COND_-INITIALIZER$ 初始化,这种方式初始化的互斥锁和条件变量具有默认属性。动态分配或者在共享内存区中的需要使用以上函数进行初始化和销毁。
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_condattr_init(pthread_condattr_t *attr);
int pthread_mutexattr_destroy(pthread_condattr_t *attr);
互斥锁和条件变量的属性类型分别为 $pthread_-mutexattr_-t$ 和 $pthread_-condattr_-t$ ,由以上函数初始化和销毁。一旦某个互斥锁或条件变量的属性对象已经被初始化,就可以通过调用不同函数启用或禁止特定属性。
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *attr, int *valptr);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int value);
int pthread_condattr_getpshared(const pthread_condattr_t *attr, int *valptr);
int pthread_condattr_setpshared(pthread_condattr_t *attr, int value);
两个 $get$ 函数的 $valptr$ 返回属性当前值,$set$ 函数则根据 $value$ 设置该值。$value$ 可以是 $PTHREAD_-PROCESS_-PRIVATE$ 或 $PTHREAD_-PROCESS_-SHARED$ ,后者也称为进程间共享属性。当在多个进程间共享互斥锁时,进程终止时系统是不会释放所有持有的锁的。而对于线程,如果在持有互斥锁时被其他线程取消,可以通过安装取消时调用的函数来清理。
7. 读写锁
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_rwlock_rdlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_unlock(pthread_rwlock_t *rwptr);
$pthread_-rwlock_-tryrdlock$ 和 $pthread_-rwlock_-trywrlock$ 尝试以非阻塞方式获取锁,如果失败,返回 $EBUSY$ 错误。
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_rwlock_init(pthread_rwlock_t *rwptr,
const pthread_rwlockattr_t *attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwptr);
如果某个读写锁是静态分配的,可以通过 $PTHREAD_-RWLOCK_-INITIALIZER$ 初始化。
#include <pthread.h>
/* 成功返回0,出错返回非负错误码 */
int pthread_rwlockattr_init(pthread_rwlockattr_t *attr);
int pthread_rwlockattr_destroy(pthread_rwlockattr_t *attr);
int pthread_rwlockattr_getpshared(const pthread_rwlockattr_t *attr, int *valptr);
int pthread_rwlockattr_setpshared(pthread_rwlockattr *attr, int value);
$value$ 的值为 $PTHREAD_-PROCESS_-PRIVATE$ 或者 $PTHREAD_-PROCESS_-SHARED$ 。
8. 线程取消
#include <pthread.h>
// 成功返回0,出错返回非负错误码
int pthread_cancel(pthread_t pid);
void pthread_cleanup_push(void (*function)(void *), void *arg);
void pthread_cleanup_pop(int execute);
通过 $pthread_-cancel$ ,可以取消一个持有锁的线程。线程可以通过 $pthread_-cleanup_-push$ 和 $pthread_-cleanup_-pop$ 安装和删除清理处理函数。清理处理函数会在线程被取消或者线程自愿终止时被调用。$pthread_-cleanup_-pop$ 总是删除栈顶的函数,并且当 $execute$ 不为 $0$ 时执行该函数。
9. 记录上锁
POSIX
记录上锁定义了一个特殊的字节范围以指定整个文件。它的起始偏移为 $0$ ,长度也为 $0$ 。粒度 ( $granularity$ ) 标识能够被锁住的对象的大小。对记录上锁来说,粒度就是单个字节。通常情况下,粒度越小,意味着允许同时使用的用户数量就越多。
#include <fcntl.h>
// 返回值取决于cmd,出错返回-1
int fcntl(int fd, int cmd, ... /* struct flock *arg */ );
struct flock {
short l_type; // F_RDLCK, F_WRLCK, F_UNLCK
short l_whence; // SEEK_SET, SEEK_CUR, SEEK_END
off_t l_start; // relative starting offset in bytes
off_t l_len; // #bytes; 0 means until end-of-file
pid_t l_pid; // PID returned by F_GETLK
};
- $F_-SETLK$ :获取 ( $l_-type$ 为 $F_-RDLCK$ 或 $F_-WRLCK$ ) 或释放 ( $l_-type$ 为 $F_-UNLCK$ ) 由 $arg$ 指向的 $flock$ 结构的锁。如果当前进程无法获取锁,返回 $EACCES$ 或 $EAGAIN$ 错误;
- $F_-SETLKW$ :与上个命令类似,但是如果无法获取锁不会立即返回,而是阻塞直到获取成功;
- $F_-GETLK$ :检查由 $arg$ 指向的锁是否已有其他进程持有,如果没有就将 $l_-type$ 置为 $F_-UNLCK$ ;否则,这个锁的信息将由 $arg$ 返回,其中包括持有锁的进程的
PID
。
一个进程可以对某个文件的特定字节范围多次发出 $F_-SETLK$ 或 $F_-SETLKW$ ,每次成功与否取决于其他进程是否锁住该字节范围和锁的类型。对于同一个进程,后执行的 $F_-SETLK$ 或 $F_-SETLKW$ 会覆盖先执行的针对统一字节范围的同样两个命令。一个进程对文件上锁不会影响其他进程访问这个文件,也就是说,一个进程锁住一个字节范围,另一个进程还是能访问对应范围内的数据。如果同一个进程对于一个已持有锁的字节范围调用 $F_-GETLK$ ,会返回 $F_-UNLCK$ ,因为当前进程已经上锁。
字节偏移是作为一个相对偏移成员 ( $l_-start$ 和 $l_-len$ ) 伴随其解释 ( $l_-whence$ ) 指定的。$l_-whence$ 可以是:
- $SEEK_-SET$ :$l_-start$ 相对于文件开头;
- $SEEK_-CUR$ :$l_-start$ 相对于当前字节偏移;
- $SEEK_-END$ :$l_-start$ 相对于文件结束。
锁住整个文件的方式有两种:
- 指定 $l_-whence$ 为 $SEEK_-SET$ ,$l_-start$ 和 $l_-len$ 为 $0$ ;
- 使用 $lseek$ 把读写指针定位到文件开始,然后指定 $l_-whence$ 为 $SEEK_-CUR$ ,$l_-start$ 和 $l_-len$ 为 $0$ 。
$fcntl$ 记录上锁既可以用于读也可以用于写。对于一个字节,它只能存在一种锁。一个字节可以有多个读锁,但只能有一个写锁。当一个描述符没有以写的模式打开时不能上写锁,同样的,没有以读模式打开的描述符也不能上读锁。对于一个打开着某个文件的给定进程来说,当它关闭该文件的所有描述符或它本身终止时,与该文件关联的锁都会被删除。记录锁与进程ID
相关,不能通过 $fork$ 继承。
#include "unpipc.h"
void my_lock(int fd) {
struct flock lock;
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
lock.l_start = 0;
lock.l_len = 0; // write lock entire file
Fcntl(fd, F_SETLKW, &lock);
}
void my_unlock(int fd) {
struct flock lock;
lock.l_type = F_UNLCK;
lock.l_whence = SEEK_SET;
lock.l_start = 0;
lock.l_len = 0; // unlock entire file
Fcntl(fd, F_SETLK, &lock);
}
上锁和解锁的过程有些复杂,可以通过宏简化。
#include "unpipc.h"
#define read_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLK, F_RDLCK, offset, whence, len)
#define readw_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLKW, F_RDLCK, offset, whence, len)
#define write_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLK, F_WRLCK, offset, whence, len)
#define writew_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLKW, F_WRLCK, offset, whence, len)
#define un_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLK, F_UNLCK, offset, whence, len)
#define is_read_lockable(fd, offset, whence, len) \
!lock_test(fd, F_RDLCK, offset, whence, len)
#define is_write_lockable(fd, offset, whence, len) \
!lock_test(fd, F_WRLCK, offset, whence, len)
int lock_reg(int fd, int cmd, int type, off_t offset, int whence, off_t len) {
struct flock lock;
lock.l_type = type; // F_RDLCK, F_WRLCK, F_UNLCK
lock.l_start = offset; // byte offset, relative to l_whence
lock.l_whence = whence; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = len; // #bytes (0 means to EOF)
return fcntl(fd, cmd, &lock); // -1 upon error
}
pid_t lock_test(int fd, int type, off_t offset, int whence, off_t len) {
struct flock lock;
lock.l_type = type; // F_RDLCK or F_WRLCK
lock.l_start = offset; // byte offset, relative to l_whence
lock.l_whence = whence; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = len; // #bytes (0 means to EOF)
if (fcntl(fd, F_GETLK, &lock) == -1)
return -1; // unexpected error
if (lock.l_type == F_UNLCK)
return 0; // false, region not locked by another proc
return lock.l_pid; // true, return positive PID of lock owner
}
记录上锁的一个常见用途是确保某个程序 ( 例如守护进程 ) 在任何时刻只有一个副本在运行。
#include "unpipc.h"
#define PATH_PIDFILE "pidfile"
int main(int argc, char **argv) {
int pidfd;
char line[MAXLINE];
// open the PID file, create if nonexistent
pidfd = Open(PATH_PIDFILE, O_RDWR | O_CREAT, FILE_MODE);
// try to write lock the entire file
if (write_lock(pidfd, 0, SEEK_SET, 0) < 0) {
if (errno = EACCES || errno = EAGAIN)
err_quit("unable to lock %s, is %s already running?",
PATH_PIDFILE, argv[0]);
else
err_sys("unable to lock %s", PATH_PIDFILE);
}
// write my PID, leave file open to hold the write lock
snprintf(line, sizeof(line), "%ld\n", (long) getpid());
Ftruncate(pidfd, 0);
Write(pidfd, line, strlen(line));
// then do whatever the daemon does
pause();
}
10. 文件锁
POSIX
保证如果以 $O_-CREAT$ 和 $O_-EXCL$ 标志调用 $open$ 函数,那么一旦该文件存在,函数就返回错误。而且考虑到其他进程的存在,检查该文件是否存在和创建文件必须是原子的。从而,我们可以把这种文件当锁使用。释放这样的锁只需要调用 $unlink$ 。
#include "unpipc.h"
#define LOCKFILE "/tmp/seqno.lock"
void my_lock(int fd) {
int tempfd;
while ((tempfd = open(LOCKFILE, O_RDWR | O_CREAT | O_EXCL, FILE_MODE)) < 0) {
if (errno != EEXIST)
err_sys("open error for lock file");
// someone else has the lock, loop around adn try again
}
Close(tempfd); // opened the file, we have the lock
}
void my_lock(int fd) {
Unlink(LOCKFILE); // release lock by removing file
}
这种技巧存在三个问题:
- 如果当前持有锁的进程没有释放就终止,那么文件不会被删除。对于这个问题,可以通过检查文件最近访问时间,如果超过一定时间没访问就认为锁已经释放,但是这种方法不完美。另一种方法是在文件内写入进程
ID
,然后检查该进程ID
是否在运行。这个方法同样存在问题,因为进程ID
在经过一段时间后会重用; - 如果另外某个进程已经上锁,那么当前进程只能不断轮询;
- 调用 $open$ 和 $unlink$ 会涉及文件系统的访问,增加开销。
Unix
文件系统的另外两个技巧也可以用于上锁:
- 如果新链接的文件名存在,那么 $link$ 会失败。可以先创建一个临时文件,路径名包括当前进程
ID
信息,然后将该临时文件 $link$ 到待建立锁的文件上。如果创建成功,说明获取了锁,只需要在不使用时 $unlink$ 锁文件即可。如果返回 $EEXIST$ 错误,说明锁已经被占用。这种技巧要求临时文件路径名和锁文件路径名必须位于同一个文件系统中,因为大多版本的Unix
不允许不同文件系统之间的硬链接; - 如果以 $O_-TRUNC$ 模式打开的文件存在,但是当前进程没有写权限,$open$ 会返回错误。我们可以指定 $O_-CREAT$ 、 $O_-WRONLY$ 和 $O_-TRUNC$ 并设置 $mode$ 为 $0$ ( 即新文件不打开任何权限位 ) 的前提下调用 $open$ 。如果成功,我们就拥有了锁,只需要在不使用时 $unlink$ 锁文件即可。如果返回 $EACCES$ ,那么线程必须重新尝试。这种技巧要求当前进程不具备超级用户权限。
不管是哪种方式的文件上锁,都存在着一些问题。所以最好的方式还是使用 $fcntl$ 的记录上锁。