pselect系统调用及示例

Linux 高级 I/O 多路复用系统调用详解

相关文章:select系统调用及示例 Linux I/O 多路复用机制对比分析poll/ppoll/epoll/select pselect系统调用及示例

1. pselect 函数详解

1. 函数介绍

pselect 是 Linux 系统中用于同时监视多个文件描述符的系统调用。可以把 pselect 想象成”智能的交通指挥官”——它能够同时观察多个”道路”(文件描述符),当某条道路有”车辆”(数据)到达时,立即通知你进行处理。

与传统的 select 相比,pselect 提供了更好的信号处理机制,避免了信号处理函数中的竞态条件问题。

2. 函数原型

#define _POSIX_C_SOURCE 200112L
#include <sys/select.h>

int pselect(int nfds, fd_set *readfds, fd_set *writefds,
            fd_set *exceptfds, const struct timespec *timeout,
            const sigset_t *sigmask);

3. 功能

pselect 函数用于同时监视多个文件描述符,等待其中任何一个变为就绪状态(可读、可写或有异常)。它能够在等待期间临时设置信号屏蔽集,提供更好的信号处理安全性。

4. 参数

  • nfds: 要监视的最大文件描述符值加1
  • readfds: 指向要监视可读事件的文件描述符集合的指针
  • writefds: 指向要监视可写事件的文件描述符集合的指针
  • exceptfds: 指向要监视异常事件的文件描述符集合的指针
  • timeout: 指向超时时间的指针(NULL 表示无限等待)
  • sigmask: 指向信号屏蔽集的指针(NULL 表示不改变信号屏蔽)

5. timespec 结构体

struct timespec {
    time_t tv_sec;   /* 秒数 */
    long   tv_nsec;  /* 纳秒数 */
};

6. 返回值

  • 成功: 返回准备就绪的文件描述符数量(0 表示超时)
  • 失败: 返回 -1,并设置相应的 errno 错误码

7. 常见错误码

  • EBADF: 一个或多个文件描述符无效
  • EINTR: 被未屏蔽的信号中断
  • EINVAL: 参数无效
  • ENOMEM: 内存不足

8. 相似函数或关联函数

  • select: 传统的文件描述符监视函数
  • poll: 更现代的文件描述符监视函数
  • ppoll: 带信号屏蔽的 poll
  • epoll_wait: epoll 接口的等待函数
  • read/write: 文件读写操作
  • signal/sigaction: 信号处理函数

9. 示例代码

示例1:基础用法 – 监视标准输入和定时器

#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>

// 信号处理标志
volatile sig_atomic_t signal_received = 0;

// 信号处理函数
void signal_handler(int sig) {
    signal_received = sig;
    printf("\n收到信号 %d\n", sig);
}

int main() {
    fd_set read_fds;
    struct timespec timeout;
    sigset_t sigmask;
    int ready;
    
    printf("=== pselect 基础示例 ===\n\n");
    
    // 设置信号处理
    struct sigaction sa;
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    
    if (sigaction(SIGINT, &sa, NULL) == -1) {
        perror("sigaction");
        return 1;
    }
    
    if (sigaction(SIGTERM, &sa, NULL) == -1) {
        perror("sigaction");
        return 1;
    }
    
    // 初始化文件描述符集合
    FD_ZERO(&read_fds);
    FD_SET(STDIN_FILENO, &read_fds);
    
    // 设置超时时间 (5 秒)
    timeout.tv_sec = 5;
    timeout.tv_nsec = 0;
    
    // 设置信号屏蔽集 (不屏蔽任何信号)
    sigemptyset(&sigmask);
    
    printf("监视设置:\n");
    printf("  监视文件描述符: %d (标准输入)\n", STDIN_FILENO);
    printf("  超时时间: %ld 秒\n", (long)timeout.tv_sec);
    printf("  按 Enter 键或等待超时...\n");
    printf("  按 Ctrl+C 发送信号\n\n");
    
    // 使用 pselect 监视文件描述符
    ready = pselect(STDIN_FILENO + 1, &read_fds, NULL, NULL, &timeout, &sigmask);
    
    if (ready == -1) {
        if (errno == EINTR) {
            printf("pselect 被信号中断\n");
            if (signal_received) {
                printf("收到信号: %d\n", signal_received);
            }
        } else {
            perror("pselect 失败");
        }
    } else if (ready == 0) {
        printf("超时: 没有文件描述符准备就绪\n");
    } else {
        printf("准备就绪的文件描述符数量: %d\n", ready);
        
        if (FD_ISSET(STDIN_FILENO, &read_fds)) {
            printf("标准输入有数据可读\n");
            
            // 读取输入
            char buffer[256];
            ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
            if (bytes_read > 0) {
                buffer[bytes_read] = '\0';
                printf("读取到: %s", buffer);
            }
        }
    }
    
    printf("\n=== pselect 特点 ===\n");
    printf("1. 原子操作: 等待和信号屏蔽是原子的\n");
    printf("2. 精确超时: 支持纳秒级超时\n");
    printf("3. 信号安全: 避免信号处理中的竞态条件\n");
    printf("4. 灵活屏蔽: 可以精确控制信号屏蔽\n");
    printf("5. 高效监视: 同时监视多个文件描述符\n");
    
    return 0;
}

示例2:多文件描述符监视

#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <fcntl.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>

// 创建临时文件用于测试
int create_test_file(const char *filename) {
    int fd = open(filename, O_CREAT | O_RDWR | O_TRUNC, 0644);
    if (fd == -1) {
        perror("创建测试文件失败");
        return -1;
    }
    
    const char *content = "这是测试文件的内容\n用于演示 pselect 功能\n";
    write(fd, content, strlen(content));
    lseek(fd, 0, SEEK_SET);
    
    printf("创建测试文件: %s\n", filename);
    return fd;
}

// 显示文件描述符集合状态
void show_fd_set_status(fd_set *fds, int max_fd, const char *description) {
    printf("%s:\n", description);
    for (int i = 0; i <= max_fd; i++) {
        if (FD_ISSET(i, fds)) {
            printf("  fd %d: 就绪\n", i);
        }
    }
    printf("\n");
}

int main() {
    fd_set read_fds, master_fds;
    struct timespec timeout;
    sigset_t sigmask;
    int test_fd1, test_fd2;
    int max_fd;
    int ready;
    
    printf("=== pselect 多文件描述符监视示例 ===\n\n");
    
    // 创建测试文件
    test_fd1 = create_test_file("test1.txt");
    test_fd2 = create_test_file("test2.txt");
    
    if (test_fd1 == -1 || test_fd2 == -1) {
        if (test_fd1 != -1) close(test_fd1);
        if (test_fd2 != -1) close(test_fd2);
        unlink("test1.txt");
        unlink("test2.txt");
        return 1;
    }
    
    // 初始化文件描述符集合
    FD_ZERO(&master_fds);
    FD_SET(STDIN_FILENO, &master_fds);
    FD_SET(test_fd1, &master_fds);
    FD_SET(test_fd2, &master_fds);
    
    // 找到最大的文件描述符
    max_fd = STDIN_FILENO;
    if (test_fd1 > max_fd) max_fd = test_fd1;
    if (test_fd2 > max_fd) max_fd = test_fd2;
    
    // 设置超时时间 (3 秒)
    timeout.tv_sec = 3;
    timeout.tv_nsec = 0;
    
    // 设置信号屏蔽集
    sigemptyset(&sigmask);
    
    printf("监视设置:\n");
    printf("  最大文件描述符: %d\n", max_fd);
    printf("  监视的文件描述符: %d(标准输入), %d(文件1), %d(文件2)\n", 
           STDIN_FILENO, test_fd1, test_fd2);
    printf("  超时时间: %ld 秒\n", (long)timeout.tv_sec);
    printf("\n");
    
    // 主循环
    printf("开始监视... (按 Ctrl+C 退出)\n");
    while (1) {
        // 复制主集合到工作集合
        read_fds = master_fds;
        
        printf("等待事件...\n");
        ready = pselect(max_fd + 1, &read_fds, NULL, NULL, &timeout, &sigmask);
        
        if (ready == -1) {
            if (errno == EINTR) {
                printf("pselect 被信号中断\n");
                break;
            } else {
                perror("pselect 失败");
                break;
            }
        } else if (ready == 0) {
            printf("超时: 没有文件描述符准备就绪\n");
        } else {
            printf("准备就绪的文件描述符数量: %d\n", ready);
            
            // 处理标准输入
            if (FD_ISSET(STDIN_FILENO, &read_fds)) {
                printf("标准输入有数据:\n");
                char buffer[256];
                ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
                if (bytes_read > 0) {
                    buffer[bytes_read] = '\0';
                    printf("  读取到: %s", buffer);
                    
                    if (strncmp(buffer, "quit", 4) == 0) {
                        printf("收到退出命令\n");
                        break;
                    }
                }
            }
            
            // 处理测试文件1
            if (FD_ISSET(test_fd1, &read_fds)) {
                printf("测试文件1有数据可读\n");
                lseek(test_fd1, 0, SEEK_SET);
                char buffer[128];
                ssize_t bytes_read = read(test_fd1, buffer, sizeof(buffer) - 1);
                if (bytes_read > 0) {
                    buffer[bytes_read] = '\0';
                    printf("  文件1内容: %s", buffer);
                }
            }
            
            // 处理测试文件2
            if (FD_ISSET(test_fd2, &read_fds)) {
                printf("测试文件2有数据可读\n");
                lseek(test_fd2, 0, SEEK_SET);
                char buffer[128];
                ssize_t bytes_read = read(test_fd2, buffer, sizeof(buffer) - 1);
                if (bytes_read > 0) {
                    buffer[bytes_read] = '\0';
                    printf("  文件2内容: %s", buffer);
                }
            }
            
            printf("\n");
        }
    }
    
    // 清理资源
    printf("清理资源...\n");
    close(test_fd1);
    close(test_fd2);
    unlink("test1.txt");
    unlink("test2.txt");
    
    printf("程序正常退出\n");
    return 0;
}

示例3:完整的事件驱动服务器模拟

#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <getopt.h>

// 服务器配置结构体
struct server_config {
    int max_clients;
    int timeout_seconds;
    int verbose;
    int use_signals;
};

// 客户端信息结构体
struct client_info {
    int fd;
    int connected;
    time_t connect_time;
    char buffer[1024];
    size_t buffer_pos;
};

// 服务器状态结构体
struct server_state {
    int running;
    int client_count;
    struct client_info *clients;
    int max_clients;
};

// 全局变量
volatile sig_atomic_t server_terminate = 0;

// 信号处理函数
void signal_handler(int sig) {
    server_terminate = 1;
    printf("\n收到终止信号 %d\n", sig);
}

// 初始化服务器状态
int init_server_state(struct server_state *state, int max_clients) {
    state->running = 1;
    state->client_count = 0;
    state->max_clients = max_clients;
    
    state->clients = calloc(max_clients, sizeof(struct client_info));
    if (!state->clients) {
        perror("分配客户端数组失败");
        return -1;
    }
    
    return 0;
}

// 添加客户端
int add_client(struct server_state *state, int client_fd) {
    if (state->client_count >= state->max_clients) {
        fprintf(stderr, "客户端数量已达上限: %d\n", state->max_clients);
        return -1;
    }
    
    for (int i = 0; i < state->max_clients; i++) {
        if (!state->clients[i].connected) {
            state->clients[i].fd = client_fd;
            state->clients[i].connected = 1;
            state->clients[i].connect_time = time(NULL);
            state->clients[i].buffer_pos = 0;
            state->client_count++;
            
            printf("添加客户端 %d (fd: %d),当前客户端数: %d\n", 
                   i, client_fd, state->client_count);
            return i;
        }
    }
    
    fprintf(stderr, "找不到空闲的客户端槽位\n");
    return -1;
}

// 移除客户端
void remove_client(struct server_state *state, int client_index) {
    if (client_index >= 0 && client_index < state->max_clients && 
        state->clients[client_index].connected) {
        
        close(state->clients[client_index].fd);
        memset(&state->clients[client_index], 0, sizeof(struct client_info));
        state->client_count--;
        
        printf("移除客户端 %d,剩余客户端数: %d\n", 
               client_index, state->client_count);
    }
}

// 显示服务器状态
void show_server_status(const struct server_state *state) {
    printf("=== 服务器状态 ===\n");
    printf("运行状态: %s\n", state->running ? "运行中" : "已停止");
    printf("客户端数量: %d/%d\n", state->client_count, state->max_clients);
    printf("当前时间: %s", ctime(&(time_t){time(NULL)}));
    
    if (state->client_count > 0) {
        printf("连接的客户端:\n");
        for (int i = 0; i < state->max_clients; i++) {
            if (state->clients[i].connected) {
                printf("  [%d] fd=%d 连接时间: %s", 
                       i, state->clients[i].fd, 
                       ctime(&state->clients[i].connect_time));
            }
        }
    }
    printf("\n");
}

int main(int argc, char *argv[]) {
    struct server_config config = {
        .max_clients = 10,
        .timeout_seconds = 30,
        .verbose = 0,
        .use_signals = 0
    };
    
    struct server_state server_state_struct;
    fd_set read_fds;
    struct timespec timeout;
    sigset_t sigmask;
    int max_fd = STDIN_FILENO;
    int ready;
    
    printf("=== pselect 事件驱动服务器模拟器 ===\n\n");
    
    // 解析命令行参数
    static struct option long_options[] = {
        {"clients", required_argument, 0, 'c'},
        {"timeout", required_argument, 0, 't'},
        {"verbose", no_argument,       0, 'v'},
        {"signals", no_argument,       0, 's'},
        {"help",    no_argument,       0, 'h'},
        {0, 0, 0, 0}
    };
    
    int opt;
    while ((opt = getopt_long(argc, argv, "c:t:vsh", long_options, NULL)) != -1) {
        switch (opt) {
            case 'c':
                config.max_clients = atoi(optarg);
                if (config.max_clients <= 0) config.max_clients = 10;
                break;
            case 't':
                config.timeout_seconds = atoi(optarg);
                if (config.timeout_seconds <= 0) config.timeout_seconds = 30;
                break;
            case 'v':
                config.verbose = 1;
                break;
            case 's':
                config.use_signals = 1;
                break;
            case 'h':
                printf("用法: %s [选项]\n", argv[0]);
                printf("选项:\n");
                printf("  -c, --clients=NUM      最大客户端数 (默认 10)\n");
                printf("  -t, --timeout=SECONDS   超时时间 (默认 30 秒)\n");
                printf("  -v, --verbose           详细输出\n");
                printf("  -s, --signals           启用信号处理\n");
                printf("  -h, --help              显示此帮助信息\n");
                return 0;
            default:
                fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
                return 1;
        }
    }
    
    // 设置信号处理
    if (config.use_signals) {
        struct sigaction sa;
        sa.sa_handler = signal_handler;
        sigemptyset(&sa.sa_mask);
        sa.sa_flags = 0;
        
        sigaction(SIGINT, &sa, NULL);
        sigaction(SIGTERM, &sa, NULL);
        printf("✓ 信号处理已启用\n");
    }
    
    // 初始化服务器状态
    if (init_server_state(&server_state_struct, config.max_clients) == -1) {
        return 1;
    }
    
    printf("服务器配置:\n");
    printf("  最大客户端数: %d\n", config.max_clients);
    printf("  超时时间: %d 秒\n", config.timeout_seconds);
    printf("  详细输出: %s\n", config.verbose ? "是" : "否");
    printf("  信号处理: %s\n", config.use_signals ? "是" : "否");
    printf("\n");
    
    // 设置超时时间
    timeout.tv_sec = config.timeout_seconds;
    timeout.tv_nsec = 0;
    
    // 设置信号屏蔽集
    sigemptyset(&sigmask);
    
    printf("启动服务器模拟...\n");
    printf("可用命令:\n");
    printf("  输入 'status' 查看服务器状态\n");
    printf("  输入 'connect' 模拟客户端连接\n");
    printf("  输入 'quit' 或 'exit' 退出服务器\n");
    printf("  输入 'help' 显示帮助信息\n");
    printf("  按 Ctrl+C 发送终止信号\n");
    printf("\n");
    
    // 主循环
    while (server_state_struct.running && !server_terminate) {
        // 初始化文件描述符集合
        FD_ZERO(&read_fds);
        FD_SET(STDIN_FILENO, &read_fds);
        max_fd = STDIN_FILENO;
        
        // 添加连接的客户端
        for (int i = 0; i < server_state_struct.max_clients; i++) {
            if (server_state_struct.clients[i].connected) {
                FD_SET(server_state_struct.clients[i].fd, &read_fds);
                if (server_state_struct.clients[i].fd > max_fd) {
                    max_fd = server_state_struct.clients[i].fd;
                }
            }
        }
        
        if (config.verbose) {
            printf("监视 %d 个文件描述符 (最大fd: %d)...\n", 
                   server_state_struct.client_count + 1, max_fd);
        }
        
        // 使用 pselect 等待事件
        ready = pselect(max_fd + 1, &read_fds, NULL, NULL, &timeout, &sigmask);
        
        if (ready == -1) {
            if (errno == EINTR) {
                if (config.verbose) {
                    printf("pselect 被信号中断\n");
                }
                continue;
            } else {
                perror("pselect 失败");
                break;
            }
        } else if (ready == 0) {
            if (config.verbose) {
                printf("超时: 没有事件发生\n");
            }
        } else {
            if (config.verbose) {
                printf("准备就绪的文件描述符数量: %d\n", ready);
            }
            
            // 处理标准输入事件
            if (FD_ISSET(STDIN_FILENO, &read_fds)) {
                char input_buffer[256];
                ssize_t bytes_read = read(STDIN_FILENO, input_buffer, sizeof(input_buffer) - 1);
                if (bytes_read > 0) {
                    input_buffer[bytes_read] = '\0';
                    
                    // 处理特殊命令
                    if (strncmp(input_buffer, "quit", 4) == 0 ||
                        strncmp(input_buffer, "exit", 4) == 0) {
                        printf("收到退出命令\n");
                        server_state_struct.running = 0;
                        server_terminate = 1;
                    } else if (strncmp(input_buffer, "status", 6) == 0) {
                        show_server_status(&server_state_struct);
                    } else if (strncmp(input_buffer, "help", 4) == 0) {
                        printf("可用命令:\n");
                        printf("  quit/exit - 退出服务器\n");
                        printf("  status    - 显示服务器状态\n");
                        printf("  help      - 显示帮助信息\n");
                        printf("  Ctrl+C    - 发送终止信号\n");
                    } else if (strncmp(input_buffer, "connect", 7) == 0) {
                        if (server_state_struct.client_count < server_state_struct.max_clients) {
                            // 模拟创建客户端连接
                            int fake_fd = 1000 + server_state_struct.client_count;
                            int client_index = add_client(&server_state_struct, fake_fd);
                            if (client_index != -1) {
                                printf("模拟客户端连接成功 (fd: %d)\n", fake_fd);
                            }
                        } else {
                            printf("客户端数量已达上限\n");
                        }
                    } else {
                        printf("收到输入: %s", input_buffer);
                    }
                }
            }
            
            // 处理客户端事件
            for (int i = 0; i < server_state_struct.max_clients; i++) {
                if (server_state_struct.clients[i].connected && 
                    FD_ISSET(server_state_struct.clients[i].fd, &read_fds)) {
                    
                    char client_buffer[256];
                    ssize_t bytes_read = read(server_state_struct.clients[i].fd, 
                                             client_buffer, sizeof(client_buffer) - 1);
                    
                    if (bytes_read > 0) {
                        client_buffer[bytes_read] = '\0';
                        printf("客户端 %d 发送数据: %s", i, client_buffer);
                        
                        // 回显数据
                        write(server_state_struct.clients[i].fd, "Echo: ", 6);
                        write(server_state_struct.clients[i].fd, client_buffer, bytes_read);
                        
                    } else if (bytes_read == 0) {
                        printf("客户端 %d 断开连接\n", i);
                        remove_client(&server_state_struct, i);
                    } else {
                        if (errno != EAGAIN && errno != EWOULDBLOCK) {
                            perror("读取客户端数据失败");
                            remove_client(&server_state_struct, i);
                        }
                    }
                }
            }
        }
    }
    
    // 清理资源
    printf("清理资源...\n");
    for (int i = 0; i < server_state_struct.max_clients; i++) {
        if (server_state_struct.clients[i].connected) {
            remove_client(&server_state_struct, i);
        }
    }
    free(server_state_struct.clients);
    
    printf("服务器模拟结束\n");
    
    printf("\n=== pselect 服务器应用说明 ===\n");
    printf("核心技术:\n");
    printf("1. 多路复用: 同时监视多个文件描述符\n");
    printf("2. 事件驱动: 基于事件的通知机制\n");
    printf("3. 信号安全: 原子的信号处理\n");
    printf("4. 超时控制: 精确的超时管理\n");
    printf("5. 资源管理: 动态的客户端管理\n");
    printf("\n");
    printf("pselect 优势:\n");
    printf("1. 原子性: 等待和信号处理是原子操作\n");
    printf("2. 灵活性: 可以精确控制信号屏蔽\n");
    printf("3. 精确性: 纳秒级超时控制\n");
    printf("4. 可扩展: 适用于中小规模并发\n");
    printf("5. 安全性: 避免信号处理中的竞态条件\n");
    
    return 0;
}

pselect系统调用及示例-CSDN博客

此条目发表在linux文章分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注