POSIX 消息队列 (mq_*) 函数详解

POSIX 消息队列 (mq_*) 函数详解

1. 函数介绍

POSIX 消息队列是一组用于进程间通信(IPC)的函数,提供了一种可靠的、基于消息的通信机制。可以把消息队列想象成”邮局系统”——发送者将消息放入邮箱(队列),接收者从邮箱中取出消息,就像现实中的邮政服务一样。

与传统的 System V 消息队列相比,POSIX 消息队列具有更好的可移植性和更简洁的 API。它们支持优先级消息、持久化、以及通过文件系统路径名进行命名。

2. 核心函数原型

#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>

// 核心函数
mqd_t mq_open(const char *name, int oflag, ...);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

3. 功能

POSIX 消息队列提供以下功能:

  • 创建和打开消息队列
  • 发送和接收消息
  • 设置和获取队列属性
  • 异步通知机制
  • 持久化支持
  • 优先级消息支持

4. 核心结构体

struct mq_attr

struct mq_attr {
    long mq_flags;   /* 消息队列标志 */
    long mq_maxmsg;  /* 最大消息数 */
    long mq_msgsize; /* 最大消息大小 */
    long mq_curmsgs; /* 当前消息数 */
};

struct sigevent (用于通知)

struct sigevent {
    int          sigev_notify;            /* 通知类型 */
    int          sigev_signo;             /* 信号编号 */
    union sigval sigev_value;             /* 传递给处理函数的数据 */
    void       (*sigev_notify_function)(union sigval);  /* 线程函数 */
    pthread_attr_t *sigev_notify_attributes;           /* 线程属性 */
};

5. 消息队列名称

  • 名称必须以 ‘/’ 开头
  • 长度限制为 NAME_MAX (通常 255 字符)
  • 示例:“/my_queue”, “/app/messages”

6. 打开标志 (oflag)

标志说明
O_RDONLY只读打开
O_WRONLY只写打开
O_RDWR读写打开
O_CREAT不存在时创建
O_EXCL与 O_CREAT 一起使用,如果存在则失败
O_NONBLOCK非阻塞模式

7. 返回值

  • mq_open: 成功返回消息队列描述符,失败返回 (mqd_t)-1
  • 其他函数: 成功返回 0,失败返回 -1

8. 相关函数

  • pthread: 多线程支持
  • signal: 信号处理
  • fcntl: 文件控制
  • unlink: 删除文件

9. 示例代码

示例1:基础用法 – 简单的消息发送和接收

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>

#define QUEUE_NAME "/example_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10

int main() {
    mqd_t mq;
    struct mq_attr attr;
    char send_buffer[MAX_MSG_SIZE];
    char recv_buffer[MAX_MSG_SIZE];
    ssize_t bytes_read;
    unsigned int priority;
    
    printf("=== POSIX 消息队列基础示例 ===\n\n");
    
    // 设置消息队列属性
    attr.mq_flags = 0;
    attr.mq_maxmsg = MAX_MSGS;
    attr.mq_msgsize = MAX_MSG_SIZE;
    attr.mq_curmsgs = 0;
    
    // 创建并打开消息队列
    printf("创建消息队列: %s\n", QUEUE_NAME);
    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(1);
    }
    printf("✓ 消息队列创建成功\n\n");
    
    // 获取并显示队列属性
    printf("消息队列属性:\n");
    if (mq_getattr(mq, &attr) == 0) {
        printf("  最大消息数: %ld\n", attr.mq_maxmsg);
        printf("  最大消息大小: %ld 字节\n", attr.mq_msgsize);
        printf("  当前消息数: %ld\n", attr.mq_curmsgs);
        printf("  标志: %ld\n", attr.mq_flags);
    }
    printf("\n");
    
    // 发送消息
    printf("发送消息:\n");
    const char* messages[] = {
        "第一条消息: Hello, World!",
        "第二条消息: 欢迎使用 POSIX 消息队列",
        "第三条消息: 这是优先级消息",
        "第四条消息: 最后一条测试消息"
    };
    int priorities[] = {0, 0, 10, 0};  // 优先级 (数值越大优先级越高)
    
    for (int i = 0; i < 4; i++) {
        if (mq_send(mq, messages[i], strlen(messages[i]) + 1, priorities[i]) == 0) {
            printf("  ✓ 发送消息 %d (优先级 %d): %s\n", i + 1, priorities[i], messages[i]);
        } else {
            perror("  ✗ mq_send 失败");
        }
    }
    
    // 显示发送后队列状态
    if (mq_getattr(mq, &attr) == 0) {
        printf("\n发送后队列状态: %ld 条消息\n", attr.mq_curmsgs);
    }
    
    // 接收消息
    printf("\n接收消息 (按优先级顺序):\n");
    for (int i = 0; i < 4; i++) {
        bytes_read = mq_receive(mq, recv_buffer, MAX_MSG_SIZE, &priority);
        if (bytes_read != -1) {
            printf("  ✓ 接收消息 %d (优先级 %d, 长度 %zd): %s\n", 
                   i + 1, priority, bytes_read, recv_buffer);
        } else {
            if (errno == EAGAIN) {
                printf("  ⚠ 队列为空\n");
                break;
            } else {
                perror("  ✗ mq_receive 失败");
                break;
            }
        }
    }
    
    // 显示接收后队列状态
    if (mq_getattr(mq, &attr) == 0) {
        printf("\n接收后队列状态: %ld 条消息\n", attr.mq_curmsgs);
    }
    
    // 关闭消息队列
    if (mq_close(mq) == 0) {
        printf("✓ 消息队列关闭成功\n");
    } else {
        perror("✗ mq_close 失败");
    }
    
    // 删除消息队列
    if (mq_unlink(QUEUE_NAME) == 0) {
        printf("✓ 消息队列删除成功\n");
    } else {
        perror("✗ mq_unlink 失败");
    }
    
    printf("\n=== 消息队列特点 ===\n");
    printf("1. 支持优先级消息 (数值越大优先级越高)\n");
    printf("2. 消息大小可配置\n");
    printf("3. 消息数量有限制\n");
    printf("4. 支持持久化 (直到显式删除)\n");
    printf("5. 可通过文件系统路径访问\n");
    
    return 0;
}

示例2:生产者-消费者模型

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>

#define QUEUE_NAME "/producer_consumer_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 20
#define NUM_MESSAGES 10

// 全局变量
mqd_t mq;
int producer_count = 0;
int consumer_count = 0;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;

// 生产者线程函数
void* producer_thread(void* arg) {
    int producer_id = *(int*)arg;
    char message[MAX_MSG_SIZE];
    time_t now;
    
    printf("生产者 %d 启动\n", producer_id);
    
    for (int i = 0; i < NUM_MESSAGES; i++) {
        // 构造消息
        time(&now);
        snprintf(message, sizeof(message), 
                "P%d-MSG%d-TIME:%s", producer_id, i + 1, ctime(&now));
        
        // 发送消息 (交替使用不同优先级)
        unsigned int priority = (i % 3 == 0) ? 5 : 1;  // 每第3条高优先级
        
        if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
            pthread_mutex_lock(&count_mutex);
            producer_count++;
            pthread_mutex_unlock(&count_mutex);
            
            printf("生产者 %d 发送消息: %s (优先级 %u)\n", 
                   producer_id, message, priority);
        } else {
            perror("生产者发送失败");
        }
        
        // 随机延迟
        usleep((rand() % 100 + 1) * 1000);  // 1-100ms
    }
    
    printf("生产者 %d 完成\n", producer_id);
    return NULL;
}

// 消费者线程函数
void* consumer_thread(void* arg) {
    int consumer_id = *(int*)arg;
    char message[MAX_MSG_SIZE];
    ssize_t bytes_read;
    unsigned int priority;
    
    printf("消费者 %d 启动\n", consumer_id);
    
    while (1) {
        // 接收消息
        bytes_read = mq_receive(mq, message, MAX_MSG_SIZE, &priority);
        if (bytes_read != -1) {
            pthread_mutex_lock(&count_mutex);
            consumer_count++;
            int current_count = consumer_count;
            pthread_mutex_unlock(&count_mutex);
            
            printf("消费者 %d 接收消息 %d (优先级 %u): %s", 
                   consumer_id, current_count, priority, message);
            
            // 检查是否接收完所有消息
            if (current_count >= NUM_MESSAGES * 2) {  // 2个生产者
                break;
            }
        } else {
            if (errno == EAGAIN) {
                // 非阻塞模式下队列为空
                usleep(10000);  // 10ms
                continue;
            } else {
                perror("消费者接收失败");
                break;
            }
        }
        
        // 随机延迟
        usleep((rand() % 50 + 1) * 1000);  // 1-50ms
    }
    
    printf("消费者 %d 完成\n", consumer_id);
    return NULL;
}

int main() {
    pthread_t producers[2];
    pthread_t consumers[3];
    int producer_ids[2] = {1, 2};
    int consumer_ids[3] = {1, 2, 3};
    struct mq_attr attr;
    
    printf("=== 生产者-消费者消息队列示例 ===\n\n");
    
    // 初始化随机数种子
    srand(time(NULL) + getpid());
    
    // 设置消息队列属性
    attr.mq_flags = 0;  // 阻塞模式
    attr.mq_maxmsg = MAX_MSGS;
    attr.mq_msgsize = MAX_MSG_SIZE;
    attr.mq_curmsgs = 0;
    
    // 创建消息队列
    printf("创建消息队列: %s\n", QUEUE_NAME);
    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(1);
    }
    printf("✓ 消息队列创建成功\n\n");
    
    // 创建生产者线程
    printf("创建生产者线程...\n");
    for (int i = 0; i < 2; i++) {
        if (pthread_create(&producers[i], NULL, producer_thread, &producer_ids[i]) != 0) {
            perror("创建生产者线程失败");
            exit(1);
        }
    }
    
    // 创建消费者线程
    printf("创建消费者线程...\n");
    for (int i = 0; i < 3; i++) {
        if (pthread_create(&consumers[i], NULL, consumer_thread, &consumer_ids[i]) != 0) {
            perror("创建消费者线程失败");
            exit(1);
        }
    }
    
    // 等待生产者完成
    printf("等待生产者完成...\n");
    for (int i = 0; i < 2; i++) {
        pthread_join(producers[i], NULL);
    }
    
    // 等待消费者完成
    printf("等待消费者完成...\n");
    for (int i = 0; i < 3; i++) {
        pthread_join(consumers[i], NULL);
    }
    
    // 显示统计信息
    printf("\n=== 统计信息 ===\n");
    printf("生产消息数: %d\n", producer_count);
    printf("消费消息数: %d\n", consumer_count);
    
    // 显示最终队列状态
    if (mq_getattr(mq, &attr) == 0) {
        printf("队列中剩余消息: %ld\n", attr.mq_curmsgs);
    }
    
    // 清理资源
    if (mq_close(mq) == 0) {
        printf("✓ 消息队列关闭成功\n");
    }
    if (mq_unlink(QUEUE_NAME) == 0) {
        printf("✓ 消息队列删除成功\n");
    }
    
    printf("\n=== 生产者-消费者模型特点 ===\n");
    printf("1. 解耦: 生产者和消费者独立运行\n");
    printf("2. 异步: 生产和消费可以不同步进行\n");
    printf("3. 缓冲: 消息队列提供缓冲作用\n");
    printf("4. 负载均衡: 多个消费者可以并行处理\n");
    printf("5. 可靠性: 消息持久化存储\n");
    
    return 0;
}

示例3:完整的消息队列管理系统

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <getopt.h>
#include <signal.h>
#include <time.h>

// 配置结构体
struct mq_config {
    char *queue_name;
    int max_messages;
    int max_message_size;
    int create_queue;
    int delete_queue;
    int show_info;
    int send_message;
    int receive_message;
    int list_queues;
    int priority;
    char *message_content;
    int non_blocking;
    int verbose;
};

// 全局变量
volatile sig_atomic_t running = 1;

// 信号处理函数
void signal_handler(int sig) {
    printf("\n收到信号 %d,准备退出...\n", sig);
    running = 0;
}

// 设置信号处理
void setup_signal_handlers() {
    struct sigaction sa;
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    
    sigaction(SIGINT, &sa, NULL);   // Ctrl+C
    sigaction(SIGTERM, &sa, NULL);  // 终止信号
}

// 显示消息队列信息
void show_queue_info(mqd_t mq) {
    struct mq_attr attr;
    
    if (mq_getattr(mq, &attr) == 0) {
        printf("消息队列属性:\n");
        printf("  最大消息数: %ld\n", attr.mq_maxmsg);
        printf("  最大消息大小: %ld 字节\n", attr.mq_msgsize);
        printf("  当前消息数: %ld\n", attr.mq_curmsgs);
        printf("  标志: %s\n", (attr.mq_flags & O_NONBLOCK) ? "非阻塞" : "阻塞");
    } else {
        perror("获取队列属性失败");
    }
}

// 列出所有消息队列
void list_all_queues() {
    printf("=== 系统消息队列列表 ===\n");
    printf("注意: POSIX 消息队列通常在 /dev/mqueue/ 目录下\n");
    
    // 尝试列出 /dev/mqueue/ 目录
    if (access("/dev/mqueue", F_OK) == 0) {
        printf("系统消息队列目录存在\n");
        system("ls -la /dev/mqueue/ 2>/dev/null || echo '无法访问 /dev/mqueue/'");
    } else {
        printf("系统消息队列目录不存在\n");
    }
    printf("\n");
}

// 发送消息
int send_message_to_queue(mqd_t mq, const char *message, int priority, int non_blocking) {
    struct mq_attr attr;
    
    // 检查消息大小
    if (mq_getattr(mq, &attr) == 0) {
        if (strlen(message) + 1 > (size_t)attr.mq_msgsize) {
            fprintf(stderr, "错误: 消息大小 (%zu) 超过队列限制 (%ld)\n", 
                    strlen(message) + 1, attr.mq_msgsize);
            return -1;
        }
    }
    
    // 发送消息
    if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
        printf("✓ 消息发送成功 (优先级 %d): %s\n", priority, message);
        return 0;
    } else {
        if (errno == EAGAIN && non_blocking) {
            printf("⚠ 队列已满,非阻塞模式下发送失败\n");
        } else {
            perror("✗ 消息发送失败");
        }
        return -1;
    }
}

// 接收消息
int receive_message_from_queue(mqd_t mq, int non_blocking) {
    char *buffer;
    struct mq_attr attr;
    ssize_t bytes_read;
    unsigned int priority;
    
    // 获取队列属性以确定缓冲区大小
    if (mq_getattr(mq, &attr) != 0) {
        perror("获取队列属性失败");
        return -1;
    }
    
    buffer = malloc(attr.mq_msgsize);
    if (!buffer) {
        perror("内存分配失败");
        return -1;
    }
    
    // 接收消息
    bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);
    if (bytes_read != -1) {
        printf("✓ 消息接收成功 (优先级 %u, 长度 %zd): %s", 
               priority, bytes_read, buffer);
        free(buffer);
        return 0;
    } else {
        if (errno == EAGAIN && non_blocking) {
            printf("⚠ 队列为空,非阻塞模式下接收失败\n");
        } else {
            perror("✗ 消息接收失败");
        }
        free(buffer);
        return -1;
    }
}

// 创建消息队列
mqd_t create_message_queue(const char *name, int max_msgs, int max_size, int non_blocking) {
    struct mq_attr attr;
    int flags = O_CREAT | O_RDWR;
    
    if (non_blocking) {
        flags |= O_NONBLOCK;
    }
    
    attr.mq_flags = non_blocking ? O_NONBLOCK : 0;
    attr.mq_maxmsg = max_msgs;
    attr.mq_msgsize = max_size;
    attr.mq_curmsgs = 0;
    
    mqd_t mq = mq_open(name, flags, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return (mqd_t)-1;
    }
    
    printf("✓ 消息队列创建成功: %s\n", name);
    return mq;
}

// 打开现有消息队列
mqd_t open_existing_queue(const char *name, int non_blocking) {
    int flags = O_RDWR;
    
    if (non_blocking) {
        flags |= O_NONBLOCK;
    }
    
    mqd_t mq = mq_open(name, flags);
    if (mq == (mqd_t)-1) {
        perror("打开消息队列失败");
        return (mqd_t)-1;
    }
    
    printf("✓ 消息队列打开成功: %s\n", name);
    return mq;
}

// 显示帮助信息
void show_help(const char *program_name) {
    printf("用法: %s [选项]\n", program_name);
    printf("\n选项:\n");
    printf("  -n, --name=NAME        消息队列名称 (以 / 开头)\n");
    printf("  -c, --create           创建消息队列\n");
    printf("  -d, --delete           删除消息队列\n");
    printf("  -i, --info             显示队列信息\n");
    printf("  -l, --list             列出所有队列\n");
    printf("  -s, --send=MESSAGE     发送消息\n");
    printf("  -r, --receive          接收消息\n");
    printf("  -p, --priority=NUM     消息优先级 (默认 0)\n");
    printf("  -m, --max-msgs=NUM     最大消息数 (创建时使用)\n");
    printf("  -z, --max-size=NUM     最大消息大小 (创建时使用)\n");
    printf("  -b, --non-blocking     非阻塞模式\n");
    printf("  -v, --verbose          详细输出\n");
    printf("  -h, --help             显示此帮助信息\n");
    printf("\n示例:\n");
    printf("  %s -n /myqueue -c -m 10 -z 256     # 创建队列\n", program_name);
    printf("  %s -n /myqueue -s \"Hello World\"   # 发送消息\n", program_name);
    printf("  %s -n /myqueue -r                  # 接收消息\n", program_name);
    printf("  %s -n /myqueue -i                  # 显示队列信息\n", program_name);
    printf("  %s -n /myqueue -d                  # 删除队列\n", program_name);
    printf("  %s -l                              # 列出所有队列\n", program_name);
}

int main(int argc, char *argv[]) {
    struct mq_config config = {
        .queue_name = NULL,
        .max_messages = 10,
        .max_message_size = 256,
        .create_queue = 0,
        .delete_queue = 0,
        .show_info = 0,
        .send_message = 0,
        .receive_message = 0,
        .list_queues = 0,
        .priority = 0,
        .message_content = NULL,
        .non_blocking = 0,
        .verbose = 0
    };
    
    printf("=== POSIX 消息队列管理系统 ===\n\n");
    
    // 解析命令行参数
    static struct option long_options[] = {
        {"name",        required_argument, 0, 'n'},
        {"create",      no_argument,       0, 'c'},
        {"delete",      no_argument,       0, 'd'},
        {"info",        no_argument,       0, 'i'},
        {"list",        no_argument,       0, 'l'},
        {"send",        required_argument, 0, 's'},
        {"receive",     no_argument,       0, 'r'},
        {"priority",    required_argument, 0, 'p'},
        {"max-msgs",    required_argument, 0, 'm'},
        {"max-size",    required_argument, 0, 'z'},
        {"non-blocking", no_argument,      0, 'b'},
        {"verbose",     no_argument,       0, 'v'},
        {"help",        no_argument,       0, 'h'},
        {0, 0, 0, 0}
    };
    
    int opt;
    while ((opt = getopt_long(argc, argv, "n:cdils:rp:m:z:bvh", long_options, NULL)) != -1) {
        switch (opt) {
            case 'n':
                config.queue_name = optarg;
                break;
            case 'c':
                config.create_queue = 1;
                break;
            case 'd':
                config.delete_queue = 1;
                break;
            case 'i':
                config.show_info = 1;
                break;
            case 'l':
                config.list_queues = 1;
                break;
            case 's':
                config.send_message = 1;
                config.message_content = optarg;
                break;
            case 'r':
                config.receive_message = 1;
                break;
            case 'p':
                config.priority = atoi(optarg);
                break;
            case 'm':
                config.max_messages = atoi(optarg);
                break;
            case 'z':
                config.max_message_size = atoi(optarg);
                break;
            case 'b':
                config.non_blocking = 1;
                break;
            case 'v':
                config.verbose = 1;
                break;
            case 'h':
                show_help(argv[0]);
                return 0;
            default:
                fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
                return 1;
        }
    }
    
    // 设置信号处理
    setup_signal_handlers();
    
    // 显示系统信息
    if (config.verbose) {
        printf("系统信息:\n");
        printf("  当前用户 UID: %d\n", getuid());
        printf("  当前进程 PID: %d\n", getpid());
        printf("  消息队列支持: ");
        system("ls /dev/mqueue/ >/dev/null 2>&1 && echo '是' || echo '否'");
        printf("\n");
    }
    
    // 列出所有队列
    if (config.list_queues) {
        list_all_queues();
        if (!config.queue_name && !config.create_queue && !config.delete_queue &&
            !config.show_info && !config.send_message && !config.receive_message) {
            return 0;
        }
    }
    
    // 如果没有指定队列名称且需要操作队列
    if (!config.queue_name && (config.create_queue || config.delete_queue || 
                              config.show_info || config.send_message || 
                              config.receive_message)) {
        fprintf(stderr, "错误: 需要指定消息队列名称\n");
        fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
        return 1;
    }
    
    // 处理队列操作
    mqd_t mq = (mqd_t)-1;
    
    if (config.create_queue) {
        mq = create_message_queue(config.queue_name, config.max_messages, 
                                 config.max_message_size, config.non_blocking);
        if (mq == (mqd_t)-1) {
            return 1;
        }
        
        if (config.show_info) {
            show_queue_info(mq);
        }
    } else if (config.queue_name) {
        // 打开现有队列
        mq = open_existing_queue(config.queue_name, config.non_blocking);
        if (mq == (mqd_t)-1) {
            return 1;
        }
    }
    
    // 显示队列信息
    if (config.show_info && mq != (mqd_t)-1) {
        show_queue_info(mq);
    }
    
    // 发送消息
    if (config.send_message && config.message_content && mq != (mqd_t)-1) {
        send_message_to_queue(mq, config.message_content, 
                             config.priority, config.non_blocking);
    }
    
    // 接收消息
    if (config.receive_message && mq != (mqd_t)-1) {
        if (config.non_blocking) {
            receive_message_from_queue(mq, config.non_blocking);
        } else {
            printf("等待接收消息 (按 Ctrl+C 退出)...\n");
            while (running) {
                if (receive_message_from_queue(mq, config.non_blocking) == -1) {
                    if (errno != EAGAIN) {
                        break;
                    }
                }
                if (!config.non_blocking) {
                    sleep(1);  // 阻塞模式下定期检查
                }
            }
        }
    }
    
    // 删除队列
    if (config.delete_queue && config.queue_name) {
        if (mq_unlink(config.queue_name) == 0) {
            printf("✓ 消息队列删除成功: %s\n", config.queue_name);
        } else {
            perror("✗ 消息队列删除失败");
        }
    }
    
    // 关闭队列
    if (mq != (mqd_t)-1) {
        if (mq_close(mq) == 0) {
            if (config.verbose) {
                printf("✓ 消息队列关闭成功\n");
            }
        } else {
            perror("✗ 消息队列关闭失败");
        }
    }
    
    // 显示使用建议
    printf("\n=== POSIX 消息队列使用建议 ===\n");
    printf("适用场景:\n");
    printf("1. 进程间通信 (IPC)\n");
    printf("2. 生产者-消费者模式\n");
    printf("3. 异步消息处理\n");
    printf("4. 系统服务通信\n");
    printf("5. 微服务架构\n");
    printf("\n");
    printf("优势:\n");
    printf("1. 可靠性: 消息持久化存储\n");
    printf("2. 优先级: 支持消息优先级\n");
    printf("3. 可移植: POSIX 标准\n");
    printf("4. 灵活性: 支持阻塞和非阻塞模式\n");
    printf("5. 安全性: 通过文件系统权限控制\n");
    printf("\n");
    printf("注意事项:\n");
    printf("1. 需要链接实时库: -lrt\n");
    printf("2. 队列名称必须以 / 开头\n");
    printf("3. 消息大小和数量有限制\n");
    printf("4. 需要适当权限才能创建/删除队列\n");
    printf("5. 应该及时关闭和清理队列资源\n");
    
    return 0;
}

编译和运行说明

# 编译示例程序(需要链接实时库)
gcc -o mq_example1 example1.c -lrt
gcc -o mq_example2 example2.c -lrt -lpthread
gcc -o mq_example3 example3.c -lrt -lpthread

# 运行示例
./mq_example1
./mq_example2
./mq_example3 --help

# 基本操作示例
./mq_example3 -n /test_queue -c -m 5 -z 128
./mq_example3 -n /test_queue -s "Hello, Message Queue!"
./mq_example3 -n /test_queue -r
./mq_example3 -n /test_queue -i
./mq_example3 -n /test_queue -d

# 列出所有队列
./mq_example3 -l

系统要求检查

# 检查系统支持
ls /dev/mqueue/ 2>/dev/null || echo "消息队列目录不存在"

# 检查内核配置
grep -i mq /boot/config-$(uname -r)

# 检查库支持
ldd --version

# 查看系统限制
ulimit -a | grep -i msg
cat /proc/sys/fs/mqueue/

重要注意事项

  1. 编译要求: 需要链接实时库 -lrt
  2. 权限要求: 创建/删除队列通常需要适当权限
  3. 名称规范: 队列名称必须以 ‘/’ 开头
  4. 资源限制: 受系统消息队列限制约束
  5. 清理责任: 应该及时关闭和删除队列
  6. 线程安全: 消息队列描述符在多线程间共享是安全的

实际应用场景

  1. 微服务通信: 服务间异步消息传递
  2. 日志系统: 异步日志记录
  3. 任务队列: 后台任务处理
  4. 事件驱动: 事件通知和处理
  5. 数据流: 实时数据处理管道
  6. 系统监控: 状态变更通知

最佳实践

// 安全的消息队列操作函数
mqd_t safe_mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) {
    mqd_t mq = mq_open(name, oflag, mode, attr);
    if (mq == (mqd_t)-1) {
        switch (errno) {
            case EACCES:
                fprintf(stderr, "权限不足访问队列: %s\n", name);
                break;
            case EEXIST:
                fprintf(stderr, "队列已存在: %s\n", name);
                break;
            case ENOENT:
                fprintf(stderr, "队列不存在: %s\n", name);
                break;
            case EINVAL:
                fprintf(stderr, "无效的队列名称或参数: %s\n", name);
                break;
            default:
                perror("mq_open 失败");
                break;
        }
    }
    return mq;
}

// 可靠的消息发送函数
int reliable_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 
                     unsigned msg_prio, int timeout_seconds) {
    struct timespec timeout;
    int result;
    
    if (timeout_seconds > 0) {
        clock_gettime(CLOCK_REALTIME, &timeout);
        timeout.tv_sec += timeout_seconds;
        result = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &timeout);
    } else {
        result = mq_send(mqdes, msg_ptr, msg_len, msg_prio);
    }
    
    return result;
}

// 带重试的消息接收函数
ssize_t retry_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 
                         unsigned *msg_prio, int max_retries) {
    ssize_t result;
    int retries = 0;
    
    while (retries < max_retries) {
        result = mq_receive(mqdes, msg_ptr, msg_len, msg_prio);
        if (result != -1) {
            return result;  // 成功接收
        }
        
        if (errno == EAGAIN) {
            retries++;
            usleep(100000);  // 100ms 延迟后重试
        } else {
            break;  // 其他错误,不再重试
        }
    }
    
    return result;
}

这些示例展示了 POSIX 消息队列的各种使用方法,从基础的消息发送接收到完整的管理系统,帮助你全面掌握 Linux 系统中的消息队列机制。

此条目发表在linux文章分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注