mq_timedreceive函数详解
1. 函数介绍
mq_timedreceive函数是Linux系统中用于在指定时间内从POSIX消息队列接收消息的函数。它是mq_receive函数的增强版本,支持超时控制。可以把mq_timedreceive想象成一个”限时消息接收器”,它能够在指定的时间内尝试接收消息,如果超时则返回错误。
这个函数特别适用于需要控制接收等待时间的场景,比如实时系统、服务器应用或需要避免无限期阻塞的程序。
使用场景:
- 实时系统的消息接收
- 服务器程序的请求处理
- 避免无限期阻塞的接收操作
- 超时控制的网络应用
- 高可用性系统中的消息处理
2. 函数原型
#include <mqueue.h>
#include <time.h>
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, const struct timespec *abs_timeout);
3. 功能
mq_timedreceive函数的主要功能是在指定的绝对超时时间内从消息队列接收消息。如果队列为空且在超时时间内没有消息到达,则返回错误。
4. 参数
- mqdes: 消息队列描述符
- 类型:mqd_t
- 含义:已打开的消息队列描述符
- msg_ptr: 消息缓冲区指针
- 类型:char*
- 含义:指向存储接收消息的缓冲区
- msg_len: 缓冲区大小
- 类型:size_t
- 含义:消息缓冲区的大小(字节数)
- msg_prio: 消息优先级指针
- 类型:unsigned int*
- 含义:指向存储消息优先级的变量(可为NULL)
- abs_timeout: 绝对超时时间
- 类型:const struct timespec*
- 含义:绝对超时时间(基于CLOCK_REALTIME)
5. 返回值
- 成功: 返回接收到的消息字节数
- 失败: 返回-1,并设置errno错误码
- EAGAIN:超时时间内没有消息可接收
- EBADF:无效的消息队列描述符
- EINTR:被信号中断
- EINVAL:参数无效
- EMSGSIZE:缓冲区太小
- ETIMEDOUT:超时
6. 相似函数或关联函数
- mq_receive(): 接收消息(阻塞)
- mq_send(): 发送消息
- mq_timedsend(): 限时发送消息
- clock_gettime(): 获取当前时间
- pthread_cond_timedwait(): 限时条件等待
7. 示例代码
示例1:基础mq_timedreceive使用 – 超时控制接收
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
// 创建消息队列
mqd_t create_test_queue(const char* name) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 10,
.mq_msgsize = 256,
.mq_curmsgs = 0
};
mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建消息队列失败");
return -1;
}
printf("创建测试队列: %s\n", name);
return mq;
}
// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
perror("获取当前时间失败");
return -1;
}
abs_timeout->tv_sec += seconds;
return 0;
}
int main() {
printf("=== 基础mq_timedreceive使用示例 ===\n");
const char* queue_name = "/receive_test_queue";
// 创建测试队列
mqd_t mq = create_test_queue(queue_name);
if (mq == -1) {
exit(EXIT_FAILURE);
}
// 演示mq_timedreceive超时(空队列)
printf("1. 演示空队列超时接收:\n");
struct timespec abs_timeout;
if (calculate_absolute_timeout(&abs_timeout, 3) == -1) { // 3秒超时
mq_close(mq);
mq_unlink(queue_name);
exit(EXIT_FAILURE);
}
char buffer[256];
unsigned int priority;
printf("从空队列接收消息(3秒超时):\n");
clock_t start_time = clock();
ssize_t result = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
clock_t end_time = clock();
double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
if (result == -1) {
if (errno == ETIMEDOUT) {
printf("✗ 接收超时 (耗时: %.2f 秒)\n", elapsed_time);
} else if (errno == EAGAIN) {
printf("✗ 暂无消息: %s\n", strerror(errno));
} else {
printf("✗ 接收失败: %s\n", strerror(errno));
}
} else {
buffer[result] = '\0';
printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
}
// 发送一些测试消息
printf("\n2. 发送测试消息:\n");
const char* messages[] = {
"第一条测试消息",
"第二条测试消息",
"第三条测试消息"
};
for (int i = 0; i < 3; i++) {
unsigned int priorities[] = {1, 5, 3};
if (mq_send(mq, messages[i], strlen(messages[i]), priorities[i]) == -1) {
perror("发送消息失败");
} else {
printf("发送: %s (优先级: %u)\n", messages[i], priorities[i]);
}
}
// 演示成功的mq_timedreceive
printf("\n3. 演示成功接收消息:\n");
if (calculate_absolute_timeout(&abs_timeout, 5) == 0) { // 5秒超时
printf("接收消息(队列有消息):\n");
// 接收所有消息
while (1) {
ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
if (bytes_received > 0) {
buffer[bytes_received] = '\0';
printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
} else {
if (errno == ETIMEDOUT || errno == EAGAIN) {
printf("无更多消息可接收\n");
break;
} else {
printf("接收失败: %s\n", strerror(errno));
break;
}
}
}
}
// 演示缓冲区大小处理
printf("\n4. 缓冲区大小处理演示:\n");
// 发送一条长消息
char long_message[200];
memset(long_message, 'A', sizeof(long_message) - 1);
long_message[sizeof(long_message) - 1] = '\0';
if (mq_send(mq, long_message, strlen(long_message), 0) == 0) {
printf("发送长消息成功\n");
// 使用过小的缓冲区接收
char small_buffer[50];
if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
ssize_t bytes_received = mq_timedreceive(mq, small_buffer, sizeof(small_buffer), NULL, &abs_timeout);
if (bytes_received == -1) {
if (errno == EMSGSIZE) {
printf("✗ 缓冲区太小 (预期错误)\n");
} else {
printf("✗ 其他错误: %s\n", strerror(errno));
}
} else {
small_buffer[bytes_received] = '\0';
printf("✓ 接收到截断消息: %s (%zd 字节)\n", small_buffer, bytes_received);
}
}
// 使用足够大的缓冲区接收
char large_buffer[256];
if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
ssize_t bytes_received = mq_timedreceive(mq, large_buffer, sizeof(large_buffer), NULL, &abs_timeout);
if (bytes_received > 0) {
large_buffer[bytes_received] = '\0';
printf("✓ 接收到完整消息 (%zd 字节)\n", bytes_received);
}
}
}
// 演示优先级接收
printf("\n5. 优先级接收演示:\n");
// 发送不同优先级的消息
struct {
const char* message;
unsigned int priority;
} priority_messages[] = {
{"低优先级消息", 1},
{"中优先级消息", 5},
{"高优先级消息", 10},
{"最高优先级消息", 15}
};
for (int i = 0; i < 4; i++) {
if (mq_send(mq, priority_messages[i].message, strlen(priority_messages[i].message),
priority_messages[i].priority) == 0) {
printf("发送: %s (优先级: %u)\n", priority_messages[i].message, priority_messages[i].priority);
}
}
// 接收消息(应该按优先级顺序接收)
printf("按优先级顺序接收消息:\n");
if (calculate_absolute_timeout(&abs_timeout, 3) == 0) {
for (int i = 0; i < 4; i++) {
ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
if (bytes_received > 0) {
buffer[bytes_received] = '\0';
printf("接收: %s (优先级: %u)\n", buffer, priority);
}
}
}
// 清理资源
printf("\n6. 清理资源:\n");
mq_close(mq);
mq_unlink(queue_name);
printf("队列已清理\n");
printf("\n=== 基础mq_timedreceive演示完成 ===\n");
return 0;
}
示例2:服务器应用中的超时消息处理
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#define MAX_REQUESTS 100
#define REQUEST_SIZE 512
volatile sig_atomic_t server_running = 1;
// 服务器请求结构
typedef struct {
char client_id[32];
char request_data[256];
time_t timestamp;
int request_id;
} server_request_t;
// 服务器响应结构
typedef struct {
int request_id;
char response_data[256];
int status;
time_t timestamp;
} server_response_t;
// 信号处理函数
void signal_handler(int sig) {
printf("服务器收到停止信号 %d\n", sig);
server_running = 0;
}
// 创建服务器队列
mqd_t create_server_queues(const char* request_queue, const char* response_queue) {
struct mq_attr request_attr = {
.mq_flags = 0,
.mq_maxmsg = 20,
.mq_msgsize = sizeof(server_request_t),
.mq_curmsgs = 0
};
struct mq_attr response_attr = {
.mq_flags = 0,
.mq_maxmsg = 20,
.mq_msgsize = sizeof(server_response_t),
.mq_curmsgs = 0
};
// 创建请求队列
mqd_t req_mq = mq_open(request_queue, O_CREAT | O_RDONLY, 0644, &request_attr);
if (req_mq == (mqd_t)-1) {
perror("创建请求队列失败");
return -1;
}
// 创建响应队列
mqd_t resp_mq = mq_open(response_queue, O_CREAT | O_WRONLY, 0644, &response_attr);
if (resp_mq == (mqd_t)-1) {
perror("创建响应队列失败");
mq_close(req_mq);
return -1;
}
printf("服务器队列创建成功:\n");
printf(" 请求队列: %s\n", request_queue);
printf(" 响应队列: %s\n", response_queue);
return req_mq; // 返回请求队列描述符
}
// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
perror("获取当前时间失败");
return -1;
}
long seconds = milliseconds / 1000;
long nanoseconds = (milliseconds % 1000) * 1000000;
abs_timeout->tv_sec += seconds;
abs_timeout->tv_nsec += nanoseconds;
if (abs_timeout->tv_nsec >= 1000000000) {
abs_timeout->tv_sec++;
abs_timeout->tv_nsec -= 1000000000;
}
return 0;
}
// 处理服务器请求
void process_server_requests(mqd_t req_mq, mqd_t resp_mq) {
printf("服务器开始处理请求...\n");
int processed_requests = 0;
time_t last_status_time = time(NULL);
while (server_running) {
struct timespec abs_timeout;
if (calculate_relative_timeout(&abs_timeout, 1000) == -1) { // 1秒超时
continue;
}
server_request_t request;
unsigned int priority;
ssize_t bytes_received = mq_timedreceive(req_mq, (char*)&request, sizeof(request), &priority, &abs_timeout);
if (bytes_received > 0) {
// 处理请求
printf("处理请求 #%d 来自客户端 %s\n", request.request_id, request.client_id);
// 模拟处理时间
usleep(100000); // 0.1秒
// 构造响应
server_response_t response;
response.request_id = request.request_id;
snprintf(response.response_data, sizeof(response.response_data),
"请求 #%d 已处理完成", request.request_id);
response.status = 200;
response.timestamp = time(NULL);
// 发送响应
if (mq_send(resp_mq, (char*)&response, sizeof(response), priority) == 0) {
printf("响应已发送: 请求 #%d\n", request.request_id);
processed_requests++;
} else {
printf("发送响应失败: %s\n", strerror(errno));
}
} else if (errno == ETIMEDOUT || errno == EAGAIN) {
// 超时或无消息,继续循环
} else {
printf("接收请求失败: %s\n", strerror(errno));
if (errno != EINTR) {
break;
}
}
// 定期显示状态
time_t current_time = time(NULL);
if (current_time - last_status_time >= 5) {
printf("服务器状态: 已处理 %d 个请求\n", processed_requests);
last_status_time = current_time;
}
}
printf("服务器停止,总共处理 %d 个请求\n", processed_requests);
}
// 客户端模拟器
void client_simulator(const char* request_queue, const char* response_queue, int client_id) {
printf("客户端 %d 启动\n", client_id);
// 打开队列
mqd_t req_mq = mq_open(request_queue, O_WRONLY);
mqd_t resp_mq = mq_open(response_queue, O_RDONLY);
if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
perror("客户端打开队列失败");
if (req_mq != (mqd_t)-1) mq_close(req_mq);
if (resp_mq != (mqd_t)-1) mq_close(resp_mq);
exit(EXIT_FAILURE);
}
srand(time(NULL) + client_id);
// 发送请求
for (int i = 0; i < 5; i++) {
server_request_t request;
snprintf(request.client_id, sizeof(request.client_id), "Client_%d", client_id);
snprintf(request.request_data, sizeof(request.request_data), "请求数据_%d", i + 1);
request.timestamp = time(NULL);
request.request_id = client_id * 100 + i + 1;
unsigned int priority = rand() % 10;
if (mq_send(req_mq, (char*)&request, sizeof(request), priority) == 0) {
printf("客户端 %d 发送请求 #%d\n", client_id, request.request_id);
} else {
printf("客户端 %d 发送请求失败: %s\n", client_id, strerror(errno));
continue;
}
// 等待响应
struct timespec abs_timeout;
if (calculate_relative_timeout(&abs_timeout, 3000) == 0) { // 3秒超时
server_response_t response;
ssize_t bytes_received = mq_timedreceive(resp_mq, (char*)&response, sizeof(response), NULL, &abs_timeout);
if (bytes_received > 0) {
printf("客户端 %d 收到响应: %s (状态: %d)\n",
client_id, response.response_data, response.status);
} else if (errno == ETIMEDOUT) {
printf("客户端 %d 等待响应超时\n", client_id);
} else {
printf("客户端 %d 接收响应失败: %s\n", client_id, strerror(errno));
}
}
sleep(1); // 客户端间隔
}
mq_close(req_mq);
mq_close(resp_mq);
printf("客户端 %d 完成\n", client_id);
}
int main() {
printf("=== 服务器应用超时消息处理示例 ===\n");
const char* request_queue = "/server_requests";
const char* response_queue = "/server_responses";
// 设置信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// 启动服务器进程
pid_t server_pid = fork();
if (server_pid == 0) {
// 服务器进程
mqd_t req_mq = mq_open(request_queue, O_RDONLY);
mqd_t resp_mq = mq_open(response_queue, O_WRONLY);
if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
perror("服务器打开队列失败");
exit(EXIT_FAILURE);
}
process_server_requests(req_mq, resp_mq);
mq_close(req_mq);
mq_close(resp_mq);
exit(EXIT_SUCCESS);
}
// 等待服务器启动
sleep(1);
// 启动多个客户端进程
pid_t clients[3];
for (int i = 0; i < 3; i++) {
clients[i] = fork();
if (clients[i] == 0) {
client_simulator(request_queue, response_queue, i + 1);
exit(EXIT_SUCCESS);
}
}
// 等待客户端完成
for (int i = 0; i < 3; i++) {
waitpid(clients[i], NULL, 0);
}
// 停止服务器
server_running = 0;
sleep(2);
waitpid(server_pid, NULL, 0);
// 清理队列
mq_unlink(request_queue);
mq_unlink(response_queue);
printf("\n=== 服务器应用演示完成 ===\n");
return 0;
}
编译和运行
# 编译示例(需要链接实时库)
gcc -o mq_unlink_example1 mq_unlink_example1.c -lrt
gcc -o mq_timedsend_example1 mq_timedsend_example1.c -lrt
gcc -o mq_timedsend_example2 mq_timedsend_example2.c -lrt
gcc -o mq_timedreceive_example1 mq_timedreceive_example1.c -lrt
gcc -o mq_timedreceive_example2 mq_timedreceive_example2.c -lrt
# 运行示例
./mq_unlink_example1
./mq_timedsend_example1
./mq_timedsend_example2
./mq_timedreceive_example1
./mq_timedreceive_example2
重要注意事项
- 权限要求: 需要适当的文件系统权限来创建和访问消息队列
- 名称规范: 消息队列名称必须以’/’开头
- 超时时间: 使用绝对时间而非相对时间
- 资源管理: 必须正确关闭队列描述符和删除队列
- 错误处理: 必须检查返回值并处理各种错误情况
- 线程安全: 消息队列操作是线程安全的
- 系统限制: 受系统消息队列数量和大小限制
最佳实践
- 资源清理: 及时关闭队列描述符和删除不需要的队列
- 超时设置: 合理设置超时时间以避免无限期等待
- 错误处理: 完善的错误处理和恢复机制
- 优先级使用: 合理使用消息优先级
- 缓冲区管理: 确保缓冲区大小足够
- 信号处理: 正确处理信号中断
- 性能监控: 监控队列性能和系统资源使用
通过这些示例,你可以理解POSIX消息队列相关函数在进程间通信方面的强大功能,它们为Linux系统提供了高效、可靠的IPC机制,特别适用于实时系统、服务器应用和分布式系统。