BPF系统调用及示例

这次我们介绍 bpf 函数,它是 Linux 内核中 **Berkeley Packet Filter **(BPF) 子系统的用户态接口。


1. 函数介绍

bpf 是一个功能极其强大的 Linux 系统调用(内核版本 >= 3.18,但许多高级特性需要更新的内核),它提供了一种在内核空间安全、高效地运行用户定义程序的机制。

你可以把 BPF 想象成一个内核里的虚拟机

  • 你(用户态程序)可以编写一段用BPF 指令集编写的“小程序”(eBPF 程序)。
  • 你将这段程序加载到内核中。
  • 内核会验证这段程序的安全性(确保它不会导致死循环、不会访问非法内存等)。
  • 如果验证通过,内核会即时编译 (JIT) 这段程序为机器码,并将其附加到特定的内核钩子(hook points)上。
  • 当内核执行到这些钩子时(例如,收到网络包、进行系统调用、跟踪函数调用),就会执行你加载的 BPF 程序。
  • BPF 程序可以进行过滤修改收集信息(遥测)、路由等操作。

主要用途:

  • 网络编程: 高性能数据包过滤(tcpdump)、流量整形、负载均衡、XDP(eXpress Data Path)超高速网络处理。
  • 系统监控和追踪: 跟踪内核函数、用户态函数、系统调用,收集性能指标(如 perf)、调试信息。
  • 安全: 实施安全策略、沙箱、审计。
  • 性能分析: 无侵入式地分析应用程序和内核性能瓶颈。

2. 函数原型

#include <linux/bpf.h> // 必需,包含 BPF 相关常量和结构体

long bpf(int cmd, union bpf_attr *attr, unsigned int size);

3. 功能

  • 统一接口bpf 系统调用是操作 eBPF 子系统的统一入口点。几乎所有与 eBPF 相关的操作(创建、加载、附加、查询、删除等)都通过这个单一的系统调用来完成。
  • 多用途: 根据 cmd 参数的不同,bpf 可以执行完全不同的操作。

4. 参数

  • int cmd: 指定要执行的具体 BPF 操作。这是一个枚举值(定义在 <linux/bpf.h> 中)。常见的命令包括:
    • BPF_MAP_CREATE: 创建一个 BPF 映射(Map)。映射是 BPF 程序和用户态程序之间共享数据的高效机制。
    • BPF_PROG_LOAD: 将一个 BPF 程序加载到内核中。
    • BPF_OBJ_PIN / BPF_OBJ_GET: 将 BPF 对象(程序或映射)固定到文件系统路径或从路径获取。
    • BPF_PROG_ATTACH / BPF_PROG_DETACH: 将已加载的 BPF 程序附加到或从特定的挂钩点(如 cgroup、网络设备)分离。
    • BPF_PROG_RUN / BPF_PROG_TEST_RUN: (测试)运行 BPF 程序。
    • BPF_MAP_LOOKUP_ELEM / BPF_MAP_UPDATE_ELEM / BPF_MAP_DELETE_ELEM: 对 BPF 映射进行查找、更新、删除元素操作。
    • BPF_PROG_GET_NEXT_ID / BPF_PROG_GET_FD_BY_ID: 枚举和通过 ID 获取 BPF 程序。
    • BPF_MAP_GET_NEXT_ID / BPF_MAP_GET_FD_BY_ID: 枚举和通过 ID 获取 BPF 映射。
    • … 还有很多其他命令 …
  • union bpf_attr *attr: 这是一个指向 union bpf_attr 结构体的指针。这个联合体包含了执行 cmd 指定操作所需的所有可能参数。根据 cmd 的不同,bpf 系统调用会从这个联合体中读取或写入特定的成员。
    • 例如,对于 BPF_MAP_CREATE,它会读取 attr->map_typeattr->key_sizeattr->value_sizeattr->max_entries 等成员。
    • 对于 BPF_PROG_LOAD,它会读取 attr->prog_typeattr->insn_cntattr->insnsattr->license 等成员。
  • unsigned int size: 指定 attr 指向的 union bpf_attr 结构体的大小(以字节为单位)。内核使用这个大小来进行兼容性检查和内存访问边界控制。

5. union bpf_attr 结构体

union bpf_attr 是一个巨大的联合体,包含了所有 BPF 操作可能需要的参数。它的定义非常庞大,这里只列举几个关键成员以说明其结构:

union bpf_attr {
    struct { /* anonymous struct for BPF_MAP_CREATE */
        __u32   map_type;    // 映射类型 (BPF_MAP_TYPE_*)
        __u32   key_size;    // 键大小
        __u32   value_size;  // 值大小
        __u32   max_entries; // 最大元素个数
        __u32   map_flags;   // 标志位
        __u32   inner_map_fd; // 用于 array/hash of maps
        __u32   numa_node;   // NUMA 节点
        char    map_name[BPF_OBJ_NAME_LEN]; // 映射名称
        __u32   map_ifindex; // 网络接口索引
        // ... 更多字段 ...
    }; // BPF_MAP_CREATE 使用这些字段

    struct { /* anonymous struct for BPF_PROG_LOAD */
        __u32   prog_type;     // 程序类型 (BPF_PROG_TYPE_*)
        __u32   insn_cnt;      // 指令数量
        __aligned_u64 insns;   // 指向指令数组的用户态指针
        __aligned_u64 license;  // 指向许可证字符串的用户态指针 ("GPL")
        __u32   log_level;     // 日志级别
        __u32   log_size;      // 日志缓冲区大小
        __aligned_u64 log_buf; // 指向日志缓冲区的用户态指针
        __u32   kern_version;  // 内核版本 (用于追踪程序)
        __u32   prog_flags;   // 程序标志
        char    prog_name[BPF_OBJ_NAME_LEN]; // 程序名称
        __u32   prog_ifindex;  // 网络接口索引
        // ... 更多字段 ...
    }; // BPF_PROG_LOAD 使用这些字段

    // ... 还有很多其他匿名结构体,对应不同的 cmd ...
};

6. 返回值

  • 成功时: 返回值取决于具体的 cmd
    • 对于 BPF_MAP_CREATEBPF_PROG_LOAD 等创建操作:通常返回一个新的文件描述符(fd),用于引用新创建的 BPF 映射或程序。
    • 对于 BPF_MAP_LOOKUP_ELEM 等查询操作:可能返回 0 表示成功。
    • 对于 BPF_PROG_ATTACH 等操作:可能返回 0 表示成功。
  • 失败时: 返回 -1,并设置全局变量 errno 来指示具体的错误原因(例如 EINVAL 参数无效,EACCES 权限不足,ENOMEM 内存不足,E2BIG 程序太大或映射太大,EPERM 操作不被允许等)。

7. 相似函数,或关联函数

  • libbpf: 一个 C 库,提供了对 bpf 系统调用的高级封装,简化了 eBPF 程序的加载、映射操作和附加过程。这是编写 eBPF 应用程序的推荐方式。
  • bpftool: 一个命令行工具,用于检查、调试和操作 eBPF 程序和映射。它本身就是 bpf 系统调用的使用者。
  • LLVM/Clang: 用于将 C 语言编写的 eBPF 程序编译成 BPF 字节码。
  • perf: 可以与 eBPF 结合使用进行性能分析。
  • bcc / bpftrace: 更高级别的工具和库,进一步简化了 eBPF 的使用,允许用 Python 或特定领域语言编写脚本。

8. 示例代码

重要提示: 直接使用 bpf 系统调用编写 eBPF 程序非常复杂,涉及大量的底层细节、内存管理和联合体操作。下面的示例将展示一个极其简化的、概念性的 C 代码,旨在说明 bpf 系统调用的调用方式和参数结构。实际的 eBPF 开发通常使用 libbpf 库。

示例 1:概念性地使用 bpf 系统调用

这个例子展示了如何直接调用 bpf 系统调用(通过 syscall)来创建一个简单的 BPF 映射。

// bpf_conceptual.c
// 注意:这是一个非常简化的概念性示例,不包含实际的 eBPF 程序加载。
// 实际使用需要 libbpf 或 LLVM/Clang 工具链。
#define _GNU_SOURCE
#include <linux/bpf.h> // 包含 BPF 相关定义
#include <sys/syscall.h> // syscall
#include <unistd.h>     // close
#include <stdio.h>      // perror, printf
#include <stdlib.h>     // exit
#include <string.h>     // memset
#include <errno.h>      // errno

// 简化包装 syscall
static inline long sys_bpf(int cmd, union bpf_attr *attr, unsigned int size) {
    return syscall(__NR_bpf, cmd, attr, size);
}

int main() {
    union bpf_attr attr;
    int map_fd;

    printf("Using bpf syscall directly to create a map...\n");

    // 1. 清零 attr 联合体
    memset(&attr, 0, sizeof(attr));

    // 2. 填充 BPF_MAP_CREATE 所需的参数
    attr.map_type = BPF_MAP_TYPE_ARRAY; // 创建一个数组类型的映射
    attr.key_size = sizeof(int);        // 键是 int 类型 (4 bytes)
    attr.value_size = sizeof(long long); // 值是 long long 类型 (8 bytes)
    attr.max_entries = 10;              // 数组大小为 10
    // attr.map_flags = 0;              // 可以设置标志,这里用默认值
    snprintf(attr.map_name, sizeof(attr.map_name), "my_array_map"); // 设置映射名称

    printf("Creating BPF_MAP_TYPE_ARRAY with:\n");
    printf("  map_type: %u (BPF_MAP_TYPE_ARRAY)\n", attr.map_type);
    printf("  key_size: %u bytes\n", attr.key_size);
    printf("  value_size: %u bytes\n", attr.value_size);
    printf("  max_entries: %u\n", attr.max_entries);
    printf("  map_name: %s\n", attr.map_name);

    // 3. 调用 bpf 系统调用 (BPF_MAP_CREATE)
    printf("Calling bpf(BPF_MAP_CREATE, ...)\n");
    map_fd = sys_bpf(BPF_MAP_CREATE, &attr, sizeof(attr));
    if (map_fd < 0) {
        perror("bpf BPF_MAP_CREATE failed");
        if (errno == EPERM) {
            printf("Permission denied. You might need to run this as root or adjust capabilities.\n");
            printf("Try: sudo ./bpf_conceptual\n");
        }
        exit(EXIT_FAILURE);
    }

    printf("BPF map created successfully. File descriptor: %d\n", map_fd);

    // 4. (概念性) 使用 map_fd 进行后续操作
    // 例如,使用 BPF_MAP_UPDATE_ELEM 更新元素
    // union bpf_attr update_attr;
    // memset(&update_attr, 0, sizeof(update_attr));
    // update_attr.map_fd = map_fd;
    // int key = 5;
    // long long value = 1234567890LL;
    // update_attr.key = (unsigned long)&key;
    // update_attr.value = (unsigned long)&value;
    // update_attr.flags = BPF_ANY; // 如果存在则更新,否则创建
    // if (sys_bpf(BPF_MAP_UPDATE_ELEM, &update_attr, sizeof(update_attr)) == -1) {
    //     perror("bpf BPF_MAP_UPDATE_ELEM failed");
    // } else {
    //     printf("Successfully updated element at key %d to value %lld\n", key, value);
    // }

    // 5. 关闭映射文件描述符
    printf("Closing BPF map file descriptor...\n");
    if (close(map_fd) == -1) {
        perror("close BPF map fd failed");
    } else {
        printf("BPF map file descriptor closed.\n");
    }

    printf("Conceptual bpf syscall example completed.\n");
    return 0;
}

**代码解释 **(概念性):

  1. 定义 sys_bpf 包装 syscall(__NR_bpf, ...),因为 glibc 可能没有直接包装 bpf
  2. 声明 union bpf_attr attr 用于传递参数。
  3. 清零 attr 联合体,这是一个好习惯,确保未使用的字段为 0。
  4. 填充 attr:
    • map_type = BPF_MAP_TYPE_ARRAY: 指定创建数组映射。
    • key_size = sizeof(int): 键是 4 字节整数。
    • value_size = sizeof(long long): 值是 8 字节长整数。
    • max_entries = 10: 数组包含 10 个元素。
    • snprintf(attr.map_name, ...): 设置映射的名称。
  5. 调用 sys_bpf:
    • cmd = BPF_MAP_CREATE: 指定创建映射操作。
    • &attr: 指向填充好的参数联合体。
    • sizeof(attr): 联合体的大小。
  6. 检查返回值:
    • 如果返回值 map_fd 是一个非负整数,表示成功,这个 map_fd 是新创建映射的文件描述符。
    • 如果返回 -1,检查 errnoEPERM 表示权限不足,通常需要 root 权限。
  7. 打印成功信息和返回的文件描述符。
  8. 概念性操作: 注释掉了使用 BPF_MAP_UPDATE_ELEM 命令更新映射元素的代码。
  9. 使用 close(map_fd) 关闭映射文件描述符,释放资源。

示例 2:使用 libbpf 创建和使用 BPF 映射 (推荐方式)

这个例子展示了使用 libbpf 库(现代推荐方式)来创建和操作 BPF 映射。

// bpf_libbpf_example.c
// 编译: gcc -o bpf_libbpf_example bpf_libbpf_example.c -lbpf
// 注意:需要安装 libbpf-dev 包

/*
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <bpf/libbpf.h> // libbpf 库
#include <bpf/bpf.h>    // bpf_map_update_elem, bpf_map_lookup_elem 等辅助函数
#include <stdio.h>      // printf, perror
#include <stdlib.h>     // exit
#include <unistd.h>     // close (如果需要)

int main() {
    int map_fd = -1;
    int err;
    int key = 5;
    long long value = 9876543210LL;
    long long lookup_value;

    printf("Using libbpf to create and manipulate a BPF map...\n");

    // 1. 使用 libbpf 创建 BPF 映射
    struct bpf_map *map = bpf_map__new(BPF_MAP_TYPE_ARRAY, sizeof(int), sizeof(long long), 10, 0, "my_libbpf_array_map");
    if (!map) {
        fprintf(stderr, "Failed to create BPF map using libbpf.\n");
        exit(EXIT_FAILURE);
    }

    // 2. 获取映射的文件描述符
    map_fd = bpf_map__fd(map);
    if (map_fd < 0) {
        fprintf(stderr, "Failed to get map file descriptor.\n");
        bpf_map__destroy(map); // 清理
        exit(EXIT_FAILURE);
    }

    printf("BPF map created using libbpf. File descriptor: %d\n", map_fd);

    // 3. 使用 libbpf 辅助函数更新映射元素
    printf("Updating element at key %d with value %lld...\n", key, value);
    err = bpf_map_update_elem(map_fd, &key, &value, BPF_ANY);
    if (err) {
        perror("bpf_map_update_elem failed");
        bpf_map__destroy(map);
        exit(EXIT_FAILURE);
    }
    printf("Element updated successfully.\n");

    // 4. 使用 libbpf 辅助函数查找映射元素
    printf("Looking up element at key %d...\n", key);
    err = bpf_map_lookup_elem(map_fd, &key, &lookup_value);
    if (err) {
        perror("bpf_map_lookup_elem failed");
        bpf_map__destroy(map);
        exit(EXIT_FAILURE);
    }
    printf("Found element at key %d with value %lld.\n", key, lookup_value);

    // 5. 清理资源
    printf("Destroying BPF map...\n");
    bpf_map__destroy(map); // 这会关闭 fd 并释放资源
    printf("BPF map destroyed.\n");

    printf("libbpf example completed.\n");
    return 0;
}
*/
// 由于 libbpf 依赖和编译可能较为复杂,此处提供伪代码框架。
// 实际使用请参考 libbpf 文档和示例。

**代码解释 **(概念性/伪代码):

  1. 包含 libbpf 库的头文件。
  2. 创建映射:
    • 调用 libbpf 提供的高级函数 bpf_map__new 来创建映射。
    • 这比直接使用 bpf 系统调用简单得多,库会处理联合体的填充和系统调用。
  3. 获取文件描述符:
    • 调用 bpf_map__fd 获取映射的文件描述符,用于后续操作。
  4. 操作映射:
    • 使用 libbpf 提供的辅助函数 bpf_map_update_elem 和 bpf_map_lookup_elem 来更新和查找映射中的元素。
    • 这些函数内部会调用 bpf 系统调用(如 BPF_MAP_UPDATE_ELEM)。
  5. 清理:
    • 调用 bpf_map__destroy 来销毁映射并释放所有相关资源(包括关闭文件描述符)。

重要提示与注意事项:

  1. 内核版本: eBPF 是一个快速发展的领域,新特性和功能不断加入。确保你的 Linux 内核版本足够新以支持你需要的功能。
  2. 权限: 使用 bpf 系统调用通常需要特殊权限,如 CAP_SYS_ADMIN 或 CAP_BPF(较新内核)。在生产环境中,应遵循最小权限原则。
  3. libbpf 是推荐方式: 直接使用 bpf 系统调用非常复杂且容易出错。libbpf 库极大地简化了开发流程,提供了更好的可移植性和错误处理。
  4. 程序加载: 加载 eBPF 程序(BPF_PROG_LOAD)比创建映射复杂得多,需要预先编译好的 BPF 字节码,并处理验证、日志等。
  5. 安全性: eBPF 程序在加载到内核前会经过严格的验证器(verifier)检查,确保其安全性(无无限循环、无非法内存访问等)。这是 eBPF 能够安全运行在内核中的关键。
  6. 性能: eBPF 程序在内核中运行,并且通常会被 JIT 编译成高效的机器码,性能非常高。
  7. 调试bpftool 和 bpf_trace_printk 是调试 eBPF 程序的常用工具。

总结:

bpf 系统调用是 Linux eBPF 子系统的核心接口,它提供了一种强大、安全且高效的方式让用户态程序在内核中执行自定义逻辑。虽然直接使用它非常底层和复杂,但通过 libbpf 等高级库,开发者可以更轻松地利用 eBPF 的强大功能来构建网络、安全、监控和性能分析等领域的前沿应用。理解其基本概念和工作原理对于现代 Linux 系统程序员来说至关重要。

发表在 linux文章 | 留下评论

linux定时器管理 timer_* 函数详解

1. 函数介绍

timer_ 函数系列*是Linux系统中用于定时器管理的一组函数,它们提供了精确的时间控制和定时功能。可以把timer_*函数想象成一个”精密时钟系统”,允许你设置各种类型的定时器,包括一次性定时器、周期性定时器、以及高精度定时器。

这些函数基于POSIX定时器标准,提供了比传统alarm()函数更强大和灵活的功能。timer_*函数可以:

  • 创建和管理多个定时器
  • 设置一次性或周期性定时
  • 指定定时器到期时的行为(发送信号或执行回调)
  • 精确控制定时器的时间间隔
  • 查询和修改定时器状态

使用场景:

  • 网络服务器中的超时控制
  • 实时系统中的周期性任务
  • 游戏开发中的帧率控制
  • 系统监控和定时检查
  • 多媒体应用中的同步控制

2. 函数原型

#include <time.h>
#include <signal.h>

// 创建定时器
int timer_create(clockid_t clockid, struct sigevent *sevp, timer_t *timerid);

// 启动/修改定时器
int timer_settime(timer_t timerid, int flags,
                  const struct itimerspec *new_value,
                  struct itimerspec *old_value);

// 获取定时器时间
int timer_gettime(timer_t timerid, struct itimerspec *curr_value);

// 获取定时器超时次数
int timer_getoverrun(timer_t timerid);

// 删除定时器
int timer_delete(timer_t timerid);

3. 功能

timer_*函数系列提供了完整的定时器管理功能:

  • timer_create: 创建一个新的定时器,指定时钟源和到期通知方式
  • timer_settime: 启动、停止或修改定时器的定时参数
  • timer_gettime: 查询定时器的当前状态和剩余时间
  • timer_getoverrun: 获取定时器的超限运行次数(当定时器到期但未被处理时)
  • timer_delete: 删除和清理定时器资源

4. 参数

timer_create参数:

  • clockid: 时钟类型
    • 类型:clockid_t
    • 含义:指定定时器使用的时钟源
    • 常用值:
      • CLOCK_REALTIME:系统实时时间
      • CLOCK_MONOTONIC:单调时钟(不会受系统时间调整影响)
      • CLOCK_PROCESS_CPUTIME_ID:进程CPU时间
      • CLOCK_THREAD_CPUTIME_ID:线程CPU时间
  • sevp: 信号事件结构
    • 类型:struct sigevent*
    • 含义:指定定时器到期时的通知方式
    • NULL表示使用默认的SIGALRM信号
  • timerid: 定时器ID
    • 类型:timer_t*
    • 含义:返回创建的定时器标识符

timer_settime参数:

  • timerid: 定时器ID
    • 类型:timer_t
    • 含义:要操作的定时器标识符
  • flags: 操作标志
    • 类型:int
    • 含义:控制定时器行为的标志
    • 常用值:
      • 0:相对时间
      • TIMER_ABSTIME:绝对时间
  • new_value: 新的定时器设置
    • 类型:const struct itimerspec*
    • 含义:指定定时器的新参数
  • old_value: 旧的定时器设置
    • 类型:struct itimerspec*
    • 含义:返回定时器之前的设置(可为NULL)

5. 返回值

  • 成功: 返回0
  • 失败: 返回-1,并设置errno错误码
    • EINVAL:参数无效
    • ENOMEM:内存不足
    • EPERM:权限不足
    • EAGAIN:资源暂时不可用

6. 相似函数或关联函数

  • alarm(): 传统的一次性定时器函数
  • setitimer(): 更灵活的间隔定时器函数
  • sleep()/usleep(): 简单的延迟函数
  • clock_gettime(): 获取时钟时间
  • nanosleep(): 高精度睡眠函数
  • signal()/sigaction(): 信号处理函数

7. 示例代码

示例1:基础timer使用 – 简单定时器

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include <string.h>
#include <errno.h>

// 定时器信号处理函数
void timer_handler(int sig, siginfo_t *si, void *uc) {
    timer_t *tidp = si->si_value.sival_ptr;
    printf("[%ld] 定时器到期! 定时器ID: %p\n", time(NULL), (void*)*tidp);
}

int main() {
    timer_t timerid;
    struct sigevent sev;
    struct itimerspec its;
    struct sigaction sa;
    
    printf("=== 基础定时器示例 ===\n");
    printf("当前时间: %ld\n", time(NULL));
    
    // 设置信号处理函数
    sa.sa_flags = SA_SIGINFO;
    sa.sa_sigaction = timer_handler;
    sigemptyset(&sa.sa_mask);
    if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
        perror("sigaction");
        exit(EXIT_FAILURE);
    }
    
    // 创建定时器
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGRTMIN;
    sev.sigev_value.sival_ptr = &timerid;
    if (timer_create(CLOCK_REALTIME, &sev, &timerid) == -1) {
        perror("timer_create");
        exit(EXIT_FAILURE);
    }
    
    printf("定时器创建成功,ID: %p\n", (void*)timerid);
    
    // 设置定时器:5秒后开始,每2秒触发一次
    its.it_value.tv_sec = 5;     // 初始延迟5秒
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = 2;  // 周期间隔2秒
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(timerid, 0, &its, NULL) == -1) {
        perror("timer_settime");
        exit(EXIT_FAILURE);
    }
    
    printf("定时器已启动:5秒后首次触发,之后每2秒触发一次\n");
    printf("程序将运行20秒...\n\n");
    
    // 等待定时器触发
    sleep(20);
    
    // 停止定时器
    its.it_value.tv_sec = 0;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = 0;
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(timerid, 0, &its, NULL) == -1) {
        perror("timer_settime");
        exit(EXIT_FAILURE);
    }
    
    printf("\n定时器已停止\n");
    
    // 删除定时器
    if (timer_delete(timerid) == -1) {
        perror("timer_delete");
        exit(EXIT_FAILURE);
    }
    
    printf("定时器已删除\n");
    
    return 0;
}

示例2:多种定时器类型和时钟源

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include <string.h>

#define NUM_TIMERS 4

timer_t timers[NUM_TIMERS];
int timer_counts[NUM_TIMERS] = {0};

// 定时器信号处理函数
void timer_handler(int sig, siginfo_t *si, void *uc) {
    int timer_index = si->si_value.sival_int;
    
    timer_counts[timer_index]++;
    printf("[%ld] 定时器 %d 到期第 %d 次\n", 
           time(NULL), timer_index, timer_counts[timer_index]);
    
    // 演示timer_gettime
    struct itimerspec curr_value;
    if (timer_gettime(timers[timer_index], &curr_value) == 0) {
        printf("  剩余时间: %ld.%09ld 秒\n", 
               curr_value.it_value.tv_sec, curr_value.it_value.tv_nsec);
    }
    
    // 演示timer_getoverrun
    int overrun = timer_getoverrun(timers[timer_index]);
    if (overrun > 0) {
        printf("  超限运行: %d 次\n", overrun);
    }
}

int main() {
    struct sigaction sa;
    struct sigevent sev;
    struct itimerspec its;
    int i;
    
    printf("=== 多种定时器类型示例 ===\n");
    printf("当前时间: %ld\n\n", time(NULL));
    
    // 设置信号处理函数
    sa.sa_flags = SA_SIGINFO;
    sa.sa_sigaction = timer_handler;
    sigemptyset(&sa.sa_mask);
    if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
        perror("sigaction");
        exit(EXIT_FAILURE);
    }
    
    // 创建不同类型的定时器
    for (i = 0; i < NUM_TIMERS; i++) {
        // 设置信号事件
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = SIGRTMIN;
        sev.sigev_value.sival_int = i;
        
        clockid_t clock_type;
        const char* clock_name;
        
        switch(i) {
            case 0:
                clock_type = CLOCK_REALTIME;
                clock_name = "CLOCK_REALTIME";
                break;
            case 1:
                clock_type = CLOCK_MONOTONIC;
                clock_name = "CLOCK_MONOTONIC";
                break;
            case 2:
                clock_type = CLOCK_PROCESS_CPUTIME_ID;
                clock_name = "CLOCK_PROCESS_CPUTIME_ID";
                break;
            case 3:
                clock_type = CLOCK_THREAD_CPUTIME_ID;
                clock_name = "CLOCK_THREAD_CPUTIME_ID";
                break;
            default:
                clock_type = CLOCK_REALTIME;
                clock_name = "DEFAULT";
                break;
        }
        
        // 创建定时器
        if (timer_create(clock_type, &sev, &timers[i]) == -1) {
            printf("创建定时器 %d (%s) 失败: %s\n", i, clock_name, strerror(errno));
            continue;
        }
        
        printf("定时器 %d (%s) 创建成功\n", i, clock_name);
        
        // 设置不同的定时参数
        switch(i) {
            case 0: // 一次性定时器
                its.it_value.tv_sec = 3 + i;     // 3秒后触发
                its.it_value.tv_nsec = 0;
                its.it_interval.tv_sec = 0;      // 不重复
                its.it_interval.tv_nsec = 0;
                printf("  设置为一次性定时器,%d秒后触发\n", 3 + i);
                break;
                
            case 1: // 短周期定时器
                its.it_value.tv_sec = 2;         // 2秒后首次触发
                its.it_value.tv_nsec = 0;
                its.it_interval.tv_sec = 1;      // 每1秒重复
                its.it_interval.tv_nsec = 0;
                printf("  设置为周期性定时器,2秒后开始,每1秒触发\n");
                break;
                
            case 2: // 长周期定时器
                its.it_value.tv_sec = 5;         // 5秒后首次触发
                its.it_value.tv_nsec = 500000000; // 500毫秒
                its.it_interval.tv_sec = 3;      // 每3秒重复
                its.it_interval.tv_nsec = 0;
                printf("  设置为周期性定时器,5.5秒后开始,每3秒触发\n");
                break;
                
            case 3: // 快速定时器
                its.it_value.tv_sec = 1;         // 1秒后首次触发
                its.it_value.tv_nsec = 0;
                its.it_interval.tv_sec = 0;      // 每0.5秒重复
                its.it_interval.tv_nsec = 500000000; // 500毫秒
                printf("  设置为周期性定时器,1秒后开始,每0.5秒触发\n");
                break;
        }
        
        // 启动定时器
        if (timer_settime(timers[i], 0, &its, NULL) == -1) {
            perror("timer_settime");
            exit(EXIT_FAILURE);
        }
    }
    
    printf("\n所有定时器已启动,程序运行20秒...\n\n");
    
    // 运行一段时间观察定时器效果
    sleep(20);
    
    // 停止所有定时器
    printf("\n=== 停止所有定时器 ===\n");
    its.it_value.tv_sec = 0;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = 0;
    its.it_interval.tv_nsec = 0;
    
    for (i = 0; i < NUM_TIMERS; i++) {
        if (timer_settime(timers[i], 0, &its, NULL) == -1) {
            printf("停止定时器 %d 失败\n", i);
        } else {
            printf("定时器 %d 已停止\n", i);
        }
    }
    
    // 删除所有定时器
    printf("\n=== 删除所有定时器 ===\n");
    for (i = 0; i < NUM_TIMERS; i++) {
        if (timer_delete(timers[i]) == -1) {
            printf("删除定时器 %d 失败\n", i);
        } else {
            printf("定时器 %d 已删除\n", i);
        }
    }
    
    // 显示统计信息
    printf("\n=== 定时器触发统计 ===\n");
    for (i = 0; i < NUM_TIMERS; i++) {
        printf("定时器 %d 触发次数: %d\n", i, timer_counts[i]);
    }
    
    return 0;
}

示例3:高精度定时器和绝对时间定时

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include <string.h>

timer_t timerid;
int trigger_count = 0;

// 定时器信号处理函数
void timer_handler(int sig, siginfo_t *si, void *uc) {
    trigger_count++;
    
    struct timespec current_time;
    clock_gettime(CLOCK_REALTIME, &current_time);
    
    printf("[%ld.%09ld] 定时器第 %d 次触发\n", 
           current_time.tv_sec, current_time.tv_nsec, trigger_count);
}

// 获取当前时间字符串
void print_current_time() {
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    printf("当前时间: %ld.%09ld\n", ts.tv_sec, ts.tv_nsec);
}

int main() {
    struct sigaction sa;
    struct sigevent sev;
    struct itimerspec its;
    struct timespec current_time;
    
    printf("=== 高精度定时器和绝对时间示例 ===\n");
    print_current_time();
    printf("\n");
    
    // 设置信号处理函数
    sa.sa_flags = SA_SIGINFO;
    sa.sa_sigaction = timer_handler;
    sigemptyset(&sa.sa_mask);
    if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
        perror("sigaction");
        exit(EXIT_FAILURE);
    }
    
    // 创建定时器
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGRTMIN;
    sev.sigev_value.sival_ptr = &timerid;
    
    if (timer_create(CLOCK_REALTIME, &sev, &timerid) == -1) {
        perror("timer_create");
        exit(EXIT_FAILURE);
    }
    
    printf("高精度定时器创建成功\n");
    
    // 示例1: 相对时间定时器(高精度)
    printf("\n--- 相对时间定时器 ---\n");
    print_current_time();
    
    // 设置1.5秒的延迟定时器
    its.it_value.tv_sec = 1;
    its.it_value.tv_nsec = 500000000;  // 500毫秒
    its.it_interval.tv_sec = 0;
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(timerid, 0, &its, NULL) == -1) {
        perror("timer_settime");
        exit(EXIT_FAILURE);
    }
    
    printf("设置1.5秒延迟定时器...\n");
    sleep(3);  // 等待定时器触发
    
    // 示例2: 周期性高精度定时器
    printf("\n--- 周期性高精度定时器 ---\n");
    print_current_time();
    
    // 设置周期性定时器:0.1秒后开始,每0.2秒触发
    its.it_value.tv_sec = 0;
    its.it_value.tv_nsec = 100000000;   // 100毫秒
    its.it_interval.tv_sec = 0;
    its.it_interval.tv_nsec = 200000000; // 200毫秒
    
    if (timer_settime(timerid, 0, &its, NULL) == -1) {
        perror("timer_settime");
        exit(EXIT_FAILURE);
    }
    
    printf("设置周期性定时器:100毫秒后开始,每200毫秒触发\n");
    printf("运行5秒观察效果...\n\n");
    
    sleep(5);
    
    // 停止定时器
    its.it_value.tv_sec = 0;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = 0;
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(timerid, 0, &its, NULL) == -1) {
        perror("timer_settime");
        exit(EXIT_FAILURE);
    }
    
    printf("\n周期性定时器已停止\n");
    printf("触发次数: %d\n", trigger_count);
    
    // 示例3: 绝对时间定时器
    printf("\n--- 绝对时间定时器 ---\n");
    trigger_count = 0;
    
    // 获取当前时间并设置5秒后的绝对时间
    if (clock_gettime(CLOCK_REALTIME, &current_time) == -1) {
        perror("clock_gettime");
        exit(EXIT_FAILURE);
    }
    
    print_current_time();
    
    // 设置绝对时间:当前时间+3秒
    struct itimerspec abs_its;
    abs_its.it_value.tv_sec = current_time.tv_sec + 3;
    abs_its.it_value.tv_nsec = current_time.tv_nsec;
    abs_its.it_interval.tv_sec = 0;
    abs_its.it_interval.tv_nsec = 0;
    
    printf("设置绝对时间定时器:3秒后触发\n");
    
    if (timer_settime(timerid, TIMER_ABSTIME, &abs_its, NULL) == -1) {
        perror("timer_settime (绝对时间)");
        exit(EXIT_FAILURE);
    }
    
    sleep(5);  // 等待定时器触发
    
    // 示例4: 定时器状态查询
    printf("\n--- 定时器状态查询 ---\n");
    
    // 重新设置一个周期性定时器用于测试
    its.it_value.tv_sec = 1;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = 2;
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(timerid, 0, &its, NULL) == -1) {
        perror("timer_settime");
        exit(EXIT_FAILURE);
    }
    
    printf("设置新的周期性定时器:1秒后开始,每2秒触发\n");
    
    // 查询定时器状态
    sleep(1);  // 等待首次触发后
    
    struct itimerspec query_its;
    if (timer_gettime(timerid, &query_its) == 0) {
        printf("定时器状态查询结果:\n");
        printf("  到期间隔: %ld.%09ld 秒\n", 
               query_its.it_interval.tv_sec, query_its.it_interval.tv_nsec);
        printf("  剩余时间: %ld.%09ld 秒\n", 
               query_its.it_value.tv_sec, query_its.it_value.tv_nsec);
    }
    
    // 演示timer_getoverrun
    printf("定时器超限运行测试(快速触发):\n");
    sleep(6);
    
    int overrun = timer_getoverrun(timerid);
    printf("超限运行次数: %d\n", overrun);
    
    // 清理
    its.it_value.tv_sec = 0;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = 0;
    its.it_interval.tv_nsec = 0;
    timer_settime(timerid, 0, &its, NULL);
    timer_delete(timerid);
    
    printf("\n所有测试完成\n");
    
    return 0;
}

示例4:定时器在实际应用中的使用

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include <string.h>
#include <pthread.h>

#define MAX_TASKS 10
#define HEARTBEAT_INTERVAL 5

// 任务结构体
typedef struct {
    int id;
    char name[50];
    int interval_seconds;
    time_t last_run;
    int run_count;
} task_t;

task_t tasks[MAX_TASKS];
int task_count = 0;
timer_t heartbeat_timer;
timer_t task_timers[MAX_TASKS];
pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

// 添加任务
int add_task(int id, const char* name, int interval) {
    if (task_count >= MAX_TASKS) {
        return -1;
    }
    
    tasks[task_count].id = id;
    strncpy(tasks[task_count].name, name, sizeof(tasks[task_count].name) - 1);
    tasks[task_count].interval_seconds = interval;
    tasks[task_count].last_run = 0;
    tasks[task_count].run_count = 0;
    
    return task_count++;
}

// 任务执行函数
void execute_task(int task_index) {
    pthread_mutex_lock(&task_mutex);
    
    time_t current_time = time(NULL);
    tasks[task_index].last_run = current_time;
    tasks[task_index].run_count++;
    
    printf("[%ld] 执行任务 %d (%s): 第 %d 次执行\n", 
           current_time, 
           tasks[task_index].id, 
           tasks[task_index].name, 
           tasks[task_index].run_count);
    
    pthread_mutex_unlock(&task_mutex);
}

// 任务定时器处理函数
void task_timer_handler(int sig, siginfo_t *si, void *uc) {
    int task_index = si->si_value.sival_int;
    execute_task(task_index);
}

// 心跳定时器处理函数
void heartbeat_handler(int sig, siginfo_t *si, void *uc) {
    static int heartbeat_count = 0;
    heartbeat_count++;
    
    printf("[%ld] 系统心跳 #%d\n", time(NULL), heartbeat_count);
    
    // 显示所有任务状态
    pthread_mutex_lock(&task_mutex);
    printf("  任务状态:\n");
    for (int i = 0; i < task_count; i++) {
        printf("    %s: 执行%d次, 最后执行: %s", 
               tasks[i].name, tasks[i].run_count,
               tasks[i].last_run ? ctime(&tasks[i].last_run) : "从未执行\n");
        if (tasks[i].last_run) {
            // 移除ctime返回的换行符
            char* newline = strchr(ctime(&tasks[i].last_run), '\n');
            if (newline) *newline = '\0';
            printf("%s\n", ctime(&tasks[i].last_run));
        }
    }
    pthread_mutex_unlock(&task_mutex);
}

// 初始化定时器系统
int init_timer_system() {
    struct sigaction sa;
    
    // 设置任务定时器信号处理
    sa.sa_flags = SA_SIGINFO;
    sa.sa_sigaction = task_timer_handler;
    sigemptyset(&sa.sa_mask);
    if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
        perror("sigaction task");
        return -1;
    }
    
    // 设置心跳定时器信号处理
    sa.sa_sigaction = heartbeat_handler;
    if (sigaction(SIGRTMIN + 1, &sa, NULL) == -1) {
        perror("sigaction heartbeat");
        return -1;
    }
    
    return 0;
}

// 启动任务定时器
int start_task_timer(int task_index) {
    struct sigevent sev;
    struct itimerspec its;
    
    // 创建定时器
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGRTMIN;
    sev.sigev_value.sival_int = task_index;
    
    if (timer_create(CLOCK_REALTIME, &sev, &task_timers[task_index]) == -1) {
        perror("timer_create task");
        return -1;
    }
    
    // 设置定时器
    its.it_value.tv_sec = tasks[task_index].interval_seconds;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = tasks[task_index].interval_seconds;
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(task_timers[task_index], 0, &its, NULL) == -1) {
        perror("timer_settime task");
        return -1;
    }
    
    printf("任务定时器 %s 已启动,间隔 %d 秒\n", 
           tasks[task_index].name, tasks[task_index].interval_seconds);
    
    return 0;
}

// 启动心跳定时器
int start_heartbeat_timer() {
    struct sigevent sev;
    struct itimerspec its;
    
    // 创建心跳定时器
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGRTMIN + 1;
    sev.sigev_value.sival_int = 0;
    
    if (timer_create(CLOCK_REALTIME, &sev, &heartbeat_timer) == -1) {
        perror("timer_create heartbeat");
        return -1;
    }
    
    // 设置心跳定时器(每5秒触发一次)
    its.it_value.tv_sec = HEARTBEAT_INTERVAL;
    its.it_value.tv_nsec = 0;
    its.it_interval.tv_sec = HEARTBEAT_INTERVAL;
    its.it_interval.tv_nsec = 0;
    
    if (timer_settime(heartbeat_timer, 0, &its, NULL) == -1) {
        perror("timer_settime heartbeat");
        return -1;
    }
    
    printf("心跳定时器已启动,间隔 %d 秒\n", HEARTBEAT_INTERVAL);
    
    return 0;
}

int main() {
    printf("=== 实际应用中的定时器系统 ===\n");
    printf("启动时间: %s", ctime(&(time_t){time(NULL)}));
    
    // 初始化定时器系统
    if (init_timer_system() == -1) {
        fprintf(stderr, "初始化定时器系统失败\n");
        exit(EXIT_FAILURE);
    }
    
    // 添加一些测试任务
    add_task(1, "数据备份", 10);
    add_task(2, "日志清理", 15);
    add_task(3, "状态检查", 5);
    add_task(4, "性能监控", 3);
    
    printf("已添加 %d 个任务\n", task_count);
    
    // 启动所有任务定时器
    for (int i = 0; i < task_count; i++) {
        if (start_task_timer(i) == -1) {
            fprintf(stderr, "启动任务定时器 %d 失败\n", i);
        }
    }
    
    // 启动心跳定时器
    if (start_heartbeat_timer() == -1) {
        fprintf(stderr, "启动心跳定时器失败\n");
    }
    
    printf("\n定时器系统运行中...\n");
    printf("程序将运行60秒,按Ctrl+C退出\n\n");
    
    // 运行主循环
    for (int i = 0; i < 60; i++) {
        sleep(1);
        
        // 每10秒显示一次统计信息
        if ((i + 1) % 10 == 0) {
            printf("[%ld] === 运行统计 ===\n", time(NULL));
            pthread_mutex_lock(&task_mutex);
            for (int j = 0; j < task_count; j++) {
                printf("  %s: 执行 %d 次\n", 
                       tasks[j].name, tasks[j].run_count);
            }
            pthread_mutex_unlock(&task_mutex);
            printf("==================\n\n");
        }
    }
    
    // 清理定时器
    printf("清理定时器...\n");
    
    // 停止并删除心跳定时器
    struct itimerspec stop_its = {{0, 0}, {0, 0}};
    timer_settime(heartbeat_timer, 0, &stop_its, NULL);
    timer_delete(heartbeat_timer);
    
    // 停止并删除所有任务定时器
    for (int i = 0; i < task_count; i++) {
        timer_settime(task_timers[i], 0, &stop_its, NULL);
        timer_delete(task_timers[i]);
    }
    
    printf("定时器系统已停止\n");
    
    // 显示最终统计
    printf("\n=== 最终统计 ===\n");
    pthread_mutex_lock(&task_mutex);
    for (int i = 0; i < task_count; i++) {
        printf("%s: 执行 %d 次\n", tasks[i].name, tasks[i].run_count);
    }
    pthread_mutex_unlock(&task_mutex);
    
    return 0;
}

编译和运行

# 编译示例1
gcc -o timer_example1 timer_example1.c -lrt
./timer_example1

# 编译示例2
gcc -o timer_example2 timer_example2.c -lrt
./timer_example2

# 编译示例3
gcc -o timer_example3 timer_example3.c -lrt
./timer_example3

# 编译示例4
gcc -o timer_example4 timer_example4.c -lrt -lpthread
./timer_example4

重要注意事项

  1. 链接库: 使用timer_*函数需要链接实时库(-lrt)
  2. 信号处理: 定时器通常通过信号通知,需要注意信号安全
  3. 精度限制: 实际精度受系统调度和负载影响
  4. 资源管理: 必须正确删除定时器以避免资源泄漏
  5. 线程安全: 在多线程环境中使用时需要适当的同步
  6. 错误处理: 所有timer_*函数都可能失败,必须检查返回值
  7. 时钟选择: 不同时钟源适用于不同的应用场景

通过这些示例,你可以理解timer_*函数在定时控制方面的强大功能,它们为Linux应用程序提供了精确、灵活的时间管理能力。

发表在 linux文章 | 留下评论

shmget等共享内存系统调用及示例

这次我们介绍 shmgetshmatshmdt, 和 shmctl 这一组函数,它们构成了 System V 共享内存 (System V Shared Memory) IPC(进程间通信)机制的核心部分。

注意: 虽然 System V IPC 是历史悠久且广泛支持的标准,但在现代 Linux 编程中,POSIX 共享内存 (shm_openmmap) 和 POSIX 消息队列 通常被认为是更现代、更可移植的选择。不过,理解 System V IPC 仍然很重要,因为它在许多遗留系统和特定场景中仍在使用。


1. 函数介绍

这四个函数共同工作,用于创建、访问、连接、分离和控制 System V 共享内存段

  • shmget (Shared Memory Get): 创建一个新的共享内存段,或者获取一个已存在的共享内存段的标识符 (ID)。这个 ID 是后续操作该共享内存段的关键。
  • shmat (Shared Memory Attach): 将一个由 shmget 获取的共享内存段连接(或附加)到调用进程的虚拟地址空间中。连接成功后,进程就可以像访问普通内存一样访问这块共享内存。
  • shmdt (Shared Memory Detach): 将一个 previously attached 的共享内存段从调用进程的地址空间中分离(或去附加)。分离后,进程不能再通过之前返回的地址访问该共享内存段。
  • shmctl (Shared Memory Control): 对共享内存段执行控制操作,如获取其状态信息 (IPC_STAT)、设置其权限 (IPC_SET) 或销毁 (IPC_RMID) 该共享内存段。

你可以把共享内存想象成一个公共的“白板”:

  1. shmget: 申请或找到一个特定的白板(通过 ID 标识)。
  2. shmat: 把这个白板挂到你(进程)的墙上,这样你就能在上面写字或看别人写的字了。
  3. shmdt: 把白板从你墙上取下来,你不能再访问它了(但白板本身还在,别人可能还在用)。
  4. shmctl: 检查白板的状态(谁在用,什么时候创建的),修改谁能用它,或者直接把白板撕掉(销毁)。

2. 函数原型

#include <sys/types.h>  // 通常需要
#include <sys/ipc.h>    // 必需,包含 IPC_* 常量
#include <sys/shm.h>    // 必需,包含 shm* 函数和 shmid_ds 结构

// 获取共享内存段标识符
int shmget(key_t key, size_t size, int shmflg);

// 连接共享内存段到进程地址空间
void *shmat(int shmid, const void *shmaddr, int shmflg);

// 从进程地址空间分离共享内存段
int shmdt(const void *shmaddr);

// 控制共享内存段
int shmctl(int shmid, int cmd, struct shmid_ds *buf);

3. 功能

  • shmget: 根据一个键 (key) 创建或获取一个共享内存段,并返回其唯一标识符 (shmid)
  • shmat: 将由 shmid 标识的共享内存段映射到调用进程的虚拟内存中,并返回映射后的虚拟地址
  • shmdt: 将由 shmaddr 指定的共享内存段从调用进程的地址空间中断开连接。
  • shmctl: 根据 cmd 命令对由 shmid 标识的共享内存段执行各种控制操作。

4. 参数详解

shmget

  • key_t key: 一个键值,用于标识一个全局唯一的共享内存段。
    • 特殊键IPC_PRIVATE (通常定义为 0) 是一个特殊键,它总是创建一个新的、唯一的共享内存段。
    • 生成键: 通常使用 ftok 函数根据一个路径名和一个项目 ID 来生成一个唯一的 key_t 值。key_t ftok(const char *pathname, int proj_id);
  • size_t size: 请求的共享内存段的大小(以字节为单位)。
    • 如果是创建新段(IPC_CREAT 被设置且该键尚不存在),则 size 指定新段的大小。
    • 如果是获取已存在的段,则 size 可以为 0,或者必须小于或等于已存在段的大小。
  • int shmflg: 指定创建标志和权限。
    • 创建标志:
      • IPC_CREAT: 如果指定的 key 不存在,则创建一个新的共享内存段。
      • IPC_EXCL: 与 IPC_CREAT 一起使用时,如果 key 已经存在,则 shmget 调用失败。这可以用来确保创建的是一个全新的段。
    • 权限: 低 9 位用于指定访问权限,格式与文件权限相同(例如 0666 表示所有者、组、其他用户都可读写)。实际权限还会受到进程 umask 的影响。

shmat

  • int shmid: 由 shmget 返回的共享内存段标识符。
  • const void *shmaddr: 指定共享内存段应连接到进程地址空间的期望地址
    • NULL (推荐): 让内核选择一个合适的地址。这是最常用也是最安全的方式。
    • 非 NULL: 指定一个具体地址。这需要非常小心,因为可能导致地址冲突或对齐问题。通常需要设置 shmflg 中的 SHM_RND 标志来指示地址可以被调整。
  • int shmflg: 控制连接行为的标志。
    • SHM_RND: 如果 shmaddr 非 NULL,则将连接地址向下舍入到 SHMLBA(共享内存低端边界)的整数倍。
    • SHM_RDONLY: 将共享内存段连接为只读。如果未设置,则连接为可读可写。

shmdt

  • const void *shmaddr: 由之前成功的 shmat 调用返回的连接地址

shmctl

  • int shmid: 由 shmget 返回的共享内存段标识符。
  • int cmd: 指定要执行的控制命令。
    • IPC_STAT: 将共享内存段的当前状态信息复制到 buf 指向的 struct shmid_ds 结构中。
    • IPC_SET: 根据 buf 指向的 struct shmid_ds 结构中的 shm_perm 成员来设置共享内存段的权限所有者
    • IPC_RMID立即销毁共享内存段。只有当所有进程都已将其分离(shmdt)后,内存才会真正被释放。如果仍有进程 attached,销毁操作会被标记,待所有进程 detach 后才执行。
  • struct shmid_ds *buf: 一个指向 struct shmid_ds 结构的指针,用于传递或接收共享内存段的状态信息。
    struct shmid_ds 包含了许多关于共享内存段的元数据,例如:struct shmid_ds { struct ipc_perm shm_perm; // 操作权限 size_t shm_segsz; // 段大小 (字节) time_t shm_atime; // 最后 attach 时间 time_t shm_dtime; // 最后 detach 时间 time_t shm_ctime; // 最后 change 时间 pid_t shm_cpid; // 创建者 PID pid_t shm_lpid; // 最后操作者 PID shmatt_t shm_nattch; // 当前连接的进程数 // ... 可能还有其他字段 ... };

5. 返回值

  • shmget:
    • 成功: 返回一个正整数,即共享内存段的标识符 (shmid)。
    • 失败: 返回 -1,并设置 errno
  • shmat:
    • 成功: 返回共享内存段连接到进程地址空间的虚拟地址
    • 失败: 返回 (void *) -1 (即 MAP_FAILED,与 mmap 相同),并设置 errno
  • shmdt:
    • 成功: 返回 0。
    • 失败: 返回 -1,并设置 errno
  • shmctl:
    • 成功: 对于 IPC_RMIDIPC_SET 返回 0;对于 IPC_STAT 返回 0 并填充 buf
    • 失败: 返回 -1,并设置 errno

6. 相似函数,或关联函数

  • POSIX 共享内存shm_openshm_unlinkmmapmunmap。这是更现代、更推荐的共享内存方式。
  • System V 消息队列msggetmsgsndmsgrcvmsgctl
  • System V 信号量semgetsemopsemctl
  • ftok: 用于生成 shmget 所需的 key_t 键值。
  • mmap / munmap: 另一种实现共享内存的方式(通过映射同一文件或使用 MAP_SHARED)。

7. 示例代码

示例 1:父子进程通过 System V 共享内存通信

这个经典的例子演示了如何使用 System V 共享内存在父子进程之间传递数据。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define SHM_SIZE 1024 // 共享内存段大小

int main() {
    key_t key;
    int shmid;
    char *data;
    pid_t pid;

    // 1. 生成一个唯一的 key (使用 ftok)
    // 注意:确保 "/tmp" 存在且可访问
    key = ftok("/tmp", 'R'); // 'R' 是项目 ID
    if (key == -1) {
        perror("ftok");
        exit(EXIT_FAILURE);
    }
    printf("Generated key: %d\n", (int)key);

    // 2. 创建共享内存段 (如果不存在则创建)
    shmid = shmget(key, SHM_SIZE, 0666 | IPC_CREAT);
    if (shmid == -1) {
        perror("shmget");
        exit(EXIT_FAILURE);
    }
    printf("Shared memory segment created/retrieved with ID: %d\n", shmid);

    // 3. fork 创建子进程
    pid = fork();
    if (pid == -1) {
        perror("fork");
        // 尝试清理已创建的共享内存
        shmctl(shmid, IPC_RMID, NULL);
        exit(EXIT_FAILURE);
    }

    if (pid == 0) {
        // --- 子进程 ---
        printf("Child process (PID: %d) started.\n", getpid());

        // 4a. 子进程连接共享内存
        data = (char *)shmat(shmid, (void *)0, 0);
        if (data == (char *)(-1)) {
            perror("shmat in child");
            _exit(EXIT_FAILURE);
        }
        printf("Child: Shared memory attached at address: %p\n", (void *)data);

        // 5a. 子进程读取数据
        printf("Child: Reading from shared memory: %s\n", data);

        // 6a. 子进程修改数据
        strncpy(data, "Hello from CHILD process!", SHM_SIZE - 1);
        data[SHM_SIZE - 1] = '\0'; // 确保字符串结束
        printf("Child: Written to shared memory.\n");

        // 7a. 子进程分离共享内存
        if (shmdt(data) == -1) {
            perror("shmdt in child");
            _exit(EXIT_FAILURE);
        }
        printf("Child: Shared memory detached.\n");

        _exit(EXIT_SUCCESS);

    } else {
        // --- 父进程 ---
        printf("Parent process (PID: %d) started.\n", getpid());

        // 4b. 父进程连接共享内存
        data = (char *)shmat(shmid, (void *)0, 0);
        if (data == (char *)(-1)) {
            perror("shmat in parent");
            // 清理
            shmctl(shmid, IPC_RMID, NULL);
            exit(EXIT_FAILURE);
        }
        printf("Parent: Shared memory attached at address: %p\n", (void *)data);

        // 5b. 父进程写入初始数据
        strncpy(data, "Hello from PARENT process!", SHM_SIZE - 1);
        data[SHM_SIZE - 1] = '\0';
        printf("Parent: Written initial data to shared memory.\n");

        // 等待子进程完成
        int status;
        waitpid(pid, &status, 0);
        if (WIFEXITED(status)) {
            printf("Parent: Child exited with status %d.\n", WEXITSTATUS(status));
        } else {
            printf("Parent: Child did not exit normally.\n");
        }

        // 6b. 父进程读取子进程修改后的数据
        printf("Parent: Reading modified data from shared memory: %s\n", data);

        // 7b. 父进程分离共享内存
        if (shmdt(data) == -1) {
            perror("shmdt in parent");
            // 仍然尝试清理
        }
        printf("Parent: Shared memory detached.\n");

        // 8. 父进程销毁共享内存段
        // 只有当所有进程都 detach 后,IPC_RMID 才会真正释放内存
        if (shmctl(shmid, IPC_RMID, NULL) == -1) {
            perror("shmctl IPC_RMID");
            exit(EXIT_FAILURE);
        }
        printf("Parent: Shared memory segment destroyed.\n");
    }

    return 0;
}

代码解释:

  1. 使用 ftok("/tmp", 'R') 生成一个唯一的 key_t 键。/tmp 是一个通常存在的目录,'R' 是项目 ID(0-255)。
  2. 调用 shmget(key, SHM_SIZE, 0666 | IPC_CREAT) 创建或获取共享内存段。0666 设置了读写权限。
  3. 调用 fork() 创建子进程。
  4. 父子进程:
    • 都调用 shmat(shmid, NULL, 0) 将共享内存段连接到自己的地址空间。NULL 让内核选择地址。
    • 检查 shmat 的返回值是否为 (char *)-1
  5. 父进程:
    • 先向共享内存写入初始数据。
    • 调用 waitpid 等待子进程结束。
    • 子进程结束后,读取子进程写入的数据。
    • 调用 shmdt 分离共享内存。
    • 调用 shmctl(shmid, IPC_RMID, NULL) 销毁共享内存段。因为此时子进程已经 detach,所以内存会被立即释放。
  6. 子进程:
    • 读取父进程写入的初始数据。
    • 向共享内存写入自己的数据。
    • 调用 shmdt 分离共享内存。
    • 使用 _exit 退出。

示例 2:检查共享内存段状态

这个例子演示了如何使用 shmctl 的 IPC_STAT 命令来获取共享内存段的详细信息。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

int main() {
    key_t key;
    int shmid;
    struct shmid_ds shmid_struct;

    // 1. 生成 key
    key = ftok(".", 'S'); // 使用当前目录
    if (key == -1) {
        perror("ftok");
        exit(EXIT_FAILURE);
    }

    // 2. 创建共享内存段
    shmid = shmget(key, 2048, 0666 | IPC_CREAT);
    if (shmid == -1) {
        perror("shmget");
        exit(EXIT_FAILURE);
    }
    printf("Shared memory segment created with ID: %d\n", shmid);

    // 3. 获取并打印共享内存段状态
    if (shmctl(shmid, IPC_STAT, &shmid_struct) == -1) {
        perror("shmctl IPC_STAT");
        shmctl(shmid, IPC_RMID, NULL); // 清理
        exit(EXIT_FAILURE);
    }

    printf("\n--- Shared Memory Segment Status ---\n");
    printf("Key: %d\n", (int)shmid_struct.shm_perm.__key); // 注意:成员名可能因系统而异
    printf("ID: %d\n", shmid);
    printf("Size: %zu bytes\n", shmid_struct.shm_segsz);
    printf("Creator UID: %d\n", shmid_struct.shm_perm.uid);
    printf("Creator GID: %d\n", shmid_struct.shm_perm.gid);
    printf("Permissions: %o\n", shmid_struct.shm_perm.mode & 0777);
    printf("Current number of attached processes: %lu\n", (unsigned long)shmid_struct.shm_nattch);
    // 注意:时间字段可能需要 #define _GNU_SOURCE 和正确的包含
    // printf("Last attach time: %s", ctime(&shmid_struct.shm_atime));
    // printf("Last detach time: %s", ctime(&shmid_struct.shm_dtime));
    // printf("Last change time: %s", ctime(&shmid_struct.shm_ctime));
    printf("Creator PID: %d\n", shmid_struct.shm_cpid);
    printf("Last operator PID: %d\n", shmid_struct.shm_lpid);
    printf("------------------------------------\n");

    // 4. 简单使用共享内存 (连接、写入、分离)
    char *data = (char *)shmat(shmid, NULL, 0);
    if (data != (char *)-1) {
        snprintf(data, 100, "Data written by process %d", getpid());
        printf("Written to shared memory: %s\n", data);
        shmdt(data);
    } else {
        perror("shmat for usage");
    }

    // 5. 再次检查状态 (连接数应该变为 1 然后又变回 0)
    // 这里简化处理,实际连接和分离是瞬间的
    if (shmctl(shmid, IPC_STAT, &shmid_struct) == -1) {
        perror("shmctl IPC_STAT 2");
    } else {
        printf("Current number of attached processes (after usage): %lu\n", (unsigned long)shmid_struct.shm_nattch);
    }

    // 6. 销毁共享内存段
    if (shmctl(shmid, IPC_RMID, NULL) == -1) {
        perror("shmctl IPC_RMID");
        exit(EXIT_FAILURE);
    }
    printf("Shared memory segment destroyed.\n");

    return 0;
}

代码解释:

  1. 使用 ftok 生成键,并用 shmget 创建一个共享内存段。
  2. 定义一个 struct shmid_ds 类型的变量 shmid_struct
  3. 调用 shmctl(shmid, IPC_STAT, &shmid_struct) 获取共享内存段的状态信息,并填充到 shmid_struct 中。
  4. 打印 shmid_struct 中的各种字段,如大小、权限、创建者 UID/GID、连接进程数等。
  5. 简单地连接、使用(写入数据)、分离共享内存段。
  6. 再次调用 shmctl IPC_STAT 查看状态变化(主要是 shm_nattch)。
  7. 最后调用 shmctl(shmid, IPC_RMID, NULL) 销毁共享内存段。

重要提示与注意事项:

  1. 清理: 使用 System V IPC 资源(共享内存、消息队列、信号量)后,务必调用相应的 ctl 函数(如 shmctl)并使用 IPC_RMID 命令进行销毁。否则,这些资源会一直存在于系统中,直到系统重启或手动使用 ipcrm 命令删除。
  2. ftok 的可靠性ftok 生成的键依赖于文件的 inode 和 mtime。如果文件被删除后重新创建,即使路径名相同,生成的键也可能不同。确保用作 ftok 参数的文件是稳定存在的。
  3. 错误处理: 始终检查这些函数的返回值,并进行适当的错误处理。
  4. 权限: 共享内存段的权限模型与文件系统类似,但检查是在 shmgetshmat 等调用时进行的。
  5. 与 mmap 的比较: System V 共享内存是内核管理的 IPC 对象,而通过 mmap 和 MAP_SHARED 实现的共享内存更像是一种内存映射文件的方式。POSIX 共享内存 (shm_open) 则结合了两者的优点,提供了命名的、基于文件描述符的共享内存机制。

总结:

shmgetshmatshmdtshmctl 这一组函数提供了 System V 共享内存 IPC 机制。虽然在现代编程中可能不如 POSIX 共享内存流行,但理解它们对于维护遗留代码和在特定系统环境中工作仍然至关重要。掌握它们的用法和生命周期管理是进行 Linux 进程间通信编程的基础之一。

发表在 linux文章 | 留下评论

Linux 调度器函数详解

1. 概述

sched_* 函数族是 Linux 系统中用于进程调度控制的一系列系统调用。可以把调度器想象成”CPU 时间片的分配管理员”——它决定哪个进程什么时候获得 CPU 时间,就像交通警察决定哪辆车什么时候可以通过路口一样。

这些函数提供了对进程调度策略、优先级、CPU 亲和性等方面的精细控制,是实现高性能、实时应用的重要工具。

2. sched_* 函数列表

2.1 基础调度函数

  • sched_yield: 让出当前 CPU 时间片
  • sched_getscheduler: 获取进程调度策略
  • sched_setscheduler: 设置进程调度策略
  • sched_getparam: 获取进程调度参数
  • sched_setparam: 设置进程调度参数

2.2 CPU 亲和性函数

  • sched_getaffinity: 获取进程 CPU 亲和性
  • sched_setaffinity: 设置进程 CPU 亲和性

2.3 优先级函数

  • sched_get_priority_min: 获取指定策略的最小优先级
  • sched_get_priority_max: 获取指定策略的最大优先级
  • sched_rr_get_interval: 获取轮转调度的时间片间隔

2.4 CPU 信息函数

  • sched_getcpu: 获取当前 CPU 编号

3. 调度策略详解

3.1 调度策略类型

策略说明
SCHED_OTHER0默认分时调度策略(CFS)
SCHED_FIFO1先进先出实时调度策略
SCHED_RR2轮转实时调度策略
SCHED_BATCH3批处理调度策略
SCHED_IDLE5空闲调度策略
SCHED_DEADLINE6截止时间调度策略

3.2 调度策略特点

#include <sched.h>
#include <stdio.h>

void show_scheduling_policies() {
    printf("=== 调度策略特点 ===\n");
    printf("SCHED_OTHER (默认):\n");
    printf("  • 用于普通进程\n");
    printf("  • 完全公平调度器 (CFS)\n");
    printf("  • 动态优先级调整\n");
    printf("  • 适合交互式应用\n\n");
    
    printf("SCHED_FIFO (实时 FIFO):\n");
    printf("  • 实时调度策略\n");
    printf("  • 高优先级进程一直运行直到阻塞或主动让出\n");
    printf("  • 不会时间片轮转\n");
    printf("  • 需要 root 权限\n\n");
    
    printf("SCHED_RR (实时轮转):\n");
    printf("  • 实时调度策略\n");
    printf("  • 时间片轮转调度\n");
    printf("  • 相同优先级进程轮流执行\n");
    printf("  • 需要 root 权限\n\n");
    
    printf("SCHED_BATCH (批处理):\n");
    printf("  • 用于批处理任务\n");
    printf("  • 减少唤醒频率\n");
    printf("  • 适合长时间运行的非交互任务\n\n");
    
    printf("SCHED_IDLE (空闲):\n");
    printf("  • 用于极低优先级任务\n");
    printf("  • 只在系统空闲时运行\n");
    printf("  • 不影响其他进程\n\n");
    
    printf("SCHED_DEADLINE (截止时间):\n");
    printf("  • 基于截止时间的调度\n");
    printf("  • 确保任务按时完成\n");
    printf("  • 需要特殊配置\n");
    printf("  • Linux 3.14+\n\n");
}

4. 函数详细介绍

4.1 sched_yield – 让出 CPU 时间片

函数原型

#include <sched.h>
int sched_yield(void);

功能

让出当前进程的 CPU 时间片,允许其他同优先级的进程运行。

参数

无参数

返回值

  • 成功: 返回 0
  • 失败: 返回 -1(实际上很少失败)

示例代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#include <time.h>
#include <errno.h>

// 消耗 CPU 的函数
void consume_cpu(int seconds) {
    time_t start = time(NULL);
    volatile long sum = 0;
    
    while (time(NULL) - start < seconds) {
        for (long i = 0; i < 1000000; i++) {
            sum += i;
        }
    }
}

int main() {
    printf("=== sched_yield 示例 ===\n\n");
    
    // 获取当前进程 ID 和 CPU 信息
    printf("进程 ID: %d\n", getpid());
    printf("父进程 ID: %d\n", getppid());
    
    // 获取当前 CPU 编号(如果有支持)
    int current_cpu = sched_getcpu();
    if (current_cpu != -1) {
        printf("当前 CPU: %d\n", current_cpu);
    } else {
        printf("无法获取当前 CPU 信息\n");
    }
    
    printf("\n1. 不使用 sched_yield 的 CPU 消耗:\n");
    printf("   开始消耗 CPU 时间...\n");
    
    time_t start_time = time(NULL);
    consume_cpu(3);  // 消耗 3 秒 CPU 时间
    time_t end_time = time(NULL);
    
    printf("   消耗完成,用时 %ld 秒\n", end_time - start_time);
    
    printf("\n2. 使用 sched_yield 的 CPU 消耗:\n");
    printf("   开始消耗 CPU 时间并让出时间片...\n");
    
    start_time = time(NULL);
    time_t yield_start = time(NULL);
    
    while (time(NULL) - yield_start < 3) {
        // 消耗一些 CPU 时间
        volatile long sum = 0;
        for (long i = 0; i < 500000; i++) {
            sum += i;
        }
        
        // 让出 CPU 时间片
        if (sched_yield() == -1) {
            perror("sched_yield 失败");
        }
    }
    
    end_time = time(NULL);
    printf("   消耗完成,用时 %ld 秒\n", end_time - start_time);
    
    printf("\n3. sched_yield 的实际效果:\n");
    printf("   • 允许其他同优先级进程运行\n");
    printf("   • 改善系统响应性\n");
    printf("   • 减少饥饿现象\n");
    printf("   • 适合协作式多任务\n");
    
    printf("\n=== sched_yield 使用场景 ===\n");
    printf("1. 长时间运行的循环\n");
    printf("2. 忙等待循环\n");
    printf("3. 协作式多任务\n");
    printf("4. 实时应用中的主动让出\n");
    printf("5. 负载均衡\n");
    
    printf("\n=== 注意事项 ===\n");
    printf("1. 不保证立即切换到其他进程\n");
    printf("2. 只影响同优先级进程\n");
    printf("3. 过度使用可能影响性能\n");
    printf("4. 不是强制调度切换\n");
    printf("5. 应该谨慎使用\n");
    
    return 0;
}

4.2 sched_getscheduler/sched_setscheduler – 调度策略控制

函数原型

#include <sched.h>

int sched_getscheduler(pid_t pid);
int sched_setscheduler(pid_t pid, int policy, const struct sched_param *param);

sched_param 结构体

struct sched_param {
    int sched_priority;  // 调度优先级
    // 对于 SCHED_DEADLINE,还有额外字段
};

功能

  • sched_getscheduler: 获取指定进程的调度策略
  • sched_setscheduler: 设置指定进程的调度策略和参数

参数

  • pid: 进程 ID(0 表示当前进程)
  • policy: 调度策略
  • param: 指向调度参数的指针

返回值

  • sched_getscheduler: 成功返回调度策略,失败返回 -1
  • sched_setscheduler: 成功返回 0,失败返回 -1

示例代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
#include <string.h>
#include <sys/resource.h>

// 将调度策略转换为字符串
const char* policy_to_string(int policy) {
    switch (policy) {
        case SCHED_OTHER: return "SCHED_OTHER (默认)";
        case SCHED_FIFO:  return "SCHED_FIFO (实时FIFO)";
        case SCHED_RR:    return "SCHED_RR (实时轮转)";
        case SCHED_BATCH: return "SCHED_BATCH (批处理)";
        case SCHED_IDLE:  return "SCHED_IDLE (空闲)";
        case SCHED_DEADLINE: return "SCHED_DEADLINE (截止时间)";
        default:          return "未知策略";
    }
}

// 显示进程调度信息
void show_process_scheduling_info(pid_t pid, const char *description) {
    printf("=== %s ===\n", description);
    
    if (pid == 0) {
        printf("进程: 当前进程 (PID: %d)\n", getpid());
    } else {
        printf("进程: PID %d\n", pid);
    }
    
    // 获取调度策略
    int policy = sched_getscheduler(pid);
    if (policy == -1) {
        perror("获取调度策略失败");
        return;
    }
    
    printf("调度策略: %s\n", policy_to_string(policy));
    
    // 获取调度参数
    struct sched_param param;
    if (sched_getparam(pid, &param) == 0) {
        printf("调度优先级: %d\n", param.sched_priority);
        
        // 显示优先级范围
        int min_priority = sched_get_priority_min(policy);
        int max_priority = sched_get_priority_max(policy);
        
        if (min_priority != -1 && max_priority != -1) {
            printf("优先级范围: %d - %d\n", min_priority, max_priority);
            
            if (param.sched_priority < min_priority || param.sched_priority > max_priority) {
                printf("⚠ 当前优先级超出范围\n");
            }
        }
    } else {
        perror("获取调度参数失败");
    }
    
    // 获取进程优先级
    errno = 0;
    int nice_value = getpriority(PRIO_PROCESS, pid);
    if (errno == 0) {
        printf("Nice 值: %d\n", nice_value);
    }
    
    printf("\n");
}

// 设置调度策略
int set_process_scheduling_policy(pid_t pid, int policy, int priority) {
    struct sched_param param;
    param.sched_priority = priority;
    
    printf("设置进程调度策略:\n");
    printf("  进程 ID: %d\n", pid ? pid : getpid());
    printf("  调度策略: %s\n", policy_to_string(policy));
    printf("  调度优先级: %d\n", priority);
    
    if (sched_setscheduler(pid, policy, &param) == 0) {
        printf("✓ 调度策略设置成功\n");
        return 0;
    } else {
        switch (errno) {
            case EPERM:
                printf("✗ 权限不足: 需要 root 权限设置实时策略\n");
                break;
            case EINVAL:
                printf("✗ 参数无效: 策略或优先级无效\n");
                break;
            case ESRCH:
                printf("✗ 进程不存在\n");
                break;
            default:
                printf("✗ 设置失败: %s\n", strerror(errno));
                break;
        }
        return -1;
    }
}

int main() {
    printf("=== sched_getscheduler/sched_setscheduler 示例 ===\n\n");
    
    // 显示当前用户信息
    printf("用户信息:\n");
    printf("  UID: %d\n", getuid());
    printf("  EUID: %d\n", geteuid());
    printf("  GID: %d\n", getgid());
    printf("  EGID: %d\n", getegid());
    printf("\n");
    
    // 显示初始调度信息
    show_process_scheduling_info(0, "初始调度信息");
    
    // 显示各种调度策略的优先级范围
    printf("=== 各种调度策略的优先级范围 ===\n");
    
    int policies[] = {SCHED_OTHER, SCHED_FIFO, SCHED_RR, SCHED_BATCH, SCHED_IDLE};
    int num_policies = sizeof(policies) / sizeof(policies[0]);
    
    for (int i = 0; i < num_policies; i++) {
        int min_priority = sched_get_priority_min(policies[i]);
        int max_priority = sched_get_priority_max(policies[i]);
        
        printf("%-20s: 最小优先级 %3d, 最大优先级 %3d\n", 
               policy_to_string(policies[i]), min_priority, max_priority);
    }
    printf("\n");
    
    // 演示调度策略设置(需要 root 权限)
    printf("=== 调度策略设置演示 ===\n");
    
    // 1. 尝试设置 SCHED_FIFO(需要 root 权限)
    printf("1. 尝试设置 SCHED_FIFO 策略:\n");
    if (set_process_scheduling_policy(0, SCHED_FIFO, 10) == 0) {
        show_process_scheduling_info(0, "设置 SCHED_FIFO 后");
    } else {
        printf("  说明: SCHED_FIFO 需要 root 权限\n");
    }
    
    // 2. 尝试设置 SCHED_RR(需要 root 权限)
    printf("\n2. 尝试设置 SCHED_RR 策略:\n");
    if (set_process_scheduling_policy(0, SCHED_RR, 15) == 0) {
        show_process_scheduling_info(0, "设置 SCHED_RR 后");
    } else {
        printf("  说明: SCHED_RR 需要 root 权限\n");
    }
    
    // 3. 尝试设置 SCHED_BATCH
    printf("\n3. 尝试设置 SCHED_BATCH 策略:\n");
    if (set_process_scheduling_policy(0, SCHED_BATCH, 0) == 0) {
        show_process_scheduling_info(0, "设置 SCHED_BATCH 后");
    } else {
        printf("  说明: 可能需要适当权限\n");
    }
    
    // 4. 恢复默认策略
    printf("\n4. 恢复默认 SCHED_OTHER 策略:\n");
    struct sched_param default_param = {0};
    if (sched_setscheduler(0, SCHED_OTHER, &default_param) == 0) {
        printf("✓ 成功恢复默认调度策略\n");
        show_process_scheduling_info(0, "恢复默认策略后");
    } else {
        printf("✗ 恢复默认策略失败: %s\n", strerror(errno));
    }
    
    printf("\n=== 调度策略使用建议 ===\n");
    printf("选择原则:\n");
    printf("1. 普通应用: 使用 SCHED_OTHER(默认)\n");
    printf("2. 实时应用: 使用 SCHED_FIFO 或 SCHED_RR\n");
    printf("3. 批处理任务: 使用 SCHED_BATCH\n");
    printf("4. 后台任务: 使用 SCHED_IDLE\n");
    printf("5. 截止时间任务: 使用 SCHED_DEADLINE\n");
    printf("\n");
    
    printf("权限要求:\n");
    printf("1. SCHED_OTHER/SCHED_BATCH/SCHED_IDLE: 普通权限\n");
    printf("2. SCHED_FIFO/SCHED_RR: 需要 root 权限\n");
    printf("3. SCHED_DEADLINE: 需要特殊配置\n");
    printf("\n");
    
    printf("性能影响:\n");
    printf("1. 实时策略: 更高优先级,更低延迟\n");
    printf("2. 批处理策略: 更低唤醒频率,更好吞吐\n");
    printf("3. 空闲策略: 不影响其他进程\n");
    printf("4. 默认策略: 平衡性能和公平性\n");
    
    return 0;
}

4.3 sched_getaffinity/sched_setaffinity – CPU 亲和性控制

函数原型

#define _GNU_SOURCE
#include <sched.h>

int sched_getaffinity(pid_t pid, size_t cpusetsize, cpu_set_t *mask);
int sched_setaffinity(pid_t pid, size_t cpusetsize, const cpu_set_t *mask);

CPU 集合操作宏

void CPU_ZERO(cpu_set_t *set);              // 清空 CPU 集合
void CPU_SET(int cpu, cpu_set_t *set);      // 设置 CPU
void CPU_CLR(int cpu, cpu_set_t *set);      // 清除 CPU
int  CPU_ISSET(int cpu, cpu_set_t *set);    // 检查 CPU 是否设置
int  CPU_COUNT(cpu_set_t *set);            // 计算设置的 CPU 数量

功能

  • sched_getaffinity: 获取进程的 CPU 亲和性掩码
  • sched_setaffinity: 设置进程的 CPU 亲和性掩码

参数

  • pid: 进程 ID(0 表示当前进程)
  • cpusetsize: CPU 集合的大小
  • mask: 指向 CPU 集合的指针

返回值

  • 成功: 返回 0
  • 失败: 返回 -1,并设置相应的 errno

示例代码

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
#include <string.h>

// 显示 CPU 集合信息
void show_cpu_set_info(const cpu_set_t *cpu_set, const char *description) {
    printf("=== %s ===\n", description);
    
    int cpu_count = CPU_COUNT((cpu_set_t*)cpu_set);
    printf("CPU 数量: %d\n", cpu_count);
    
    printf("CPU 列表: ");
    int printed = 0;
    for (int i = 0; i < CPU_SETSIZE; i++) {
        if (CPU_ISSET(i, (cpu_set_t*)cpu_set)) {
            if (printed > 0) printf(", ");
            printf("%d", i);
            printed++;
        }
    }
    if (printed == 0) {
        printf("无");
    }
    printf("\n\n");
}

// 获取系统 CPU 信息
void show_system_cpu_info() {
    printf("=== 系统 CPU 信息 ===\n");
    
    // 获取在线 CPU 数量
    int online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    printf("在线 CPU 数量: %d\n", online_cpus);
    
    // 获取配置的 CPU 数量
    int conf_cpus = sysconf(_SC_NPROCESSORS_CONF);
    printf("配置 CPU 数量: %d\n", conf_cpus);
    
    // 显示当前进程的 CPU 亲和性
    cpu_set_t current_set;
    CPU_ZERO(&current_set);
    
    if (sched_getaffinity(0, sizeof(current_set), &current_set) == 0) {
        show_cpu_set_info(&current_set, "当前进程 CPU 亲和性");
    } else {
        perror("获取 CPU 亲和性失败");
    }
}

// 设置 CPU 亲和性
int set_process_cpu_affinity(pid_t pid, const int *cpu_list, int cpu_count) {
    cpu_set_t cpu_set;
    CPU_ZERO(&cpu_set);
    
    printf("设置 CPU 亲和性:\n");
    printf("  进程 ID: %d\n", pid ? pid : getpid());
    printf("  CPU 列表: ");
    
    for (int i = 0; i < cpu_count; i++) {
        if (i > 0) printf(", ");
        printf("%d", cpu_list[i]);
        CPU_SET(cpu_list[i], &cpu_set);
    }
    printf("\n");
    
    if (sched_setaffinity(pid, sizeof(cpu_set), &cpu_set) == 0) {
        printf("✓ CPU 亲和性设置成功\n");
        return 0;
    } else {
        printf("✗ CPU 亲和性设置失败: %s\n", strerror(errno));
        return -1;
    }
}

// 创建 CPU 绑定的测试线程
void* cpu_bound_worker(void *arg) {
    int worker_id = *(int*)arg;
    
    printf("工作线程 %d 启动\n", worker_id);
    
    // 获取线程的 CPU 信息
    int current_cpu = sched_getcpu();
    if (current_cpu != -1) {
        printf("  工作线程 %d 运行在 CPU %d 上\n", worker_id, current_cpu);
    }
    
    // 执行一些 CPU 密集型任务
    volatile long sum = 0;
    for (long i = 0; i < 10000000; i++) {
        sum += i;
        
        // 偶尔检查 CPU 变化
        if (i % 1000000 == 0) {
            int new_cpu = sched_getcpu();
            if (new_cpu != current_cpu && new_cpu != -1) {
                printf("  工作线程 %d 从 CPU %d 切换到 CPU %d\n", 
                       worker_id, current_cpu, new_cpu);
                current_cpu = new_cpu;
            }
        }
    }
    
    printf("工作线程 %d 完成\n", worker_id);
    return NULL;
}

int main() {
    printf("=== sched_getaffinity/sched_setaffinity 示例 ===\n\n");
    
    // 显示系统信息
    show_system_cpu_info();
    
    // 1. 获取当前进程的 CPU 亲和性
    printf("1. 获取当前进程 CPU 亲和性:\n");
    cpu_set_t initial_set;
    CPU_ZERO(&initial_set);
    
    if (sched_getaffinity(0, sizeof(initial_set), &initial_set) == 0) {
        show_cpu_set_info(&initial_set, "初始 CPU 亲和性");
    } else {
        perror("获取初始 CPU 亲和性失败");
    }
    
    // 2. 设置 CPU 亲和性(绑定到特定 CPU)
    printf("2. 设置 CPU 亲和性:\n");
    
    // 获取系统 CPU 数量
    int online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    printf("系统在线 CPU 数量: %d\n", online_cpus);
    
    if (online_cpus > 1) {
        // 绑定到第一个 CPU
        int single_cpu[] = {0};
        if (set_process_cpu_affinity(0, single_cpu, 1) == 0) {
            cpu_set_t new_set;
            CPU_ZERO(&new_set);
            
            if (sched_getaffinity(0, sizeof(new_set), &new_set) == 0) {
                show_cpu_set_info(&new_set, "设置后 CPU 亲和性");
            }
        }
        
        // 绑定到前两个 CPU
        printf("绑定到前两个 CPU:\n");
        int two_cpus[] = {0, 1};
        if (set_process_cpu_affinity(0, two_cpus, 2) == 0) {
            cpu_set_t new_set;
            CPU_ZERO(&new_set);
            
            if (sched_getaffinity(0, sizeof(new_set), &new_set) == 0) {
                show_cpu_set_info(&new_set, "绑定到前两个 CPU 后");
            }
        }
    } else {
        printf("系统只有一个 CPU,跳过 CPU 绑定测试\n");
    }
    
    // 3. 恢复初始 CPU 亲和性
    printf("3. 恢复初始 CPU 亲和性:\n");
    if (sched_setaffinity(0, sizeof(initial_set), &initial_set) == 0) {
        printf("✓ 成功恢复初始 CPU 亲和性\n");
        
        cpu_set_t restored_set;
        CPU_ZERO(&restored_set);
        
        if (sched_getaffinity(0, sizeof(restored_set), &restored_set) == 0) {
            show_cpu_set_info(&restored_set, "恢复后 CPU 亲和性");
        }
    } else {
        printf("✗ 恢复初始 CPU 亲和性失败: %s\n", strerror(errno));
    }
    
    // 4. 显示 CPU 亲和性的好处
    printf("=== CPU 亲和性的好处 ===\n");
    printf("性能优化:\n");
    printf("1. 减少 CPU 缓存失效\n");
    printf("2. 提高缓存命中率\n");
    printf("3. 降低上下文切换开销\n");
    printf("4. 改善 NUMA 访问模式\n");
    printf("5. 提高多核应用性能\n");
    printf("\n");
    
    printf("应用场景:\n");
    printf("1. 高性能计算应用\n");
    printf("2. 实时系统\n");
    printf("3. 数据库服务器\n");
    printf("4. 游戏服务器\n");
    printf("5. 科学计算\n");
    printf("6. 音视频处理\n");
    printf("\n");
    
    printf("注意事项:\n");
    printf("1. 过度限制可能影响负载均衡\n");
    printf("2. NUMA 架构需要考虑内存亲和性\n");
    printf("3. 应该根据应用特点合理设置\n");
    printf("4. 避免与其他进程争抢 CPU\n");
    printf("5. 监控系统整体性能\n");
    
    return 0;
}

4.4 sched_get_priority_min/sched_get_priority_max – 优先级范围查询

函数原型

#include <sched.h>

int sched_get_priority_min(int policy);
int sched_get_priority_max(int policy);

功能

获取指定调度策略的最小和最大优先级值。

参数

  • policy: 调度策略

返回值

  • 成功: 返回对应的优先级值
  • 失败: 返回 -1,并设置相应的 errno

示例代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
#include <string.h>

// 显示调度策略优先级范围
void show_policy_priority_range(int policy, const char *policy_name) {
    int min_priority = sched_get_priority_min(policy);
    int max_priority = sched_get_priority_max(policy);
    
    printf("%-20s: ", policy_name);
    
    if (min_priority == -1 || max_priority == -1) {
        printf("不支持 (%s)\n", strerror(errno));
    } else {
        printf("最小优先级 %3d, 最大优先级 %3d", min_priority, max_priority);
        
        if (min_priority == max_priority) {
            printf(" (固定优先级)");
        } else if (min_priority < max_priority) {
            printf(" (动态优先级范围)");
        }
        printf("\n");
    }
}

// 显示所有调度策略的优先级范围
void show_all_policy_ranges() {
    printf("=== 所有调度策略优先级范围 ===\n");
    
    struct {
        int policy;
        const char *name;
    } policies[] = {
        {SCHED_OTHER, "SCHED_OTHER"},
        {SCHED_FIFO, "SCHED_FIFO"},
        {SCHED_RR, "SCHED_RR"},
        {SCHED_BATCH, "SCHED_BATCH"},
        {SCHED_IDLE, "SCHED_IDLE"},
        {SCHED_DEADLINE, "SCHED_DEADLINE"},
        {-1, NULL}
    };
    
    for (int i = 0; policies[i].name; i++) {
        show_policy_priority_range(policies[i].policy, policies[i].name);
    }
    
    printf("\n");
}

// 优先级验证函数
int validate_priority_for_policy(int policy, int priority) {
    int min_priority = sched_get_priority_min(policy);
    int max_priority = sched_get_priority_max(policy);
    
    if (min_priority == -1 || max_priority == -1) {
        return -1;  // 策略不支持
    }
    
    if (priority >= min_priority && priority <= max_priority) {
        return 0;  // 优先级有效
    } else {
        return 1;  // 优先级无效
    }
}

// 显示优先级验证结果
void show_priority_validation(int policy, int priority) {
    const char *policy_name = NULL;
    switch (policy) {
        case SCHED_OTHER: policy_name = "SCHED_OTHER"; break;
        case SCHED_FIFO: policy_name = "SCHED_FIFO"; break;
        case SCHED_RR: policy_name = "SCHED_RR"; break;
        case SCHED_BATCH: policy_name = "SCHED_BATCH"; break;
        case SCHED_IDLE: policy_name = "SCHED_IDLE"; break;
        case SCHED_DEADLINE: policy_name = "SCHED_DEADLINE"; break;
        default: policy_name = "未知策略"; break;
    }
    
    printf("验证 %s 策略优先级 %d: ", policy_name, priority);
    
    int result = validate_priority_for_policy(policy, priority);
    switch (result) {
        case -1:
            printf("策略不支持\n");
            break;
        case 0:
            printf("✓ 有效\n");
            break;
        case 1:
            printf("✗ 无效\n");
            break;
    }
}

int main() {
    printf("=== sched_get_priority_min/max 示例 ===\n\n");
    
    // 显示所有策略的优先级范围
    show_all_policy_ranges();
    
    // 显示当前进程信息
    printf("当前进程信息:\n");
    printf("  进程 ID: %d\n", getpid());
    printf("  父进程 ID: %d\n", getppid());
    printf("  用户 ID: %d\n", getuid());
    printf("  组 ID: %d\n", getgid());
    
    // 获取当前进程调度策略
    int current_policy = sched_getscheduler(0);
    if (current_policy != -1) {
        printf("  当前调度策略: %d ", current_policy);
        switch (current_policy) {
            case SCHED_OTHER: printf("(SCHED_OTHER)\n"); break;
            case SCHED_FIFO: printf("(SCHED_FIFO)\n"); break;
            case SCHED_RR: printf("(SCHED_RR)\n"); break;
            case SCHED_BATCH: printf("(SCHED_BATCH)\n"); break;
            case SCHED_IDLE: printf("(SCHED_IDLE)\n"); break;
            case SCHED_DEADLINE: printf("(SCHED_DEADLINE)\n"); break;
            default: printf("(未知)\n"); break;
        }
        
        // 获取当前进程优先级
        struct sched_param current_param;
        if (sched_getparam(0, &current_param) == 0) {
            printf("  当前优先级: %d\n", current_param.sched_priority);
            
            // 验证当前优先级
            show_priority_validation(current_policy, current_param.sched_priority);
        }
    } else {
        perror("获取当前调度策略失败");
    }
    
    printf("\n");
    
    // 验证不同策略的优先级
    printf("=== 优先级验证示例 ===\n");
    
    // SCHED_OTHER 策略
    printf("SCHED_OTHER 策略:\n");
    show_priority_validation(SCHED_OTHER, 0);      // 有效
    show_priority_validation(SCHED_OTHER, -1);     // 无效
    show_priority_validation(SCHED_OTHER, 10);      // 无效
    
    // SCHED_FIFO 策略
    printf("\nSCHED_FIFO 策略:\n");
    show_priority_validation(SCHED_FIFO, 10);       // 可能有效
    show_priority_validation(SCHED_FIFO, 50);      // 可能有效
    show_priority_validation(SCHED_FIFO, 99);       // 可能有效
    show_priority_validation(SCHED_FIFO, 100);      // 可能无效
    
    // SCHED_RR 策略
    printf("\nSCHED_RR 策略:\n");
    show_priority_validation(SCHED_RR, 10);        // 可能有效
    show_priority_validation(SCHED_RR, 50);         // 可能有效
    show_priority_validation(SCHED_RR, 99);         // 可能有效
    show_priority_validation(SCHED_RR, 100);        // 可能无效
    
    // SCHED_BATCH 策略
    printf("\nSCHED_BATCH 策略:\n");
    show_priority_validation(SCHED_BATCH, 0);      // 有效
    show_priority_validation(SCHED_BATCH, -1);     // 无效
    show_priority_validation(SCHED_BATCH, 10);      // 无效
    
    // SCHED_IDLE 策略
    printf("\nSCHED_IDLE 策略:\n");
    show_priority_validation(SCHED_IDLE, 0);       // 有效
    show_priority_validation(SCHED_IDLE, -1);      // 无效
    show_priority_validation(SCHED_IDLE, 10);      // 无效
    
    printf("\n=== 优先级使用建议 ===\n");
    printf("优先级设置原则:\n");
    printf("1. 了解策略的优先级范围\n");
    printf("2. 合理分配优先级值\n");
    printf("3. 避免优先级反转\n");
    printf("4. 考虑系统整体平衡\n");
    printf("5. 验证优先级的有效性\n");
    printf("\n");
    
    printf("实时策略优先级:\n");
    printf("1. SCHED_FIFO/SCHED_RR: 1-99 (通常)\n");
    printf("2. 数值越大优先级越高\n");
    printf("3. 需要 root 权限\n");
    printf("4. 谨慎使用高优先级\n");
    printf("5. 避免独占 CPU\n");
    printf("\n");
    
    printf("普通策略优先级:\n");
    printf("1. SCHED_OTHER: 通常为 0\n");
    printf("2. 通过 nice 值调整\n");
    printf("3. 使用动态优先级\n");
    printf("4. 平衡系统负载\n");
    printf("5. 适合交互式应用\n");
    
    return 0;
}

4.5 sched_rr_get_interval – 轮转调度时间片

函数原型

#include <sched.h>

int sched_rr_get_interval(pid_t pid, struct timespec *tp);

功能

获取 SCHED_RR 策略的时间片长度。

参数

  • pid: 进程 ID(0 表示当前进程)
  • tp: 指向 timespec 结构体的指针,用于存储时间片长度

返回值

  • 成功: 返回 0
  • 失败: 返回 -1,并设置相应的 errno

示例代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
#include <string.h>
#include <time.h>

// 显示时间片信息
void show_timeslice_info(const struct timespec *interval, const char *description) {
    printf("=== %s ===\n", description);
    
    if (interval->tv_sec == 0 && interval->tv_nsec == 0) {
        printf("时间片长度: 未设置\n");
    } else {
        printf("时间片长度: %ld.%09ld 秒\n", 
               (long)interval->tv_sec, (long)interval->tv_nsec);
        printf("时间片长度: %.3f 毫秒\n", 
               (double)interval->tv_sec * 1000 + (double)interval->tv_nsec / 1000000);
        printf("时间片长度: %.3f 微秒\n", 
               (double)interval->tv_sec * 1000000 + (double)interval->tv_nsec / 1000);
        printf("时间片长度: %ld 纳秒\n", 
               (long)interval->tv_sec * 1000000000 + (long)interval->tv_nsec);
    }
    
    printf("\n");
}

// 获取 RR 调度时间片
int get_rr_timeslice(pid_t pid) {
    struct timespec interval;
    
    printf("获取 SCHED_RR 时间片长度:\n");
    printf("  进程 ID: %d\n", pid ? pid : getpid());
    
    if (sched_rr_get_interval(pid, &interval) == 0) {
        show_timeslice_info(&interval, "RR 调度时间片");
        return 0;
    } else {
        switch (errno) {
            case EINVAL:
                printf("✗ 进程不是 SCHED_RR 策略\n");
                break;
            case ESRCH:
                printf("✗ 进程不存在\n");
                break;
            default:
                printf("✗ 获取时间片失败: %s\n", strerror(errno));
                break;
        }
        return -1;
    }
}

// 比较不同策略的时间片
void compare_policy_intervals() {
    printf("=== 不同策略时间片比较 ===\n");
    
    // 获取当前进程时间片(假设是 SCHED_OTHER)
    struct timespec current_interval;
    if (sched_rr_get_interval(0, &current_interval) == 0) {
        printf("当前进程时间片: %ld.%09ld 秒\n", 
               (long)current_interval.tv_sec, (long)current_interval.tv_nsec);
    } else {
        printf("当前进程时间片: 无法获取 (%s)\n", strerror(errno));
    }
    
    // 显示系统时间片配置
    printf("\n系统时间片配置:\n");
    
    // 读取内核参数
    FILE *fp = fopen("/proc/sys/kernel/sched_rr_timeslice_ms", "r");
    if (fp) {
        char buffer[64];
        if (fgets(buffer, sizeof(buffer), fp)) {
            printf("  RR 时间片 (ms): %s", buffer);
        }
        fclose(fp);
    } else {
        printf("  RR 时间片: 无法读取系统配置\n");
    }
    
    // 显示其他相关配置
    printf("其他相关配置:\n");
    system("cat /proc/sys/kernel/sched_child_runs_first 2>/dev/null || echo '  无法读取 sched_child_runs_first'");
    system("cat /proc/sys/kernel/sched_autogroup_enabled 2>/dev/null || echo '  无法读取 sched_autogroup_enabled'");
    
    printf("\n");
}

int main() {
    printf("=== sched_rr_get_interval 示例 ===\n\n");
    
    // 显示系统信息
    printf("系统信息:\n");
    printf("  内核版本: ");
    system("uname -r | tr -d '\\n'");
    printf("\n");
    printf("  CPU 架构: ");
    system("uname -m | tr -d '\\n'");
    printf("\n");
    printf("  进程 ID: %d\n", getpid());
    printf("  父进程 ID: %d\n", getppid());
    printf("\n");
    
    // 1. 获取当前进程时间片
    printf("1. 获取当前进程时间片:\n");
    get_rr_timeslice(0);
    
    // 2. 尝试获取不存在进程的时间片
    printf("2. 尝试获取不存在进程的时间片:\n");
    if (sched_rr_get_interval(999999, &(struct timespec){0}) == -1) {
        if (errno == ESRCH) {
            printf("✓ 正确处理不存在进程: ESRCH\n");
        } else {
            printf("✗ 意外错误: %s\n", strerror(errno));
        }
    }
    
    // 3. 尝试获取非 RR 策略进程的时间片
    printf("3. 尝试获取非 RR 策略进程的时间片:\n");
    if (sched_rr_get_interval(0, &(struct timespec){0}) == -1) {
        if (errno == EINVAL) {
            printf("✓ 正确处理非 RR 策略进程: EINVAL\n");
            printf("  说明: 当前进程使用默认 SCHED_OTHER 策略\n");
        } else {
            printf("✗ 其他错误: %s\n", strerror(errno));
        }
    }
    
    // 4. 显示系统时间片配置
    compare_policy_intervals();
    
    // 5. 显示时间片对性能的影响
    printf("=== 时间片对性能的影响 ===\n");
    printf("时间片长度的影响:\n");
    printf("1. 较短时间片:\n");
    printf("   • 更好的响应性\n");
    printf("   • 更多上下文切换\n");
    printf("   • 更低吞吐量\n");
    printf("   • 更高延迟\n");
    printf("\n");
    
    printf("2. 较长时间片:\n");
    printf("   • 更低响应性\n");
    printf("   • 更少上下文切换\n");
    printf("   • 更高吞吐量\n");
    printf("   • 更低延迟\n");
    printf("\n");
    
    printf("3. 默认时间片:\n");
    printf("   • 平衡响应性和吞吐量\n");
    printf("   • 通常为 10-100ms\n");
    printf("   • 适应大多数应用\n");
    printf("   • 可配置调整\n");
    printf("\n");
    
    printf("应用场景:\n");
    printf("1. 实时系统: 需要短时间片保证响应性\n");
    printf("2. 批处理系统: 需要长时间片提高吞吐量\n");
    printf("3. 交互式应用: 需要适中时间片平衡各方面\n");
    printf("4. 服务器应用: 根据负载动态调整\n");
    printf("5. 嵌入式系统: 根据硬件特性优化\n");
    printf("\n");
    
    printf("配置建议:\n");
    printf("1. 监控上下文切换频率\n");
    printf("2. 调整时间片适应应用特点\n");
    printf("3. 考虑系统整体负载\n");
    printf("4. 测试不同配置的性能\n");
    printf("5. 避免极端配置影响系统稳定性\n");
    
    return 0;
}

4.6 sched_getcpu – 获取当前 CPU 编号

函数原型

#define _GNU_SOURCE
#include <sched.h>

int sched_getcpu(void);

功能

获取调用线程当前运行的 CPU 编号。

参数

无参数

返回值

  • 成功: 返回 CPU 编号(从 0 开始)
  • 失败: 返回 -1,并设置相应的 errno

示例代码

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/syscall.h>

// 获取 CPU 信息的辅助函数
void show_cpu_info() {
    printf("=== CPU 信息 ===\n");
    
    // 获取当前 CPU 编号
    int current_cpu = sched_getcpu();
    if (current_cpu != -1) {
        printf("当前 CPU 编号: %d\n", current_cpu);
    } else {
        printf("无法获取当前 CPU 编号: %s\n", strerror(errno));
    }
    
    // 获取系统 CPU 数量
    long online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    long conf_cpus = sysconf(_SC_NPROCESSORS_CONF);
    
    printf("在线 CPU 数量: %ld\n", online_cpus);
    printf("配置 CPU 数量: %ld\n", conf_cpus);
    
    // 显示 CPU 信息
    printf("CPU 详细信息:\n");
    system("lscpu | head -10 2>/dev/null || echo '无法获取 CPU 详细信息'");
    
    printf("\n");
}

// 线程函数,显示线程的 CPU 信息
void* thread_cpu_info(void* arg) {
    int thread_id = *(int*)arg;
    int initial_cpu, current_cpu;
    
    // 获取初始 CPU
    initial_cpu = sched_getcpu();
    
    printf("线程 %d:\n", thread_id);
    printf("  初始 CPU: %d\n", initial_cpu);
    
    // 执行一些工作并检查 CPU 变化
    volatile long sum = 0;
    for (long i = 0; i < 10000000; i++) {
        sum += i;
        
        // 偶尔检查 CPU 变化
        if (i % 1000000 == 0) {
            current_cpu = sched_getcpu();
            if (current_cpu != initial_cpu && current_cpu != -1) {
                printf("  线程 %d 从 CPU %d 切换到 CPU %d\n", 
                       thread_id, initial_cpu, current_cpu);
                initial_cpu = current_cpu;
            }
        }
    }
    
    // 最终 CPU
    current_cpu = sched_getcpu();
    printf("  最终 CPU: %d\n", current_cpu);
    printf("  线程 %d 完成\n", thread_id);
    
    return NULL;
}

// CPU 绑定测试
void test_cpu_binding() {
    printf("=== CPU 绑定测试 ===\n");
    
    // 获取系统 CPU 数量
    long online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    if (online_cpus <= 1) {
        printf("系统只有一个 CPU,跳过绑定测试\n");
        return;
    }
    
    // 创建多个线程
    pthread_t threads[4];
    int thread_ids[4] = {1, 2, 3, 4};
    
    printf("创建 4 个线程进行 CPU 绑定测试:\n");
    
    for (int i = 0; i < 4; i++) {
        if (pthread_create(&threads[i], NULL, thread_cpu_info, &thread_ids[i]) != 0) {
            perror("创建线程失败");
            return;
        }
    }
    
    // 等待所有线程完成
    for (int i = 0; i < 4; i++) {
        pthread_join(threads[i], NULL);
    }
    
    printf("CPU 绑定测试完成\n\n");
}

int main() {
    printf("=== sched_getcpu 示例 ===\n\n");
    
    // 显示系统信息
    show_cpu_info();
    
    // 显示当前进程信息
    printf("进程信息:\n");
    printf("  进程 ID: %d\n", getpid());
    printf("  父进程 ID: %d\n", getppid());
    printf("  线程 ID: %ld\n", syscall(SYS_gettid));
    
    // 获取当前 CPU
    int current_cpu = sched_getcpu();
    if (current_cpu != -1) {
        printf("  当前运行 CPU: %d\n", current_cpu);
    } else {
        printf("  无法获取 CPU 信息: %s\n", strerror(errno));
    }
    
    printf("\n");
    
    // 连续获取 CPU 信息观察变化
    printf("连续获取 CPU 信息 (执行密集计算):\n");
    volatile long sum = 0;
    int initial_cpu = sched_getcpu();
    
    printf("  初始 CPU: %d\n", initial_cpu);
    
    for (long i = 0; i < 50000000; i++) {
        sum += i;
        
        // 每隔一定次数检查 CPU
        if (i % 10000000 == 0) {
            int current_cpu = sched_getcpu();
            if (current_cpu != -1) {
                printf("  计算 %ld 次后 CPU: %d\n", i, current_cpu);
            }
        }
    }
    
    int final_cpu = sched_getcpu();
    printf("  最终 CPU: %d\n", final_cpu);
    
    if (initial_cpu != final_cpu && initial_cpu != -1 && final_cpu != -1) {
        printf("  ✓ CPU 在计算过程中发生了切换\n");
    } else {
        printf("  CPU 在计算过程中保持不变\n");
    }
    
    printf("\n");
    
    // CPU 绑定测试
    test_cpu_binding();
    
    // 显示 CPU 调度信息
    printf("=== CPU 调度信息 ===\n");
    printf("CPU 调度相关概念:\n");
    printf("1. CPU 亲和性: 进程可以运行的 CPU 集合\n");
    printf("2. 负载均衡: 系统在 CPU 间分配负载\n");
    printf("3. 上下文切换: 进程在 CPU 间的切换\n");
    printf("4. 缓存局部性: 数据在 CPU 缓存中的位置\n");
    printf("5. NUMA 拓扑: 非统一内存访问架构\n");
    printf("\n");
    
    printf("sched_getcpu 的用途:\n");
    printf("1. 性能分析: 监控线程 CPU 使用情况\n");
    printf("2. 负载均衡: 了解 CPU 分配情况\n");
    printf("3. 调试工具: 分析程序执行行为\n");
    printf("4. 实时系统: 监控实时性约束\n");
    printf("5. 优化建议: 识别性能瓶颈\n");
    printf("\n");
    
    printf("使用建议:\n");
    printf("1. 结合 CPU 亲和性使用\n");
    printf("2. 监控 CPU 切换频率\n");
    printf("3. 分析热点 CPU\n");
    printf("4. 优化缓存局部性\n");
    printf("5. 避免过度绑定 CPU\n");
    
    return 0;
}

3. 编译和运行说明

# 编译所有示例程序
gcc -o pread_example pread_example.c
gcc -o affinity_example affinity_example.c
gcc -o priority_example priority_example.c
gcc -o rr_interval_example rr_interval_example.c
gcc -o getcpu_example getcpu_example.c -lpthread

# 运行示例程序
./pread_example
./affinity_example
./priority_example
./rr_interval_example
./getcpu_example

# 需要 root 权限的测试
sudo ./priority_example
sudo ./rr_interval_example

4. 系统要求检查

# 检查内核版本
uname -r

# 检查系统调用支持
grep -E "(pread|pwrite)" /usr/include/asm/unistd_64.h

# 检查 CPU 信息
lscpu

# 检查调度器信息
cat /proc/sched_debug 2>/dev/null || echo "无法读取调度器调试信息"

# 检查实时调度支持
grep -i realtime /boot/config-$(uname -r)

# 检查 NUMA 支持
numactl --hardware 2>/dev/null || echo "系统不支持 NUMA"

5. 重要注意事项

5.1 权限要求

  • 普通用户: 可以使用 pread/pwrite 等基本 I/O 操作
  • root 用户: 需要设置实时调度策略 (SCHED_FIFO/SCHED_RR)
  • CAP_SYS_NICE: 某些操作需要此能力

5.2 错误处理

// 安全的系统调用封装
int safe_pread(int fd, void *buf, size_t count, off_t offset) {
    if (fd < 0 || !buf || count == 0) {
        errno = EINVAL;
        return -1;
    }
    
    ssize_t result;
    do {
        result = pread(fd, buf, count, offset);
    } while (result == -1 && errno == EINTR);
    
    return result;
}

// 安全的 CPU 亲和性设置
int safe_sched_setaffinity(pid_t pid, const cpu_set_t *mask) {
    if (!mask) {
        errno = EINVAL;
        return -1;
    }
    
    return sched_setaffinity(pid, sizeof(cpu_set_t), mask);
}

5.3 性能考虑

// 性能优化建议
void performance_optimization_tips() {
    printf("性能优化建议:\n");
    printf("1. 批量操作: 使用 preadv/pwritev 减少系统调用次数\n");
    printf("2. 缓冲区大小: 合理设置缓冲区大小避免频繁调用\n");
    printf("3. CPU 亲和性: 合理绑定 CPU 提高缓存命中率\n");
    printf("4. 调度策略: 根据应用特点选择合适的调度策略\n");
    printf("5. 优先级设置: 避免过度使用高优先级影响系统稳定性\n");
    printf("6. 时间片调整: 根据应用需求调整时间片长度\n");
    printf("7. 资源限制: 合理设置进程资源限制\n");
    printf("8. 内存管理: 避免内存碎片和频繁分配\n");
}

5.4 最佳实践

// 完整的 I/O 操作最佳实践
typedef struct {
    int fd;
    size_t buffer_size;
    int use_positioned_io;
    int use_scatter_gather;
    cpu_set_t cpu_affinity;
    int has_cpu_affinity;
} io_context_t;

// 初始化 I/O 上下文
int init_io_context(io_context_t *ctx, const char *filename) {
    ctx->fd = open(filename, O_RDWR | O_CREAT, 0644);
    if (ctx->fd == -1) {
        return -1;
    }
    
    ctx->buffer_size = 4096;
    ctx->use_positioned_io = 1;
    ctx->use_scatter_gather = 0;
    ctx->has_cpu_affinity = 0;
    
    return 0;
}

// 清理 I/O 上下文
void cleanup_io_context(io_context_t *ctx) {
    if (ctx->fd != -1) {
        close(ctx->fd);
        ctx->fd = -1;
    }
}

6. 实际应用场景

6.1 数据库系统

// 数据库页 I/O 操作
int db_page_io(int fd, off_t page_offset, void *page_data, size_t page_size) {
    // 使用 pread/pwrite 进行页级别的随机访问
    ssize_t bytes_read = pread(fd, page_data, page_size, page_offset);
    return (bytes_read == (ssize_t)page_size) ? 0 : -1;
}

6.2 实时系统

// 实时应用调度设置
int setup_realtime_scheduling(int priority) {
    struct sched_param param;
    param.sched_priority = priority;
    
    // 设置实时调度策略
    if (sched_setscheduler(0, SCHED_FIFO, &param) == -1) {
        return -1;
    }
    
    // 设置 CPU 亲和性
    cpu_set_t cpu_set;
    CPU_ZERO(&cpu_set);
    CPU_SET(0, &cpu_set);  // 绑定到 CPU 0
    
    return sched_setaffinity(0, sizeof(cpu_set), &cpu_set);
}

6.3 网络服务器

// 网络服务器 I/O 处理
int handle_network_io(int client_fd, struct iovec *iov, int iovcnt) {
    // 使用 preadv/pwritev 处理网络数据包
    return pwritev(client_fd, iov, iovcnt, 0);
}

7. 总结

7.1 核心概念回顾

Linux 调度器函数族为进程调度控制提供了精细化的管理能力:

  1. sched_yield: 让出当前 CPU 时间片,允许同优先级进程运行
  2. sched_getscheduler/sched_setscheduler: 获取和设置进程调度策略
  3. sched_getaffinity/sched_setaffinity: 获取和设置 CPU 亲和性
  4. sched_get_priority_min/sched_get_priority_max: 获取调度策略优先级范围
  5. sched_rr_get_interval: 获取轮转调度时间片长度
  6. sched_getcpu: 获取当前运行的 CPU 编号
  7. prlimit64: 获取和设置进程资源限制

7.2 调度策略详解

五种主要调度策略

  • SCHED_OTHER: 默认分时调度(CFS),适合普通应用
  • SCHED_FIFO: 实时先进先出,高优先级进程持续运行
  • SCHED_RR: 实时轮转调度,相同优先级进程时间片轮转
  • SCHED_BATCH: 批处理优化,减少上下文切换
  • SCHED_IDLE: 空闲任务,只在系统空闲时运行

7.3 性能优化要点

调度策略选择

  • 普通应用:使用 SCHED_OTHER(默认)
  • 实时系统:使用 SCHED_FIFO 或 SCHED_RR
  • 批处理任务:使用 SCHED_BATCH
  • 后台任务:使用 SCHED_IDLE

CPU 亲和性优化

  • 减少 CPU 缓存失效
  • 提高缓存命中率
  • 降低上下文切换开销
  • 改善 NUMA 访问模式

7.4 安全和权限管理

权限要求

  • 普通用户:可使用 SCHED_OTHER/SCHED_BATCH/SCHED_IDLE
  • root 用户:可使用所有调度策略
  • CAP_SYS_NICE:允许修改调度策略和优先级
  • CAP_SYS_ADMIN:允许使用 prlimit64 设置资源限制

安全考虑

// 权限检查示例
int check_scheduling_permissions() {
    if (geteuid() == 0) {
        return 1;  // root 权限
    }
    
    // 检查 CAP_SYS_NICE 能力
    // 使用 libcap-ng 库进行能力检查
    return 0;  // 普通权限
}

7.5 实际应用场景

适用场景

  1. 实时系统:音视频处理、工业控制(SCHED_FIFO/SCHED_RR)
  2. 高性能计算:科学计算、数据分析(CPU 亲和性绑定)
  3. 服务器应用:Web 服务、数据库(合理的调度策略)
  4. 系统监控:性能分析、资源管理(sched_getcpu)
  5. 容器技术:资源限制、进程隔离(prlimit64)

7.6 最佳实践

调度策略设置

// 安全的调度策略设置
int safe_set_scheduler(pid_t pid, int policy, int priority) {
    struct sched_param param;
    
    // 验证参数
    if (priority < sched_get_priority_min(policy) || 
        priority > sched_get_priority_max(policy)) {
        errno = EINVAL;
        return -1;
    }
    
    param.sched_priority = priority;
    
    // 设置调度策略
    int result = sched_setscheduler(pid, policy, &param);
    
    if (result == -1) {
        switch (errno) {
            case EPERM:
                fprintf(stderr, "权限不足,需要适当权限\n");
                break;
            case EINVAL:
                fprintf(stderr, "无效的策略或优先级\n");
                break;
            case ESRCH:
                fprintf(stderr, "进程不存在\n");
                break;
        }
    }
    
    return result;
}

CPU 亲和性管理

// 智能 CPU 绑定
int smart_cpu_binding(pid_t pid, int cpu_count) {
    cpu_set_t cpu_set;
    CPU_ZERO(&cpu_set);
    
    // 根据系统 CPU 数量智能绑定
    int available_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    int bind_count = (cpu_count < available_cpus) ? cpu_count : available_cpus;
    
    for (int i = 0; i < bind_count; i++) {
        CPU_SET(i, &cpu_set);
    }
    
    return sched_setaffinity(pid, sizeof(cpu_set), &cpu_set);
}

资源限制控制

// 安全的资源限制设置
int safe_set_resource_limit(int resource, rlim_t soft_limit, rlim_t hard_limit) {
    struct rlimit64 limit;
    limit.rlim_cur = soft_limit;
    limit.rlim_max = hard_limit;
    
    int result = prlimit64(0, resource, &limit, NULL);
    
    if (result == -1) {
        switch (errno) {
            case EPERM:
                fprintf(stderr, "权限不足,需要 CAP_SYS_RESOURCE 能力\n");
                break;
            case EINVAL:
                fprintf(stderr, "无效的资源类型或限制值\n");
                break;
        }
    }
    
    return result;
}

7.7 学习建议

掌握路径

  1. 入门阶段:理解基本调度概念和 sched_yield 使用
  2. 进阶阶段:掌握调度策略和 CPU 亲和性
  3. 高级阶段:精通资源限制和性能优化
  4. 专家阶段:实现复杂的调度控制系统

实践要点

  • 从简单示例开始逐步复杂化
  • 重点关注权限管理和错误处理
  • 实际项目中验证调度效果
  • 持续关注实时系统发展

这些调度器函数是 Linux 系统编程的重要组成部分,正确掌握和使用它们对于开发高性能、实时性要求高的应用程序至关重要。通过系统的学习和实践,开发者可以充分发挥 Linux 调度系统的强大功能。

发表在 linux文章 | 留下评论

mq_unlink系统调用及示例

mq_unlink函数详解

1. 函数介绍

mq_unlink函数是Linux系统中用于删除POSIX消息队列的函数。可以把mq_unlink想象成一个”消息队列删除器”,它能够从系统中移除指定名称的消息队列。

POSIX消息队列是进程间通信(IPC)的一种机制,允许不同进程通过队列发送和接收消息。mq_unlink的作用类似于文件系统的unlink函数,它删除消息队列的名称,但不会立即销毁队列本身。只有当所有打开该队列的进程都关闭了队列描述符后,队列才会被真正销毁。

使用场景:

  • 进程间通信系统的清理
  • 服务器程序的资源管理
  • 系统维护和清理脚本
  • 消息队列生命周期管理

2. 函数原型

#include <mqueue.h>

int mq_unlink(const char *name);

3. 功能

mq_unlink函数的主要功能是删除指定名称的POSIX消息队列。它从系统中移除队列的名称,使得后续无法通过该名称打开队列,但已打开的队列描述符仍然有效。

4. 参数

  • name: 消息队列名称
    • 类型:const char*
    • 含义:要删除的消息队列名称
    • 名称必须以’/’开头,如”/my_queue”

5. 返回值

  • 成功: 返回0
  • 失败: 返回-1,并设置errno错误码
    • EACCES:权限不足
    • ENOENT:指定名称的消息队列不存在
    • EINVAL:名称无效

6. 相似函数或关联函数

  • mq_open(): 打开或创建消息队列
  • mq_close(): 关闭消息队列描述符
  • mq_send(): 发送消息
  • mq_receive(): 接收消息
  • mq_getattr(): 获取队列属性
  • mq_setattr(): 设置队列属性
  • unlink(): 删除文件

7. 示例代码

示例1:基础mq_unlink使用 – 简单队列删除

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>

// 创建消息队列
mqd_t create_message_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 10,
        .mq_msgsize = 256,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return -1;
    }
    
    printf("创建消息队列: %s (描述符: %d)\n", name, (int)mq);
    return mq;
}

// 显示消息队列属性
void show_queue_attributes(mqd_t mq, const char* name) {
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == -1) {
        perror("获取队列属性失败");
        return;
    }
    
    printf("队列 %s 属性:\n", name);
    printf("  最大消息数: %ld\n", attr.mq_maxmsg);
    printf("  最大消息大小: %ld\n", attr.mq_msgsize);
    printf("  当前消息数: %ld\n", attr.mq_curmsgs);
    printf("  标志: %ld\n", attr.mq_flags);
}

int main() {
    printf("=== 基础mq_unlink使用示例 ===\n");
    
    const char* queue_name = "/test_queue";
    
    // 创建消息队列
    printf("1. 创建消息队列:\n");
    mqd_t mq = create_message_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    show_queue_attributes(mq, queue_name);
    
    // 发送一些测试消息
    printf("\n2. 发送测试消息:\n");
    const char* messages[] = {
        "第一条测试消息",
        "第二条测试消息",
        "第三条测试消息"
    };
    
    for (int i = 0; i < 3; i++) {
        if (mq_send(mq, messages[i], strlen(messages[i]), 0) == -1) {
            perror("发送消息失败");
        } else {
            printf("发送消息: %s\n", messages[i]);
        }
    }
    
    show_queue_attributes(mq, queue_name);
    
    // 使用mq_unlink删除队列名称
    printf("\n3. 使用mq_unlink删除队列名称:\n");
    if (mq_unlink(queue_name) == 0) {
        printf("✓ 成功删除队列名称: %s\n", queue_name);
        printf("注意: 队列本身仍然存在,因为还有打开的描述符\n");
    } else {
        printf("✗ 删除队列名称失败: %s\n", strerror(errno));
    }
    
    // 验证队列名称已被删除
    printf("\n4. 验证队列名称删除效果:\n");
    mqd_t mq2 = mq_open(queue_name, O_RDONLY);
    if (mq2 == -1) {
        printf("✓ 无法通过名称重新打开队列 (预期行为): %s\n", strerror(errno));
    } else {
        printf("✗ 仍然可以通过名称打开队列\n");
        mq_close(mq2);
    }
    
    // 原有描述符仍然可以使用
    printf("\n5. 原有描述符仍然有效:\n");
    char buffer[256];
    ssize_t bytes_received;
    unsigned int priority;
    
    while ((bytes_received = mq_receive(mq, buffer, sizeof(buffer), &priority)) > 0) {
        buffer[bytes_received] = '\0';
        printf("接收到消息: %s (优先级: %u)\n", buffer, priority);
    }
    
    // 关闭队列描述符(此时队列才会被真正销毁)
    printf("\n6. 关闭队列描述符:\n");
    if (mq_close(mq) == 0) {
        printf("✓ 队列描述符已关闭,队列被真正销毁\n");
    } else {
        perror("关闭队列描述符失败");
    }
    
    printf("\n=== 基础mq_unlink演示完成 ===\n");
    
    return 0;
}

示例2:多个进程共享队列的删除管理

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <string.h>
#include <errno.h>
#include <time.h>

#define MAX_MESSAGES 5
#define MESSAGE_SIZE 256

// 生产者进程
void producer_process(const char* queue_name, int producer_id) {
    printf("生产者 %d 启动\n", producer_id);
    
    // 打开已存在的队列
    mqd_t mq = mq_open(queue_name, O_WRONLY);
    if (mq == (mqd_t)-1) {
        perror("生产者打开队列失败");
        exit(EXIT_FAILURE);
    }
    
    srand(time(NULL) + producer_id);
    
    // 发送消息
    for (int i = 0; i < MAX_MESSAGES; i++) {
        char message[MESSAGE_SIZE];
        snprintf(message, sizeof(message), "生产者%d的消息%d", producer_id, i + 1);
        
        // 随机优先级
        unsigned int priority = rand() % 10;
        
        if (mq_send(mq, message, strlen(message), priority) == -1) {
            perror("发送消息失败");
        } else {
            printf("生产者 %d 发送: %s (优先级: %u)\n", producer_id, message, priority);
        }
        
        sleep(1);  // 模拟处理时间
    }
    
    printf("生产者 %d 完成\n", producer_id);
    mq_close(mq);
}

// 消费者进程
void consumer_process(const char* queue_name, int consumer_id) {
    printf("消费者 %d 启动\n", consumer_id);
    
    // 打开已存在的队列
    mqd_t mq = mq_open(queue_name, O_RDONLY);
    if (mq == (mqd_t)-1) {
        perror("消费者打开队列失败");
        exit(EXIT_FAILURE);
    }
    
    // 接收消息
    char buffer[MESSAGE_SIZE];
    ssize_t bytes_received;
    unsigned int priority;
    int message_count = 0;
    
    while (message_count < MAX_MESSAGES * 2) {  // 期望接收所有生产者的消息
        bytes_received = mq_receive(mq, buffer, sizeof(buffer), &priority);
        if (bytes_received > 0) {
            buffer[bytes_received] = '\0';
            printf("消费者 %d 接收: %s (优先级: %u)\n", consumer_id, buffer, priority);
            message_count++;
        } else if (errno == EAGAIN) {
            // 非阻塞模式下没有消息
            printf("消费者 %d: 暂无消息\n", consumer_id);
            sleep(1);
        } else {
            perror("接收消息失败");
            break;
        }
    }
    
    printf("消费者 %d 完成,接收 %d 条消息\n", consumer_id, message_count);
    mq_close(mq);
}

// 管理进程
void manager_process(const char* queue_name) {
    printf("管理进程启动\n");
    
    // 创建消息队列
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 20,
        .mq_msgsize = MESSAGE_SIZE,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(queue_name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("管理进程创建队列失败");
        exit(EXIT_FAILURE);
    }
    
    printf("管理进程创建队列: %s\n", queue_name);
    
    // 启动生产者和消费者进程
    pid_t producers[2], consumers[2];
    
    // 启动生产者
    for (int i = 0; i < 2; i++) {
        producers[i] = fork();
        if (producers[i] == 0) {
            producer_process(queue_name, i + 1);
            exit(EXIT_SUCCESS);
        }
    }
    
    // 启动消费者
    for (int i = 0; i < 2; i++) {
        consumers[i] = fork();
        if (consumers[i] == 0) {
            consumer_process(queue_name, i + 1);
            exit(EXIT_SUCCESS);
        }
    }
    
    // 等待生产者完成
    printf("管理进程等待生产者完成...\n");
    for (int i = 0; i < 2; i++) {
        waitpid(producers[i], NULL, 0);
    }
    
    printf("所有生产者已完成\n");
    
    // 模拟一段时间让消费者处理完消息
    sleep(3);
    
    // 删除队列名称(但队列仍存在,因为消费者还在使用)
    printf("管理进程删除队列名称...\n");
    if (mq_unlink(queue_name) == 0) {
        printf("✓ 队列名称已删除,但队列仍存在(消费者仍在使用)\n");
    } else {
        printf("✗ 删除队列名称失败: %s\n", strerror(errno));
    }
    
    // 等待消费者完成
    printf("管理进程等待消费者完成...\n");
    for (int i = 0; i < 2; i++) {
        waitpid(consumers[i], NULL, 0);
    }
    
    printf("所有消费者已完成\n");
    
    // 现在队列才会被真正销毁(所有描述符都已关闭)
    printf("队列已被真正销毁\n");
    mq_close(mq);
    
    printf("管理进程完成\n");
}

int main() {
    printf("=== 多进程共享队列删除管理示例 ===\n");
    
    const char* queue_name = "/shared_queue";
    
    // 启动管理进程
    pid_t manager = fork();
    if (manager == 0) {
        manager_process(queue_name);
        exit(EXIT_SUCCESS);
    }
    
    // 父进程等待管理进程完成
    waitpid(manager, NULL, 0);
    
    // 验证队列是否已被删除
    printf("\n验证队列删除效果:\n");
    mqd_t mq = mq_open(queue_name, O_RDONLY);
    if (mq == -1) {
        printf("✓ 队列已成功删除: %s\n", strerror(errno));
    } else {
        printf("✗ 队列仍然存在\n");
        mq_close(mq);
    }
    
    printf("\n=== 多进程队列管理演示完成 ===\n");
    
    return 0;
}
发表在 linux文章 | 留下评论

mq_timedsend系统调用及示例

mq_timedsend函数详解

1. 函数介绍

mq_timedsend函数是Linux系统中用于在指定时间内发送消息到POSIX消息队列的函数。它是mq_send函数的增强版本,支持超时控制。可以把mq_timedsend想象成一个”限时消息发送器”,它能够在指定的时间内尝试发送消息,如果超时则返回错误。

这个函数特别适用于需要控制发送等待时间的场景,比如实时系统或需要避免无限期阻塞的应用程序。

使用场景:

  • 实时系统的消息发送
  • 避免无限期阻塞的发送操作
  • 超时控制的网络应用
  • 高可用性系统中的消息处理

2. 函数原型

#include <mqueue.h>
#include <time.h>

int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 
                 unsigned int msg_prio, const struct timespec *abs_timeout);

3. 功能

mq_timedsend函数的主要功能是在指定的绝对超时时间内发送消息到消息队列。如果队列已满且在超时时间内无法发送,则返回错误。

4. 参数

  • mqdes: 消息队列描述符
    • 类型:mqd_t
    • 含义:已打开的消息队列描述符
  • msg_ptr: 消息内容指针
    • 类型:const char*
    • 含义:指向要发送的消息内容
  • msg_len: 消息长度
    • 类型:size_t
    • 含义:消息内容的字节数
  • msg_prio: 消息优先级
    • 类型:unsigned int
    • 含义:消息的优先级(0-32767)
  • abs_timeout: 绝对超时时间
    • 类型:const struct timespec*
    • 含义:绝对超时时间(基于CLOCK_REALTIME)

5. 返回值

  • 成功: 返回0
  • 失败: 返回-1,并设置errno错误码
    • EAGAIN:超时时间内无法发送消息
    • EBADF:无效的消息队列描述符
    • EINTR:被信号中断
    • EINVAL:参数无效
    • EMSGSIZE:消息大小超过队列限制
    • ETIMEDOUT:超时

6. 相似函数或关联函数

  • mq_send(): 发送消息(阻塞)
  • mq_receive(): 接收消息
  • mq_timedreceive(): 限时接收消息
  • clock_gettime(): 获取当前时间
  • pthread_cond_timedwait(): 限时条件等待

7. 示例代码

示例1:基础mq_timedsend使用 – 超时控制发送

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>

// 创建带限制的消息队列
mqd_t create_limited_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 2,      // 很小的队列容量
        .mq_msgsize = 128,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return -1;
    }
    
    printf("创建限制队列: %s (容量: %ld)\n", name, attr.mq_maxmsg);
    return mq;
}

// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    abs_timeout->tv_sec += seconds;
    return 0;
}

int main() {
    printf("=== 基础mq_timedsend使用示例 ===\n");
    
    const char* queue_name = "/timed_queue";
    
    // 创建限制队列
    mqd_t mq = create_limited_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 填满队列
    printf("1. 填满队列:\n");
    for (int i = 0; i < 2; i++) {
        char message[64];
        snprintf(message, sizeof(message), "填充消息 %d", i + 1);
        
        if (mq_send(mq, message, strlen(message), 0) == -1) {
            perror("发送填充消息失败");
        } else {
            printf("发送: %s\n", message);
        }
    }
    
    // 显示队列状态
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == 0) {
        printf("队列当前消息数: %ld/%ld\n", attr.mq_curmsgs, attr.mq_maxmsg);
    }
    
    // 演示mq_timedsend超时
    printf("\n2. 演示mq_timedsend超时:\n");
    
    struct timespec abs_timeout;
    if (calculate_absolute_timeout(&abs_timeout, 3) == -1) {  // 3秒超时
        mq_close(mq);
        mq_unlink(queue_name);
        exit(EXIT_FAILURE);
    }
    
    char test_message[] = "超时测试消息";
    printf("尝试发送消息(队列已满,3秒超时):\n");
    
    clock_t start_time = clock();
    int result = mq_timedsend(mq, test_message, strlen(test_message), 0, &abs_timeout);
    clock_t end_time = clock();
    
    double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
    
    if (result == 0) {
        printf("✓ 消息发送成功\n");
    } else {
        if (errno == ETIMEDOUT) {
            printf("✗ 发送超时 (耗时: %.2f 秒)\n", elapsed_time);
        } else if (errno == EAGAIN) {
            printf("✗ 队列满,无法发送: %s\n", strerror(errno));
        } else {
            printf("✗ 发送失败: %s\n", strerror(errno));
        }
    }
    
    // 演示成功的mq_timedsend
    printf("\n3. 演示成功的mq_timedsend:\n");
    
    // 先接收一条消息,为发送腾出空间
    char buffer[128];
    ssize_t bytes_received = mq_receive(mq, buffer, sizeof(buffer), NULL);
    if (bytes_received > 0) {
        buffer[bytes_received] = '\0';
        printf("接收消息为发送腾出空间: %s\n", buffer);
    }
    
    // 现在队列有空间了
    if (calculate_absolute_timeout(&abs_timeout, 5) == 0) {  // 5秒超时
        char success_message[] = "成功发送的消息";
        printf("发送消息(队列有空间):\n");
        
        if (mq_timedsend(mq, success_message, strlen(success_message), 5, &abs_timeout) == 0) {
            printf("✓ 消息发送成功 (优先级: 5)\n");
        } else {
            printf("✗ 发送失败: %s\n", strerror(errno));
        }
    }
    
    // 演示不同超时时间的效果
    printf("\n4. 不同超时时间演示:\n");
    
    // 立即超时(过去的时间)
    struct timespec past_time = {0, 0};
    char immediate_message[] = "立即超时消息";
    printf("使用过去时间作为超时(立即返回):\n");
    if (mq_timedsend(mq, immediate_message, strlen(immediate_message), 0, &past_time) == -1) {
        if (errno == ETIMEDOUT) {
            printf("✓ 立即超时 (预期行为)\n");
        } else {
            printf("✗ 其他错误: %s\n", strerror(errno));
        }
    }
    
    // 长时间超时
    if (calculate_absolute_timeout(&abs_timeout, 10) == 0) {  // 10秒超时
        char long_timeout_message[] = "长超时消息";
        printf("使用长超时时间:\n");
        if (mq_timedsend(mq, long_timeout_message, strlen(long_timeout_message), 1, &abs_timeout) == 0) {
            printf("✓ 长超时发送成功\n");
        } else {
            printf("✗ 长超时发送失败: %s\n", strerror(errno));
        }
    }
    
    // 清理资源
    printf("\n5. 清理资源:\n");
    mq_close(mq);
    mq_unlink(queue_name);
    printf("队列已清理\n");
    
    printf("\n=== 基础mq_timedsend演示完成 ===\n");
    
    return 0;
}

示例2:实时系统中的超时消息发送

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

#define MAX_MESSAGES 10
#define MESSAGE_SIZE 256

volatile sig_atomic_t stop_flag = 0;

// 信号处理函数
void signal_handler(int sig) {
    printf("收到信号 %d,准备停止...\n", sig);
    stop_flag = 1;
}

// 创建实时消息队列
mqd_t create_realtime_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 5,
        .mq_msgsize = MESSAGE_SIZE,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建实时队列失败");
        return -1;
    }
    
    printf("创建实时队列: %s\n", name);
    return mq;
}

// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    // 转换毫秒到秒和纳秒
    long seconds = milliseconds / 1000;
    long nanoseconds = (milliseconds % 1000) * 1000000;
    
    abs_timeout->tv_sec += seconds;
    abs_timeout->tv_nsec += nanoseconds;
    
    // 处理纳秒溢出
    if (abs_timeout->tv_nsec >= 1000000000) {
        abs_timeout->tv_sec++;
        abs_timeout->tv_nsec -= 1000000000;
    }
    
    return 0;
}

// 实时消息发送器
void realtime_message_sender(mqd_t mq, const char* sender_name) {
    printf("实时发送器 %s 启动\n", sender_name);
    
    srand(time(NULL));
    
    int message_count = 0;
    while (!stop_flag && message_count < MAX_MESSAGES) {
        char message[MESSAGE_SIZE];
        snprintf(message, sizeof(message), "%s: 实时消息 %d", sender_name, message_count + 1);
        
        // 随机超时时间(10-100毫秒)
        int timeout_ms = 10 + rand() % 91;
        
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, timeout_ms) == -1) {
            continue;
        }
        
        // 使用mq_timedsend发送消息
        unsigned int priority = rand() % 10;
        int result = mq_timedsend(mq, message, strlen(message), priority, &abs_timeout);
        
        if (result == 0) {
            printf("[%s] 发送成功: %s (优先级: %u, 超时: %dms)\n", 
                   sender_name, message, priority, timeout_ms);
        } else {
            if (errno == ETIMEDOUT) {
                printf("[%s] 发送超时: %s (超时: %dms)\n", 
                       sender_name, message, timeout_ms);
            } else if (errno == EAGAIN) {
                printf("[%s] 队列满,发送失败: %s\n", sender_name, message);
            } else {
                printf("[%s] 发送错误: %s (%s)\n", 
                       sender_name, message, strerror(errno));
            }
        }
        
        message_count++;
        usleep(500000);  // 0.5秒间隔
    }
    
    printf("实时发送器 %s 完成\n", sender_name);
}

// 消息接收器
void message_receiver(mqd_t mq, const char* receiver_name) {
    printf("消息接收器 %s 启动\n", receiver_name);
    
    char buffer[MESSAGE_SIZE];
    ssize_t bytes_received;
    unsigned int priority;
    int received_count = 0;
    
    while (!stop_flag && received_count < MAX_MESSAGES * 2) {
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, 2000) == -1) {  // 2秒超时
            continue;
        }
        
        bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
        
        if (bytes_received > 0) {
            buffer[bytes_received] = '\0';
            printf("[%s] 接收: %s (优先级: %u)\n", receiver_name, buffer, priority);
            received_count++;
        } else if (errno == ETIMEDOUT) {
            printf("[%s] 接收超时\n", receiver_name);
        } else if (errno == EAGAIN) {
            printf("[%s] 暂无消息\n", receiver_name);
            usleep(100000);  // 0.1秒后重试
        } else {
            printf("[%s] 接收错误: %s\n", receiver_name, strerror(errno));
            break;
        }
    }
    
    printf("消息接收器 %s 完成,接收 %d 条消息\n", receiver_name, received_count);
}

int main() {
    printf("=== 实时系统超时消息发送示例 ===\n");
    
    const char* queue_name = "/realtime_queue";
    
    // 设置信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    
    // 创建实时队列
    mqd_t mq = create_realtime_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 启动发送器和接收器
    pid_t sender1 = fork();
    if (sender1 == 0) {
        realtime_message_sender(mq, "发送器1");
        exit(EXIT_SUCCESS);
    }
    
    pid_t sender2 = fork();
    if (sender2 == 0) {
        realtime_message_sender(mq, "发送器2");
        exit(EXIT_SUCCESS);
    }
    
    pid_t receiver = fork();
    if (receiver == 0) {
        message_receiver(mq, "接收器");
        exit(EXIT_SUCCESS);
    }
    
    // 主进程等待一段时间后发送停止信号
    printf("系统运行中... 按Ctrl+C停止或等待30秒\n");
    
    int elapsed = 0;
    while (elapsed < 30 && !stop_flag) {
        sleep(1);
        elapsed++;
        
        // 定期显示队列状态
        if (elapsed % 5 == 0) {
            struct mq_attr attr;
            if (mq_getattr(mq, &attr) == 0) {
                printf("队列状态: %ld/%ld 消息\n", attr.mq_curmsgs, attr.mq_maxmsg);
            }
        }
    }
    
    // 发送停止信号
    stop_flag = 1;
    printf("发送停止信号...\n");
    
    // 等待所有子进程完成
    waitpid(sender1, NULL, 0);
    waitpid(sender2, NULL, 0);
    waitpid(receiver, NULL, 0);
    
    // 清理资源
    mq_close(mq);
    mq_unlink(queue_name);
    printf("系统已停止,资源已清理\n");
    
    printf("\n=== 实时系统演示完成 ===\n");
    
    return 0;
}
发表在 linux文章 | 留下评论

mq_timedreceive系统调用及示例

mq_timedreceive函数详解

1. 函数介绍

mq_timedreceive函数是Linux系统中用于在指定时间内从POSIX消息队列接收消息的函数。它是mq_receive函数的增强版本,支持超时控制。可以把mq_timedreceive想象成一个”限时消息接收器”,它能够在指定的时间内尝试接收消息,如果超时则返回错误。

这个函数特别适用于需要控制接收等待时间的场景,比如实时系统、服务器应用或需要避免无限期阻塞的程序。

使用场景:

  • 实时系统的消息接收
  • 服务器程序的请求处理
  • 避免无限期阻塞的接收操作
  • 超时控制的网络应用
  • 高可用性系统中的消息处理

2. 函数原型

#include <mqueue.h>
#include <time.h>

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 
                        unsigned int *msg_prio, const struct timespec *abs_timeout);

3. 功能

mq_timedreceive函数的主要功能是在指定的绝对超时时间内从消息队列接收消息。如果队列为空且在超时时间内没有消息到达,则返回错误。

4. 参数

  • mqdes: 消息队列描述符
    • 类型:mqd_t
    • 含义:已打开的消息队列描述符
  • msg_ptr: 消息缓冲区指针
    • 类型:char*
    • 含义:指向存储接收消息的缓冲区
  • msg_len: 缓冲区大小
    • 类型:size_t
    • 含义:消息缓冲区的大小(字节数)
  • msg_prio: 消息优先级指针
    • 类型:unsigned int*
    • 含义:指向存储消息优先级的变量(可为NULL)
  • abs_timeout: 绝对超时时间
    • 类型:const struct timespec*
    • 含义:绝对超时时间(基于CLOCK_REALTIME)

5. 返回值

  • 成功: 返回接收到的消息字节数
  • 失败: 返回-1,并设置errno错误码
    • EAGAIN:超时时间内没有消息可接收
    • EBADF:无效的消息队列描述符
    • EINTR:被信号中断
    • EINVAL:参数无效
    • EMSGSIZE:缓冲区太小
    • ETIMEDOUT:超时

6. 相似函数或关联函数

  • mq_receive(): 接收消息(阻塞)
  • mq_send(): 发送消息
  • mq_timedsend(): 限时发送消息
  • clock_gettime(): 获取当前时间
  • pthread_cond_timedwait(): 限时条件等待

7. 示例代码

示例1:基础mq_timedreceive使用 – 超时控制接收

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>

// 创建消息队列
mqd_t create_test_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 10,
        .mq_msgsize = 256,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return -1;
    }
    
    printf("创建测试队列: %s\n", name);
    return mq;
}

// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    abs_timeout->tv_sec += seconds;
    return 0;
}

int main() {
    printf("=== 基础mq_timedreceive使用示例 ===\n");
    
    const char* queue_name = "/receive_test_queue";
    
    // 创建测试队列
    mqd_t mq = create_test_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 演示mq_timedreceive超时(空队列)
    printf("1. 演示空队列超时接收:\n");
    
    struct timespec abs_timeout;
    if (calculate_absolute_timeout(&abs_timeout, 3) == -1) {  // 3秒超时
        mq_close(mq);
        mq_unlink(queue_name);
        exit(EXIT_FAILURE);
    }
    
    char buffer[256];
    unsigned int priority;
    
    printf("从空队列接收消息(3秒超时):\n");
    clock_t start_time = clock();
    ssize_t result = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
    clock_t end_time = clock();
    
    double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
    
    if (result == -1) {
        if (errno == ETIMEDOUT) {
            printf("✗ 接收超时 (耗时: %.2f 秒)\n", elapsed_time);
        } else if (errno == EAGAIN) {
            printf("✗ 暂无消息: %s\n", strerror(errno));
        } else {
            printf("✗ 接收失败: %s\n", strerror(errno));
        }
    } else {
        buffer[result] = '\0';
        printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
    }
    
    // 发送一些测试消息
    printf("\n2. 发送测试消息:\n");
    const char* messages[] = {
        "第一条测试消息",
        "第二条测试消息",
        "第三条测试消息"
    };
    
    for (int i = 0; i < 3; i++) {
        unsigned int priorities[] = {1, 5, 3};
        if (mq_send(mq, messages[i], strlen(messages[i]), priorities[i]) == -1) {
            perror("发送消息失败");
        } else {
            printf("发送: %s (优先级: %u)\n", messages[i], priorities[i]);
        }
    }
    
    // 演示成功的mq_timedreceive
    printf("\n3. 演示成功接收消息:\n");
    
    if (calculate_absolute_timeout(&abs_timeout, 5) == 0) {  // 5秒超时
        printf("接收消息(队列有消息):\n");
        
        // 接收所有消息
        while (1) {
            ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
            if (bytes_received > 0) {
                buffer[bytes_received] = '\0';
                printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
            } else {
                if (errno == ETIMEDOUT || errno == EAGAIN) {
                    printf("无更多消息可接收\n");
                    break;
                } else {
                    printf("接收失败: %s\n", strerror(errno));
                    break;
                }
            }
        }
    }
    
    // 演示缓冲区大小处理
    printf("\n4. 缓冲区大小处理演示:\n");
    
    // 发送一条长消息
    char long_message[200];
    memset(long_message, 'A', sizeof(long_message) - 1);
    long_message[sizeof(long_message) - 1] = '\0';
    
    if (mq_send(mq, long_message, strlen(long_message), 0) == 0) {
        printf("发送长消息成功\n");
        
        // 使用过小的缓冲区接收
        char small_buffer[50];
        if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
            ssize_t bytes_received = mq_timedreceive(mq, small_buffer, sizeof(small_buffer), NULL, &abs_timeout);
            if (bytes_received == -1) {
                if (errno == EMSGSIZE) {
                    printf("✗ 缓冲区太小 (预期错误)\n");
                } else {
                    printf("✗ 其他错误: %s\n", strerror(errno));
                }
            } else {
                small_buffer[bytes_received] = '\0';
                printf("✓ 接收到截断消息: %s (%zd 字节)\n", small_buffer, bytes_received);
            }
        }
        
        // 使用足够大的缓冲区接收
        char large_buffer[256];
        if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
            ssize_t bytes_received = mq_timedreceive(mq, large_buffer, sizeof(large_buffer), NULL, &abs_timeout);
            if (bytes_received > 0) {
                large_buffer[bytes_received] = '\0';
                printf("✓ 接收到完整消息 (%zd 字节)\n", bytes_received);
            }
        }
    }
    
    // 演示优先级接收
    printf("\n5. 优先级接收演示:\n");
    
    // 发送不同优先级的消息
    struct {
        const char* message;
        unsigned int priority;
    } priority_messages[] = {
        {"低优先级消息", 1},
        {"中优先级消息", 5},
        {"高优先级消息", 10},
        {"最高优先级消息", 15}
    };
    
    for (int i = 0; i < 4; i++) {
        if (mq_send(mq, priority_messages[i].message, strlen(priority_messages[i].message), 
                   priority_messages[i].priority) == 0) {
            printf("发送: %s (优先级: %u)\n", priority_messages[i].message, priority_messages[i].priority);
        }
    }
    
    // 接收消息(应该按优先级顺序接收)
    printf("按优先级顺序接收消息:\n");
    if (calculate_absolute_timeout(&abs_timeout, 3) == 0) {
        for (int i = 0; i < 4; i++) {
            ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
            if (bytes_received > 0) {
                buffer[bytes_received] = '\0';
                printf("接收: %s (优先级: %u)\n", buffer, priority);
            }
        }
    }
    
    // 清理资源
    printf("\n6. 清理资源:\n");
    mq_close(mq);
    mq_unlink(queue_name);
    printf("队列已清理\n");
    
    printf("\n=== 基础mq_timedreceive演示完成 ===\n");
    
    return 0;
}

示例2:服务器应用中的超时消息处理

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

#define MAX_REQUESTS 100
#define REQUEST_SIZE 512

volatile sig_atomic_t server_running = 1;

// 服务器请求结构
typedef struct {
    char client_id[32];
    char request_data[256];
    time_t timestamp;
    int request_id;
} server_request_t;

// 服务器响应结构
typedef struct {
    int request_id;
    char response_data[256];
    int status;
    time_t timestamp;
} server_response_t;

// 信号处理函数
void signal_handler(int sig) {
    printf("服务器收到停止信号 %d\n", sig);
    server_running = 0;
}

// 创建服务器队列
mqd_t create_server_queues(const char* request_queue, const char* response_queue) {
    struct mq_attr request_attr = {
        .mq_flags = 0,
        .mq_maxmsg = 20,
        .mq_msgsize = sizeof(server_request_t),
        .mq_curmsgs = 0
    };
    
    struct mq_attr response_attr = {
        .mq_flags = 0,
        .mq_maxmsg = 20,
        .mq_msgsize = sizeof(server_response_t),
        .mq_curmsgs = 0
    };
    
    // 创建请求队列
    mqd_t req_mq = mq_open(request_queue, O_CREAT | O_RDONLY, 0644, &request_attr);
    if (req_mq == (mqd_t)-1) {
        perror("创建请求队列失败");
        return -1;
    }
    
    // 创建响应队列
    mqd_t resp_mq = mq_open(response_queue, O_CREAT | O_WRONLY, 0644, &response_attr);
    if (resp_mq == (mqd_t)-1) {
        perror("创建响应队列失败");
        mq_close(req_mq);
        return -1;
    }
    
    printf("服务器队列创建成功:\n");
    printf("  请求队列: %s\n", request_queue);
    printf("  响应队列: %s\n", response_queue);
    
    return req_mq;  // 返回请求队列描述符
}

// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    long seconds = milliseconds / 1000;
    long nanoseconds = (milliseconds % 1000) * 1000000;
    
    abs_timeout->tv_sec += seconds;
    abs_timeout->tv_nsec += nanoseconds;
    
    if (abs_timeout->tv_nsec >= 1000000000) {
        abs_timeout->tv_sec++;
        abs_timeout->tv_nsec -= 1000000000;
    }
    
    return 0;
}

// 处理服务器请求
void process_server_requests(mqd_t req_mq, mqd_t resp_mq) {
    printf("服务器开始处理请求...\n");
    
    int processed_requests = 0;
    time_t last_status_time = time(NULL);
    
    while (server_running) {
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, 1000) == -1) {  // 1秒超时
            continue;
        }
        
        server_request_t request;
        unsigned int priority;
        
        ssize_t bytes_received = mq_timedreceive(req_mq, (char*)&request, sizeof(request), &priority, &abs_timeout);
        
        if (bytes_received > 0) {
            // 处理请求
            printf("处理请求 #%d 来自客户端 %s\n", request.request_id, request.client_id);
            
            // 模拟处理时间
            usleep(100000);  // 0.1秒
            
            // 构造响应
            server_response_t response;
            response.request_id = request.request_id;
            snprintf(response.response_data, sizeof(response.response_data), 
                     "请求 #%d 已处理完成", request.request_id);
            response.status = 200;
            response.timestamp = time(NULL);
            
            // 发送响应
            if (mq_send(resp_mq, (char*)&response, sizeof(response), priority) == 0) {
                printf("响应已发送: 请求 #%d\n", request.request_id);
                processed_requests++;
            } else {
                printf("发送响应失败: %s\n", strerror(errno));
            }
        } else if (errno == ETIMEDOUT || errno == EAGAIN) {
            // 超时或无消息,继续循环
        } else {
            printf("接收请求失败: %s\n", strerror(errno));
            if (errno != EINTR) {
                break;
            }
        }
        
        // 定期显示状态
        time_t current_time = time(NULL);
        if (current_time - last_status_time >= 5) {
            printf("服务器状态: 已处理 %d 个请求\n", processed_requests);
            last_status_time = current_time;
        }
    }
    
    printf("服务器停止,总共处理 %d 个请求\n", processed_requests);
}

// 客户端模拟器
void client_simulator(const char* request_queue, const char* response_queue, int client_id) {
    printf("客户端 %d 启动\n", client_id);
    
    // 打开队列
    mqd_t req_mq = mq_open(request_queue, O_WRONLY);
    mqd_t resp_mq = mq_open(response_queue, O_RDONLY);
    
    if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
        perror("客户端打开队列失败");
        if (req_mq != (mqd_t)-1) mq_close(req_mq);
        if (resp_mq != (mqd_t)-1) mq_close(resp_mq);
        exit(EXIT_FAILURE);
    }
    
    srand(time(NULL) + client_id);
    
    // 发送请求
    for (int i = 0; i < 5; i++) {
        server_request_t request;
        snprintf(request.client_id, sizeof(request.client_id), "Client_%d", client_id);
        snprintf(request.request_data, sizeof(request.request_data), "请求数据_%d", i + 1);
        request.timestamp = time(NULL);
        request.request_id = client_id * 100 + i + 1;
        
        unsigned int priority = rand() % 10;
        
        if (mq_send(req_mq, (char*)&request, sizeof(request), priority) == 0) {
            printf("客户端 %d 发送请求 #%d\n", client_id, request.request_id);
        } else {
            printf("客户端 %d 发送请求失败: %s\n", client_id, strerror(errno));
            continue;
        }
        
        // 等待响应
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, 3000) == 0) {  // 3秒超时
            server_response_t response;
            ssize_t bytes_received = mq_timedreceive(resp_mq, (char*)&response, sizeof(response), NULL, &abs_timeout);
            
            if (bytes_received > 0) {
                printf("客户端 %d 收到响应: %s (状态: %d)\n", 
                       client_id, response.response_data, response.status);
            } else if (errno == ETIMEDOUT) {
                printf("客户端 %d 等待响应超时\n", client_id);
            } else {
                printf("客户端 %d 接收响应失败: %s\n", client_id, strerror(errno));
            }
        }
        
        sleep(1);  // 客户端间隔
    }
    
    mq_close(req_mq);
    mq_close(resp_mq);
    printf("客户端 %d 完成\n", client_id);
}

int main() {
    printf("=== 服务器应用超时消息处理示例 ===\n");
    
    const char* request_queue = "/server_requests";
    const char* response_queue = "/server_responses";
    
    // 设置信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    
    // 启动服务器进程
    pid_t server_pid = fork();
    if (server_pid == 0) {
        // 服务器进程
        mqd_t req_mq = mq_open(request_queue, O_RDONLY);
        mqd_t resp_mq = mq_open(response_queue, O_WRONLY);
        
        if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
            perror("服务器打开队列失败");
            exit(EXIT_FAILURE);
        }
        
        process_server_requests(req_mq, resp_mq);
        
        mq_close(req_mq);
        mq_close(resp_mq);
        exit(EXIT_SUCCESS);
    }
    
    // 等待服务器启动
    sleep(1);
    
    // 启动多个客户端进程
    pid_t clients[3];
    for (int i = 0; i < 3; i++) {
        clients[i] = fork();
        if (clients[i] == 0) {
            client_simulator(request_queue, response_queue, i + 1);
            exit(EXIT_SUCCESS);
        }
    }
    
    // 等待客户端完成
    for (int i = 0; i < 3; i++) {
        waitpid(clients[i], NULL, 0);
    }
    
    // 停止服务器
    server_running = 0;
    sleep(2);
    waitpid(server_pid, NULL, 0);
    
    // 清理队列
    mq_unlink(request_queue);
    mq_unlink(response_queue);
    
    printf("\n=== 服务器应用演示完成 ===\n");
    
    return 0;
}

编译和运行

# 编译示例(需要链接实时库)
gcc -o mq_unlink_example1 mq_unlink_example1.c -lrt
gcc -o mq_timedsend_example1 mq_timedsend_example1.c -lrt
gcc -o mq_timedsend_example2 mq_timedsend_example2.c -lrt
gcc -o mq_timedreceive_example1 mq_timedreceive_example1.c -lrt
gcc -o mq_timedreceive_example2 mq_timedreceive_example2.c -lrt

# 运行示例
./mq_unlink_example1
./mq_timedsend_example1
./mq_timedsend_example2
./mq_timedreceive_example1
./mq_timedreceive_example2

重要注意事项

  1. 权限要求: 需要适当的文件系统权限来创建和访问消息队列
  2. 名称规范: 消息队列名称必须以’/’开头
  3. 超时时间: 使用绝对时间而非相对时间
  4. 资源管理: 必须正确关闭队列描述符和删除队列
  5. 错误处理: 必须检查返回值并处理各种错误情况
  6. 线程安全: 消息队列操作是线程安全的
  7. 系统限制: 受系统消息队列数量和大小限制

最佳实践

  1. 资源清理: 及时关闭队列描述符和删除不需要的队列
  2. 超时设置: 合理设置超时时间以避免无限期等待
  3. 错误处理: 完善的错误处理和恢复机制
  4. 优先级使用: 合理使用消息优先级
  5. 缓冲区管理: 确保缓冲区大小足够
  6. 信号处理: 正确处理信号中断
  7. 性能监控: 监控队列性能和系统资源使用

通过这些示例,你可以理解POSIX消息队列相关函数在进程间通信方面的强大功能,它们为Linux系统提供了高效、可靠的IPC机制,特别适用于实时系统、服务器应用和分布式系统。

发表在 linux文章 | 留下评论

POSIX 消息队列 (mq_*) 函数详解

POSIX 消息队列 (mq_*) 函数详解

1. 函数介绍

POSIX 消息队列是一组用于进程间通信(IPC)的函数,提供了一种可靠的、基于消息的通信机制。可以把消息队列想象成”邮局系统”——发送者将消息放入邮箱(队列),接收者从邮箱中取出消息,就像现实中的邮政服务一样。

与传统的 System V 消息队列相比,POSIX 消息队列具有更好的可移植性和更简洁的 API。它们支持优先级消息、持久化、以及通过文件系统路径名进行命名。

2. 核心函数原型

#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>

// 核心函数
mqd_t mq_open(const char *name, int oflag, ...);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

3. 功能

POSIX 消息队列提供以下功能:

  • 创建和打开消息队列
  • 发送和接收消息
  • 设置和获取队列属性
  • 异步通知机制
  • 持久化支持
  • 优先级消息支持

4. 核心结构体

struct mq_attr

struct mq_attr {
    long mq_flags;   /* 消息队列标志 */
    long mq_maxmsg;  /* 最大消息数 */
    long mq_msgsize; /* 最大消息大小 */
    long mq_curmsgs; /* 当前消息数 */
};

struct sigevent (用于通知)

struct sigevent {
    int          sigev_notify;            /* 通知类型 */
    int          sigev_signo;             /* 信号编号 */
    union sigval sigev_value;             /* 传递给处理函数的数据 */
    void       (*sigev_notify_function)(union sigval);  /* 线程函数 */
    pthread_attr_t *sigev_notify_attributes;           /* 线程属性 */
};

5. 消息队列名称

  • 名称必须以 ‘/’ 开头
  • 长度限制为 NAME_MAX (通常 255 字符)
  • 示例:“/my_queue”, “/app/messages”

6. 打开标志 (oflag)

标志说明
O_RDONLY只读打开
O_WRONLY只写打开
O_RDWR读写打开
O_CREAT不存在时创建
O_EXCL与 O_CREAT 一起使用,如果存在则失败
O_NONBLOCK非阻塞模式

7. 返回值

  • mq_open: 成功返回消息队列描述符,失败返回 (mqd_t)-1
  • 其他函数: 成功返回 0,失败返回 -1

8. 相关函数

  • pthread: 多线程支持
  • signal: 信号处理
  • fcntl: 文件控制
  • unlink: 删除文件

9. 示例代码

示例1:基础用法 – 简单的消息发送和接收

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>

#define QUEUE_NAME "/example_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10

int main() {
    mqd_t mq;
    struct mq_attr attr;
    char send_buffer[MAX_MSG_SIZE];
    char recv_buffer[MAX_MSG_SIZE];
    ssize_t bytes_read;
    unsigned int priority;
    
    printf("=== POSIX 消息队列基础示例 ===\n\n");
    
    // 设置消息队列属性
    attr.mq_flags = 0;
    attr.mq_maxmsg = MAX_MSGS;
    attr.mq_msgsize = MAX_MSG_SIZE;
    attr.mq_curmsgs = 0;
    
    // 创建并打开消息队列
    printf("创建消息队列: %s\n", QUEUE_NAME);
    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(1);
    }
    printf("✓ 消息队列创建成功\n\n");
    
    // 获取并显示队列属性
    printf("消息队列属性:\n");
    if (mq_getattr(mq, &attr) == 0) {
        printf("  最大消息数: %ld\n", attr.mq_maxmsg);
        printf("  最大消息大小: %ld 字节\n", attr.mq_msgsize);
        printf("  当前消息数: %ld\n", attr.mq_curmsgs);
        printf("  标志: %ld\n", attr.mq_flags);
    }
    printf("\n");
    
    // 发送消息
    printf("发送消息:\n");
    const char* messages[] = {
        "第一条消息: Hello, World!",
        "第二条消息: 欢迎使用 POSIX 消息队列",
        "第三条消息: 这是优先级消息",
        "第四条消息: 最后一条测试消息"
    };
    int priorities[] = {0, 0, 10, 0};  // 优先级 (数值越大优先级越高)
    
    for (int i = 0; i < 4; i++) {
        if (mq_send(mq, messages[i], strlen(messages[i]) + 1, priorities[i]) == 0) {
            printf("  ✓ 发送消息 %d (优先级 %d): %s\n", i + 1, priorities[i], messages[i]);
        } else {
            perror("  ✗ mq_send 失败");
        }
    }
    
    // 显示发送后队列状态
    if (mq_getattr(mq, &attr) == 0) {
        printf("\n发送后队列状态: %ld 条消息\n", attr.mq_curmsgs);
    }
    
    // 接收消息
    printf("\n接收消息 (按优先级顺序):\n");
    for (int i = 0; i < 4; i++) {
        bytes_read = mq_receive(mq, recv_buffer, MAX_MSG_SIZE, &priority);
        if (bytes_read != -1) {
            printf("  ✓ 接收消息 %d (优先级 %d, 长度 %zd): %s\n", 
                   i + 1, priority, bytes_read, recv_buffer);
        } else {
            if (errno == EAGAIN) {
                printf("  ⚠ 队列为空\n");
                break;
            } else {
                perror("  ✗ mq_receive 失败");
                break;
            }
        }
    }
    
    // 显示接收后队列状态
    if (mq_getattr(mq, &attr) == 0) {
        printf("\n接收后队列状态: %ld 条消息\n", attr.mq_curmsgs);
    }
    
    // 关闭消息队列
    if (mq_close(mq) == 0) {
        printf("✓ 消息队列关闭成功\n");
    } else {
        perror("✗ mq_close 失败");
    }
    
    // 删除消息队列
    if (mq_unlink(QUEUE_NAME) == 0) {
        printf("✓ 消息队列删除成功\n");
    } else {
        perror("✗ mq_unlink 失败");
    }
    
    printf("\n=== 消息队列特点 ===\n");
    printf("1. 支持优先级消息 (数值越大优先级越高)\n");
    printf("2. 消息大小可配置\n");
    printf("3. 消息数量有限制\n");
    printf("4. 支持持久化 (直到显式删除)\n");
    printf("5. 可通过文件系统路径访问\n");
    
    return 0;
}

示例2:生产者-消费者模型

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>

#define QUEUE_NAME "/producer_consumer_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 20
#define NUM_MESSAGES 10

// 全局变量
mqd_t mq;
int producer_count = 0;
int consumer_count = 0;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;

// 生产者线程函数
void* producer_thread(void* arg) {
    int producer_id = *(int*)arg;
    char message[MAX_MSG_SIZE];
    time_t now;
    
    printf("生产者 %d 启动\n", producer_id);
    
    for (int i = 0; i < NUM_MESSAGES; i++) {
        // 构造消息
        time(&now);
        snprintf(message, sizeof(message), 
                "P%d-MSG%d-TIME:%s", producer_id, i + 1, ctime(&now));
        
        // 发送消息 (交替使用不同优先级)
        unsigned int priority = (i % 3 == 0) ? 5 : 1;  // 每第3条高优先级
        
        if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
            pthread_mutex_lock(&count_mutex);
            producer_count++;
            pthread_mutex_unlock(&count_mutex);
            
            printf("生产者 %d 发送消息: %s (优先级 %u)\n", 
                   producer_id, message, priority);
        } else {
            perror("生产者发送失败");
        }
        
        // 随机延迟
        usleep((rand() % 100 + 1) * 1000);  // 1-100ms
    }
    
    printf("生产者 %d 完成\n", producer_id);
    return NULL;
}

// 消费者线程函数
void* consumer_thread(void* arg) {
    int consumer_id = *(int*)arg;
    char message[MAX_MSG_SIZE];
    ssize_t bytes_read;
    unsigned int priority;
    
    printf("消费者 %d 启动\n", consumer_id);
    
    while (1) {
        // 接收消息
        bytes_read = mq_receive(mq, message, MAX_MSG_SIZE, &priority);
        if (bytes_read != -1) {
            pthread_mutex_lock(&count_mutex);
            consumer_count++;
            int current_count = consumer_count;
            pthread_mutex_unlock(&count_mutex);
            
            printf("消费者 %d 接收消息 %d (优先级 %u): %s", 
                   consumer_id, current_count, priority, message);
            
            // 检查是否接收完所有消息
            if (current_count >= NUM_MESSAGES * 2) {  // 2个生产者
                break;
            }
        } else {
            if (errno == EAGAIN) {
                // 非阻塞模式下队列为空
                usleep(10000);  // 10ms
                continue;
            } else {
                perror("消费者接收失败");
                break;
            }
        }
        
        // 随机延迟
        usleep((rand() % 50 + 1) * 1000);  // 1-50ms
    }
    
    printf("消费者 %d 完成\n", consumer_id);
    return NULL;
}

int main() {
    pthread_t producers[2];
    pthread_t consumers[3];
    int producer_ids[2] = {1, 2};
    int consumer_ids[3] = {1, 2, 3};
    struct mq_attr attr;
    
    printf("=== 生产者-消费者消息队列示例 ===\n\n");
    
    // 初始化随机数种子
    srand(time(NULL) + getpid());
    
    // 设置消息队列属性
    attr.mq_flags = 0;  // 阻塞模式
    attr.mq_maxmsg = MAX_MSGS;
    attr.mq_msgsize = MAX_MSG_SIZE;
    attr.mq_curmsgs = 0;
    
    // 创建消息队列
    printf("创建消息队列: %s\n", QUEUE_NAME);
    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(1);
    }
    printf("✓ 消息队列创建成功\n\n");
    
    // 创建生产者线程
    printf("创建生产者线程...\n");
    for (int i = 0; i < 2; i++) {
        if (pthread_create(&producers[i], NULL, producer_thread, &producer_ids[i]) != 0) {
            perror("创建生产者线程失败");
            exit(1);
        }
    }
    
    // 创建消费者线程
    printf("创建消费者线程...\n");
    for (int i = 0; i < 3; i++) {
        if (pthread_create(&consumers[i], NULL, consumer_thread, &consumer_ids[i]) != 0) {
            perror("创建消费者线程失败");
            exit(1);
        }
    }
    
    // 等待生产者完成
    printf("等待生产者完成...\n");
    for (int i = 0; i < 2; i++) {
        pthread_join(producers[i], NULL);
    }
    
    // 等待消费者完成
    printf("等待消费者完成...\n");
    for (int i = 0; i < 3; i++) {
        pthread_join(consumers[i], NULL);
    }
    
    // 显示统计信息
    printf("\n=== 统计信息 ===\n");
    printf("生产消息数: %d\n", producer_count);
    printf("消费消息数: %d\n", consumer_count);
    
    // 显示最终队列状态
    if (mq_getattr(mq, &attr) == 0) {
        printf("队列中剩余消息: %ld\n", attr.mq_curmsgs);
    }
    
    // 清理资源
    if (mq_close(mq) == 0) {
        printf("✓ 消息队列关闭成功\n");
    }
    if (mq_unlink(QUEUE_NAME) == 0) {
        printf("✓ 消息队列删除成功\n");
    }
    
    printf("\n=== 生产者-消费者模型特点 ===\n");
    printf("1. 解耦: 生产者和消费者独立运行\n");
    printf("2. 异步: 生产和消费可以不同步进行\n");
    printf("3. 缓冲: 消息队列提供缓冲作用\n");
    printf("4. 负载均衡: 多个消费者可以并行处理\n");
    printf("5. 可靠性: 消息持久化存储\n");
    
    return 0;
}

示例3:完整的消息队列管理系统

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <getopt.h>
#include <signal.h>
#include <time.h>

// 配置结构体
struct mq_config {
    char *queue_name;
    int max_messages;
    int max_message_size;
    int create_queue;
    int delete_queue;
    int show_info;
    int send_message;
    int receive_message;
    int list_queues;
    int priority;
    char *message_content;
    int non_blocking;
    int verbose;
};

// 全局变量
volatile sig_atomic_t running = 1;

// 信号处理函数
void signal_handler(int sig) {
    printf("\n收到信号 %d,准备退出...\n", sig);
    running = 0;
}

// 设置信号处理
void setup_signal_handlers() {
    struct sigaction sa;
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    
    sigaction(SIGINT, &sa, NULL);   // Ctrl+C
    sigaction(SIGTERM, &sa, NULL);  // 终止信号
}

// 显示消息队列信息
void show_queue_info(mqd_t mq) {
    struct mq_attr attr;
    
    if (mq_getattr(mq, &attr) == 0) {
        printf("消息队列属性:\n");
        printf("  最大消息数: %ld\n", attr.mq_maxmsg);
        printf("  最大消息大小: %ld 字节\n", attr.mq_msgsize);
        printf("  当前消息数: %ld\n", attr.mq_curmsgs);
        printf("  标志: %s\n", (attr.mq_flags & O_NONBLOCK) ? "非阻塞" : "阻塞");
    } else {
        perror("获取队列属性失败");
    }
}

// 列出所有消息队列
void list_all_queues() {
    printf("=== 系统消息队列列表 ===\n");
    printf("注意: POSIX 消息队列通常在 /dev/mqueue/ 目录下\n");
    
    // 尝试列出 /dev/mqueue/ 目录
    if (access("/dev/mqueue", F_OK) == 0) {
        printf("系统消息队列目录存在\n");
        system("ls -la /dev/mqueue/ 2>/dev/null || echo '无法访问 /dev/mqueue/'");
    } else {
        printf("系统消息队列目录不存在\n");
    }
    printf("\n");
}

// 发送消息
int send_message_to_queue(mqd_t mq, const char *message, int priority, int non_blocking) {
    struct mq_attr attr;
    
    // 检查消息大小
    if (mq_getattr(mq, &attr) == 0) {
        if (strlen(message) + 1 > (size_t)attr.mq_msgsize) {
            fprintf(stderr, "错误: 消息大小 (%zu) 超过队列限制 (%ld)\n", 
                    strlen(message) + 1, attr.mq_msgsize);
            return -1;
        }
    }
    
    // 发送消息
    if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
        printf("✓ 消息发送成功 (优先级 %d): %s\n", priority, message);
        return 0;
    } else {
        if (errno == EAGAIN && non_blocking) {
            printf("⚠ 队列已满,非阻塞模式下发送失败\n");
        } else {
            perror("✗ 消息发送失败");
        }
        return -1;
    }
}

// 接收消息
int receive_message_from_queue(mqd_t mq, int non_blocking) {
    char *buffer;
    struct mq_attr attr;
    ssize_t bytes_read;
    unsigned int priority;
    
    // 获取队列属性以确定缓冲区大小
    if (mq_getattr(mq, &attr) != 0) {
        perror("获取队列属性失败");
        return -1;
    }
    
    buffer = malloc(attr.mq_msgsize);
    if (!buffer) {
        perror("内存分配失败");
        return -1;
    }
    
    // 接收消息
    bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);
    if (bytes_read != -1) {
        printf("✓ 消息接收成功 (优先级 %u, 长度 %zd): %s", 
               priority, bytes_read, buffer);
        free(buffer);
        return 0;
    } else {
        if (errno == EAGAIN && non_blocking) {
            printf("⚠ 队列为空,非阻塞模式下接收失败\n");
        } else {
            perror("✗ 消息接收失败");
        }
        free(buffer);
        return -1;
    }
}

// 创建消息队列
mqd_t create_message_queue(const char *name, int max_msgs, int max_size, int non_blocking) {
    struct mq_attr attr;
    int flags = O_CREAT | O_RDWR;
    
    if (non_blocking) {
        flags |= O_NONBLOCK;
    }
    
    attr.mq_flags = non_blocking ? O_NONBLOCK : 0;
    attr.mq_maxmsg = max_msgs;
    attr.mq_msgsize = max_size;
    attr.mq_curmsgs = 0;
    
    mqd_t mq = mq_open(name, flags, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return (mqd_t)-1;
    }
    
    printf("✓ 消息队列创建成功: %s\n", name);
    return mq;
}

// 打开现有消息队列
mqd_t open_existing_queue(const char *name, int non_blocking) {
    int flags = O_RDWR;
    
    if (non_blocking) {
        flags |= O_NONBLOCK;
    }
    
    mqd_t mq = mq_open(name, flags);
    if (mq == (mqd_t)-1) {
        perror("打开消息队列失败");
        return (mqd_t)-1;
    }
    
    printf("✓ 消息队列打开成功: %s\n", name);
    return mq;
}

// 显示帮助信息
void show_help(const char *program_name) {
    printf("用法: %s [选项]\n", program_name);
    printf("\n选项:\n");
    printf("  -n, --name=NAME        消息队列名称 (以 / 开头)\n");
    printf("  -c, --create           创建消息队列\n");
    printf("  -d, --delete           删除消息队列\n");
    printf("  -i, --info             显示队列信息\n");
    printf("  -l, --list             列出所有队列\n");
    printf("  -s, --send=MESSAGE     发送消息\n");
    printf("  -r, --receive          接收消息\n");
    printf("  -p, --priority=NUM     消息优先级 (默认 0)\n");
    printf("  -m, --max-msgs=NUM     最大消息数 (创建时使用)\n");
    printf("  -z, --max-size=NUM     最大消息大小 (创建时使用)\n");
    printf("  -b, --non-blocking     非阻塞模式\n");
    printf("  -v, --verbose          详细输出\n");
    printf("  -h, --help             显示此帮助信息\n");
    printf("\n示例:\n");
    printf("  %s -n /myqueue -c -m 10 -z 256     # 创建队列\n", program_name);
    printf("  %s -n /myqueue -s \"Hello World\"   # 发送消息\n", program_name);
    printf("  %s -n /myqueue -r                  # 接收消息\n", program_name);
    printf("  %s -n /myqueue -i                  # 显示队列信息\n", program_name);
    printf("  %s -n /myqueue -d                  # 删除队列\n", program_name);
    printf("  %s -l                              # 列出所有队列\n", program_name);
}

int main(int argc, char *argv[]) {
    struct mq_config config = {
        .queue_name = NULL,
        .max_messages = 10,
        .max_message_size = 256,
        .create_queue = 0,
        .delete_queue = 0,
        .show_info = 0,
        .send_message = 0,
        .receive_message = 0,
        .list_queues = 0,
        .priority = 0,
        .message_content = NULL,
        .non_blocking = 0,
        .verbose = 0
    };
    
    printf("=== POSIX 消息队列管理系统 ===\n\n");
    
    // 解析命令行参数
    static struct option long_options[] = {
        {"name",        required_argument, 0, 'n'},
        {"create",      no_argument,       0, 'c'},
        {"delete",      no_argument,       0, 'd'},
        {"info",        no_argument,       0, 'i'},
        {"list",        no_argument,       0, 'l'},
        {"send",        required_argument, 0, 's'},
        {"receive",     no_argument,       0, 'r'},
        {"priority",    required_argument, 0, 'p'},
        {"max-msgs",    required_argument, 0, 'm'},
        {"max-size",    required_argument, 0, 'z'},
        {"non-blocking", no_argument,      0, 'b'},
        {"verbose",     no_argument,       0, 'v'},
        {"help",        no_argument,       0, 'h'},
        {0, 0, 0, 0}
    };
    
    int opt;
    while ((opt = getopt_long(argc, argv, "n:cdils:rp:m:z:bvh", long_options, NULL)) != -1) {
        switch (opt) {
            case 'n':
                config.queue_name = optarg;
                break;
            case 'c':
                config.create_queue = 1;
                break;
            case 'd':
                config.delete_queue = 1;
                break;
            case 'i':
                config.show_info = 1;
                break;
            case 'l':
                config.list_queues = 1;
                break;
            case 's':
                config.send_message = 1;
                config.message_content = optarg;
                break;
            case 'r':
                config.receive_message = 1;
                break;
            case 'p':
                config.priority = atoi(optarg);
                break;
            case 'm':
                config.max_messages = atoi(optarg);
                break;
            case 'z':
                config.max_message_size = atoi(optarg);
                break;
            case 'b':
                config.non_blocking = 1;
                break;
            case 'v':
                config.verbose = 1;
                break;
            case 'h':
                show_help(argv[0]);
                return 0;
            default:
                fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
                return 1;
        }
    }
    
    // 设置信号处理
    setup_signal_handlers();
    
    // 显示系统信息
    if (config.verbose) {
        printf("系统信息:\n");
        printf("  当前用户 UID: %d\n", getuid());
        printf("  当前进程 PID: %d\n", getpid());
        printf("  消息队列支持: ");
        system("ls /dev/mqueue/ >/dev/null 2>&1 && echo '是' || echo '否'");
        printf("\n");
    }
    
    // 列出所有队列
    if (config.list_queues) {
        list_all_queues();
        if (!config.queue_name && !config.create_queue && !config.delete_queue &&
            !config.show_info && !config.send_message && !config.receive_message) {
            return 0;
        }
    }
    
    // 如果没有指定队列名称且需要操作队列
    if (!config.queue_name && (config.create_queue || config.delete_queue || 
                              config.show_info || config.send_message || 
                              config.receive_message)) {
        fprintf(stderr, "错误: 需要指定消息队列名称\n");
        fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
        return 1;
    }
    
    // 处理队列操作
    mqd_t mq = (mqd_t)-1;
    
    if (config.create_queue) {
        mq = create_message_queue(config.queue_name, config.max_messages, 
                                 config.max_message_size, config.non_blocking);
        if (mq == (mqd_t)-1) {
            return 1;
        }
        
        if (config.show_info) {
            show_queue_info(mq);
        }
    } else if (config.queue_name) {
        // 打开现有队列
        mq = open_existing_queue(config.queue_name, config.non_blocking);
        if (mq == (mqd_t)-1) {
            return 1;
        }
    }
    
    // 显示队列信息
    if (config.show_info && mq != (mqd_t)-1) {
        show_queue_info(mq);
    }
    
    // 发送消息
    if (config.send_message && config.message_content && mq != (mqd_t)-1) {
        send_message_to_queue(mq, config.message_content, 
                             config.priority, config.non_blocking);
    }
    
    // 接收消息
    if (config.receive_message && mq != (mqd_t)-1) {
        if (config.non_blocking) {
            receive_message_from_queue(mq, config.non_blocking);
        } else {
            printf("等待接收消息 (按 Ctrl+C 退出)...\n");
            while (running) {
                if (receive_message_from_queue(mq, config.non_blocking) == -1) {
                    if (errno != EAGAIN) {
                        break;
                    }
                }
                if (!config.non_blocking) {
                    sleep(1);  // 阻塞模式下定期检查
                }
            }
        }
    }
    
    // 删除队列
    if (config.delete_queue && config.queue_name) {
        if (mq_unlink(config.queue_name) == 0) {
            printf("✓ 消息队列删除成功: %s\n", config.queue_name);
        } else {
            perror("✗ 消息队列删除失败");
        }
    }
    
    // 关闭队列
    if (mq != (mqd_t)-1) {
        if (mq_close(mq) == 0) {
            if (config.verbose) {
                printf("✓ 消息队列关闭成功\n");
            }
        } else {
            perror("✗ 消息队列关闭失败");
        }
    }
    
    // 显示使用建议
    printf("\n=== POSIX 消息队列使用建议 ===\n");
    printf("适用场景:\n");
    printf("1. 进程间通信 (IPC)\n");
    printf("2. 生产者-消费者模式\n");
    printf("3. 异步消息处理\n");
    printf("4. 系统服务通信\n");
    printf("5. 微服务架构\n");
    printf("\n");
    printf("优势:\n");
    printf("1. 可靠性: 消息持久化存储\n");
    printf("2. 优先级: 支持消息优先级\n");
    printf("3. 可移植: POSIX 标准\n");
    printf("4. 灵活性: 支持阻塞和非阻塞模式\n");
    printf("5. 安全性: 通过文件系统权限控制\n");
    printf("\n");
    printf("注意事项:\n");
    printf("1. 需要链接实时库: -lrt\n");
    printf("2. 队列名称必须以 / 开头\n");
    printf("3. 消息大小和数量有限制\n");
    printf("4. 需要适当权限才能创建/删除队列\n");
    printf("5. 应该及时关闭和清理队列资源\n");
    
    return 0;
}

编译和运行说明

# 编译示例程序(需要链接实时库)
gcc -o mq_example1 example1.c -lrt
gcc -o mq_example2 example2.c -lrt -lpthread
gcc -o mq_example3 example3.c -lrt -lpthread

# 运行示例
./mq_example1
./mq_example2
./mq_example3 --help

# 基本操作示例
./mq_example3 -n /test_queue -c -m 5 -z 128
./mq_example3 -n /test_queue -s "Hello, Message Queue!"
./mq_example3 -n /test_queue -r
./mq_example3 -n /test_queue -i
./mq_example3 -n /test_queue -d

# 列出所有队列
./mq_example3 -l

系统要求检查

# 检查系统支持
ls /dev/mqueue/ 2>/dev/null || echo "消息队列目录不存在"

# 检查内核配置
grep -i mq /boot/config-$(uname -r)

# 检查库支持
ldd --version

# 查看系统限制
ulimit -a | grep -i msg
cat /proc/sys/fs/mqueue/

重要注意事项

  1. 编译要求: 需要链接实时库 -lrt
  2. 权限要求: 创建/删除队列通常需要适当权限
  3. 名称规范: 队列名称必须以 ‘/’ 开头
  4. 资源限制: 受系统消息队列限制约束
  5. 清理责任: 应该及时关闭和删除队列
  6. 线程安全: 消息队列描述符在多线程间共享是安全的

实际应用场景

  1. 微服务通信: 服务间异步消息传递
  2. 日志系统: 异步日志记录
  3. 任务队列: 后台任务处理
  4. 事件驱动: 事件通知和处理
  5. 数据流: 实时数据处理管道
  6. 系统监控: 状态变更通知

最佳实践

// 安全的消息队列操作函数
mqd_t safe_mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) {
    mqd_t mq = mq_open(name, oflag, mode, attr);
    if (mq == (mqd_t)-1) {
        switch (errno) {
            case EACCES:
                fprintf(stderr, "权限不足访问队列: %s\n", name);
                break;
            case EEXIST:
                fprintf(stderr, "队列已存在: %s\n", name);
                break;
            case ENOENT:
                fprintf(stderr, "队列不存在: %s\n", name);
                break;
            case EINVAL:
                fprintf(stderr, "无效的队列名称或参数: %s\n", name);
                break;
            default:
                perror("mq_open 失败");
                break;
        }
    }
    return mq;
}

// 可靠的消息发送函数
int reliable_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 
                     unsigned msg_prio, int timeout_seconds) {
    struct timespec timeout;
    int result;
    
    if (timeout_seconds > 0) {
        clock_gettime(CLOCK_REALTIME, &timeout);
        timeout.tv_sec += timeout_seconds;
        result = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &timeout);
    } else {
        result = mq_send(mqdes, msg_ptr, msg_len, msg_prio);
    }
    
    return result;
}

// 带重试的消息接收函数
ssize_t retry_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 
                         unsigned *msg_prio, int max_retries) {
    ssize_t result;
    int retries = 0;
    
    while (retries < max_retries) {
        result = mq_receive(mqdes, msg_ptr, msg_len, msg_prio);
        if (result != -1) {
            return result;  // 成功接收
        }
        
        if (errno == EAGAIN) {
            retries++;
            usleep(100000);  // 100ms 延迟后重试
        } else {
            break;  // 其他错误,不再重试
        }
    }
    
    return result;
}

这些示例展示了 POSIX 消息队列的各种使用方法,从基础的消息发送接收到完整的管理系统,帮助你全面掌握 Linux 系统中的消息队列机制。

发表在 linux文章 | 留下评论

io_uring 系统调用及示例

io_uring 系统调用详解

1. 函数介绍

io_uring 是Linux 5.1引入的高性能异步I/O框架,提供了一种现代化的异步I/O接口。相比传统的AIO(异步I/O),io_uring具有更好的性能、更低的系统调用开销和更丰富的功能。它使用共享内存环形缓冲区实现用户空间和内核空间的高效通信。

2. 函数原型

#include <liburing.h>
#include <linux/io_uring.h>

// 初始化io_uring实例
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);

// 销毁io_uring实例
int io_uring_queue_exit(struct io_uring *ring);

// 提交I/O请求
int io_uring_submit(struct io_uring *ring);

// 等待I/O完成事件
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);

// 标记完成事件已处理
void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe);

// 准备I/O操作
void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, __u64 offset);
void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, __u64 offset);
void io_uring_prep_openat(struct io_uring_sqe *sqe, int dfd, const char *path, int flags, mode_t mode);
void io_uring_prep_close(struct io_uring_sqe *sqe, int fd);
void io_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, unsigned fsync_flags);

3. 功能

io_uring 提供了完整的异步I/O解决方案,支持文件I/O、网络I/O、文件操作等多种操作类型。它通过共享内存环形缓冲区实现零拷贝的数据传输,显著提高了I/O性能。

4. 参数说明

io_uring_queue_init参数:

  • unsigned entries: 环形缓冲区大小(必须是2的幂)
  • *struct io_uring ring: io_uring实例指针
  • unsigned flags: 初始化标志

io_uring_queue_exit参数:

  • *struct io_uring ring: 要销毁的io_uring实例

io_uring_submit参数:

  • *struct io_uring ring: io_uring实例

io_uring_wait_cqe参数:

  • *struct io_uring ring: io_uring实例
  • **struct io_uring_cqe cqe_ptr: 完成事件指针

5. 返回值

  • 成功: 返回非负值或0
  • 失败: 返回负的错误码

6. 相似函数,或关联函数

  • io_setup/io_destroy: 传统的AIO接口
  • io_submit/io_cancel: 传统的AIO提交和取消
  • io_getevents: 传统的AIO事件获取
  • aio_read/aio_write: POSIX AIO接口
  • epoll_wait/poll: 传统的I/O多路复用

7. 示例代码

示例1:基础io_uring使用

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>

/**
 * 演示基础io_uring使用方法
 */
int demo_io_uring_basic() {
    struct io_uring ring;
    int ret;
    
    printf("=== 基础io_uring使用示例 ===\n");
    
    // 初始化io_uring实例
    printf("1. 初始化io_uring实例:\n");
    ret = io_uring_queue_init(32, &ring, 0);
    if (ret < 0) {
        printf("  初始化失败: %s\n", strerror(-ret));
        return -1;
    }
    printf("  ✓ io_uring实例初始化成功\n");
    printf("  环形缓冲区大小: 32\n");
    
    // 显示io_uring信息
    printf("  io_uring信息:\n");
    printf("    提交队列大小: %u\n", ring.sq.ring_sz);
    printf("    完成队列大小: %u\n", ring.cq.ring_sz);
    printf("    特性标志: 0x%x\n", ring.features);
    
    // 创建测试文件
    printf("\n2. 创建测试文件:\n");
    const char *test_filename = "io_uring_test.txt";
    int test_fd = open(test_filename, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    if (test_fd == -1) {
        perror("  创建测试文件失败");
        io_uring_queue_exit(&ring);
        return -1;
    }
    printf("  ✓ 测试文件创建成功: %s\n", test_filename);
    
    // 准备测试数据
    const char *test_data = "Hello from io_uring! This is a test message.\n";
    size_t data_size = strlen(test_data);
    
    // 使用io_uring写入数据
    printf("\n3. 使用io_uring写入数据:\n");
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        printf("  获取SQE失败\n");
        close(test_fd);
        unlink(test_filename);
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    // 准备写入操作
    io_uring_prep_write(sqe, test_fd, test_data, data_size, 0);
    sqe->user_data = 1;  // 设置用户数据
    
    printf("  准备写入操作:\n");
    printf("    文件描述符: %d\n", test_fd);
    printf("    数据大小: %zu 字节\n", data_size);
    printf("    偏移量: 0\n");
    
    // 提交操作
    ret = io_uring_submit(&ring);
    if (ret <= 0) {
        printf("  提交操作失败: %s\n", strerror(-ret));
        close(test_fd);
        unlink(test_filename);
        io_uring_queue_exit(&ring);
        return -1;
    }
    printf("  ✓ 操作提交成功,提交了 %d 个请求\n", ret);
    
    // 等待完成
    printf("\n4. 等待I/O操作完成:\n");
    struct io_uring_cqe *cqe;
    ret = io_uring_wait_cqe(&ring, &cqe);
    if (ret < 0) {
        printf("  等待完成事件失败: %s\n", strerror(-ret));
        close(test_fd);
        unlink(test_filename);
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    printf("  ✓ I/O操作完成\n");
    printf("    用户数据: %llu\n", (unsigned long long)cqe->user_data);
    printf("    结果: %d 字节\n", cqe->res);
    printf("    标志: 0x%x\n", cqe->flags);
    
    // 标记完成事件已处理
    io_uring_cqe_seen(&ring, cqe);
    
    // 读取写入的数据验证
    printf("\n5. 验证写入结果:\n");
    close(test_fd);
    
    test_fd = open(test_filename, O_RDONLY);
    if (test_fd != -1) {
        char read_buffer[256];
        ssize_t bytes_read = read(test_fd, read_buffer, sizeof(read_buffer) - 1);
        if (bytes_read > 0) {
            read_buffer[bytes_read] = '\0';
            printf("  读取到的数据 (%zd 字节):\n", bytes_read);
            printf("  %s", read_buffer);
        }
        close(test_fd);
    }
    
    // 清理资源
    printf("\n6. 清理资源:\n");
    unlink(test_filename);
    io_uring_queue_exit(&ring);
    printf("  ✓ 资源清理完成\n");
    
    return 0;
}

int main() {
    return demo_io_uring_basic();
}

示例2:批量I/O操作

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>

/**
 * 批量I/O操作演示
 */
int demo_batch_io_operations() {
    struct io_uring ring;
    const int batch_size = 8;
    const int file_count = 5;
    int ret;
    
    printf("=== 批量I/O操作演示 ===\n");
    
    // 初始化io_uring
    printf("1. 初始化io_uring:\n");
    ret = io_uring_queue_init(64, &ring, 0);
    if (ret < 0) {
        printf("  初始化失败: %s\n", strerror(-ret));
        return -1;
    }
    printf("  ✓ io_uring初始化成功\n");
    
    // 创建测试文件
    printf("\n2. 创建测试文件:\n");
    int file_fds[file_count];
    char filenames[file_count][32];
    
    for (int i = 0; i < file_count; i++) {
        snprintf(filenames[i], sizeof(filenames[i]), "batch_test_%d.txt", i);
        file_fds[i] = open(filenames[i], O_CREAT | O_WRONLY | O_TRUNC, 0644);
        if (file_fds[i] == -1) {
            perror("  创建文件失败");
            // 清理已创建的文件
            for (int j = 0; j < i; j++) {
                close(file_fds[j]);
                unlink(filenames[j]);
            }
            io_uring_queue_exit(&ring);
            return -1;
        }
        printf("  创建文件 %d: %s\n", i, filenames[i]);
    }
    
    // 准备批量写入操作
    printf("\n3. 准备批量写入操作:\n");
    char *test_data[file_count];
    int submitted_ops = 0;
    
    for (int i = 0; i < file_count; i++) {
        // 分配测试数据
        test_data[i] = malloc(256);
        if (!test_data[i]) {
            perror("  分配测试数据失败");
            // 清理资源
            for (int j = 0; j <= i; j++) {
                if (test_data[j]) free(test_data[j]);
                if (j < file_count) {
                    close(file_fds[j]);
                    unlink(filenames[j]);
                }
            }
            io_uring_queue_exit(&ring);
            return -1;
        }
        
        snprintf(test_data[i], 256, 
                "Batch write test data for file %d. Operation count: %d\n", i, i + 1);
        
        // 准备多个写入操作
        for (int j = 0; j < batch_size && submitted_ops < 32; j++) {
            struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
            if (!sqe) {
                printf("  获取SQE失败\n");
                break;
            }
            
            // 准备写入操作
            io_uring_prep_write(sqe, file_fds[i], test_data[i], strlen(test_data[i]), 
                              j * 256);
            sqe->user_data = (i * batch_size + j);  // 唯一标识符
            
            submitted_ops++;
        }
        
        printf("  为文件 %d 准备了 %d 个写入操作\n", i, batch_size);
    }
    
    printf("\n4. 批量提交I/O操作:\n");
    printf("  总共准备了 %d 个I/O操作\n", submitted_ops);
    
    ret = io_uring_submit(&ring);
    if (ret <= 0) {
        printf("  提交操作失败: %s\n", strerror(-ret));
    } else {
        printf("  ✓ 成功提交 %d 个I/O操作\n", ret);
    }
    
    // 等待所有操作完成
    printf("\n5. 等待所有操作完成:\n");
    int completed_ops = 0;
    
    while (completed_ops < submitted_ops) {
        struct io_uring_cqe *cqe;
        ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            printf("  等待完成事件失败: %s\n", strerror(-ret));
            break;
        }
        
        completed_ops++;
        printf("  操作 %d 完成: 写入 %d 字节\n", 
               (int)cqe->user_data, cqe->res);
        
        // 标记完成事件已处理
        io_uring_cqe_seen(&ring, cqe);
    }
    
    printf("  总共完成了 %d 个I/O操作\n", completed_ops);
    
    // 验证写入结果
    printf("\n6. 验证写入结果:\n");
    for (int i = 0; i < file_count; i++) {
        close(file_fds[i]);
        
        // 重新打开文件读取
        int read_fd = open(filenames[i], O_RDONLY);
        if (read_fd != -1) {
            struct stat st;
            if (fstat(read_fd, &st) == 0) {
                printf("  文件 %s: 大小 %ld 字节\n", filenames[i], st.st_size);
            }
            close(read_fd);
        }
    }
    
    // 清理资源
    printf("\n7. 清理资源:\n");
    for (int i = 0; i < file_count; i++) {
        free(test_data[i]);
        unlink(filenames[i]);
    }
    
    io_uring_queue_exit(&ring);
    printf("  ✓ 所有资源清理完成\n");
    
    return 0;
}

int main() {
    return demo_batch_io_operations();
}

示例3:文件操作演示

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>

/**
 * 文件操作结构
 */
typedef struct {
    int fd;
    char filename[64];
    off_t file_size;
    int operation_type;  // 0:read, 1:write, 2:open, 3:close, 4:fsync
} file_operation_t;

/**
 * 演示文件操作
 */
int demo_file_operations() {
    struct io_uring ring;
    int ret;
    
    printf("=== 文件操作演示 ===\n");
    
    // 初始化io_uring
    printf("1. 初始化io_uring:\n");
    ret = io_uring_queue_init(16, &ring, 0);
    if (ret < 0) {
        printf("  初始化失败: %s\n", strerror(-ret));
        return -1;
    }
    printf("  ✓ io_uring初始化成功\n");
    
    // 演示异步文件打开
    printf("\n2. 异步文件打开操作:\n");
    const char *test_filename = "async_file_test.txt";
    
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        printf("  获取SQE失败\n");
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    // 准备打开文件操作
    io_uring_prep_openat(sqe, AT_FDCWD, test_filename, 
                        O_CREAT | O_RDWR | O_TRUNC, 0644);
    sqe->user_data = 1;
    
    printf("  准备打开文件: %s\n", test_filename);
    
    ret = io_uring_submit(&ring);
    if (ret <= 0) {
        printf("  提交打开操作失败: %s\n", strerror(-ret));
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    // 等待打开完成
    struct io_uring_cqe *cqe;
    ret = io_uring_wait_cqe(&ring, &cqe);
    if (ret < 0) {
        printf("  等待打开完成失败: %s\n", strerror(-ret));
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    int file_fd = cqe->res;
    printf("  ✓ 文件打开成功,文件描述符: %d\n", file_fd);
    io_uring_cqe_seen(&ring, cqe);
    
    // 演示异步写入操作
    printf("\n3. 异步写入操作:\n");
    const char *write_data = "This is asynchronous write test data.\nMultiple lines of test content.\nEnd of test data.\n";
    size_t write_size = strlen(write_data);
    
    sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        printf("  获取SQE失败\n");
        close(file_fd);
        unlink(test_filename);
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    io_uring_prep_write(sqe, file_fd, write_data, write_size, 0);
    sqe->user_data = 2;
    
    printf("  准备写入数据: %zu 字节\n", write_size);
    
    ret = io_uring_submit(&ring);
    if (ret <= 0) {
        printf("  提交写入操作失败: %s\n", strerror(-ret));
    } else {
        printf("  ✓ 写入操作提交成功\n");
    }
    
    // 等待写入完成
    ret = io_uring_wait_cqe(&ring, &cqe);
    if (ret < 0) {
        printf("  等待写入完成失败: %s\n", strerror(-ret));
    } else {
        printf("  ✓ 写入完成: %d 字节\n", cqe->res);
        io_uring_cqe_seen(&ring, cqe);
    }
    
    // 演示异步fsync操作
    printf("\n4. 异步fsync操作:\n");
    sqe = io_uring_get_sqe(&ring);
    if (sqe) {
        io_uring_prep_fsync(sqe, file_fd, 0);
        sqe->user_data = 3;
        
        printf("  准备fsync操作\n");
        
        ret = io_uring_submit(&ring);
        if (ret > 0) {
            ret = io_uring_wait_cqe(&ring, &cqe);
            if (ret == 0) {
                printf("  ✓ fsync完成\n");
                io_uring_cqe_seen(&ring, cqe);
            }
        }
    }
    
    // 演示异步读取操作
    printf("\n5. 异步读取操作:\n");
    char read_buffer[256];
    
    sqe = io_uring_get_sqe(&ring);
    if (sqe) {
        io_uring_prep_read(sqe, file_fd, read_buffer, sizeof(read_buffer) - 1, 0);
        sqe->user_data = 4;
        
        printf("  准备读取操作\n");
        
        ret = io_uring_submit(&ring);
        if (ret > 0) {
            ret = io_uring_wait_cqe(&ring, &cqe);
            if (ret == 0) {
                if (cqe->res > 0) {
                    read_buffer[cqe->res] = '\0';
                    printf("  ✓ 读取完成: %d 字节\n", cqe->res);
                    printf("  读取内容:\n%s", read_buffer);
                } else {
                    printf("  读取失败或文件为空\n");
                }
                io_uring_cqe_seen(&ring, cqe);
            }
        }
    }
    
    // 演示异步关闭操作
    printf("\n6. 异步关闭操作:\n");
    sqe = io_uring_get_sqe(&ring);
    if (sqe) {
        io_uring_prep_close(sqe, file_fd);
        sqe->user_data = 5;
        
        printf("  准备关闭文件\n");
        
        ret = io_uring_submit(&ring);
        if (ret > 0) {
            ret = io_uring_wait_cqe(&ring, &cqe);
            if (ret == 0) {
                printf("  ✓ 文件关闭完成\n");
                io_uring_cqe_seen(&ring, cqe);
            }
        }
    }
    
    // 清理资源
    printf("\n7. 清理资源:\n");
    unlink(test_filename);
    io_uring_queue_exit(&ring);
    printf("  ✓ 资源清理完成\n");
    
    return 0;
}

int main() {
    return demo_file_operations();
}

示例4:网络I/O演示

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

/**
 * 演示网络I/O操作
 */
int demo_network_io() {
    struct io_uring ring;
    int ret;
    
    printf("=== 网络I/O操作演示 ===\n");
    
    // 初始化io_uring
    printf("1. 初始化io_uring:\n");
    ret = io_uring_queue_init(32, &ring, 0);
    if (ret < 0) {
        printf("  初始化失败: %s\n", strerror(-ret));
        return -1;
    }
    printf("  ✓ io_uring初始化成功\n");
    
    // 演示异步socket创建
    printf("\n2. 异步socket创建:\n");
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        printf("  获取SQE失败\n");
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    // 注意:io_uring的网络I/O支持需要较新的内核版本
    printf("  注意:网络I/O操作需要Linux 5.5+内核支持\n");
    printf("  在本演示中,我们将展示准备网络操作的方法\n");
    
    // 演示网络操作准备(伪代码)
    printf("\n3. 网络操作准备示例:\n");
    printf("  // 创建TCP socket\n");
    printf("  sqe = io_uring_get_sqe(&ring);\n");
    printf("  io_uring_prep_socket(sqe, AF_INET, SOCK_STREAM, 0, 0);\n");
    printf("  sqe->user_data = 1;\n");
    printf("\n");
    
    printf("  // 连接服务器\n");
    printf("  struct sockaddr_in addr;\n");
    printf("  memset(&addr, 0, sizeof(addr));\n");
    printf("  addr.sin_family = AF_INET;\n");
    printf("  addr.sin_port = htons(80);\n");
    printf("  addr.sin_addr.s_addr = inet_addr(\"127.0.0.1\");\n");
    printf("  io_uring_prep_connect(sqe, sockfd, &addr, sizeof(addr));\n");
    printf("\n");
    
    printf("  // 发送数据\n");
    printf("  const char *data = \"GET / HTTP/1.1\\r\\n\\r\\n\";\n");
    printf("  io_uring_prep_send(sqe, sockfd, data, strlen(data), 0);\n");
    printf("\n");
    
    printf("  // 接收数据\n");
    printf("  char buffer[1024];\n");
    printf("  io_uring_prep_recv(sqe, sockfd, buffer, sizeof(buffer), 0);\n");
    
    // 显示网络I/O优势
    printf("\n=== 网络I/O优势 ===\n");
    printf("1. 高性能:\n");
    printf("   ✓ 零拷贝数据传输\n");
    printf("   ✓ 减少系统调用开销\n");
    printf("   ✓ 提高并发处理能力\n");
    
    printf("\n2. 低延迟:\n");
    printf("   ✓ 快速事件通知\n");
    printf("   ✓ 减少上下文切换\n");
    printf("   ✓ 优化内存访问模式\n");
    
    printf("\n3. 可扩展性:\n");
    printf("   ✓ 支持大量并发连接\n");
    printf("   ✓ 高效的事件处理\n");
    printf("   ✓ 灵活的缓冲区管理\n");
    
    // 清理资源
    printf("\n4. 清理资源:\n");
    io_uring_queue_exit(&ring);
    printf("  ✓ io_uring资源清理完成\n");
    
    return 0;
}

int main() {
    return demo_network_io();
}

示例5:性能对比测试

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>

/**
 * 性能测试结果结构
 */
typedef struct {
    const char *test_name;
    long long execution_time_us;
    int operation_count;
    double throughput_ops;
    double average_latency_us;
} performance_result_t;

/**
 * 获取当前时间(微秒)
 */
long long get_current_time_us() {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000000LL + tv.tv_usec;
}

/**
 * 传统同步I/O性能测试
 */
int test_sync_io_performance(performance_result_t *result) {
    const int operation_count = 1000;
    const size_t buffer_size = 4096;
    char *buffer = malloc(buffer_size);
    long long start_time, end_time;
    
    if (!buffer) {
        return -1;
    }
    
    printf("执行同步I/O性能测试...\n");
    
    // 创建测试文件
    const char *filename = "sync_test.dat";
    int fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    if (fd == -1) {
        free(buffer);
        return -1;
    }
    
    start_time = get_current_time_us();
    
    // 执行同步写入操作
    for (int i = 0; i < operation_count; i++) {
        // 填充测试数据
        for (size_t j = 0; j < buffer_size; j++) {
            buffer[j] = 'A' + (i + j) % 26;
        }
        
        ssize_t written = write(fd, buffer, buffer_size);
        if (written != (ssize_t)buffer_size) {
            printf("写入失败\n");
            close(fd);
            unlink(filename);
            free(buffer);
            return -1;
        }
    }
    
    end_time = get_current_time_us();
    
    close(fd);
    unlink(filename);
    free(buffer);
    
    result->execution_time_us = end_time - start_time;
    result->operation_count = operation_count;
    result->throughput_ops = (double)operation_count / (result->execution_time_us / 1000000.0);
    result->average_latency_us = (double)result->execution_time_us / operation_count;
    
    printf("同步I/O测试完成\n");
    return 0;
}

/**
 * io_uring异步I/O性能测试
 */
int test_io_uring_performance(performance_result_t *result) {
    struct io_uring ring;
    const int operation_count = 1000;
    const size_t buffer_size = 4096;
    char **buffers;
    long long start_time, end_time;
    int ret;
    
    printf("执行io_uring异步I/O性能测试...\n");
    
    // 初始化io_uring
    ret = io_uring_queue_init(256, &ring, 0);
    if (ret < 0) {
        printf("io_uring初始化失败: %s\n", strerror(-ret));
        return -1;
    }
    
    // 分配缓冲区
    buffers = malloc(operation_count * sizeof(char*));
    if (!buffers) {
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    for (int i = 0; i < operation_count; i++) {
        buffers[i] = malloc(buffer_size);
        if (!buffers[i]) {
            // 清理已分配的缓冲区
            for (int j = 0; j < i; j++) {
                free(buffers[j]);
            }
            free(buffers);
            io_uring_queue_exit(&ring);
            return -1;
        }
        
        // 填充测试数据
        for (size_t j = 0; j < buffer_size; j++) {
            buffers[i][j] = 'A' + (i + j) % 26;
        }
    }
    
    // 创建测试文件
    const char *filename = "async_test.dat";
    int fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    if (fd == -1) {
        perror("创建测试文件失败");
        // 清理缓冲区
        for (int i = 0; i < operation_count; i++) {
            free(buffers[i]);
        }
        free(buffers);
        io_uring_queue_exit(&ring);
        return -1;
    }
    
    start_time = get_current_time_us();
    
    // 提交异步写入操作
    int submitted = 0;
    for (int i = 0; i < operation_count; i++) {
        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        if (!sqe) {
            printf("获取SQE失败\n");
            break;
        }
        
        io_uring_prep_write(sqe, fd, buffers[i], buffer_size, i * buffer_size);
        sqe->user_data = i;
        
        submitted++;
        
        // 定期提交操作
        if (submitted % 32 == 0 || i == operation_count - 1) {
            ret = io_uring_submit(&ring);
            if (ret < 0) {
                printf("提交操作失败: %s\n", strerror(-ret));
                break;
            }
        }
    }
    
    printf("提交了 %d 个异步操作\n", submitted);
    
    // 等待所有操作完成
    int completed = 0;
    while (completed < submitted) {
        struct io_uring_cqe *cqe;
        ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            printf("等待完成事件失败: %s\n", strerror(-ret));
            break;
        }
        
        completed++;
        io_uring_cqe_seen(&ring, cqe);
    }
    
    end_time = get_current_time_us();
    
    printf("完成了 %d 个异步操作\n", completed);
    
    // 清理资源
    close(fd);
    unlink(filename);
    
    for (int i = 0; i < operation_count; i++) {
        free(buffers[i]);
    }
    free(buffers);
    io_uring_queue_exit(&ring);
    
    result->execution_time_us = end_time - start_time;
    result->operation_count = completed;
    result->throughput_ops = (double)completed / (result->execution_time_us / 1000000.0);
    result->average_latency_us = (double)result->execution_time_us / completed;
    
    printf("io_uring异步I/O测试完成\n");
    return 0;
}

/**
 * 演示性能对比测试
 */
int demo_performance_comparison() {
    performance_result_t sync_result = {0};
    performance_result_t async_result = {0};
    
    printf("=== io_uring vs 同步I/O 性能对比 ===\n");
    
    // 设置测试结果名称
    sync_result.test_name = "同步I/O";
    async_result.test_name = "io_uring异步I/O";
    
    // 执行同步I/O测试
    printf("1. 执行同步I/O测试:\n");
    if (test_sync_io_performance(&sync_result) != 0) {
        printf("  同步I/O测试失败\n");
        return -1;
    }
    
    printf("  测试完成\n");
    
    // 执行io_uring测试
    printf("\n2. 执行io_uring异步I/O测试:\n");
    if (test_io_uring_performance(&async_result) != 0) {
        printf("  io_uring测试失败\n");
        return -1;
    }
    
    printf("  测试完成\n");
    
    // 显示测试结果
    printf("\n=== 性能测试结果 ===\n");
    printf("%-20s %-15s %-15s %-15s %-15s\n",
           "测试类型", "操作次数", "耗时(μs)", "吞吐量(ops/s)", "平均延迟(μs)");
    printf("%-20s %-15s %-15s %-15s %-15s\n",
           "--------", "--------", "--------", "------------", "------------");
    
    printf("%-20s %-15d %-15lld %-15.0f %-15.2f\n",
           sync_result.test_name,
           sync_result.operation_count,
           sync_result.execution_time_us,
           sync_result.throughput_ops,
           sync_result.average_latency_us);
    
    printf("%-20s %-15d %-15lld %-15.0f %-15.2f\n",
           async_result.test_name,
           async_result.operation_count,
           async_result.execution_time_us,
           async_result.throughput_ops,
           async_result.average_latency_us);
    
    // 性能对比分析
    printf("\n=== 性能对比分析 ===\n");
    if (sync_result.execution_time_us > 0 && async_result.execution_time_us > 0) {
        double time_improvement = (double)sync_result.execution_time_us / async_result.execution_time_us;
        double throughput_improvement = async_result.throughput_ops / sync_result.throughput_ops;
        double latency_reduction = (sync_result.average_latency_us - async_result.average_latency_us) / 
                                  sync_result.average_latency_us * 100;
        
        printf("执行时间对比: %.2f 倍提升\n", time_improvement);
        printf("吞吐量对比: %.2f 倍提升\n", throughput_improvement);
        printf("平均延迟减少: %.1f%%\n", latency_reduction);
    }
    
    // 显示优势分析
    printf("\n=== 优势分析 ===\n");
    printf("1. io_uring优势:\n");
    printf("   ✓ 零拷贝数据传输\n");
    printf("   ✓ 减少系统调用次数\n");
    printf("   ✓ 提高I/O并发性能\n");
    printf("   ✓ 更好的CPU利用率\n");
    
    printf("\n2. 适用场景:\n");
    printf("   ✓ 高并发网络服务器\n");
    printf("   ✓ 大文件传输应用\n");
    printf("   ✓ 实时数据处理\n");
    printf("   ✓ 数据库存储引擎\n");
    
    printf("\n3. 性能优化建议:\n");
    printf("   ✓ 合理设置环形缓冲区大小\n");
    printf("   ✓ 批量提交I/O操作\n");
    printf("   ✓ 使用适当的等待策略\n");
    printf("   ✓ 监控系统资源使用\n");
    
    return 0;
}

int main() {
    return demo_performance_comparison();
}

io_uring 使用注意事项

系统要求:

  1. 内核版本: 需要Linux 5.1或更高版本
  2. 架构支持: 支持所有主流架构
  3. 编译要求: 需要liburing库支持

初始化选项:

  1. IORING_SETUP_IOPOLL: 启用I/O轮询模式
  2. IORING_SETUP_SQPOLL: 启用提交队列轮询
  3. IORING_SETUP_SQ_AFF: 设置提交队列CPU亲和性
  4. IORING_SETUP_CQSIZE: 设置完成队列大小

错误处理:

  1. 负返回值: 表示错误码
  2. errno设置: 传统错误码机制
  3. 完成事件: 通过cqe->res返回结果

性能考虑:

  1. 缓冲区大小: 合理设置环形缓冲区大小
  2. 批量操作: 批量提交提高效率
  3. 内存管理: 避免频繁的内存分配
  4. CPU亲和性: 考虑CPU绑定优化

安全考虑:

  1. 权限检查: 确保有足够的权限
  2. 资源限制: 避免消耗过多系统资源
  3. 输入验证: 验证所有输入参数
  4. 错误恢复: 妥善处理各种错误情况

最佳实践:

  1. 环境检查: 使用前检查内核支持
  2. 参数验证: 验证所有输入参数
  3. 错误处理: 妥善处理各种错误
  4. 资源管理: 及时释放分配的资源
  5. 性能监控: 监控性能指标并优化

io_uring vs 传统AIO对比

传统AIO限制:

// 传统AIO接口
#include <linux/aio_abi.h>
int io_setup(unsigned nr_events, aio_context_t *ctxp);
int io_destroy(aio_context_t ctx);
int io_submit(aio_context_t ctx, long nr, struct iocb *ios[]);
int io_cancel(aio_context_t ctx, struct iocb *iocb, struct io_event *result);
int io_getevents(aio_context_t ctx, long min_nr, long nr, 
                 struct io_event *events, struct timespec *timeout);

io_uring优势:

// io_uring接口
#include <liburing.h>
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);
int io_uring_queue_exit(struct io_uring *ring);
int io_uring_submit(struct io_uring *ring);
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);

性能对比数据

系统调用开销:

  • 传统AIO: 每次操作需要多个系统调用
  • io_uring: 批量操作减少系统调用次数

内存拷贝:

  • 传统AIO: 需要多次内存拷贝
  • io_uring: 零拷贝数据传输

并发性能:

  • 传统AIO: 并发性能有限
  • io_uring: 高并发性能优异

常见使用场景

1. 网络服务器:

// 高性能网络服务器
struct io_uring ring;
io_uring_queue_init(4096, &ring, 0);

// 批量处理网络请求
for (int i = 0; i < connections; i++) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    io_uring_prep_recv(sqe, conn_fd[i], buffer[i], buffer_size, 0);
}

io_uring_submit(&ring);

2. 存储系统:

// 高性能存储系统
struct io_uring ring;
io_uring_queue_init(8192, &ring, IORING_SETUP_IOPOLL);

// 批量存储操作
for (int i = 0; i < io_count; i++) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    io_uring_prep_write(sqe, fd, data[i], size[i], offset[i]);
}

io_uring_submit(&ring);

3. 数据库引擎:

// 数据库存储引擎
struct io_uring ring;
io_uring_queue_init(2048, &ring, 0);

// 并发数据页读写
for (int i = 0; i < pages; i++) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    io_uring_prep_read(sqe, fd, page_buffer[i], page_size, page_offset[i]);
}

io_uring_submit(&ring);

总结

io_uring 是Linux系统中先进的异步I/O框架,提供了:

  1. 高性能: 显著优于传统AIO的性能
  2. 易用性: 简化的API设计
  3. 灵活性: 丰富的操作类型支持
  4. 可扩展性: 支持大规模并发操作

通过合理使用io_uring,可以构建高性能的I/O密集型应用。在实际应用中,需要注意内核版本要求、错误处理和性能优化等关键问题。

发表在 linux文章 | 留下评论

io_submit系统调用及示例

io_submit系统调用及示例

1. 函数介绍

在使用 io_setup 创建了异步 I/O 上下文之后,下一步就是向这个上下文提交实际的 I/O 请求。

io_submit 系统调用的作用就是将一个或多个异步 I/O 请求提交到指定的异步 I/O 上下文中。每个请求都由一个 struct iocb(I/O Control Block)结构体描述,该结构体包含了操作类型(读/写/同步)、文件描述符、缓冲区地址、读写字节数、文件偏移量等所有必需的信息。

提交后,内核会接管这些请求,并在后台(可能使用专门的线程或机制)执行这些 I/O 操作。调用 io_submit 的进程可以立即继续执行,无需等待 I/O 完成。

简单来说,io_submit 就是把写好的“异步任务清单”(iocb 结构体)交给之前创建的“任务管理器”(io_context_t),让它开始执行这些任务。

2. 函数原型

// 需要定义宏来启用 AIO 相关定义
#define _GNU_SOURCE
#include <linux/aio_abi.h> // 包含 iocb 等定义
#include <sys/syscall.h>   // 包含系统调用号
#include <unistd.h>        // 包含 syscall 函数

// io_submit 系统调用的实际接口
long syscall(SYS_io_submit, io_context_t ctx_id, long nr, struct iocb **iocbpp);

注意:这也是一个底层系统调用,通常需要通过 syscall() 函数调用。

3. 功能

将 nr 个异步 I/O 请求(由 iocbpp 指向的数组描述)提交到由 ctx_id 标识的异步 I/O 上下文中。内核会尝试立即开始处理这些请求。

4. 参数

  • ctx_id:
    • io_context_t 类型。
    • 由 io_setup 返回的、有效的异步 I/O 上下文的标识符。
  • nr:
    • long 类型。
    • 指定要提交的异步 I/O 请求数量。这个值应该与 iocbpp 数组的大小相对应。
  • iocbpp:
    • struct iocb ** 类型。
    • 一个指针数组,数组中的每个元素都指向一个 struct iocb 结构体。struct iocb 描述了一个单独的异步 I/O 请求。
    • 数组的大小至少为 nr

5. struct iocb 结构体 (关键部分)

这是描述单个异步 I/O 请求的核心结构体。

// 简化版,实际定义在 linux/aio_abi.h
struct iocb {
    __u64 aio_data;          // 用户定义的数据,用于匹配请求和完成事件
    __u32 aio_key, aio_reserved1;
    __u16 aio_lio_opcode;    // 操作类型 (IOCB_CMD_PREAD, IOCB_CMD_PWRITE, ...)
    __s16 aio_reqprio;       // 请求优先级 (通常为 0)
    __u32 aio_fildes;        // 文件描述符
    __u64 aio_buf;           // 用户空间缓冲区地址
    __u64 aio_nbytes;        // 传输字节数
    __s64 aio_offset;        // 文件偏移量
    // ... 其他字段用于高级功能
};

关键字段

  • aio_lio_opcode: 指定操作类型。
    • IOCB_CMD_PREAD: 异步预读(指定偏移量的读取)。
    • IOCB_CMD_PWRITE: 异步预写(指定偏移量的写入)。
    • IOCB_CMD_FSYNC: 异步文件数据和元数据同步。
    • IOCB_CMD_FDSYNC: 异步文件数据同步。
  • aio_fildes: 进行 I/O 操作的目标文件描述符。
  • aio_buf: 用户空间缓冲区的地址(读取时存放数据,写入时提供数据)。
  • aio_nbytes: 要传输(读取或写入)的字节数。
  • aio_offset: 文件中的偏移量(类似 pread/pwrite)。
  • aio_data: 用户自定义数据。当这个请求完成后,对应的完成事件 (io_event) 会包含这个值,方便程序识别是哪个请求完成了。

6. 返回值

  • 成功: 返回实际成功提交的请求数(一个非负整数,可能小于或等于 nr)。
  • 失败: 返回 -1,并设置 errno。如果返回一个 0 到 nr 之间的正数 m,则表示只有数组中前 m 个请求被成功提交,后面的提交失败了。

7. 错误码 (errno)

  • EAGAIN: 资源暂时不可用,例如内核的提交队列已满。
  • EBADFctx_id 无效,或者 iocbpp 中某个 iocb 的 aio_fildes 是无效的文件描述符。
  • EINVALctx_id 无效,或者 iocbpp 中某个 iocb 的参数无效(例如 aio_lio_opcode 未知,或 nr 为负数)。
  • ENOMEM: 内存不足。

8. 相似函数或关联函数

  • io_setup: 创建异步 I/O 上下文,是 io_submit 的前置步骤。
  • io_getevents: 用于获取已提交请求的完成状态(事件)。
  • io_cancel: 尝试取消一个已提交但尚未完成的 I/O 请求。
  • struct iocb: 描述单个异步 I/O 请求的结构体。

9. 示例代码

下面的示例演示如何使用 io_setup 创建上下文,然后使用 io_submit 提交异步写入请求。

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include <linux/aio_abi.h>
#include <sys/syscall.h>

// 封装 io_setup 系统调用
static inline int my_io_setup(unsigned nr_events, io_context_t *ctxp) {
    return syscall(__NR_io_setup, nr_events, ctxp);
}

// 封装 io_destroy 系统调用
static inline int my_io_destroy(io_context_t ctx) {
    return syscall(__NR_io_destroy, ctx);
}

// 封装 io_submit 系统调用
static inline int my_io_submit(io_context_t ctx, long nr, struct iocb **iocbpp) {
    return syscall(__NR_io_submit, ctx, nr, iocbpp);
}

// 辅助函数:初始化一个异步写入的 iocb 结构
void prep_pwrite(struct iocb *iocb, int fd, const void *buf, size_t count, __u64 offset) {
    memset(iocb, 0, sizeof(*iocb)); // 清零结构体
    iocb->aio_lio_opcode = IOCB_CMD_PWRITE; // 设置操作类型为异步写
    iocb->aio_fildes = fd;                 // 设置文件描述符
    iocb->aio_buf = (__u64)(unsigned long)buf; // 设置缓冲区地址
    iocb->aio_nbytes = count;              // 设置写入字节数
    iocb->aio_offset = offset;             // 设置文件偏移量
    iocb->aio_data = (__u64)(unsigned long)buf; // 设置用户数据 (这里用 buf 地址)
}

int main() {
    const char *filename = "io_submit_test_file.txt";
    const int num_writes = 3;
    const size_t chunk_size = 1024;
    int fd;
    io_context_t ctx = 0; // 必须初始化为 0
    struct iocb iocbs[num_writes];
    struct iocb *iocb_ptrs[num_writes];
    char buffers[num_writes][chunk_size];
    int ret, i;

    printf("--- Demonstrating io_submit ---\n");

    // 1. 初始化要写入的数据
    for (i = 0; i < num_writes; ++i) {
        memset(buffers[i], 'A' + i, chunk_size); // Fill with 'A', 'B', 'C'
    }

    // 2. 创建并打开文件
    fd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, 0644);
    if (fd == -1) {
        perror("open");
        exit(EXIT_FAILURE);
    }
    printf("1. Opened/created file '%s' (fd=%d)\n", filename, fd);

    // 3. 初始化异步 I/O 上下文
    ret = my_io_setup(num_writes, &ctx);
    if (ret < 0) {
        perror("io_setup");
        close(fd);
        exit(EXIT_FAILURE);
    }
    printf("2. Initialized AIO context (ctx_id=%llu)\n", (unsigned long long)ctx);

    // 4. 准备 I/O 请求 (iocb)
    printf("3. Preparing %d asynchronous write requests...\n", num_writes);
    for (i = 0; i < num_writes; ++i) {
        prep_pwrite(&iocbs[i], fd, buffers[i], chunk_size, i * chunk_size);
        iocb_ptrs[i] = &iocbs[i];
        printf("   Prepared write %d: offset=%zu, size=%zu, data='%c'...\n",
               i+1, i * chunk_size, chunk_size, 'A' + i);
    }

    // 5. 提交 I/O 请求
    printf("4. Submitting %d write requests using io_submit...\n", num_writes);
    ret = my_io_submit(ctx, num_writes, iocb_ptrs);
    if (ret != num_writes) {
        fprintf(stderr, "   io_submit failed: submitted %d requests, expected %d\n", ret, num_writes);
        if (ret < 0) {
            perror("   io_submit error");
        } else {
            printf("   Only the first %d requests were submitted successfully.\n", ret);
        }
        // 清理并退出
        my_io_destroy(ctx);
        close(fd);
        unlink(filename);
        exit(EXIT_FAILURE);
    }

    printf("   io_submit succeeded. All %d requests submitted.\n", ret);

    // 6. 注意:此时写入操作可能仍在进行中,我们需要用 io_getevents 来等待完成
    // 这个例子只演示提交,不等待完成。
    printf("5. Note: io_submit returned immediately. The writes are happening in the background.\n");
    printf("   To get the results, you need to call io_getevents().\n");

    // 7. 清理资源 (在真实程序中,你应该在 io_getevents 确认完成后再关闭文件)
    printf("6. Cleaning up resources...\n");
    my_io_destroy(ctx);
    printf("   Destroyed AIO context.\n");
    close(fd);
    printf("   Closed file descriptor.\n");
    unlink(filename); // 删除测试文件
    printf("   Deleted test file '%s'.\n", filename);

    printf("\n--- Summary ---\n");
    printf("1. io_submit(ctx_id, nr, iocb_ptrs) submits 'nr' AIO requests to the context 'ctx_id'.\n");
    printf("2. Each request is described by an 'iocb' struct, pointed to by elements in 'iocb_ptrs'.\n");
    printf("3. It returns the number of requests successfully submitted (may be < nr on partial failure).\n");
    printf("4. It returns immediately; the I/O happens asynchronously in the background.\n");
    printf("5. Use io_getevents() afterwards to check for completion and get results.\n");

    return 0;
}

10. 编译和运行

# 假设代码保存在 io_submit_example.c 中
gcc -o io_submit_example io_submit_example.c

# 运行程序
./io_submit_example

11. 预期输出

--- Demonstrating io_submit ---
1. Opened/created file 'io_submit_test_file.txt' (fd=3)
2. Initialized AIO context (ctx_id=123456789)
3. Preparing 3 asynchronous write requests...
   Prepared write 1: offset=0, size=1024, data='A'...
   Prepared write 2: offset=1024, size=1024, data='B'...
   Prepared write 3: offset=2048, size=1024, data='C'...
4. Submitting 3 write requests using io_submit...
   io_submit succeeded. All 3 requests submitted.
5. Note: io_submit returned immediately. The writes are happening in the background.
   To get the results, you need to call io_getevents().
6. Cleaning up resources...
   Destroyed AIO context.
   Closed file descriptor.
   Deleted test file 'io_submit_test_file.txt'.

--- Summary ---
1. io_submit(ctx_id, nr, iocb_ptrs) submits 'nr' AIO requests to the context 'ctx_id'.
2. Each request is described by an 'iocb' struct, pointed to by elements in 'iocb_ptrs'.
3. It returns the number of requests successfully submitted (may be < nr on partial failure).
4. It returns immediately; the I/O happens asynchronously in the background.
5. Use io_getevents() afterwards to check for completion and get resu
发表在 linux文章 | 留下评论