putpmsg 替代函数和方案详解
1. 现代 Linux 替代方案概述
由于 putpmsg
主要在 System V STREAMS 系统中可用,而大多数 Linux 系统不完全支持 STREAMS,因此需要使用现代的替代方案。以下是主要的替代函数和方案:
2. 替代方案分类
2.1 实时信号 (RT Signals)
#include <signal.h>
#include <sys/types.h>
#include <string.h>
// 使用实时信号发送优先级数据
int send_priority_data_with_signal(pid_t target_pid, int priority,
const void *data, size_t data_size) {
union sigval signal_data;
// 实时信号支持传递额外数据
if (data_size <= sizeof(signal_data)) {
memcpy(&signal_data, data, data_size);
signal_data.sival_int = priority; // 将优先级存储在整型字段中
} else {
// 对于大数据,传递指针或标识符
signal_data.sival_ptr = (void*)data; // 注意:需要共享内存
signal_data.sival_int = priority;
}
// 选择实时信号 (SIGRTMIN + 0 到 SIGRTMAX - SIGRTMIN)
int signal_num = SIGRTMIN + (priority % (SIGRTMAX - SIGRTMIN + 1));
return sigqueue(target_pid, signal_num, signal_data);
}
// 信号处理函数
void priority_signal_handler(int sig, siginfo_t *info, void *context) {
printf("收到优先级信号 %d,优先级: %d\n", sig, info->si_value.sival_int);
if (info->si_code == SI_QUEUE) {
// 处理队列中的信号
printf(" 信号来源: 进程 %d\n", info->si_pid);
if (info->si_value.sival_ptr) {
printf(" 数据指针: %p\n", info->si_value.sival_ptr);
}
}
}
2.2 Unix 域套接字 (Unix Domain Sockets)
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
// 消息头结构体
struct priority_message_header {
int priority; // 消息优先级
int msg_type; // 消息类型
size_t data_size; // 数据大小
time_t timestamp; // 时间戳
};
// 使用 Unix 域套接字发送优先级消息
int send_priority_message_unix(int sockfd, int priority,
const void *data, size_t data_size) {
struct priority_message_header header;
// 构造消息头
header.priority = priority;
header.msg_type = 1; // 普通消息类型
header.data_size = data_size;
header.timestamp = time(NULL);
// 发送消息头
ssize_t sent = send(sockfd, &header, sizeof(header), MSG_NOSIGNAL);
if (sent != sizeof(header)) {
return -1;
}
// 发送实际数据
if (data_size > 0) {
sent = send(sockfd, data, data_size, MSG_NOSIGNAL);
if (sent != (ssize_t)data_size) {
return -1;
}
}
return 0;
}
// 接收优先级消息
ssize_t receive_priority_message_unix(int sockfd, int *priority,
void *buffer, size_t buffer_size) {
struct priority_message_header header;
// 接收消息头
ssize_t received = recv(sockfd, &header, sizeof(header), 0);
if (received != sizeof(header)) {
return -1;
}
// 返回优先级
if (priority) {
*priority = header.priority;
}
// 接收数据
if (header.data_size > 0 && header.data_size <= buffer_size) {
received = recv(sockfd, buffer, header.data_size, 0);
if (received == (ssize_t)header.data_size) {
return received;
}
}
return -1;
}
2.3 管道和 FIFO
#include <sys/stat.h>
#include <fcntl.h>
#include <poll.h>
// 带优先级的消息结构体
struct fifo_message {
int priority; // 消息优先级
pid_t sender_pid; // 发送者 PID
size_t data_size; // 数据大小
char data[1024]; // 消息数据
};
// 通过 FIFO 发送优先级消息
int send_priority_message_fifo(const char *fifo_path, int priority,
const void *data, size_t data_size) {
int fd;
struct fifo_message msg;
// 打开 FIFO
fd = open(fifo_path, O_WRONLY);
if (fd == -1) {
return -1;
}
// 构造消息
msg.priority = priority;
msg.sender_pid = getpid();
msg.data_size = (data_size < sizeof(msg.data)) ? data_size : sizeof(msg.data);
if (data && data_size > 0) {
memcpy(msg.data, data, msg.data_size);
}
// 发送消息
ssize_t written = write(fd, &msg, sizeof(msg));
close(fd);
return (written == sizeof(msg)) ? 0 : -1;
}
// 通过 FIFO 接收优先级消息
int receive_priority_message_fifo(const char *fifo_path,
struct fifo_message *msg) {
int fd;
// 以非阻塞模式打开 FIFO
fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
if (fd == -1) {
return -1;
}
// 使用 poll 等待数据
struct pollfd pfd = {fd, POLLIN, 0};
int ready = poll(&pfd, 1, 1000); // 1秒超时
if (ready > 0 && (pfd.revents & POLLIN)) {
ssize_t bytes_read = read(fd, msg, sizeof(struct fifo_message));
close(fd);
return (bytes_read == sizeof(struct fifo_message)) ? 0 : -1;
}
close(fd);
return -1;
}
2.4 D-Bus 消息系统
#include <dbus/dbus.h>
#include <stdio.h>
#include <stdlib.h>
// 使用 D-Bus 发送优先级消息 (需要安装 libdbus)
#ifdef USE_DBUS
int send_dbus_priority_message(const char *bus_name,
const char *object_path,
const char *interface,
int priority,
const char *message_data) {
DBusConnection *connection;
DBusMessage *message;
DBusPendingCall *pending;
dbus_bool_t result;
// 连接到系统总线
dbus_error_t error;
dbus_error_init(&error);
connection = dbus_bus_get(DBUS_BUS_SESSION, &error);
if (dbus_error_is_set(&error)) {
fprintf(stderr, "连接 D-Bus 失败: %s\n", error.message);
dbus_error_free(&error);
return -1;
}
// 创建消息
message = dbus_message_new_signal(object_path, interface, "PriorityMessage");
if (!message) {
dbus_connection_unref(connection);
return -1;
}
// 添加参数
dbus_message_append_args(message,
DBUS_TYPE_INT32, &priority,
DBUS_TYPE_STRING, &message_data,
DBUS_TYPE_INVALID);
// 发送消息
result = dbus_connection_send(connection, message, NULL);
dbus_message_unref(message);
dbus_connection_flush(connection);
dbus_connection_unref(connection);
return result ? 0 : -1;
}
#endif
2.5 自定义优先级队列
#include <pthread.h>
#include <stdlib.h>
#include <sys/queue.h>
// 消息结构体
struct priority_message {
int priority; // 消息优先级
time_t timestamp; // 时间戳
pid_t sender_pid; // 发送者 PID
size_t data_size; // 数据大小
void *data; // 消息数据
SLIST_ENTRY(priority_message) entries;
};
// 消息队列
SLIST_HEAD(message_queue, priority_message) global_message_queue =
SLIST_HEAD_INITIALIZER(global_message_queue);
// 互斥锁和条件变量
pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
// 发送优先级消息
int send_priority_message_queue(int priority, const void *data, size_t data_size) {
// 分配消息内存
struct priority_message *msg = malloc(sizeof(struct priority_message));
if (!msg) {
return -1;
}
// 分配数据内存
if (data_size > 0) {
msg->data = malloc(data_size);
if (!msg->data) {
free(msg);
return -1;
}
memcpy(msg->data, data, data_size);
} else {
msg->data = NULL;
}
// 初始化消息
msg->priority = priority;
msg->timestamp = time(NULL);
msg->sender_pid = getpid();
msg->data_size = data_size;
// 添加到队列
pthread_mutex_lock(&queue_mutex);
// 按优先级插入队列(简单实现:插到队首)
SLIST_INSERT_HEAD(&global_message_queue, msg, entries);
pthread_cond_signal(&queue_cond); // 通知等待的线程
pthread_mutex_unlock(&queue_mutex);
return 0;
}
// 接收优先级消息(阻塞)
struct priority_message* receive_priority_message_queue(int timeout_seconds) {
struct priority_message *msg = NULL;
struct timespec timeout;
pthread_mutex_lock(&queue_mutex);
// 等待消息
while (SLIST_EMPTY(&global_message_queue)) {
if (timeout_seconds > 0) {
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeout_seconds;
int result = pthread_cond_timedwait(&queue_cond, &queue_mutex, &timeout);
if (result == ETIMEDOUT) {
pthread_mutex_unlock(&queue_mutex);
return NULL;
}
} else {
pthread_cond_wait(&queue_cond, &queue_mutex);
}
}
// 取出最高优先级的消息
msg = SLIST_FIRST(&global_message_queue);
SLIST_REMOVE_HEAD(&global_message_queue, entries);
pthread_mutex_unlock(&queue_mutex);
return msg;
}
// 清理消息
void free_priority_message(struct priority_message *msg) {
if (msg) {
if (msg->data) {
free(msg->data);
}
free(msg);
}
}
2.6 POSIX 消息队列
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
// 发送优先级消息到 POSIX 消息队列
int send_priority_message_mq(mqd_t mqdes, int priority,
const void *data, size_t data_size) {
// POSIX 消息队列直接支持优先级
return mq_send(mqdes, (const char*)data, data_size, priority);
}
// 接收优先级消息
ssize_t receive_priority_message_mq(mqd_t mqdes, unsigned *priority,
void *buffer, size_t buffer_size) {
return mq_receive(mqdes, (char*)buffer, buffer_size, priority);
}
// 创建优先级消息队列
mqd_t create_priority_message_queue(const char *name, int max_messages,
size_t max_message_size) {
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = max_messages;
attr.mq_msgsize = max_message_size;
attr.mq_curmsgs = 0;
return mq_open(name, O_CREAT | O_WRONLY, 0644, &attr);
}
2.7 epoll + 管道实现优先级
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
// 优先级管道结构体
struct priority_pipe {
int epoll_fd;
int event_fd; // 用于通知事件
int **pipes; // 按优先级划分的管道数组
int num_priorities;
};
// 创建优先级管道系统
struct priority_pipe* create_priority_pipe_system(int num_priorities) {
struct priority_pipe *pp = malloc(sizeof(struct priority_pipe));
if (!pp) return NULL;
pp->num_priorities = num_priorities;
pp->pipes = malloc(num_priorities * sizeof(int*));
if (!pp->pipes) {
free(pp);
return NULL;
}
// 创建 epoll 实例
pp->epoll_fd = epoll_create1(0);
if (pp->epoll_fd == -1) {
free(pp->pipes);
free(pp);
return NULL;
}
// 创建事件通知 fd
pp->event_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (pp->event_fd == -1) {
close(pp->epoll_fd);
free(pp->pipes);
free(pp);
return NULL;
}
// 添加事件 fd 到 epoll
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = pp->event_fd;
epoll_ctl(pp->epoll_fd, EPOLL_CTL_ADD, pp->event_fd, &ev);
// 创建优先级管道
for (int i = 0; i < num_priorities; i++) {
pp->pipes[i] = malloc(2 * sizeof(int));
if (pp->pipes[i] && pipe(pp->pipes[i]) == 0) {
// 设置非阻塞模式
int flags = fcntl(pp->pipes[i][0], F_GETFL, 0);
fcntl(pp->pipes[i][0], F_SETFL, flags | O_NONBLOCK);
}
}
return pp;
}
// 发送优先级消息
int send_priority_message_pipe(struct priority_pipe *pp, int priority,
const void *data, size_t data_size) {
if (priority < 0 || priority >= pp->num_priorities) {
errno = EINVAL;
return -1;
}
ssize_t written = write(pp->pipes[priority][1], data, data_size);
if (written == (ssize_t)data_size) {
// 通知有新消息
uint64_t notify = 1;
write(pp->event_fd, ¬ify, sizeof(notify));
return 0;
}
return -1;
}
3. 完整的替代方案示例
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <pthread.h>
// 消息优先级枚举
enum message_priority {
PRIORITY_LOW = 10,
PRIORITY_NORMAL = 50,
PRIORITY_HIGH = 100,
PRIORITY_CRITICAL = 200
};
// 通用消息结构体
struct unified_message {
int priority;
time_t timestamp;
pid_t sender_pid;
char type[32];
size_t data_size;
char data[512];
};
// 消息处理器函数指针
typedef void (*message_handler_t)(const struct unified_message *msg);
// 消息系统接口
struct message_system {
const char *name;
int (*send_func)(const struct unified_message *msg);
int (*receive_func)(struct unified_message *msg, int timeout_ms);
int (*init_func)(void);
void (*cleanup_func)(void);
};
// 实时信号消息系统
static int rt_signal_send(const struct unified_message *msg) {
union sigval data;
data.sival_int = msg->priority;
// 使用 SIGRTMIN + 0 作为消息信号
return sigqueue(getpid(), SIGRTMIN, data);
}
static int rt_signal_receive(struct unified_message *msg, int timeout_ms) {
// 这里简化处理,实际需要信号处理机制
sleep(timeout_ms / 1000);
return -1; // 超时
}
// 自定义队列消息系统
static struct unified_message *message_queue[100];
static int queue_head = 0, queue_tail = 0, queue_count = 0;
static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static int custom_queue_send(const struct unified_message *msg) {
pthread_mutex_lock(&queue_mutex);
if (queue_count >= 100) {
pthread_mutex_unlock(&queue_mutex);
return -1; // 队列满
}
// 分配消息内存
struct unified_message *new_msg = malloc(sizeof(struct unified_message));
if (!new_msg) {
pthread_mutex_unlock(&queue_mutex);
return -1;
}
memcpy(new_msg, msg, sizeof(struct unified_message));
new_msg->timestamp = time(NULL);
// 按优先级插入队列
int insert_pos = queue_head;
for (int i = 0; i < queue_count; i++) {
int pos = (queue_head + i) % 100;
if (message_queue[pos]->priority < new_msg->priority) {
insert_pos = pos;
break;
}
}
// 移动后续消息
for (int i = queue_count; i > insert_pos; i--) {
int from_pos = (queue_head + i - 1) % 100;
int to_pos = (queue_head + i) % 100;
message_queue[to_pos] = message_queue[from_pos];
}
message_queue[insert_pos] = new_msg;
queue_count++;
pthread_mutex_unlock(&queue_mutex);
return 0;
}
static int custom_queue_receive(struct unified_message *msg, int timeout_ms) {
time_t start_time = time(NULL);
while (1) {
pthread_mutex_lock(&queue_mutex);
if (queue_count > 0) {
// 取出最高优先级消息
struct unified_message *highest_msg = message_queue[queue_head];
memcpy(msg, highest_msg, sizeof(struct unified_message));
free(highest_msg);
queue_head = (queue_head + 1) % 100;
queue_count--;
pthread_mutex_unlock(&queue_mutex);
return 0;
}
pthread_mutex_unlock(&queue_mutex);
// 检查超时
if (timeout_ms > 0) {
time_t elapsed = time(NULL) - start_time;
if (elapsed * 1000 >= timeout_ms) {
return -1; // 超时
}
}
usleep(10000); // 休眠 10ms
}
return -1;
}
// 消息系统实现
static struct message_system available_systems[] = {
{
.name = "custom_queue",
.send_func = custom_queue_send,
.receive_func = custom_queue_receive,
.init_func = NULL,
.cleanup_func = NULL
},
{
.name = "rt_signal",
.send_func = rt_signal_send,
.receive_func = rt_signal_receive,
.init_func = NULL,
.cleanup_func = NULL
}
};
// 当前使用的消息系统
static struct message_system *current_system = &available_systems[0];
// 发送统一消息
int send_unified_message(int priority, const char *type,
const void *data, size_t data_size) {
struct unified_message msg;
msg.priority = priority;
msg.timestamp = time(NULL);
msg.sender_pid = getpid();
strncpy(msg.type, type, sizeof(msg.type) - 1);
msg.type[sizeof(msg.type) - 1] = '\0';
msg.data_size = (data_size < sizeof(msg.data)) ? data_size : sizeof(msg.data);
if (data && data_size > 0) {
memcpy(msg.data, data, msg.data_size);
}
if (current_system->send_func) {
return current_system->send_func(&msg);
}
return -1;
}
// 接收统一消息
int receive_unified_message(struct unified_message *msg, int timeout_ms) {
if (current_system->receive_func) {
return current_system->receive_func(msg, timeout_ms);
}
return -1;
}
// 演示不同优先级消息处理
void demonstrate_priority_handling() {
printf("=== 优先级消息处理演示 ===\n\n");
// 发送不同优先级的消息
printf("发送不同优先级的消息:\n");
const char *critical_msg = "系统紧急告警:磁盘空间不足";
send_unified_message(PRIORITY_CRITICAL, "ALERT", critical_msg, strlen(critical_msg));
printf(" ✓ 发送关键优先级消息 (优先级 %d)\n", PRIORITY_CRITICAL);
const char *high_msg = "应用程序错误:数据库连接失败";
send_unified_message(PRIORITY_HIGH, "ERROR", high_msg, strlen(high_msg));
printf(" ✓ 发送高优先级消息 (优先级 %d)\n", PRIORITY_HIGH);
const char *normal_msg = "用户登录成功";
send_unified_message(PRIORITY_NORMAL, "INFO", normal_msg, strlen(normal_msg));
printf(" ✓ 发送普通优先级消息 (优先级 %d)\n", PRIORITY_NORMAL);
const char *low_msg = "系统日志:定时任务执行完成";
send_unified_message(PRIORITY_LOW, "DEBUG", low_msg, strlen(low_msg));
printf(" ✓ 发送低优先级消息 (优先级 %d)\n", PRIORITY_LOW);
printf("\n接收消息 (按优先级顺序):\n");
// 接收并显示消息
struct unified_message received_msg;
for (int i = 0; i < 4; i++) {
if (receive_unified_message(&received_msg, 1000) == 0) {
printf(" [%d] 优先级 %d (%s): %.*s\n",
i + 1, received_msg.priority, received_msg.type,
(int)received_msg.data_size, received_msg.data);
} else {
printf(" [%d] 超时或无消息\n", i + 1);
}
}
}
int main() {
printf("=== putpmsg 现代替代方案演示 ===\n\n");
printf("putpmsg 替代方案概述:\n");
printf("1. 实时信号 (RT signals)\n");
printf("2. Unix 域套接字\n");
printf("3. 管道和 FIFO\n");
printf("4. D-Bus 消息系统\n");
printf("5. 自定义优先级队列\n");
printf("6. POSIX 消息队列\n");
printf("7. epoll + 管道\n");
printf("\n");
// 演示优先级处理
demonstrate_priority_handling();
printf("\n=== 各方案特点对比 ===\n");
printf("方案 优先级支持 跨进程 复杂度 性能\n");
printf("------------- ---------- ------- ------ ----\n");
printf("实时信号 中等 是 低 高\n");
printf("Unix套接字 无 是 中 中\n");
printf("管道/FIFO 无 是 低 中\n");
printf("D-Bus 高 是 高 中\n");
printf("自定义队列 高 否 中 高\n");
printf("POSIX消息队列 高 是 中 高\n");
printf("epoll+管道 高 是 高 高\n");
printf("\n");
printf("=== 选择建议 ===\n");
printf("简单应用: 使用实时信号或管道\n");
printf("复杂系统: 使用 POSIX 消息队列\n");
printf("高性能: 使用 epoll + 管道\n");
printf("企业级: 使用 D-Bus\n");
printf("跨语言: 使用 D-Bus 或 Unix 套接字\n");
return 0;
}
4. 编译和运行说明
# 编译示例程序
gcc -o putpmsg_alternative alternative.c -lpthread
# 编译 D-Bus 版本 (如果使用)
gcc -o putpmsg_dbus dbus_example.c -ldbus-1
# 编译 POSIX 消息队列版本
gcc -o putpmsg_mq mq_example.c -lrt
# 运行示例
./putpmsg_alternative
5. 各方案详细对比
5.1 功能对比表
方案 | 优先级 | 跨进程 | 实时性 | 复杂度 | 可移植性 | 资源消耗 |
---|---|---|---|---|---|---|
实时信号 | 中等 | 是 | 高 | 低 | 中 | 低 |
Unix套接字 | 无 | 是 | 中 | 中 | 高 | 中 |
管道/FIFO | 无 | 是 | 中 | 低 | 高 | 低 |
D-Bus | 高 | 是 | 中 | 高 | 中 | 高 |
自定义队列 | 高 | 否 | 高 | 中 | 低 | 低 |
POSIX消息队列 | 高 | 是 | 高 | 中 | 中 | 中 |
epoll+管道 | 高 | 是 | 高 | 高 | 中 | 低 |
5.2 使用场景推荐
// 场景1: 简单的进程间通信
// 推荐: 实时信号
int simple_ipc_with_signals() {
// 发送优先级信号
union sigval data;
data.sival_int = PRIORITY_HIGH;
return sigqueue(target_pid, SIGRTMIN, data);
}
// 场景2: 高性能本地通信
// 推荐: Unix 域套接字
int high_performance_local_communication() {
int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
// 配置套接字...
return sockfd;
}
// 场景3: 系统级消息传递
// 推荐: POSIX 消息队列
int system_level_messaging() {
struct mq_attr attr = {0, 10, 1024, 0};
mqd_t mqdes = mq_open("/system_queue", O_CREAT | O_WRONLY, 0644, &attr);
return (mqdes != (mqd_t)-1) ? 0 : -1;
}
// 场景4: 复杂的企业级应用
// 推荐: D-Bus
int enterprise_application_messaging() {
// 连接到 D-Bus...
// 发送消息...
return 0;
}
6. 最佳实践总结
6.1 选择原则
- 简单需求: 优先考虑实时信号或管道
- 高性能: 选择 epoll + 管道或 Unix 套接字
- 复杂需求: 使用 POSIX 消息队列或 D-Bus
- 跨语言: 优先考虑 D-Bus 或 Unix 套接字
- 系统集成: 使用系统现有的消息机制
6.2 代码质量保证
// 统一的错误处理
#define HANDLE_ERROR(operation) \
do { \
if ((operation) == -1) { \
fprintf(stderr, "错误: %s 失败 - %s\n", #operation, strerror(errno)); \
return -1; \
} \
} while(0)
// 资源清理宏
#define CLEANUP_RESOURCE(resource, cleanup_func) \
do { \
if (resource) { \
cleanup_func(resource); \
resource = NULL; \
} \
} while(0)
// 安全的消息发送函数
int safe_send_priority_message(int priority, const void *data, size_t data_size) {
if (!data || data_size == 0) {
errno = EINVAL;
return -1;
}
if (priority < 0 || priority > 255) {
errno = EINVAL;
return -1;
}
return send_unified_message(priority, "GENERIC", data, data_size);
}
这些替代方案为 putpmsg
提供了现代化的解决方案,每种方案都有其适用场景和优势。选择合适的方案需要根据具体的需求、性能要求和系统环境来决定。