1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h>
// 消息类型定义 #define BAND_LOW 25 // 低优先级 #define BAND_NORMAL 50 // 普通优先级 #define BAND_MEDIUM 100 // 中等优先级 #define BAND_HIGH 150 // 高优先级 #define BAND_CRITICAL 200 // 关键优先级
// 消息队列管理器 struct message_manager { int total_messages; int processed_messages; int dropped_messages; struct { int band; int msg_id; char type[32]; char content[256]; time_t created_time; time_t processed_time; } queue[50]; };
// 全局消息管理器 struct message_manager msg_mgr = {0};
// 标志定义 #define MSG_HIPRI 0x01 #define MSG_ANY 0x02 #define MSG_BAND 0x04
// 模拟的 strbuf 结构 struct stream_buffer { int maxlen; int len; char *buf; };
// 添加消息到队列 int queue_message(int band, const char *type, const char *content) { if (msg_mgr.total_messages >= 50) { msg_mgr.dropped_messages++; printf("警告: 消息队列已满,丢弃消息\n"); return -1; } int index = msg_mgr.total_messages++; msg_mgr.queue[index].band = band; msg_mgr.queue[index].msg_id = msg_mgr.total_messages; strncpy(msg_mgr.queue[index].type, type, 31); msg_mgr.queue[index].type[31] = '\0'; strncpy(msg_mgr.queue[index].content, content, 255); msg_mgr.queue[index].content[255] = '\0'; msg_mgr.queue[index].created_time = time(NULL); msg_mgr.queue[index].processed_time = 0; printf("入队消息 #%d: band=%d, 类型=%s\n", msg_mgr.queue[index].msg_id, band, type); return 0; }
// 模拟的 getpmsg 实现 int advanced_getpmsg(struct stream_buffer *ctlptr, struct stream_buffer *dataptr, int *bandp, int *flagsp) { if (msg_mgr.total_messages == 0) { return -1; // 队列为空 } int msg_index = -1; int target_band = *bandp; int flags = *flagsp; // 根据标志选择消息 if (flags & MSG_HIPRI) { // 查找最高优先级消息 int max_band = -1; for (int i = 0; i < msg_mgr.total_messages; i++) { if (msg_mgr.queue[i].band > max_band) { max_band = msg_mgr.queue[i].band; msg_index = i; } } } else if (flags & MSG_BAND) { // 查找指定优先级消息 for (int i = 0; i < msg_mgr.total_messages; i++) { if (msg_mgr.queue[i].band == target_band) { msg_index = i; break; } } } else { // 默认:按顺序处理 msg_index = 0; } if (msg_index == -1) { return -1; // 未找到消息 } // 处理消息 struct { int band; int msg_id; char type[32]; char content[256]; time_t created_time; time_t processed_time; } *msg = &msg_mgr.queue[msg_index]; msg->processed_time = time(NULL); msg_mgr.processed_messages++; // 准备返回数据 if (ctlptr && ctlptr->buf) { char ctl_info[256]; snprintf(ctl_info, sizeof(ctl_info), "MSG_ID=%d,BAND=%d,TYPE=%s", msg->msg_id, msg->band, msg->type); int copy_len = (strlen(ctl_info) + 1 < ctlptr->maxlen) ? strlen(ctl_info) + 1 : ctlptr->maxlen; memcpy(ctlptr->buf, ctl_info, copy_len - 1); ctlptr->buf[copy_len - 1] = '\0'; ctlptr->len = copy_len - 1; } if (dataptr && dataptr->buf) { int copy_len = (strlen(msg->content) + 1 < dataptr->maxlen) ? strlen(msg->content) + 1 : dataptr->maxlen; memcpy(dataptr->buf, msg->content, copy_len - 1); dataptr->buf[copy_len - 1] = '\0'; dataptr->len = copy_len - 1; } // 更新返回参数 *bandp = msg->band; *flagsp = (msg->band >= BAND_HIGH) ? MSG_HIPRI : 0; // 从队列中移除消息 for (int i = msg_index; i < msg_mgr.total_messages - 1; i++) { msg_mgr.queue[i] = msg_mgr.queue[i + 1]; } msg_mgr.total_messages--; return 0; }
// 显示系统统计 void show_system_stats() { printf("\n=== 消息系统统计 ===\n"); printf("总消息数: %d\n", msg_mgr.total_messages); printf("已处理消息: %d\n", msg_mgr.processed_messages); printf("丢弃消息: %d\n", msg_mgr.dropped_messages); printf("队列中消息: %d\n", msg_mgr.total_messages); }
// 显示队列内容 void show_queue_contents() { printf("\n=== 队列内容 ===\n"); if (msg_mgr.total_messages == 0) { printf("队列为空\n"); return; } printf("优先级 ID 类型 内容\n"); printf("-------- ---- -------------- ------------------------\n"); for (int i = 0; i < msg_mgr.total_messages; i++) { printf("%-8d %-4d %-14s %s\n", msg_mgr.queue[i].band, msg_mgr.queue[i].msg_id, msg_mgr.queue[i].type, msg_mgr.queue[i].content); } }
// 按优先级分类统计 void show_priority_statistics() { int band_counts[5] = {0}; // 低、普通、中等、高、关键 for (int i = 0; i < msg_mgr.total_messages; i++) { int band = msg_mgr.queue[i].band; if (band <= BAND_LOW) band_counts[0]++; else if (band <= BAND_NORMAL) band_counts[1]++; else if (band <= BAND_MEDIUM) band_counts[2]++; else if (band <= BAND_HIGH) band_counts[3]++; else band_counts[4]++; } printf("\n=== 优先级统计 ===\n"); printf("低优先级 (0-25): %d 条消息\n", band_counts[0]); printf("普通优先级 (26-50): %d 条消息\n", band_counts[1]); printf("中等优先级 (51-100): %d 条消息\n", band_counts[2]); printf("高优先级 (101-150): %d 条消息\n", band_counts[3]); printf("关键优先级 (151-255): %d 条消息\n", band_counts[4]); }
int main() { struct stream_buffer ctlbuf, databuf; char ctl_buffer[256], data_buffer[1024]; int band, flags; int result; printf("=== 高级优先级消息管理系统 ===\n\n"); // 初始化缓冲区 ctlbuf.maxlen = sizeof(ctl_buffer); ctlbuf.buf = ctl_buffer; ctlbuf.len = 0; databuf.maxlen = sizeof(data_buffer); databuf.buf = data_buffer; databuf.len = 0; // 添加各种优先级的消息 printf("初始化消息队列...\n"); queue_message(BAND_CRITICAL, "ALERT", "系统紧急告警:磁盘空间不足"); queue_message(BAND_HIGH, "ERROR", "应用程序错误:数据库连接失败"); queue_message(BAND_MEDIUM, "WARNING", "系统警告:CPU使用率过高"); queue_message(BAND_NORMAL, "INFO", "用户登录成功"); queue_message(BAND_LOW, "DEBUG", "调试信息:函数调用跟踪"); queue_message(BAND_CRITICAL, "ALERT", "安全告警:多次登录失败"); queue_message(BAND_HIGH, "ERROR", "网络错误:连接超时"); queue_message(BAND_MEDIUM, "NOTICE", "系统通知:配置文件已更新"); show_system_stats(); show_queue_contents(); show_priority_statistics(); // 处理消息的示例 printf("\n=== 消息处理演示 ===\n"); // 1. 处理最高优先级消息 printf("\n--- 处理最高优先级消息 ---\n"); band = 0; flags = MSG_HIPRI; result = advanced_getpmsg(&ctlbuf, &databuf, &band, &flags); if (result == 0) { printf("处理成功:\n"); printf(" 优先级: %d\n", band); printf(" 控制信息: %s\n", ctlbuf.buf); printf(" 消息内容: %s\n", databuf.buf); printf(" 消息标志: %s\n", (flags & MSG_HIPRI) ? "高优先级" : "普通优先级"); } // 2. 处理指定优先级消息 printf("\n--- 处理中等优先级消息 ---\n"); band = BAND_MEDIUM; flags = MSG_BAND; result = advanced_getpmsg(&ctlbuf, &databuf, &band, &flags); if (result == 0) { printf("处理成功:\n"); printf(" 优先级: %d\n", band); printf(" 控制信息: %s\n", ctlbuf.buf); printf(" 消息内容: %s\n", databuf.buf); } // 3. 处理任意消息 printf("\n--- 处理任意消息 ---\n"); band = 0; flags = MSG_ANY; result = advanced_getpmsg(&ctlbuf, &databuf, &band, &flags); if (result == 0) { printf("处理成功:\n"); printf(" 优先级: %d\n", band); printf(" 控制信息: %s\n", ctlbuf.buf); printf(" 消息内容: %s\n", databuf.buf); } // 显示处理后的状态 show_system_stats(); show_queue_contents(); printf("\n=== STREAMS 优先级消息系统特点 ===\n"); printf("1. 支持 0-255 级优先级\n"); printf("2. 可以按优先级选择性接收消息\n"); printf("3. 高优先级消息可以抢占处理\n"); printf("4. 适用于实时系统和关键任务应用\n"); printf("\nLinux 替代方案:\n"); printf("- 实时信号 (RT signals)\n"); printf("- D-Bus 消息系统\n"); printf("- systemd journal\n"); printf("- 自定义优先级队列\n"); return 0; }
|