POSIX 消息队列 (mq_*) 函数详解
1. 函数介绍
POSIX 消息队列是一组用于进程间通信(IPC)的函数,提供了一种可靠的、基于消息的通信机制。可以把消息队列想象成”邮局系统”——发送者将消息放入邮箱(队列),接收者从邮箱中取出消息,就像现实中的邮政服务一样。
与传统的 System V 消息队列相比,POSIX 消息队列具有更好的可移植性和更简洁的 API。它们支持优先级消息、持久化、以及通过文件系统路径名进行命名。
2. 核心函数原型
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
// 核心函数
mqd_t mq_open(const char *name, int oflag, ...);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
3. 功能
POSIX 消息队列提供以下功能:
- 创建和打开消息队列
- 发送和接收消息
- 设置和获取队列属性
- 异步通知机制
- 持久化支持
- 优先级消息支持
4. 核心结构体
struct mq_attr
struct mq_attr {
long mq_flags; /* 消息队列标志 */
long mq_maxmsg; /* 最大消息数 */
long mq_msgsize; /* 最大消息大小 */
long mq_curmsgs; /* 当前消息数 */
};
struct sigevent (用于通知)
struct sigevent {
int sigev_notify; /* 通知类型 */
int sigev_signo; /* 信号编号 */
union sigval sigev_value; /* 传递给处理函数的数据 */
void (*sigev_notify_function)(union sigval); /* 线程函数 */
pthread_attr_t *sigev_notify_attributes; /* 线程属性 */
};
5. 消息队列名称
- 名称必须以 ‘/’ 开头
- 长度限制为 NAME_MAX (通常 255 字符)
- 示例:“/my_queue”, “/app/messages”
6. 打开标志 (oflag)
标志 | 说明 |
---|---|
O_RDONLY | 只读打开 |
O_WRONLY | 只写打开 |
O_RDWR | 读写打开 |
O_CREAT | 不存在时创建 |
O_EXCL | 与 O_CREAT 一起使用,如果存在则失败 |
O_NONBLOCK | 非阻塞模式 |
7. 返回值
- mq_open: 成功返回消息队列描述符,失败返回 (mqd_t)-1
- 其他函数: 成功返回 0,失败返回 -1
8. 相关函数
- pthread: 多线程支持
- signal: 信号处理
- fcntl: 文件控制
- unlink: 删除文件
9. 示例代码
示例1:基础用法 – 简单的消息发送和接收
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#define QUEUE_NAME "/example_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10
int main() {
mqd_t mq;
struct mq_attr attr;
char send_buffer[MAX_MSG_SIZE];
char recv_buffer[MAX_MSG_SIZE];
ssize_t bytes_read;
unsigned int priority;
printf("=== POSIX 消息队列基础示例 ===\n\n");
// 设置消息队列属性
attr.mq_flags = 0;
attr.mq_maxmsg = MAX_MSGS;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;
// 创建并打开消息队列
printf("创建消息队列: %s\n", QUEUE_NAME);
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
printf("✓ 消息队列创建成功\n\n");
// 获取并显示队列属性
printf("消息队列属性:\n");
if (mq_getattr(mq, &attr) == 0) {
printf(" 最大消息数: %ld\n", attr.mq_maxmsg);
printf(" 最大消息大小: %ld 字节\n", attr.mq_msgsize);
printf(" 当前消息数: %ld\n", attr.mq_curmsgs);
printf(" 标志: %ld\n", attr.mq_flags);
}
printf("\n");
// 发送消息
printf("发送消息:\n");
const char* messages[] = {
"第一条消息: Hello, World!",
"第二条消息: 欢迎使用 POSIX 消息队列",
"第三条消息: 这是优先级消息",
"第四条消息: 最后一条测试消息"
};
int priorities[] = {0, 0, 10, 0}; // 优先级 (数值越大优先级越高)
for (int i = 0; i < 4; i++) {
if (mq_send(mq, messages[i], strlen(messages[i]) + 1, priorities[i]) == 0) {
printf(" ✓ 发送消息 %d (优先级 %d): %s\n", i + 1, priorities[i], messages[i]);
} else {
perror(" ✗ mq_send 失败");
}
}
// 显示发送后队列状态
if (mq_getattr(mq, &attr) == 0) {
printf("\n发送后队列状态: %ld 条消息\n", attr.mq_curmsgs);
}
// 接收消息
printf("\n接收消息 (按优先级顺序):\n");
for (int i = 0; i < 4; i++) {
bytes_read = mq_receive(mq, recv_buffer, MAX_MSG_SIZE, &priority);
if (bytes_read != -1) {
printf(" ✓ 接收消息 %d (优先级 %d, 长度 %zd): %s\n",
i + 1, priority, bytes_read, recv_buffer);
} else {
if (errno == EAGAIN) {
printf(" ⚠ 队列为空\n");
break;
} else {
perror(" ✗ mq_receive 失败");
break;
}
}
}
// 显示接收后队列状态
if (mq_getattr(mq, &attr) == 0) {
printf("\n接收后队列状态: %ld 条消息\n", attr.mq_curmsgs);
}
// 关闭消息队列
if (mq_close(mq) == 0) {
printf("✓ 消息队列关闭成功\n");
} else {
perror("✗ mq_close 失败");
}
// 删除消息队列
if (mq_unlink(QUEUE_NAME) == 0) {
printf("✓ 消息队列删除成功\n");
} else {
perror("✗ mq_unlink 失败");
}
printf("\n=== 消息队列特点 ===\n");
printf("1. 支持优先级消息 (数值越大优先级越高)\n");
printf("2. 消息大小可配置\n");
printf("3. 消息数量有限制\n");
printf("4. 支持持久化 (直到显式删除)\n");
printf("5. 可通过文件系统路径访问\n");
return 0;
}
示例2:生产者-消费者模型
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#define QUEUE_NAME "/producer_consumer_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 20
#define NUM_MESSAGES 10
// 全局变量
mqd_t mq;
int producer_count = 0;
int consumer_count = 0;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
// 生产者线程函数
void* producer_thread(void* arg) {
int producer_id = *(int*)arg;
char message[MAX_MSG_SIZE];
time_t now;
printf("生产者 %d 启动\n", producer_id);
for (int i = 0; i < NUM_MESSAGES; i++) {
// 构造消息
time(&now);
snprintf(message, sizeof(message),
"P%d-MSG%d-TIME:%s", producer_id, i + 1, ctime(&now));
// 发送消息 (交替使用不同优先级)
unsigned int priority = (i % 3 == 0) ? 5 : 1; // 每第3条高优先级
if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
pthread_mutex_lock(&count_mutex);
producer_count++;
pthread_mutex_unlock(&count_mutex);
printf("生产者 %d 发送消息: %s (优先级 %u)\n",
producer_id, message, priority);
} else {
perror("生产者发送失败");
}
// 随机延迟
usleep((rand() % 100 + 1) * 1000); // 1-100ms
}
printf("生产者 %d 完成\n", producer_id);
return NULL;
}
// 消费者线程函数
void* consumer_thread(void* arg) {
int consumer_id = *(int*)arg;
char message[MAX_MSG_SIZE];
ssize_t bytes_read;
unsigned int priority;
printf("消费者 %d 启动\n", consumer_id);
while (1) {
// 接收消息
bytes_read = mq_receive(mq, message, MAX_MSG_SIZE, &priority);
if (bytes_read != -1) {
pthread_mutex_lock(&count_mutex);
consumer_count++;
int current_count = consumer_count;
pthread_mutex_unlock(&count_mutex);
printf("消费者 %d 接收消息 %d (优先级 %u): %s",
consumer_id, current_count, priority, message);
// 检查是否接收完所有消息
if (current_count >= NUM_MESSAGES * 2) { // 2个生产者
break;
}
} else {
if (errno == EAGAIN) {
// 非阻塞模式下队列为空
usleep(10000); // 10ms
continue;
} else {
perror("消费者接收失败");
break;
}
}
// 随机延迟
usleep((rand() % 50 + 1) * 1000); // 1-50ms
}
printf("消费者 %d 完成\n", consumer_id);
return NULL;
}
int main() {
pthread_t producers[2];
pthread_t consumers[3];
int producer_ids[2] = {1, 2};
int consumer_ids[3] = {1, 2, 3};
struct mq_attr attr;
printf("=== 生产者-消费者消息队列示例 ===\n\n");
// 初始化随机数种子
srand(time(NULL) + getpid());
// 设置消息队列属性
attr.mq_flags = 0; // 阻塞模式
attr.mq_maxmsg = MAX_MSGS;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;
// 创建消息队列
printf("创建消息队列: %s\n", QUEUE_NAME);
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
printf("✓ 消息队列创建成功\n\n");
// 创建生产者线程
printf("创建生产者线程...\n");
for (int i = 0; i < 2; i++) {
if (pthread_create(&producers[i], NULL, producer_thread, &producer_ids[i]) != 0) {
perror("创建生产者线程失败");
exit(1);
}
}
// 创建消费者线程
printf("创建消费者线程...\n");
for (int i = 0; i < 3; i++) {
if (pthread_create(&consumers[i], NULL, consumer_thread, &consumer_ids[i]) != 0) {
perror("创建消费者线程失败");
exit(1);
}
}
// 等待生产者完成
printf("等待生产者完成...\n");
for (int i = 0; i < 2; i++) {
pthread_join(producers[i], NULL);
}
// 等待消费者完成
printf("等待消费者完成...\n");
for (int i = 0; i < 3; i++) {
pthread_join(consumers[i], NULL);
}
// 显示统计信息
printf("\n=== 统计信息 ===\n");
printf("生产消息数: %d\n", producer_count);
printf("消费消息数: %d\n", consumer_count);
// 显示最终队列状态
if (mq_getattr(mq, &attr) == 0) {
printf("队列中剩余消息: %ld\n", attr.mq_curmsgs);
}
// 清理资源
if (mq_close(mq) == 0) {
printf("✓ 消息队列关闭成功\n");
}
if (mq_unlink(QUEUE_NAME) == 0) {
printf("✓ 消息队列删除成功\n");
}
printf("\n=== 生产者-消费者模型特点 ===\n");
printf("1. 解耦: 生产者和消费者独立运行\n");
printf("2. 异步: 生产和消费可以不同步进行\n");
printf("3. 缓冲: 消息队列提供缓冲作用\n");
printf("4. 负载均衡: 多个消费者可以并行处理\n");
printf("5. 可靠性: 消息持久化存储\n");
return 0;
}
示例3:完整的消息队列管理系统
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <getopt.h>
#include <signal.h>
#include <time.h>
// 配置结构体
struct mq_config {
char *queue_name;
int max_messages;
int max_message_size;
int create_queue;
int delete_queue;
int show_info;
int send_message;
int receive_message;
int list_queues;
int priority;
char *message_content;
int non_blocking;
int verbose;
};
// 全局变量
volatile sig_atomic_t running = 1;
// 信号处理函数
void signal_handler(int sig) {
printf("\n收到信号 %d,准备退出...\n", sig);
running = 0;
}
// 设置信号处理
void setup_signal_handlers() {
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL); // Ctrl+C
sigaction(SIGTERM, &sa, NULL); // 终止信号
}
// 显示消息队列信息
void show_queue_info(mqd_t mq) {
struct mq_attr attr;
if (mq_getattr(mq, &attr) == 0) {
printf("消息队列属性:\n");
printf(" 最大消息数: %ld\n", attr.mq_maxmsg);
printf(" 最大消息大小: %ld 字节\n", attr.mq_msgsize);
printf(" 当前消息数: %ld\n", attr.mq_curmsgs);
printf(" 标志: %s\n", (attr.mq_flags & O_NONBLOCK) ? "非阻塞" : "阻塞");
} else {
perror("获取队列属性失败");
}
}
// 列出所有消息队列
void list_all_queues() {
printf("=== 系统消息队列列表 ===\n");
printf("注意: POSIX 消息队列通常在 /dev/mqueue/ 目录下\n");
// 尝试列出 /dev/mqueue/ 目录
if (access("/dev/mqueue", F_OK) == 0) {
printf("系统消息队列目录存在\n");
system("ls -la /dev/mqueue/ 2>/dev/null || echo '无法访问 /dev/mqueue/'");
} else {
printf("系统消息队列目录不存在\n");
}
printf("\n");
}
// 发送消息
int send_message_to_queue(mqd_t mq, const char *message, int priority, int non_blocking) {
struct mq_attr attr;
// 检查消息大小
if (mq_getattr(mq, &attr) == 0) {
if (strlen(message) + 1 > (size_t)attr.mq_msgsize) {
fprintf(stderr, "错误: 消息大小 (%zu) 超过队列限制 (%ld)\n",
strlen(message) + 1, attr.mq_msgsize);
return -1;
}
}
// 发送消息
if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
printf("✓ 消息发送成功 (优先级 %d): %s\n", priority, message);
return 0;
} else {
if (errno == EAGAIN && non_blocking) {
printf("⚠ 队列已满,非阻塞模式下发送失败\n");
} else {
perror("✗ 消息发送失败");
}
return -1;
}
}
// 接收消息
int receive_message_from_queue(mqd_t mq, int non_blocking) {
char *buffer;
struct mq_attr attr;
ssize_t bytes_read;
unsigned int priority;
// 获取队列属性以确定缓冲区大小
if (mq_getattr(mq, &attr) != 0) {
perror("获取队列属性失败");
return -1;
}
buffer = malloc(attr.mq_msgsize);
if (!buffer) {
perror("内存分配失败");
return -1;
}
// 接收消息
bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);
if (bytes_read != -1) {
printf("✓ 消息接收成功 (优先级 %u, 长度 %zd): %s",
priority, bytes_read, buffer);
free(buffer);
return 0;
} else {
if (errno == EAGAIN && non_blocking) {
printf("⚠ 队列为空,非阻塞模式下接收失败\n");
} else {
perror("✗ 消息接收失败");
}
free(buffer);
return -1;
}
}
// 创建消息队列
mqd_t create_message_queue(const char *name, int max_msgs, int max_size, int non_blocking) {
struct mq_attr attr;
int flags = O_CREAT | O_RDWR;
if (non_blocking) {
flags |= O_NONBLOCK;
}
attr.mq_flags = non_blocking ? O_NONBLOCK : 0;
attr.mq_maxmsg = max_msgs;
attr.mq_msgsize = max_size;
attr.mq_curmsgs = 0;
mqd_t mq = mq_open(name, flags, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建消息队列失败");
return (mqd_t)-1;
}
printf("✓ 消息队列创建成功: %s\n", name);
return mq;
}
// 打开现有消息队列
mqd_t open_existing_queue(const char *name, int non_blocking) {
int flags = O_RDWR;
if (non_blocking) {
flags |= O_NONBLOCK;
}
mqd_t mq = mq_open(name, flags);
if (mq == (mqd_t)-1) {
perror("打开消息队列失败");
return (mqd_t)-1;
}
printf("✓ 消息队列打开成功: %s\n", name);
return mq;
}
// 显示帮助信息
void show_help(const char *program_name) {
printf("用法: %s [选项]\n", program_name);
printf("\n选项:\n");
printf(" -n, --name=NAME 消息队列名称 (以 / 开头)\n");
printf(" -c, --create 创建消息队列\n");
printf(" -d, --delete 删除消息队列\n");
printf(" -i, --info 显示队列信息\n");
printf(" -l, --list 列出所有队列\n");
printf(" -s, --send=MESSAGE 发送消息\n");
printf(" -r, --receive 接收消息\n");
printf(" -p, --priority=NUM 消息优先级 (默认 0)\n");
printf(" -m, --max-msgs=NUM 最大消息数 (创建时使用)\n");
printf(" -z, --max-size=NUM 最大消息大小 (创建时使用)\n");
printf(" -b, --non-blocking 非阻塞模式\n");
printf(" -v, --verbose 详细输出\n");
printf(" -h, --help 显示此帮助信息\n");
printf("\n示例:\n");
printf(" %s -n /myqueue -c -m 10 -z 256 # 创建队列\n", program_name);
printf(" %s -n /myqueue -s \"Hello World\" # 发送消息\n", program_name);
printf(" %s -n /myqueue -r # 接收消息\n", program_name);
printf(" %s -n /myqueue -i # 显示队列信息\n", program_name);
printf(" %s -n /myqueue -d # 删除队列\n", program_name);
printf(" %s -l # 列出所有队列\n", program_name);
}
int main(int argc, char *argv[]) {
struct mq_config config = {
.queue_name = NULL,
.max_messages = 10,
.max_message_size = 256,
.create_queue = 0,
.delete_queue = 0,
.show_info = 0,
.send_message = 0,
.receive_message = 0,
.list_queues = 0,
.priority = 0,
.message_content = NULL,
.non_blocking = 0,
.verbose = 0
};
printf("=== POSIX 消息队列管理系统 ===\n\n");
// 解析命令行参数
static struct option long_options[] = {
{"name", required_argument, 0, 'n'},
{"create", no_argument, 0, 'c'},
{"delete", no_argument, 0, 'd'},
{"info", no_argument, 0, 'i'},
{"list", no_argument, 0, 'l'},
{"send", required_argument, 0, 's'},
{"receive", no_argument, 0, 'r'},
{"priority", required_argument, 0, 'p'},
{"max-msgs", required_argument, 0, 'm'},
{"max-size", required_argument, 0, 'z'},
{"non-blocking", no_argument, 0, 'b'},
{"verbose", no_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};
int opt;
while ((opt = getopt_long(argc, argv, "n:cdils:rp:m:z:bvh", long_options, NULL)) != -1) {
switch (opt) {
case 'n':
config.queue_name = optarg;
break;
case 'c':
config.create_queue = 1;
break;
case 'd':
config.delete_queue = 1;
break;
case 'i':
config.show_info = 1;
break;
case 'l':
config.list_queues = 1;
break;
case 's':
config.send_message = 1;
config.message_content = optarg;
break;
case 'r':
config.receive_message = 1;
break;
case 'p':
config.priority = atoi(optarg);
break;
case 'm':
config.max_messages = atoi(optarg);
break;
case 'z':
config.max_message_size = atoi(optarg);
break;
case 'b':
config.non_blocking = 1;
break;
case 'v':
config.verbose = 1;
break;
case 'h':
show_help(argv[0]);
return 0;
default:
fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
return 1;
}
}
// 设置信号处理
setup_signal_handlers();
// 显示系统信息
if (config.verbose) {
printf("系统信息:\n");
printf(" 当前用户 UID: %d\n", getuid());
printf(" 当前进程 PID: %d\n", getpid());
printf(" 消息队列支持: ");
system("ls /dev/mqueue/ >/dev/null 2>&1 && echo '是' || echo '否'");
printf("\n");
}
// 列出所有队列
if (config.list_queues) {
list_all_queues();
if (!config.queue_name && !config.create_queue && !config.delete_queue &&
!config.show_info && !config.send_message && !config.receive_message) {
return 0;
}
}
// 如果没有指定队列名称且需要操作队列
if (!config.queue_name && (config.create_queue || config.delete_queue ||
config.show_info || config.send_message ||
config.receive_message)) {
fprintf(stderr, "错误: 需要指定消息队列名称\n");
fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);
return 1;
}
// 处理队列操作
mqd_t mq = (mqd_t)-1;
if (config.create_queue) {
mq = create_message_queue(config.queue_name, config.max_messages,
config.max_message_size, config.non_blocking);
if (mq == (mqd_t)-1) {
return 1;
}
if (config.show_info) {
show_queue_info(mq);
}
} else if (config.queue_name) {
// 打开现有队列
mq = open_existing_queue(config.queue_name, config.non_blocking);
if (mq == (mqd_t)-1) {
return 1;
}
}
// 显示队列信息
if (config.show_info && mq != (mqd_t)-1) {
show_queue_info(mq);
}
// 发送消息
if (config.send_message && config.message_content && mq != (mqd_t)-1) {
send_message_to_queue(mq, config.message_content,
config.priority, config.non_blocking);
}
// 接收消息
if (config.receive_message && mq != (mqd_t)-1) {
if (config.non_blocking) {
receive_message_from_queue(mq, config.non_blocking);
} else {
printf("等待接收消息 (按 Ctrl+C 退出)...\n");
while (running) {
if (receive_message_from_queue(mq, config.non_blocking) == -1) {
if (errno != EAGAIN) {
break;
}
}
if (!config.non_blocking) {
sleep(1); // 阻塞模式下定期检查
}
}
}
}
// 删除队列
if (config.delete_queue && config.queue_name) {
if (mq_unlink(config.queue_name) == 0) {
printf("✓ 消息队列删除成功: %s\n", config.queue_name);
} else {
perror("✗ 消息队列删除失败");
}
}
// 关闭队列
if (mq != (mqd_t)-1) {
if (mq_close(mq) == 0) {
if (config.verbose) {
printf("✓ 消息队列关闭成功\n");
}
} else {
perror("✗ 消息队列关闭失败");
}
}
// 显示使用建议
printf("\n=== POSIX 消息队列使用建议 ===\n");
printf("适用场景:\n");
printf("1. 进程间通信 (IPC)\n");
printf("2. 生产者-消费者模式\n");
printf("3. 异步消息处理\n");
printf("4. 系统服务通信\n");
printf("5. 微服务架构\n");
printf("\n");
printf("优势:\n");
printf("1. 可靠性: 消息持久化存储\n");
printf("2. 优先级: 支持消息优先级\n");
printf("3. 可移植: POSIX 标准\n");
printf("4. 灵活性: 支持阻塞和非阻塞模式\n");
printf("5. 安全性: 通过文件系统权限控制\n");
printf("\n");
printf("注意事项:\n");
printf("1. 需要链接实时库: -lrt\n");
printf("2. 队列名称必须以 / 开头\n");
printf("3. 消息大小和数量有限制\n");
printf("4. 需要适当权限才能创建/删除队列\n");
printf("5. 应该及时关闭和清理队列资源\n");
return 0;
}
编译和运行说明
# 编译示例程序(需要链接实时库)
gcc -o mq_example1 example1.c -lrt
gcc -o mq_example2 example2.c -lrt -lpthread
gcc -o mq_example3 example3.c -lrt -lpthread
# 运行示例
./mq_example1
./mq_example2
./mq_example3 --help
# 基本操作示例
./mq_example3 -n /test_queue -c -m 5 -z 128
./mq_example3 -n /test_queue -s "Hello, Message Queue!"
./mq_example3 -n /test_queue -r
./mq_example3 -n /test_queue -i
./mq_example3 -n /test_queue -d
# 列出所有队列
./mq_example3 -l
系统要求检查
# 检查系统支持
ls /dev/mqueue/ 2>/dev/null || echo "消息队列目录不存在"
# 检查内核配置
grep -i mq /boot/config-$(uname -r)
# 检查库支持
ldd --version
# 查看系统限制
ulimit -a | grep -i msg
cat /proc/sys/fs/mqueue/
重要注意事项
- 编译要求: 需要链接实时库
-lrt
- 权限要求: 创建/删除队列通常需要适当权限
- 名称规范: 队列名称必须以 ‘/’ 开头
- 资源限制: 受系统消息队列限制约束
- 清理责任: 应该及时关闭和删除队列
- 线程安全: 消息队列描述符在多线程间共享是安全的
实际应用场景
- 微服务通信: 服务间异步消息传递
- 日志系统: 异步日志记录
- 任务队列: 后台任务处理
- 事件驱动: 事件通知和处理
- 数据流: 实时数据处理管道
- 系统监控: 状态变更通知
最佳实践
// 安全的消息队列操作函数
mqd_t safe_mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) {
mqd_t mq = mq_open(name, oflag, mode, attr);
if (mq == (mqd_t)-1) {
switch (errno) {
case EACCES:
fprintf(stderr, "权限不足访问队列: %s\n", name);
break;
case EEXIST:
fprintf(stderr, "队列已存在: %s\n", name);
break;
case ENOENT:
fprintf(stderr, "队列不存在: %s\n", name);
break;
case EINVAL:
fprintf(stderr, "无效的队列名称或参数: %s\n", name);
break;
default:
perror("mq_open 失败");
break;
}
}
return mq;
}
// 可靠的消息发送函数
int reliable_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned msg_prio, int timeout_seconds) {
struct timespec timeout;
int result;
if (timeout_seconds > 0) {
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeout_seconds;
result = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &timeout);
} else {
result = mq_send(mqdes, msg_ptr, msg_len, msg_prio);
}
return result;
}
// 带重试的消息接收函数
ssize_t retry_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned *msg_prio, int max_retries) {
ssize_t result;
int retries = 0;
while (retries < max_retries) {
result = mq_receive(mqdes, msg_ptr, msg_len, msg_prio);
if (result != -1) {
return result; // 成功接收
}
if (errno == EAGAIN) {
retries++;
usleep(100000); // 100ms 延迟后重试
} else {
break; // 其他错误,不再重试
}
}
return result;
}
这些示例展示了 POSIX 消息队列的各种使用方法,从基础的消息发送接收到完整的管理系统,帮助你全面掌握 Linux 系统中的消息队列机制。