Linux 高级 I/O 多路复用系统调用详解
相关文章:select系统调用及示例 Linux I/O 多路复用机制对比分析poll/ppoll/epoll/select pselect系统调用及示例
1. pselect 函数详解
1. 函数介绍
pselect
是 Linux 系统中用于同时监视多个文件描述符的系统调用。可以把 pselect
想象成”智能的交通指挥官”——它能够同时观察多个”道路”(文件描述符),当某条道路有”车辆”(数据)到达时,立即通知你进行处理。
与传统的 select
相比,pselect
提供了更好的信号处理机制,避免了信号处理函数中的竞态条件问题。
2. 函数原型
#define _POSIX_C_SOURCE 200112L
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);
3. 功能
pselect
函数用于同时监视多个文件描述符,等待其中任何一个变为就绪状态(可读、可写或有异常)。它能够在等待期间临时设置信号屏蔽集,提供更好的信号处理安全性。
4. 参数
- nfds: 要监视的最大文件描述符值加1
- readfds: 指向要监视可读事件的文件描述符集合的指针
- writefds: 指向要监视可写事件的文件描述符集合的指针
- exceptfds: 指向要监视异常事件的文件描述符集合的指针
- timeout: 指向超时时间的指针(NULL 表示无限等待)
- sigmask: 指向信号屏蔽集的指针(NULL 表示不改变信号屏蔽)
5. timespec 结构体
struct timespec {
time_t tv_sec; /* 秒数 */
long tv_nsec; /* 纳秒数 */
};
6. 返回值
- 成功: 返回准备就绪的文件描述符数量(0 表示超时)
- 失败: 返回 -1,并设置相应的 errno 错误码
7. 常见错误码
EBADF
: 一个或多个文件描述符无效EINTR
: 被未屏蔽的信号中断EINVAL
: 参数无效ENOMEM
: 内存不足
8. 相似函数或关联函数
- select: 传统的文件描述符监视函数
- poll: 更现代的文件描述符监视函数
- ppoll: 带信号屏蔽的 poll
- epoll_wait: epoll 接口的等待函数
- read/write: 文件读写操作
- signal/sigaction: 信号处理函数
9. 示例代码
示例1:基础用法 – 监视标准输入和定时器
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>
// 信号处理标志
volatile sig_atomic_t signal_received = 0;
// 信号处理函数
void signal_handler(int sig) {
signal_received = sig;
printf("\n收到信号 %d\n", sig);
}
int main() {
fd_set read_fds;
struct timespec timeout;
sigset_t sigmask;
int ready;
printf("=== pselect 基础示例 ===\n\n");
// 设置信号处理
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
if (sigaction(SIGINT, &sa, NULL) == -1) {
perror("sigaction");
return 1;
}
if (sigaction(SIGTERM, &sa, NULL) == -1) {
perror("sigaction");
return 1;
}
// 初始化文件描述符集合
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);
// 设置超时时间 (5 秒)
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
// 设置信号屏蔽集 (不屏蔽任何信号)
sigemptyset(&sigmask);
printf("监视设置:\n");
printf(" 监视文件描述符: %d (标准输入)\n", STDIN_FILENO);
printf(" 超时时间: %ld 秒\n", (long)timeout.tv_sec);
printf(" 按 Enter 键或等待超时...\n");
printf(" 按 Ctrl+C 发送信号\n\n");
// 使用 pselect 监视文件描述符
ready = pselect(STDIN_FILENO + 1, &read_fds, NULL, NULL, &timeout, &sigmask);
if (ready == -1) {
if (errno == EINTR) {
printf("pselect 被信号中断\n");
if (signal_received) {
printf("收到信号: %d\n", signal_received);
}
} else {
perror("pselect 失败");
}
} else if (ready == 0) {
printf("超时: 没有文件描述符准备就绪\n");
} else {
printf("准备就绪的文件描述符数量: %d\n", ready);
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
printf("标准输入有数据可读\n");
// 读取输入
char buffer[256];
ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf("读取到: %s", buffer);
}
}
}
printf("\n=== pselect 特点 ===\n");
printf("1. 原子操作: 等待和信号屏蔽是原子的\n");
printf("2. 精确超时: 支持纳秒级超时\n");
printf("3. 信号安全: 避免信号处理中的竞态条件\n");
printf("4. 灵活屏蔽: 可以精确控制信号屏蔽\n");
printf("5. 高效监视: 同时监视多个文件描述符\n");
return 0;
}
示例2:多文件描述符监视
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <fcntl.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>
// 创建临时文件用于测试
int create_test_file(const char *filename) {
int fd = open(filename, O_CREAT | O_RDWR | O_TRUNC, 0644);
if (fd == -1) {
perror("创建测试文件失败");
return -1;
}
const char *content = "这是测试文件的内容\n用于演示 pselect 功能\n";
write(fd, content, strlen(content));
lseek(fd, 0, SEEK_SET);
printf("创建测试文件: %s\n", filename);
return fd;
}
// 显示文件描述符集合状态
void show_fd_set_status(fd_set *fds, int max_fd, const char *description) {
printf("%s:\n", description);
for (int i = 0; i <= max_fd; i++) {
if (FD_ISSET(i, fds)) {
printf(" fd %d: 就绪\n", i);
}
}
printf("\n");
}
int main() {
fd_set read_fds, master_fds;
struct timespec timeout;
sigset_t sigmask;
int test_fd1, test_fd2;
int max_fd;
int ready;
printf("=== pselect 多文件描述符监视示例 ===\n\n");
// 创建测试文件
test_fd1 = create_test_file("test1.txt");
test_fd2 = create_test_file("test2.txt");
if (test_fd1 == -1 || test_fd2 == -1) {
if (test_fd1 != -1) close(test_fd1);
if (test_fd2 != -1) close(test_fd2);
unlink("test1.txt");
unlink("test2.txt");
return 1;
}
// 初始化文件描述符集合
FD_ZERO(&master_fds);
FD_SET(STDIN_FILENO, &master_fds);
FD_SET(test_fd1, &master_fds);
FD_SET(test_fd2, &master_fds);
// 找到最大的文件描述符
max_fd = STDIN_FILENO;
if (test_fd1 > max_fd) max_fd = test_fd1;
if (test_fd2 > max_fd) max_fd = test_fd2;
// 设置超时时间 (3 秒)
timeout.tv_sec = 3;
timeout.tv_nsec = 0;
// 设置信号屏蔽集
sigemptyset(&sigmask);
printf("监视设置:\n");
printf(" 最大文件描述符: %d\n", max_fd);
printf(" 监视的文件描述符: %d(标准输入), %d(文件1), %d(文件2)\n",
STDIN_FILENO, test_fd1, test_fd2);
printf(" 超时时间: %ld 秒\n", (long)timeout.tv_sec);
printf("\n");
// 主循环
printf("开始监视... (按 Ctrl+C 退出)\n");
while (1) {
// 复制主集合到工作集合
read_fds = master_fds;
printf("等待事件...\n");
ready = pselect(max_fd + 1, &read_fds, NULL, NULL, &timeout, &sigmask);
if (ready == -1) {
if (errno == EINTR) {
printf("pselect 被信号中断\n");
break;
} else {
perror("pselect 失败");
break;
}
} else if (ready == 0) {
printf("超时: 没有文件描述符准备就绪\n");
} else {
printf("准备就绪的文件描述符数量: %d\n", ready);
// 处理标准输入
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
printf("标准输入有数据:\n");
char buffer[256];
ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 读取到: %s", buffer);
if (strncmp(buffer, "quit", 4) == 0) {
printf("收到退出命令\n");
break;
}
}
}
// 处理测试文件1
if (FD_ISSET(test_fd1, &read_fds)) {
printf("测试文件1有数据可读\n");
lseek(test_fd1, 0, SEEK_SET);
char buffer[128];
ssize_t bytes_read = read(test_fd1, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 文件1内容: %s", buffer);
}
}
// 处理测试文件2
if (FD_ISSET(test_fd2, &read_fds)) {
printf("测试文件2有数据可读\n");
lseek(test_fd2, 0, SEEK_SET);
char buffer[128];
ssize_t bytes_read = read(test_fd2, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 文件2内容: %s", buffer);
}
}
printf("\n");
}
}
// 清理资源
printf("清理资源...\n");
close(test_fd1);
close(test_fd2);
unlink("test1.txt");
unlink("test2.txt");
printf("程序正常退出\n");
return 0;
}
示例3:完整的事件驱动服务器模拟
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <getopt.h>
// 服务器配置结构体
struct server_config {
int max_clients;
int timeout_seconds;
int verbose;
int use_signals;
};
// 客户端信息结构体
struct client_info {
int fd;
int connected;
time_t connect_time;
char buffer[1024];
size_t buffer_pos;
};
// 服务器状态结构体
struct server_state {
int running;
int client_count;
struct client_info *clients;
int max_clients;
};
// 全局变量
volatile sig_atomic_t server_terminate = 0;
// 信号处理函数
void signal_handler(int sig) {
server_terminate = 1;
printf("\n收到终止信号 %d\n", sig);
}
// 初始化服务器状态
int init_server_state(struct server_state *state, int max_clients) {
state->running = 1;
state->client_count = 0;
state->max_clients = max_clients;
state->clients = calloc(max_clients, sizeof(struct client_info));
if (!state->clients) {
perror("分配客户端数组失败");
return -1;
}
return 0;
}
// 添加客户端
int add_client(struct server_state *state, int client_fd) {
if (state->client_count >= state->max_clients) {
fprintf(stderr, "客户端数量已达上限: %d\n", state->max_clients);
return -1;
}
for (int i = 0; i < state->max_clients; i++) {
if (!state->clients[i].connected) {
state->clients[i].fd = client_fd;
state->clients[i].connected = 1;
state->clients[i].connect_time = time(NULL);
state->clients[i].buffer_pos = 0;
state->client_count++;
printf("添加客户端 %d (fd: %d),当前客户端数: %d\n",
i, client_fd, state->client_count);
return i;
}
}
fprintf(stderr, "找不到空闲的客户端槽位\n");
return -1;
}
// 移除客户端
void remove_client(struct server_state *state, int client_index) {
if (client_index >= 0 && client_index < state->max_clients &&
state->clients[client_index].connected) {
close(state->clients[client_index].fd);
memset(&state->clients[client_index], 0, sizeof(struct client_info));
state->client_count--;
printf("移除客户端 %d,剩余客户端数: %d\n",
client_index, state->client_count);
}
}
// 显示服务器状态
void show_server_status(const struct server_state *state) {
printf("=== 服务器状态 ===\n");
printf("运行状态: %s\n", state->running ? "运行中" : "已停止");
printf("客户端数量: %d/%d\n", state->client_count, state->max_clients);
printf("当前时间: %s", ctime(&(time_t){time(NULL)}));
if (state->client_count > 0) {
printf("连接的客户端:\n");
for (int i = 0; i < state->max_clients; i++) {
if (state->clients[i].connected) {
printf(" [%d] fd=%d 连接时间: %s",
i, state->clients[i].fd,
ctime(&state->clients[i].connect_time));
}
}
}
printf("\n");
}
int main(int argc, char *argv[]) {
struct server_config config = {
.max_clients = 10,
.timeout_seconds = 30,
.verbose = 0,
.use_signals = 0
};
struct server_state server_state_struct;
fd_set read_fds;
struct timespec timeout;
sigset_t sigmask;
int max_fd = STDIN_FILENO;
int ready;
printf("=== pselect 事件驱动服务器模拟器 ===\n\n");
// 解析命令行参数
static struct option long_options[] = {
{"clients", required_argument, 0, 'c'},
{"timeout", required_argument, 0, 't'},
{"verbose", no_argument, 0, 'v'},
{"signals", no_argument, 0, 's'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};
int opt;
while ((opt = getopt_long(argc, argv, "c:t:vsh", long_options, NULL)) != -1) {
switch (opt) {
case 'c':
config.max_clients = atoi(optarg);
if (config.max_clients <= 0) config.max_clients = 10;
break;
case 't':
config.timeout_seconds = atoi(optarg);
if (config.timeout_seconds <= 0) config.timeout_seconds = 30;
break;
case 'v':
config.verbose = 1;
break;
case 's':
config.use_signals = 1;
break;
case 'h':
printf("用法: %s [选项]\n", argv[0]);
printf("选项:\n");
printf(" -c, --clients=NUM 最大客户端数 (默认 10)\n");
printf(" -t, --timeout=SECONDS 超时时间 (默认 30 秒)\n");
printf(" -v, --verbose 详细输出\n");
printf(" -s, --signals 启用信号处理\n");
printf(" -h, --help 显示此帮助信息\n");
return 0;
default:
fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
return 1;
}
}
// 设置信号处理
if (config.use_signals) {
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
printf("✓ 信号处理已启用\n");
}
// 初始化服务器状态
if (init_server_state(&server_state_struct, config.max_clients) == -1) {
return 1;
}
printf("服务器配置:\n");
printf(" 最大客户端数: %d\n", config.max_clients);
printf(" 超时时间: %d 秒\n", config.timeout_seconds);
printf(" 详细输出: %s\n", config.verbose ? "是" : "否");
printf(" 信号处理: %s\n", config.use_signals ? "是" : "否");
printf("\n");
// 设置超时时间
timeout.tv_sec = config.timeout_seconds;
timeout.tv_nsec = 0;
// 设置信号屏蔽集
sigemptyset(&sigmask);
printf("启动服务器模拟...\n");
printf("可用命令:\n");
printf(" 输入 'status' 查看服务器状态\n");
printf(" 输入 'connect' 模拟客户端连接\n");
printf(" 输入 'quit' 或 'exit' 退出服务器\n");
printf(" 输入 'help' 显示帮助信息\n");
printf(" 按 Ctrl+C 发送终止信号\n");
printf("\n");
// 主循环
while (server_state_struct.running && !server_terminate) {
// 初始化文件描述符集合
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);
max_fd = STDIN_FILENO;
// 添加连接的客户端
for (int i = 0; i < server_state_struct.max_clients; i++) {
if (server_state_struct.clients[i].connected) {
FD_SET(server_state_struct.clients[i].fd, &read_fds);
if (server_state_struct.clients[i].fd > max_fd) {
max_fd = server_state_struct.clients[i].fd;
}
}
}
if (config.verbose) {
printf("监视 %d 个文件描述符 (最大fd: %d)...\n",
server_state_struct.client_count + 1, max_fd);
}
// 使用 pselect 等待事件
ready = pselect(max_fd + 1, &read_fds, NULL, NULL, &timeout, &sigmask);
if (ready == -1) {
if (errno == EINTR) {
if (config.verbose) {
printf("pselect 被信号中断\n");
}
continue;
} else {
perror("pselect 失败");
break;
}
} else if (ready == 0) {
if (config.verbose) {
printf("超时: 没有事件发生\n");
}
} else {
if (config.verbose) {
printf("准备就绪的文件描述符数量: %d\n", ready);
}
// 处理标准输入事件
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
char input_buffer[256];
ssize_t bytes_read = read(STDIN_FILENO, input_buffer, sizeof(input_buffer) - 1);
if (bytes_read > 0) {
input_buffer[bytes_read] = '\0';
// 处理特殊命令
if (strncmp(input_buffer, "quit", 4) == 0 ||
strncmp(input_buffer, "exit", 4) == 0) {
printf("收到退出命令\n");
server_state_struct.running = 0;
server_terminate = 1;
} else if (strncmp(input_buffer, "status", 6) == 0) {
show_server_status(&server_state_struct);
} else if (strncmp(input_buffer, "help", 4) == 0) {
printf("可用命令:\n");
printf(" quit/exit - 退出服务器\n");
printf(" status - 显示服务器状态\n");
printf(" help - 显示帮助信息\n");
printf(" Ctrl+C - 发送终止信号\n");
} else if (strncmp(input_buffer, "connect", 7) == 0) {
if (server_state_struct.client_count < server_state_struct.max_clients) {
// 模拟创建客户端连接
int fake_fd = 1000 + server_state_struct.client_count;
int client_index = add_client(&server_state_struct, fake_fd);
if (client_index != -1) {
printf("模拟客户端连接成功 (fd: %d)\n", fake_fd);
}
} else {
printf("客户端数量已达上限\n");
}
} else {
printf("收到输入: %s", input_buffer);
}
}
}
// 处理客户端事件
for (int i = 0; i < server_state_struct.max_clients; i++) {
if (server_state_struct.clients[i].connected &&
FD_ISSET(server_state_struct.clients[i].fd, &read_fds)) {
char client_buffer[256];
ssize_t bytes_read = read(server_state_struct.clients[i].fd,
client_buffer, sizeof(client_buffer) - 1);
if (bytes_read > 0) {
client_buffer[bytes_read] = '\0';
printf("客户端 %d 发送数据: %s", i, client_buffer);
// 回显数据
write(server_state_struct.clients[i].fd, "Echo: ", 6);
write(server_state_struct.clients[i].fd, client_buffer, bytes_read);
} else if (bytes_read == 0) {
printf("客户端 %d 断开连接\n", i);
remove_client(&server_state_struct, i);
} else {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("读取客户端数据失败");
remove_client(&server_state_struct, i);
}
}
}
}
}
}
// 清理资源
printf("清理资源...\n");
for (int i = 0; i < server_state_struct.max_clients; i++) {
if (server_state_struct.clients[i].connected) {
remove_client(&server_state_struct, i);
}
}
free(server_state_struct.clients);
printf("服务器模拟结束\n");
printf("\n=== pselect 服务器应用说明 ===\n");
printf("核心技术:\n");
printf("1. 多路复用: 同时监视多个文件描述符\n");
printf("2. 事件驱动: 基于事件的通知机制\n");
printf("3. 信号安全: 原子的信号处理\n");
printf("4. 超时控制: 精确的超时管理\n");
printf("5. 资源管理: 动态的客户端管理\n");
printf("\n");
printf("pselect 优势:\n");
printf("1. 原子性: 等待和信号处理是原子操作\n");
printf("2. 灵活性: 可以精确控制信号屏蔽\n");
printf("3. 精确性: 纳秒级超时控制\n");
printf("4. 可扩展: 适用于中小规模并发\n");
printf("5. 安全性: 避免信号处理中的竞态条件\n");
return 0;
}