futex系统调用及示例

futex 函数详解

1. 函数介绍

futex(Fast Userspace muTEX)是 Linux 内核提供的一种高效的同步原语。可以把 futex 想象成一个”智能的红绿灯系统”——在大多数情况下,它在用户空间快速运行(就像绿灯畅通无阻),只有在发生竞争时才需要内核介入(就像遇到红灯需要交通警察协调)。

传统的互斥锁在用户空间和内核空间之间频繁切换,开销很大。而 futex 的巧妙之处在于:如果没有竞争,线程可以在用户空间直接完成同步操作;只有当出现等待情况时,才需要进入内核进行睡眠和唤醒操作。这就像排队买票,如果没人排队,你可以直接买票离开;如果人很多,就需要排队等待,这时工作人员会来维持秩序。

2. 函数原型

#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>

int futex(int *uaddr, int futex_op, int val,
          const struct timespec *timeout,   /* or: uint32_t val2 */
          int *uaddr2, int val3);

注意:futex 是系统调用,通常通过 syscall() 函数调用。

3. 功能

futex 提供了一种高效的用户空间同步机制,主要用于实现各种同步原语(如互斥锁、条件变量、信号量等)。它通过在用户空间和内核空间之间智能切换,实现了高性能的线程同步。

4. 参数

  • uaddr: 指向用户空间的一个整型变量(futex 字),用于存储同步状态
  • futex_op: 操作类型(见下表)
  • val: 操作参数,根据不同的操作类型有不同的含义
  • timeout: 超时时间(可选,某些操作使用)
  • uaddr2: 第二个 futex 地址(某些操作使用)
  • val3: 第三个参数(某些操作使用)

5. 常用操作类型

操作说明功能
FUTEX_WAIT等待操作如果 *uaddr == val,则睡眠等待
FUTEX_WAKE唤醒操作唤醒在 uaddr 上等待的 val 个线程
FUTEX_WAIT_BITSET带位集的等待可以指定唤醒条件
FUTEX_WAKE_BITSET带位集的唤醒按位集唤醒等待线程
FUTEX_LOCK_PI优先级继承锁用于实现互斥锁
FUTEX_UNLOCK_PI解锁优先级继承锁解除互斥锁

6. 返回值

  • 成功: 返回值根据操作类型而定
    • FUTEX_WAIT: 成功返回 0,超时返回 -1 并设置 errno 为 ETIMEDOUT
    • FUTEX_WAKE: 返回唤醒的线程数
  • 失败: 返回 -1,并设置相应的 errno 错误码

常见错误码:

  • EAGAIN: 条件不满足(如 FUTEX_WAIT 时值已改变)
  • ETIMEDOUT: 等待超时
  • EINVAL: 参数无效
  • EFAULT: 地址无效

7. 相似函数或关联函数

  • pthread_mutex_t: POSIX 互斥锁(基于 futex 实现)
  • pthread_cond_t: POSIX 条件变量(基于 futex 实现)
  • sem_t: POSIX 信号量(基于 futex 实现)
  • atomic operations: 原子操作(与 futex 配合使用)

8. 示例代码

示例1:基础用法 – 实现简单的计数器同步

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <errno.h>
#include <stdatomic.h>

// futex 系统调用封装
int futex_wait(int *addr, int val, const struct timespec *timeout) {
    return syscall(SYS_futex, addr, FUTEX_WAIT, val, timeout, NULL, 0);
}

int futex_wake(int *addr, int num) {
    return syscall(SYS_futex, addr, FUTEX_WAKE, num, NULL, NULL, 0);
}

// 全局计数器和同步变量
atomic_int counter = 0;
int futex_var = 0;  // 0: 可访问, 1: 正在访问

// 线程函数
void* worker_thread(void* arg) {
    int thread_id = *(int*)arg;
    
    for (int i = 0; i < 5; i++) {
        // 尝试获取锁
        int expected = 0;
        while (!atomic_compare_exchange_weak(&futex_var, &expected, 1)) {
            // 如果获取失败,等待
            futex_wait(&futex_var, 1, NULL);
            expected = 0;
        }
        
        // 临界区:访问共享资源
        int old_value = atomic_fetch_add(&counter, 1);
        printf("线程 %d: 计数器从 %d 增加到 %d\n", 
               thread_id, old_value, old_value + 1);
        
        // 模拟一些工作
        usleep(100000);  // 100ms
        
        // 释放锁
        atomic_store(&futex_var, 0);
        futex_wake(&futex_var, 1);  // 唤醒一个等待的线程
        
        // 非临界区工作
        usleep(50000);  // 50ms
    }
    
    return NULL;
}

int main() {
    pthread_t threads[3];
    int thread_ids[3] = {1, 2, 3};
    
    printf("=== FUTEX 基础同步示例 ===\n");
    printf("启动 3 个线程,每个线程对计数器执行 5 次操作\n\n");
    
    // 创建线程
    for (int i = 0; i < 3; i++) {
        if (pthread_create(&threads[i], NULL, worker_thread, &thread_ids[i]) != 0) {
            perror("pthread_create");
            return 1;
        }
    }
    
    // 等待所有线程完成
    for (int i = 0; i < 3; i++) {
        pthread_join(threads[i], NULL);
    }
    
    printf("\n最终计数器值: %d\n", atomic_load(&counter));
    return 0;
}

示例2:带超时的 futex 操作

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <errno.h>
#include <stdatomic.h>

int futex_wait_timeout(int *addr, int val, int timeout_ms) {
    struct timespec timeout;
    timeout.tv_sec = timeout_ms / 1000;
    timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
    
    return syscall(SYS_futex, addr, FUTEX_WAIT, val, &timeout, NULL, 0);
}

int futex_wake(int *addr, int num) {
    return syscall(SYS_futex, addr, FUTEX_WAKE, num, NULL, NULL, 0);
}

// 生产者-消费者场景
int buffer = -1;  // -1 表示空
int data_ready = 0;  // 0: 无数据, 1: 有数据

void* producer_thread(void* arg) {
    for (int i = 1; i <= 5; i++) {
        printf("生产者: 准备生产数据 %d\n", i);
        
        // 等待缓冲区为空
        while (__atomic_load_n(&data_ready, __ATOMIC_ACQUIRE) == 1) {
            printf("生产者: 缓冲区满,等待消费者...\n");
            int ret = futex_wait_timeout(&data_ready, 1, 2000);  // 2秒超时
            if (ret == -1 && errno == ETIMEDOUT) {
                printf("生产者: 等待超时,继续检查...\n");
            }
        }
        
        // 生产数据
        buffer = i;
        __atomic_store_n(&data_ready, 1, __ATOMIC_RELEASE);
        printf("生产者: 生产了数据 %d\n", i);
        
        // 唤醒消费者
        futex_wake(&data_ready, 1);
        
        sleep(1);
    }
    
    return NULL;
}

void* consumer_thread(void* arg) {
    for (int i = 0; i < 5; i++) {
        printf("消费者: 准备消费数据\n");
        
        // 等待数据就绪
        while (__atomic_load_n(&data_ready, __ATOMIC_ACQUIRE) == 0) {
            printf("消费者: 无数据,等待生产者...\n");
            int ret = futex_wait_timeout(&data_ready, 0, 3000);  // 3秒超时
            if (ret == -1 && errno == ETIMEDOUT) {
                printf("消费者: 等待超时,继续检查...\n");
            }
        }
        
        // 消费数据
        printf("消费者: 消费了数据 %d\n", buffer);
        buffer = -1;
        __atomic_store_n(&data_ready, 0, __ATOMIC_RELEASE);
        
        // 唤醒生产者
        futex_wake(&data_ready, 1);
        
        sleep(2);
    }
    
    return NULL;
}

int main() {
    pthread_t producer, consumer;
    
    printf("=== 带超时的 FUTEX 操作示例 ===\n");
    printf("生产者生产 5 个数据,消费者消费它们\n");
    printf("等待操作设置 2-3 秒超时\n\n");
    
    // 创建线程
    if (pthread_create(&producer, NULL, producer_thread, NULL) != 0 ||
        pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) {
        perror("pthread_create");
        return 1;
    }
    
    // 等待线程完成
    pthread_join(producer, NULL);
    pthread_join(consumer, NULL);
    
    printf("\n生产者-消费者操作完成\n");
    return 0;
}

示例3:实现简单的互斥锁

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <errno.h>
#include <stdatomic.h>

// 简单的 futex 互斥锁实现
typedef struct {
    atomic_int state;  // 0: 未锁定, 1: 已锁定
} futex_mutex_t;

// 初始化互斥锁
void futex_mutex_init(futex_mutex_t *mutex) {
    atomic_store(&mutex->state, 0);
}

// 加锁
void futex_mutex_lock(futex_mutex_t *mutex) {
    int expected = 0;
    // 尝试原子地将状态从 0 改为 1
    while (!atomic_compare_exchange_weak(&mutex->state, &expected, 1)) {
        // 如果失败,说明锁已被占用,等待
        syscall(SYS_futex, &mutex->state, FUTEX_WAIT, 1, NULL, NULL, 0);
        expected = 0;  // 重置期望值
    }
}

// 解锁
void futex_mutex_unlock(futex_mutex_t *mutex) {
    // 原子地将状态改为 0
    atomic_store(&mutex->state, 0);
    // 唤醒一个等待的线程
    syscall(SYS_futex, &mutex->state, FUTEX_WAKE, 1, NULL, NULL, 0);
}

// 全局变量和互斥锁
int shared_counter = 0;
futex_mutex_t my_mutex;

// 工作线程函数
void* increment_thread(void* arg) {
    int thread_id = *(int*)arg;
    int operations = 100000;
    
    printf("线程 %d: 开始执行 %d 次操作\n", thread_id, operations);
    
    for (int i = 0; i < operations; i++) {
        futex_mutex_lock(&my_mutex);
        // 临界区
        shared_counter++;
        futex_mutex_unlock(&my_mutex);
    }
    
    printf("线程 %d: 完成\n", thread_id);
    return NULL;
}

int main() {
    pthread_t threads[4];
    int thread_ids[4] = {1, 2, 3, 4};
    int expected_result = 400000;
    
    printf("=== FUTEX 互斥锁实现示例 ===\n");
    printf("4 个线程,每个执行 100,000 次递增操作\n");
    printf("期望结果: %d\n\n", expected_result);
    
    // 初始化互斥锁
    futex_mutex_init(&my_mutex);
    
    // 创建线程
    for (int i = 0; i < 4; i++) {
        if (pthread_create(&threads[i], NULL, increment_thread, &thread_ids[i]) != 0) {
            perror("pthread_create");
            return 1;
        }
    }
    
    // 等待所有线程完成
    for (int i = 0; i < 4; i++) {
        pthread_join(threads[i], NULL);
    }
    
    printf("\n实际结果: %d\n", shared_counter);
    printf("结果%s正确\n", (shared_counter == expected_result) ? "" : "不");
    
    if (shared_counter != expected_result) {
        printf("可能存在竞态条件或同步问题\n");
    }
    
    return 0;
}

编译和运行说明

# 编译示例(需要链接 pthread 库)
gcc -o futex_example1 example1.c -lpthread
gcc -o futex_example2 example2.c -lpthread
gcc -o futex_example3 example3.c -lpthread

# 运行示例
./futex_example1
./futex_example2
./futex_example3

重要注意事项

  1. 原子操作配合: futex 通常与原子操作(atomic operations)配合使用,确保检查和等待操作的原子性
  2. 虚假唤醒: 与条件变量一样,futex 也可能出现虚假唤醒,需要在循环中检查条件
  3. 内存序: 使用适当的内存序(memory ordering)确保同步正确性
  4. 错误处理: 始终检查 futex 调用的返回值和 errno
  5. 移植性: futex 是 Linux 特有的特性,在其他系统上不可用
  6. 调试困难: futex 相关的 bug 很难调试,建议使用成熟的同步库

futex 的优势

  1. 高性能: 无竞争时完全在用户空间操作
  2. 低延迟: 避免不必要的内核切换
  3. 可扩展性: 支持大量并发线程
  4. 灵活性: 可以实现各种同步原语

这些示例展示了 futex 的基本使用方法,从简单的同步操作到带超时机制,再到完整的互斥锁实现,帮助你理解这个强大而高效的同步机制。

futex系统调用, futex函数详解, Linux futex示例, Fast Userspace muTEX, futex编程指南, 如何使用futex, futex性能优化, 基于futex的同步机制, futex与线程同步, 高级futex技术应用

https://blog.csdn.net/zidier215/article/details/151332503?sharetype=blogdetail&sharerId=151332503&sharerefer=PC&sharesource=zidier215&spm=1011.2480.3001.8118

此条目发表在linux文章分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注