set_robust_list 函数详解
set_robust_list
是Linux系统调用,用于设置进程的健壮互斥锁(robust mutex)列表。当持有健壮互斥锁的进程异常终止时,内核会自动释放这些锁,防止死锁的发生。这个机制对于构建高可靠性多线程应用程序非常重要。
1. 函数介绍
set_robust_list
是Linux系统调用,用于设置进程的健壮互斥锁(robust mutex)列表。当持有健壮互斥锁的进程异常终止时,内核会自动释放这些锁,防止死锁的发生。这个机制对于构建高可靠性多线程应用程序非常重要。
2. 函数原型
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
long set_robust_list(struct robust_list_head *head, size_t len);
3. 功能
set_robust_list
允许进程注册一个健壮互斥锁列表,当进程异常退出时(如被信号终止),内核会自动遍历这个列表并释放所有未释放的互斥锁,确保其他等待这些锁的线程不会永久阻塞。
4. 参数
- *struct robust_list_head head: 指向健壮列表头的指针
- size_t len: 列表头结构的大小(通常为sizeof(struct robust_list_head))
5. 返回值
- 成功: 返回0
- 失败: 返回-1,并设置errno
6. 相似函数,或关联函数
- get_robust_list: 获取当前健壮列表
- pthread_mutexattr_setrobust: 设置pthread互斥锁的健壮属性
- pthread_mutex_consistent: 标记互斥锁状态为一致
7. 示例代码
示例1:基础set_robust_list使用
#include <linux/futex.h>
#include <sys/syscall.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
/**
* 健壮列表头结构
*/
struct robust_list_head {
struct robust_list *list;
long futex_offset;
unsigned long list_op_pending;
};
/**
* 健壮列表节点结构
*/
struct robust_list {
struct robust_list *next;
};
/**
* 调用set_robust_list系统调用
*/
static inline long sys_set_robust_list(struct robust_list_head *head, size_t len) {
return syscall(SYS_set_robust_list, head, len);
}
/**
* 调用get_robust_list系统调用
*/
static inline long sys_get_robust_list(int pid, struct robust_list_head **head, size_t *len) {
return syscall(SYS_get_robust_list, pid, head, len);
}
/**
* 演示基础set_robust_list使用方法
*/
int demo_set_robust_list_basic() {
struct robust_list_head head;
struct robust_list_head *get_head;
size_t len;
long result;
printf("=== 基础set_robust_list使用示例 ===\n");
// 初始化健壮列表头
head.list = NULL;
head.futex_offset = 0;
head.list_op_pending = 0;
printf("1. 设置健壮列表:\n");
result = sys_set_robust_list(&head, sizeof(head));
if (result == 0) {
printf(" 成功设置健壮列表\n");
} else {
printf(" 设置健壮列表失败: %s\n", strerror(errno));
if (errno == ENOSYS) {
printf(" 系统不支持健壮列表功能\n");
return 0;
}
return -1;
}
printf("2. 获取当前健壮列表:\n");
result = sys_get_robust_list(0, &get_head, &len);
if (result == 0) {
printf(" 成功获取健壮列表\n");
printf(" 列表地址: %p\n", (void*)get_head);
printf(" 列表大小: %zu 字节\n", len);
} else {
printf(" 获取健壮列表失败: %s\n", strerror(errno));
}
printf("3. 验证列表设置:\n");
if ((void*)get_head == (void*)&head && len == sizeof(head)) {
printf(" ✓ 健壮列表设置正确\n");
} else {
printf(" ✗ 健壮列表设置可能有问题\n");
}
return 0;
}
int main() {
return demo_set_robust_list_basic();
}
示例2:pthread健壮互斥锁演示
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <sys/syscall.h>
/**
* 共享资源结构
*/
typedef struct {
pthread_mutex_t mutex;
int counter;
int active_threads;
} shared_resource_t;
/**
* 线程数据结构
*/
typedef struct {
int thread_id;
shared_resource_t *resource;
int terminate_signal;
} thread_data_t;
/**
* 初始化健壮互斥锁
*/
int init_robust_mutex(pthread_mutex_t *mutex) {
pthread_mutexattr_t attr;
int result;
// 初始化互斥锁属性
result = pthread_mutexattr_init(&attr);
if (result != 0) {
printf("初始化互斥锁属性失败: %s\n", strerror(result));
return -1;
}
// 设置互斥锁为健壮的
result = pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
if (result != 0) {
printf("设置健壮属性失败: %s\n", strerror(result));
pthread_mutexattr_destroy(&attr);
return -1;
}
// 初始化互斥锁
result = pthread_mutex_init(mutex, &attr);
if (result != 0) {
printf("初始化互斥锁失败: %s\n", strerror(result));
pthread_mutexattr_destroy(&attr);
return -1;
}
pthread_mutexattr_destroy(&attr);
return 0;
}
/**
* 线程函数
*/
void* worker_thread(void *arg) {
thread_data_t *data = (thread_data_t*)arg;
int result;
printf("工作线程 %d 启动\n", data->thread_id);
// 模拟工作循环
for (int i = 0; i < 10 && !data->terminate_signal; i++) {
// 获取互斥锁
result = pthread_mutex_lock(&data->resource->mutex);
if (result == EOWNERDEAD) {
printf("线程 %d: 检测到锁持有者异常终止\n", data->thread_id);
// 标记互斥锁状态为一致
result = pthread_mutex_consistent(&data->resource->mutex);
if (result != 0) {
printf("线程 %d: 恢复互斥锁一致性失败: %s\n",
data->thread_id, strerror(result));
pthread_mutex_unlock(&data->resource->mutex);
break;
}
printf("线程 %d: 成功恢复互斥锁一致性\n", data->thread_id);
} else if (result != 0) {
printf("线程 %d: 获取互斥锁失败: %s\n", data->thread_id, strerror(result));
break;
}
// 访问共享资源
data->resource->counter++;
printf("线程 %d: 计数器 = %d\n", data->thread_id, data->resource->counter);
// 模拟工作处理时间
usleep(100000); // 100ms
// 释放互斥锁
pthread_mutex_unlock(&data->resource->mutex);
// 短暂休息
usleep(50000); // 50ms
}
// 减少活跃线程计数
pthread_mutex_lock(&data->resource->mutex);
data->resource->active_threads--;
pthread_mutex_unlock(&data->resource->mutex);
printf("工作线程 %d 结束\n", data->thread_id);
return NULL;
}
/**
* 演示pthread健壮互斥锁
*/
int demo_pthread_robust_mutex() {
shared_resource_t resource = {0};
pthread_t threads[3];
thread_data_t thread_data[3];
int result;
printf("=== pthread健壮互斥锁演示 ===\n");
// 初始化健壮互斥锁
if (init_robust_mutex(&resource.mutex) != 0) {
return -1;
}
printf("成功初始化健壮互斥锁\n");
// 初始化共享资源
resource.counter = 0;
resource.active_threads = 3;
// 创建工作线程
printf("创建3个工作线程...\n");
for (int i = 0; i < 3; i++) {
thread_data[i].thread_id = i + 1;
thread_data[i].resource = &resource;
thread_data[i].terminate_signal = 0;
result = pthread_create(&threads[i], NULL, worker_thread, &thread_data[i]);
if (result != 0) {
printf("创建线程 %d 失败: %s\n", i + 1, strerror(result));
return -1;
}
}
// 让线程运行一段时间
printf("让线程运行5秒...\n");
sleep(5);
// 强制终止一个线程来演示健壮性
printf("强制终止线程1来演示健壮性...\n");
thread_data[0].terminate_signal = 1;
pthread_kill(threads[0], SIGKILL);
// 等待其他线程完成
printf("等待其他线程完成...\n");
for (int i = 1; i < 3; i++) {
pthread_join(threads[i], NULL);
}
// 显示最终结果
printf("\n最终结果:\n");
printf(" 计数器值: %d\n", resource.counter);
printf(" 活跃线程: %d\n", resource.active_threads);
// 清理互斥锁
pthread_mutex_destroy(&resource.mutex);
return 0;
}
int main() {
return demo_pthread_robust_mutex();
}
示例3:健壮列表管理
#include <linux/futex.h>
#include <sys/syscall.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdatomic.h>
/**
* 自定义健壮列表节点
*/
typedef struct custom_robust_node {
struct custom_robust_node *next;
pthread_mutex_t *mutex;
int node_id;
} custom_robust_node_t;
/**
* 自定义健壮列表管理器
*/
typedef struct {
struct robust_list_head head;
custom_robust_node_t *nodes;
int node_count;
int max_nodes;
} robust_list_manager_t;
/**
* 系统调用封装
*/
static inline long sys_set_robust_list(struct robust_list_head *head, size_t len) {
return syscall(SYS_set_robust_list, head, len);
}
static inline long sys_get_robust_list(int pid, struct robust_list_head **head, size_t *len) {
return syscall(SYS_get_robust_list, pid, head, len);
}
/**
* 初始化健壮列表管理器
*/
int robust_list_init(robust_list_manager_t *manager, int max_nodes) {
// 初始化列表头
manager->head.list = NULL;
manager->head.futex_offset = 0;
manager->head.list_op_pending = 0;
// 分配节点数组
manager->nodes = calloc(max_nodes, sizeof(custom_robust_node_t));
if (!manager->nodes) {
perror("分配节点数组失败");
return -1;
}
manager->node_count = 0;
manager->max_nodes = max_nodes;
// 设置健壮列表
long result = sys_set_robust_list(&manager->head, sizeof(manager->head));
if (result != 0) {
printf("设置健壮列表失败: %s\n", strerror(errno));
free(manager->nodes);
return -1;
}
printf("健壮列表管理器初始化成功,最大节点数: %d\n", max_nodes);
return 0;
}
/**
* 添加互斥锁到健壮列表
*/
int robust_list_add_mutex(robust_list_manager_t *manager, pthread_mutex_t *mutex, int node_id) {
if (manager->node_count >= manager->max_nodes) {
printf("节点数已达上限\n");
return -1;
}
int index = manager->node_count++;
manager->nodes[index].mutex = mutex;
manager->nodes[index].node_id = node_id;
// 连接节点到列表
if (index > 0) {
manager->nodes[index-1].next = (struct robust_list*)&manager->nodes[index];
}
// 更新列表头
manager->head.list = (struct robust_list*)&manager->nodes[0];
printf("添加互斥锁到健壮列表,节点ID: %d\n", node_id);
return 0;
}
/**
* 演示健壮列表管理
*/
int demo_robust_list_management() {
robust_list_manager_t manager;
pthread_mutex_t mutex1, mutex2, mutex3;
pthread_mutexattr_t attr;
int result;
printf("=== 健壮列表管理演示 ===\n");
// 初始化管理器
if (robust_list_init(&manager, 10) != 0) {
return -1;
}
// 初始化互斥锁属性
result = pthread_mutexattr_init(&attr);
if (result != 0) {
printf("初始化互斥锁属性失败: %s\n", strerror(result));
return -1;
}
result = pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
if (result != 0) {
printf("设置健壮属性失败: %s\n", strerror(result));
pthread_mutexattr_destroy(&attr);
return -1;
}
// 创建多个健壮互斥锁
printf("创建健壮互斥锁...\n");
result = pthread_mutex_init(&mutex1, &attr);
if (result != 0) {
printf("初始化互斥锁1失败: %s\n", strerror(result));
pthread_mutexattr_destroy(&attr);
return -1;
}
result = pthread_mutex_init(&mutex2, &attr);
if (result != 0) {
printf("初始化互斥锁2失败: %s\n", strerror(result));
pthread_mutex_destroy(&mutex1);
pthread_mutexattr_destroy(&attr);
return -1;
}
result = pthread_mutex_init(&mutex3, &attr);
if (result != 0) {
printf("初始化互斥锁3失败: %s\n", strerror(result));
pthread_mutex_destroy(&mutex1);
pthread_mutex_destroy(&mutex2);
pthread_mutexattr_destroy(&attr);
return -1;
}
pthread_mutexattr_destroy(&attr);
// 将互斥锁添加到健壮列表
robust_list_add_mutex(&manager, &mutex1, 1);
robust_list_add_mutex(&manager, &mutex2, 2);
robust_list_add_mutex(&manager, &mutex3, 3);
// 测试互斥锁操作
printf("测试互斥锁操作...\n");
for (int i = 0; i < 3; i++) {
result = pthread_mutex_lock(&mutex1);
if (result == 0) {
printf("成功获取互斥锁1\n");
usleep(100000);
pthread_mutex_unlock(&mutex1);
printf("成功释放互斥锁1\n");
} else if (result == EOWNERDEAD) {
printf("检测到锁持有者异常终止,恢复一致性...\n");
pthread_mutex_consistent(&mutex1);
pthread_mutex_unlock(&mutex1);
} else {
printf("获取互斥锁1失败: %s\n", strerror(result));
}
}
// 清理资源
pthread_mutex_destroy(&mutex1);
pthread_mutex_destroy(&mutex2);
pthread_mutex_destroy(&mutex3);
free(manager.nodes);
printf("健壮列表管理演示完成\n");
return 0;
}
int main() {
return demo_robust_list_management();
}
示例4:异常终止处理演示
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <sys/wait.h>
/**
* 共享资源结构
*/
typedef struct {
pthread_mutex_t mutex;
int shared_data;
volatile int writer_active;
} shared_data_t;
/**
* 初始化健壮互斥锁
*/
int init_robust_mutex(pthread_mutex_t *mutex) {
pthread_mutexattr_t attr;
int result;
result = pthread_mutexattr_init(&attr);
if (result != 0) {
return -1;
}
result = pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
if (result != 0) {
pthread_mutexattr_destroy(&attr);
return -1;
}
result = pthread_mutex_init(mutex, &attr);
pthread_mutexattr_destroy(&attr);
return result;
}
/**
* 写者线程函数
*/
void* writer_thread(void *arg) {
shared_data_t *data = (shared_data_t*)arg;
int result;
printf("写者线程启动 (PID: %d)\n", getpid());
// 获取互斥锁
result = pthread_mutex_lock(&data->mutex);
if (result != 0) {
printf("写者: 获取互斥锁失败: %s\n", strerror(result));
return NULL;
}
printf("写者: 成功获取互斥锁\n");
data->writer_active = 1;
// 模拟长时间写操作
printf("写者: 开始写操作...\n");
for (int i = 0; i < 10; i++) {
data->shared_data = i + 1;
printf("写者: 写入数据 %d\n", data->shared_data);
sleep(1);
}
// 正常释放互斥锁
data->writer_active = 0;
pthread_mutex_unlock(&data->mutex);
printf("写者: 释放互斥锁\n");
return NULL;
}
/**
* 读者线程函数
*/
void* reader_thread(void *arg) {
shared_data_t *data = (shared_data_t*)arg;
int result;
printf("读者线程启动 (PID: %d)\n", getpid());
// 等待写者开始工作
sleep(2);
// 尝试获取互斥锁
printf("读者: 尝试获取互斥锁...\n");
result = pthread_mutex_lock(&data->mutex);
if (result == EOWNERDEAD) {
printf("读者: 检测到写者异常终止!\n");
printf("读者: 正在恢复互斥锁一致性...\n");
// 恢复互斥锁一致性
result = pthread_mutex_consistent(&data->mutex);
if (result == 0) {
printf("读者: 成功恢复互斥锁一致性\n");
printf("读者: 当前共享数据: %d\n", data->shared_data);
} else {
printf("读者: 恢复一致性失败: %s\n", strerror(result));
}
} else if (result == 0) {
printf("读者: 成功获取互斥锁\n");
printf("读者: 读取数据: %d\n", data->shared_data);
pthread_mutex_unlock(&data->mutex);
} else {
printf("读者: 获取互斥锁失败: %s\n", strerror(result));
}
return NULL;
}
/**
* 演示异常终止处理
*/
int demo_abnormal_termination() {
shared_data_t data = {0};
pthread_t writer_tid, reader_tid;
int result;
printf("=== 异常终止处理演示 ===\n");
// 初始化健壮互斥锁
if (init_robust_mutex(&data.mutex) != 0) {
printf("初始化健壮互斥锁失败\n");
return -1;
}
printf("成功初始化健壮互斥锁\n");
// 创建写者进程
pid_t writer_pid = fork();
if (writer_pid == 0) {
// 写者进程
printf("写者进程启动\n");
// 获取互斥锁
result = pthread_mutex_lock(&data.mutex);
if (result != 0) {
printf("写者进程: 获取互斥锁失败: %s\n", strerror(result));
exit(1);
}
printf("写者进程: 成功获取互斥锁\n");
data.writer_active = 1;
// 模拟工作一段时间后异常终止
printf("写者进程: 开始工作,5秒后模拟异常终止...\n");
sleep(5);
printf("写者进程: 模拟异常终止(不释放锁)\n");
// 注意:这里不调用pthread_mutex_unlock,模拟异常终止
exit(1); // 强制退出,不释放锁
} else if (writer_pid > 0) {
// 父进程(读者)
printf("父进程(读者)启动\n");
// 等待写者进程开始工作
sleep(2);
// 创建读者线程
result = pthread_create(&reader_tid, NULL, reader_thread, &data);
if (result != 0) {
printf("创建读者线程失败: %s\n", strerror(result));
kill(writer_pid, SIGKILL);
pthread_mutex_destroy(&data.mutex);
return -1;
}
// 等待写者进程异常终止
int status;
waitpid(writer_pid, &status, 0);
printf("写者进程已终止 (状态: %d)\n", status);
// 等待读者线程完成
pthread_join(reader_tid, NULL);
// 清理资源
pthread_mutex_destroy(&data.mutex);
printf("异常终止处理演示完成\n");
} else {
perror("fork 失败");
pthread_mutex_destroy(&data.mutex);
return -1;
}
return 0;
}
int main() {
return demo_abnormal_termination();
}
示例5:健壮性测试工具
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
/**
* 健壮性测试配置
*/
typedef struct {
int num_threads;
int test_duration;
int crash_probability; // 0-100,表示异常终止概率
int use_robust_mutex;
} robustness_test_config_t;
/**
* 测试统计信息
*/
typedef struct {
atomic_int lock_attempts;
atomic_int lock_success;
atomic_int lock_failures;
atomic_int owner_dead_detected;
atomic_int consistency_restored;
atomic_int normal_terminations;
atomic_int abnormal_terminations;
} test_stats_t;
/**
* 测试线程数据
*/
typedef struct {
int thread_id;
pthread_mutex_t *mutex;
test_stats_t *stats;
robustness_test_config_t *config;
volatile int *terminate_flag;
} test_thread_data_t;
/**
* 初始化健壮互斥锁
*/
int init_test_mutex(pthread_mutex_t *mutex, int robust) {
pthread_mutexattr_t attr;
int result;
result = pthread_mutexattr_init(&attr);
if (result != 0) return result;
if (robust) {
result = pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
if (result != 0) {
pthread_mutexattr_destroy(&attr);
return result;
}
}
result = pthread_mutex_init(mutex, &attr);
pthread_mutexattr_destroy(&attr);
return result;
}
/**
* 测试线程函数
*/
void* test_thread(void *arg) {
test_thread_data_t *data = (test_thread_data_t*)arg;
int result;
printf("测试线程 %d 启动\n", data->thread_id);
srand(time(NULL) + data->thread_id);
while (!(*data->terminate_flag)) {
// 尝试获取互斥锁
atomic_fetch_add(&data->stats->lock_attempts, 1);
result = pthread_mutex_lock(data->mutex);
if (result == 0) {
atomic_fetch_add(&data->stats->lock_success, 1);
// 模拟持有锁的工作
usleep(10000 + (rand() % 50000)); // 10-60ms
// 随机决定是否异常终止(模拟崩溃)
if (data->config->crash_probability > 0 &&
rand() % 100 < data->config->crash_probability) {
printf("线程 %d: 模拟异常终止\n", data->thread_id);
atomic_fetch_add(&data->stats->abnormal_terminations, 1);
exit(1); // 模拟崩溃,不释放锁
}
// 正常释放锁
pthread_mutex_unlock(data->mutex);
atomic_fetch_add(&data->stats->normal_terminations, 1);
} else if (result == EOWNERDEAD) {
atomic_fetch_add(&data->stats->owner_dead_detected, 1);
printf("线程 %d: 检测到锁持有者异常终止\n", data->thread_id);
// 恢复一致性
result = pthread_mutex_consistent(data->mutex);
if (result == 0) {
atomic_fetch_add(&data->stats->consistency_restored, 1);
printf("线程 %d: 成功恢复锁一致性\n", data->thread_id);
pthread_mutex_unlock(data->mutex);
} else {
atomic_fetch_add(&data->stats->lock_failures, 1);
printf("线程 %d: 恢复一致性失败: %s\n", data->thread_id, strerror(result));
}
} else {
atomic_fetch_add(&data->stats->lock_failures, 1);
printf("线程 %d: 获取锁失败: %s\n", data->thread_id, strerror(result));
}
// 短暂休息
usleep(10000 + (rand() % 20000)); // 10-30ms
}
printf("测试线程 %d 结束\n", data->thread_id);
return NULL;
}
/**
* 显示测试统计
*/
void show_test_statistics(test_stats_t *stats) {
printf("\n=== 测试统计结果 ===\n");
printf("锁尝试次数: %d\n", atomic_load(&stats->lock_attempts));
printf("成功获取锁: %d\n", atomic_load(&stats->lock_success));
printf("获取锁失败: %d\n", atomic_load(&stats->lock_failures));
printf("检测到异常终止: %d\n", atomic_load(&stats->owner_dead_detected));
printf("恢复一致性: %d\n", atomic_load(&stats->consistency_restored));
printf("正常终止: %d\n", atomic_load(&stats->normal_terminations));
printf("异常终止: %d\n", atomic_load(&stats->abnormal_terminations));
if (atomic_load(&stats->lock_attempts) > 0) {
double success_rate = (double)atomic_load(&stats->lock_success) /
atomic_load(&stats->lock_attempts) * 100;
printf("锁获取成功率: %.2f%%\n", success_rate);
}
}
/**
* 演示健壮性测试
*/
int demo_robustness_testing() {
pthread_mutex_t mutex;
pthread_t *threads;
test_thread_data_t *thread_data;
test_stats_t stats = {0};
volatile int terminate_flag = 0;
robustness_test_config_t config = {
.num_threads = 5,
.test_duration = 30, // 30秒
.crash_probability = 5, // 5%概率异常终止
.use_robust_mutex = 1
};
printf("=== 健壮性测试工具 ===\n");
printf("测试配置:\n");
printf(" 线程数: %d\n", config.num_threads);
printf(" 测试时长: %d 秒\n", config.test_duration);
printf(" 异常终止概率: %d%%\n", config.crash_probability);
printf(" 使用健壮互斥锁: %s\n", config.use_robust_mutex ? "是" : "否");
// 初始化互斥锁
if (init_test_mutex(&mutex, config.use_robust_mutex) != 0) {
printf("初始化互斥锁失败\n");
return -1;
}
// 分配线程数组
threads = malloc(config.num_threads * sizeof(pthread_t));
thread_data = malloc(config.num_threads * sizeof(test_thread_data_t));
if (!threads || !thread_data) {
printf("分配内存失败\n");
pthread_mutex_destroy(&mutex);
free(threads);
free(thread_data);
return -1;
}
// 创建测试线程
printf("创建 %d 个测试线程...\n", config.num_threads);
for (int i = 0; i < config.num_threads; i++) {
thread_data[i].thread_id = i + 1;
thread_data[i].mutex = &mutex;
thread_data[i].stats = &stats;
thread_data[i].config = &config;
thread_data[i].terminate_flag = &terminate_flag;
int result = pthread_create(&threads[i], NULL, test_thread, &thread_data[i]);
if (result != 0) {
printf("创建线程 %d 失败: %s\n", i + 1, strerror(result));
// 清理已创建的线程
terminate_flag = 1;
for (int j = 0; j < i; j++) {
pthread_join(threads[j], NULL);
}
pthread_mutex_destroy(&mutex);
free(threads);
free(thread_data);
return -1;
}
}
// 运行测试
printf("开始测试,持续 %d 秒...\n", config.test_duration);
sleep(config.test_duration);
// 停止测试
printf("停止测试...\n");
terminate_flag = 1;
// 等待所有线程结束
for (int i = 0; i < config.num_threads; i++) {
pthread_join(threads[i], NULL);
}
// 显示统计结果
show_test_statistics(&stats);
// 清理资源
pthread_mutex_destroy(&mutex);
free(threads);
free(thread_data);
return 0;
}
int main() {
return demo_robustness_testing();
}
set_robust_list 使用注意事项
系统要求:
- 内核版本: 需要Linux 2.6.17或更高版本
- 架构支持: 支持所有主流架构
- 编译环境: 需要正确的系统头文件
功能限制:
- FUTEX支持: 需要内核支持FUTEX同步原语
- pthread支持: 需要支持健壮互斥锁的pthread实现
- 权限要求: 通常不需要特殊权限
错误处理:
- ENOSYS: 系统不支持健壮列表功能
- EINVAL: 参数无效
- EFAULT: 指针参数无效
- ENOMEM: 内存不足
性能考虑:
- 开销: 健壮互斥锁比普通互斥锁有额外开销
- 列表维护: 需要维护健壮列表结构
- 异常处理: 异常终止时需要额外处理时间
最佳实践:
- 选择性使用: 只在需要健壮性的场景使用
- 正确处理: 妥善处理EOWNERDEAD错误
- 及时清理: 正常情况下及时释放互斥锁
- 测试验证: 充分测试异常场景下的行为
健壮互斥锁工作原理
1. 正常操作流程:
线程A获取锁 -> 执行临界区 -> 释放锁 -> 线程B获取锁
2. 异常终止处理:
线程A获取锁 -> 异常终止(未释放锁) -> 内核检测 -> 自动释放锁 -> 线程B获取锁时得到EOWNERDEAD -> 调用pthread_mutex_consistent -> 恢复正常使用
3. 关键数据结构:
// 健壮列表头
struct robust_list_head {
struct robust_list *list; // 列表指针
long futex_offset; // futex偏移量
unsigned long list_op_pending; // 待处理操作
};
// 列表节点
struct robust_list {
struct robust_list *next; // 下一个节点
};
常见使用场景
1. 服务器应用:
// 为关键资源使用健壮互斥锁,防止服务器进程崩溃导致死锁
pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
2. 数据库系统:
// 保护数据库连接和事务资源,确保异常终止时资源能被正确释放
3. 实时系统:
// 在实时应用中确保关键资源不会因为进程异常而永久锁定
错误处理指南
EOWNERDEAD处理:
int result = pthread_mutex_lock(&mutex);
if (result == EOWNERDEAD) {
// 检测到锁持有者异常终止
result = pthread_mutex_consistent(&mutex);
if (result == 0) {
// 成功恢复一致性,可以正常使用锁
// 注意:锁保护的数据可能处于不一致状态
} else {
// 恢复失败,应该释放锁并处理错误
pthread_mutex_unlock(&mutex);
}
}
总结
set_robust_list
和相关的健壮互斥锁机制提供了:
- 故障恢复: 进程异常终止时自动释放互斥锁
- 死锁预防: 防止因进程崩溃导致的永久阻塞
- 系统稳定性: 提高多线程应用的可靠性
- 透明处理: 对应用层提供相对透明的健壮性保证
通过合理使用健壮互斥锁,可以构建更加可靠的多线程应用程序,特别是在需要长时间运行和高可用性的系统中表现出色。在实际应用中,需要注意性能开销和正确的错误处理。