mq_timedreceive系统调用及示例

mq_timedreceive函数详解

1. 函数介绍

mq_timedreceive函数是Linux系统中用于在指定时间内从POSIX消息队列接收消息的函数。它是mq_receive函数的增强版本,支持超时控制。可以把mq_timedreceive想象成一个”限时消息接收器”,它能够在指定的时间内尝试接收消息,如果超时则返回错误。

这个函数特别适用于需要控制接收等待时间的场景,比如实时系统、服务器应用或需要避免无限期阻塞的程序。

使用场景:

  • 实时系统的消息接收
  • 服务器程序的请求处理
  • 避免无限期阻塞的接收操作
  • 超时控制的网络应用
  • 高可用性系统中的消息处理

2. 函数原型

#include <mqueue.h>
#include <time.h>

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 
                        unsigned int *msg_prio, const struct timespec *abs_timeout);

3. 功能

mq_timedreceive函数的主要功能是在指定的绝对超时时间内从消息队列接收消息。如果队列为空且在超时时间内没有消息到达,则返回错误。

4. 参数

  • mqdes: 消息队列描述符
    • 类型:mqd_t
    • 含义:已打开的消息队列描述符
  • msg_ptr: 消息缓冲区指针
    • 类型:char*
    • 含义:指向存储接收消息的缓冲区
  • msg_len: 缓冲区大小
    • 类型:size_t
    • 含义:消息缓冲区的大小(字节数)
  • msg_prio: 消息优先级指针
    • 类型:unsigned int*
    • 含义:指向存储消息优先级的变量(可为NULL)
  • abs_timeout: 绝对超时时间
    • 类型:const struct timespec*
    • 含义:绝对超时时间(基于CLOCK_REALTIME)

5. 返回值

  • 成功: 返回接收到的消息字节数
  • 失败: 返回-1,并设置errno错误码
    • EAGAIN:超时时间内没有消息可接收
    • EBADF:无效的消息队列描述符
    • EINTR:被信号中断
    • EINVAL:参数无效
    • EMSGSIZE:缓冲区太小
    • ETIMEDOUT:超时

6. 相似函数或关联函数

  • mq_receive(): 接收消息(阻塞)
  • mq_send(): 发送消息
  • mq_timedsend(): 限时发送消息
  • clock_gettime(): 获取当前时间
  • pthread_cond_timedwait(): 限时条件等待

7. 示例代码

示例1:基础mq_timedreceive使用 – 超时控制接收

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>

// 创建消息队列
mqd_t create_test_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 10,
        .mq_msgsize = 256,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return -1;
    }
    
    printf("创建测试队列: %s\n", name);
    return mq;
}

// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    abs_timeout->tv_sec += seconds;
    return 0;
}

int main() {
    printf("=== 基础mq_timedreceive使用示例 ===\n");
    
    const char* queue_name = "/receive_test_queue";
    
    // 创建测试队列
    mqd_t mq = create_test_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 演示mq_timedreceive超时(空队列)
    printf("1. 演示空队列超时接收:\n");
    
    struct timespec abs_timeout;
    if (calculate_absolute_timeout(&abs_timeout, 3) == -1) {  // 3秒超时
        mq_close(mq);
        mq_unlink(queue_name);
        exit(EXIT_FAILURE);
    }
    
    char buffer[256];
    unsigned int priority;
    
    printf("从空队列接收消息(3秒超时):\n");
    clock_t start_time = clock();
    ssize_t result = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
    clock_t end_time = clock();
    
    double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
    
    if (result == -1) {
        if (errno == ETIMEDOUT) {
            printf("✗ 接收超时 (耗时: %.2f 秒)\n", elapsed_time);
        } else if (errno == EAGAIN) {
            printf("✗ 暂无消息: %s\n", strerror(errno));
        } else {
            printf("✗ 接收失败: %s\n", strerror(errno));
        }
    } else {
        buffer[result] = '\0';
        printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
    }
    
    // 发送一些测试消息
    printf("\n2. 发送测试消息:\n");
    const char* messages[] = {
        "第一条测试消息",
        "第二条测试消息",
        "第三条测试消息"
    };
    
    for (int i = 0; i < 3; i++) {
        unsigned int priorities[] = {1, 5, 3};
        if (mq_send(mq, messages[i], strlen(messages[i]), priorities[i]) == -1) {
            perror("发送消息失败");
        } else {
            printf("发送: %s (优先级: %u)\n", messages[i], priorities[i]);
        }
    }
    
    // 演示成功的mq_timedreceive
    printf("\n3. 演示成功接收消息:\n");
    
    if (calculate_absolute_timeout(&abs_timeout, 5) == 0) {  // 5秒超时
        printf("接收消息(队列有消息):\n");
        
        // 接收所有消息
        while (1) {
            ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
            if (bytes_received > 0) {
                buffer[bytes_received] = '\0';
                printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
            } else {
                if (errno == ETIMEDOUT || errno == EAGAIN) {
                    printf("无更多消息可接收\n");
                    break;
                } else {
                    printf("接收失败: %s\n", strerror(errno));
                    break;
                }
            }
        }
    }
    
    // 演示缓冲区大小处理
    printf("\n4. 缓冲区大小处理演示:\n");
    
    // 发送一条长消息
    char long_message[200];
    memset(long_message, 'A', sizeof(long_message) - 1);
    long_message[sizeof(long_message) - 1] = '\0';
    
    if (mq_send(mq, long_message, strlen(long_message), 0) == 0) {
        printf("发送长消息成功\n");
        
        // 使用过小的缓冲区接收
        char small_buffer[50];
        if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
            ssize_t bytes_received = mq_timedreceive(mq, small_buffer, sizeof(small_buffer), NULL, &abs_timeout);
            if (bytes_received == -1) {
                if (errno == EMSGSIZE) {
                    printf("✗ 缓冲区太小 (预期错误)\n");
                } else {
                    printf("✗ 其他错误: %s\n", strerror(errno));
                }
            } else {
                small_buffer[bytes_received] = '\0';
                printf("✓ 接收到截断消息: %s (%zd 字节)\n", small_buffer, bytes_received);
            }
        }
        
        // 使用足够大的缓冲区接收
        char large_buffer[256];
        if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
            ssize_t bytes_received = mq_timedreceive(mq, large_buffer, sizeof(large_buffer), NULL, &abs_timeout);
            if (bytes_received > 0) {
                large_buffer[bytes_received] = '\0';
                printf("✓ 接收到完整消息 (%zd 字节)\n", bytes_received);
            }
        }
    }
    
    // 演示优先级接收
    printf("\n5. 优先级接收演示:\n");
    
    // 发送不同优先级的消息
    struct {
        const char* message;
        unsigned int priority;
    } priority_messages[] = {
        {"低优先级消息", 1},
        {"中优先级消息", 5},
        {"高优先级消息", 10},
        {"最高优先级消息", 15}
    };
    
    for (int i = 0; i < 4; i++) {
        if (mq_send(mq, priority_messages[i].message, strlen(priority_messages[i].message), 
                   priority_messages[i].priority) == 0) {
            printf("发送: %s (优先级: %u)\n", priority_messages[i].message, priority_messages[i].priority);
        }
    }
    
    // 接收消息(应该按优先级顺序接收)
    printf("按优先级顺序接收消息:\n");
    if (calculate_absolute_timeout(&abs_timeout, 3) == 0) {
        for (int i = 0; i < 4; i++) {
            ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
            if (bytes_received > 0) {
                buffer[bytes_received] = '\0';
                printf("接收: %s (优先级: %u)\n", buffer, priority);
            }
        }
    }
    
    // 清理资源
    printf("\n6. 清理资源:\n");
    mq_close(mq);
    mq_unlink(queue_name);
    printf("队列已清理\n");
    
    printf("\n=== 基础mq_timedreceive演示完成 ===\n");
    
    return 0;
}

示例2:服务器应用中的超时消息处理

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

#define MAX_REQUESTS 100
#define REQUEST_SIZE 512

volatile sig_atomic_t server_running = 1;

// 服务器请求结构
typedef struct {
    char client_id[32];
    char request_data[256];
    time_t timestamp;
    int request_id;
} server_request_t;

// 服务器响应结构
typedef struct {
    int request_id;
    char response_data[256];
    int status;
    time_t timestamp;
} server_response_t;

// 信号处理函数
void signal_handler(int sig) {
    printf("服务器收到停止信号 %d\n", sig);
    server_running = 0;
}

// 创建服务器队列
mqd_t create_server_queues(const char* request_queue, const char* response_queue) {
    struct mq_attr request_attr = {
        .mq_flags = 0,
        .mq_maxmsg = 20,
        .mq_msgsize = sizeof(server_request_t),
        .mq_curmsgs = 0
    };
    
    struct mq_attr response_attr = {
        .mq_flags = 0,
        .mq_maxmsg = 20,
        .mq_msgsize = sizeof(server_response_t),
        .mq_curmsgs = 0
    };
    
    // 创建请求队列
    mqd_t req_mq = mq_open(request_queue, O_CREAT | O_RDONLY, 0644, &request_attr);
    if (req_mq == (mqd_t)-1) {
        perror("创建请求队列失败");
        return -1;
    }
    
    // 创建响应队列
    mqd_t resp_mq = mq_open(response_queue, O_CREAT | O_WRONLY, 0644, &response_attr);
    if (resp_mq == (mqd_t)-1) {
        perror("创建响应队列失败");
        mq_close(req_mq);
        return -1;
    }
    
    printf("服务器队列创建成功:\n");
    printf("  请求队列: %s\n", request_queue);
    printf("  响应队列: %s\n", response_queue);
    
    return req_mq;  // 返回请求队列描述符
}

// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    long seconds = milliseconds / 1000;
    long nanoseconds = (milliseconds % 1000) * 1000000;
    
    abs_timeout->tv_sec += seconds;
    abs_timeout->tv_nsec += nanoseconds;
    
    if (abs_timeout->tv_nsec >= 1000000000) {
        abs_timeout->tv_sec++;
        abs_timeout->tv_nsec -= 1000000000;
    }
    
    return 0;
}

// 处理服务器请求
void process_server_requests(mqd_t req_mq, mqd_t resp_mq) {
    printf("服务器开始处理请求...\n");
    
    int processed_requests = 0;
    time_t last_status_time = time(NULL);
    
    while (server_running) {
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, 1000) == -1) {  // 1秒超时
            continue;
        }
        
        server_request_t request;
        unsigned int priority;
        
        ssize_t bytes_received = mq_timedreceive(req_mq, (char*)&request, sizeof(request), &priority, &abs_timeout);
        
        if (bytes_received > 0) {
            // 处理请求
            printf("处理请求 #%d 来自客户端 %s\n", request.request_id, request.client_id);
            
            // 模拟处理时间
            usleep(100000);  // 0.1秒
            
            // 构造响应
            server_response_t response;
            response.request_id = request.request_id;
            snprintf(response.response_data, sizeof(response.response_data), 
                     "请求 #%d 已处理完成", request.request_id);
            response.status = 200;
            response.timestamp = time(NULL);
            
            // 发送响应
            if (mq_send(resp_mq, (char*)&response, sizeof(response), priority) == 0) {
                printf("响应已发送: 请求 #%d\n", request.request_id);
                processed_requests++;
            } else {
                printf("发送响应失败: %s\n", strerror(errno));
            }
        } else if (errno == ETIMEDOUT || errno == EAGAIN) {
            // 超时或无消息,继续循环
        } else {
            printf("接收请求失败: %s\n", strerror(errno));
            if (errno != EINTR) {
                break;
            }
        }
        
        // 定期显示状态
        time_t current_time = time(NULL);
        if (current_time - last_status_time >= 5) {
            printf("服务器状态: 已处理 %d 个请求\n", processed_requests);
            last_status_time = current_time;
        }
    }
    
    printf("服务器停止,总共处理 %d 个请求\n", processed_requests);
}

// 客户端模拟器
void client_simulator(const char* request_queue, const char* response_queue, int client_id) {
    printf("客户端 %d 启动\n", client_id);
    
    // 打开队列
    mqd_t req_mq = mq_open(request_queue, O_WRONLY);
    mqd_t resp_mq = mq_open(response_queue, O_RDONLY);
    
    if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
        perror("客户端打开队列失败");
        if (req_mq != (mqd_t)-1) mq_close(req_mq);
        if (resp_mq != (mqd_t)-1) mq_close(resp_mq);
        exit(EXIT_FAILURE);
    }
    
    srand(time(NULL) + client_id);
    
    // 发送请求
    for (int i = 0; i < 5; i++) {
        server_request_t request;
        snprintf(request.client_id, sizeof(request.client_id), "Client_%d", client_id);
        snprintf(request.request_data, sizeof(request.request_data), "请求数据_%d", i + 1);
        request.timestamp = time(NULL);
        request.request_id = client_id * 100 + i + 1;
        
        unsigned int priority = rand() % 10;
        
        if (mq_send(req_mq, (char*)&request, sizeof(request), priority) == 0) {
            printf("客户端 %d 发送请求 #%d\n", client_id, request.request_id);
        } else {
            printf("客户端 %d 发送请求失败: %s\n", client_id, strerror(errno));
            continue;
        }
        
        // 等待响应
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, 3000) == 0) {  // 3秒超时
            server_response_t response;
            ssize_t bytes_received = mq_timedreceive(resp_mq, (char*)&response, sizeof(response), NULL, &abs_timeout);
            
            if (bytes_received > 0) {
                printf("客户端 %d 收到响应: %s (状态: %d)\n", 
                       client_id, response.response_data, response.status);
            } else if (errno == ETIMEDOUT) {
                printf("客户端 %d 等待响应超时\n", client_id);
            } else {
                printf("客户端 %d 接收响应失败: %s\n", client_id, strerror(errno));
            }
        }
        
        sleep(1);  // 客户端间隔
    }
    
    mq_close(req_mq);
    mq_close(resp_mq);
    printf("客户端 %d 完成\n", client_id);
}

int main() {
    printf("=== 服务器应用超时消息处理示例 ===\n");
    
    const char* request_queue = "/server_requests";
    const char* response_queue = "/server_responses";
    
    // 设置信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    
    // 启动服务器进程
    pid_t server_pid = fork();
    if (server_pid == 0) {
        // 服务器进程
        mqd_t req_mq = mq_open(request_queue, O_RDONLY);
        mqd_t resp_mq = mq_open(response_queue, O_WRONLY);
        
        if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
            perror("服务器打开队列失败");
            exit(EXIT_FAILURE);
        }
        
        process_server_requests(req_mq, resp_mq);
        
        mq_close(req_mq);
        mq_close(resp_mq);
        exit(EXIT_SUCCESS);
    }
    
    // 等待服务器启动
    sleep(1);
    
    // 启动多个客户端进程
    pid_t clients[3];
    for (int i = 0; i < 3; i++) {
        clients[i] = fork();
        if (clients[i] == 0) {
            client_simulator(request_queue, response_queue, i + 1);
            exit(EXIT_SUCCESS);
        }
    }
    
    // 等待客户端完成
    for (int i = 0; i < 3; i++) {
        waitpid(clients[i], NULL, 0);
    }
    
    // 停止服务器
    server_running = 0;
    sleep(2);
    waitpid(server_pid, NULL, 0);
    
    // 清理队列
    mq_unlink(request_queue);
    mq_unlink(response_queue);
    
    printf("\n=== 服务器应用演示完成 ===\n");
    
    return 0;
}

编译和运行

# 编译示例(需要链接实时库)
gcc -o mq_unlink_example1 mq_unlink_example1.c -lrt
gcc -o mq_timedsend_example1 mq_timedsend_example1.c -lrt
gcc -o mq_timedsend_example2 mq_timedsend_example2.c -lrt
gcc -o mq_timedreceive_example1 mq_timedreceive_example1.c -lrt
gcc -o mq_timedreceive_example2 mq_timedreceive_example2.c -lrt

# 运行示例
./mq_unlink_example1
./mq_timedsend_example1
./mq_timedsend_example2
./mq_timedreceive_example1
./mq_timedreceive_example2

重要注意事项

  1. 权限要求: 需要适当的文件系统权限来创建和访问消息队列
  2. 名称规范: 消息队列名称必须以’/’开头
  3. 超时时间: 使用绝对时间而非相对时间
  4. 资源管理: 必须正确关闭队列描述符和删除队列
  5. 错误处理: 必须检查返回值并处理各种错误情况
  6. 线程安全: 消息队列操作是线程安全的
  7. 系统限制: 受系统消息队列数量和大小限制

最佳实践

  1. 资源清理: 及时关闭队列描述符和删除不需要的队列
  2. 超时设置: 合理设置超时时间以避免无限期等待
  3. 错误处理: 完善的错误处理和恢复机制
  4. 优先级使用: 合理使用消息优先级
  5. 缓冲区管理: 确保缓冲区大小足够
  6. 信号处理: 正确处理信号中断
  7. 性能监控: 监控队列性能和系统资源使用

通过这些示例,你可以理解POSIX消息队列相关函数在进程间通信方面的强大功能,它们为Linux系统提供了高效、可靠的IPC机制,特别适用于实时系统、服务器应用和分布式系统。

此条目发表在linux文章分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注