sendmmsg 函数详解
1. 函数介绍
2. 函数原型
#define _GNU_SOURCE
#include <sys/socket.h>
int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
int flags);
3. 功能
sendmmsg
允许向套接字批量发送多个消息,每个消息可以包含数据和控制信息。它支持分散缓冲区发送、地址信息指定、控制数据发送等功能,是构建高性能网络应用的关键工具。
4. 参数
- int sockfd: 套接字文件描述符
- *struct mmsghdr msgvec: 消息向量数组,描述多个发送消息
- unsigned int vlen: 消息向量数组的长度(最大可发送的消息数)
- int flags: 发送标志,与sendmsg相同
5. 返回值
- 成功: 返回实际发送的消息数量
- 失败: 返回-1,并设置errno
6. 相似函数,或关联函数
- sendmsg: 单消息发送函数
- send: 基本发送函数
- recvmmsg: 对应的批量接收函数
- writev: 分散缓冲区写入函数
7. 示例代码
示例1:基础sendmmsg使用
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
/**
* 演示sendmmsg的基础使用方法
*/
int demo_sendmmsg_basic() {
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
struct mmsghdr msgvec[3];
struct iovec iov[3][1];
char *messages[3] = {
"First message from sendmmsg",
"Second message from sendmmsg",
"Third message from sendmmsg"
};
int messages_sent;
printf("=== sendmmsg 基础使用示例 ===\n");
// 创建TCP服务器套接字
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("创建服务器套接字失败");
return -1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8080);
// 绑定套接字
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server_fd);
return -1;
}
// 监听连接
if (listen(server_fd, 1) == -1) {
perror("监听失败");
close(server_fd);
return -1;
}
printf("服务器监听在端口 8080\n");
// 启动客户端连接
if (fork() == 0) {
// 客户端代码
sleep(1); // 等待服务器启动
client_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8080);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(client_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0) {
printf("客户端连接成功\n");
// 接收服务器发送的消息
char buffer[256];
for (int i = 0; i < 3; i++) {
ssize_t bytes = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0) {
buffer[bytes] = '\0';
printf("客户端接收到消息 %d: %s\n", i + 1, buffer);
}
}
}
close(client_fd);
exit(0);
}
// 接受客户端连接
client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
perror("接受连接失败");
close(server_fd);
return -1;
}
printf("客户端连接来自: %s:%d\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
// 准备批量发送消息结构
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < 3; i++) {
// 设置每个消息的缓冲区
iov[i][0].iov_base = messages[i];
iov[i][0].iov_len = strlen(messages[i]);
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = NULL; // TCP不需要目标地址
msgvec[i].msg_hdr.msg_namelen = 0;
}
printf("准备批量发送3个消息...\n");
// 批量发送消息
messages_sent = sendmmsg(client_fd, msgvec, 3, 0);
if (messages_sent == -1) {
perror("sendmmsg 失败");
close(client_fd);
close(server_fd);
return -1;
}
printf("成功发送 %d 个消息:\n", messages_sent);
// 显示发送结果
for (int i = 0; i < messages_sent; i++) {
printf(" 消息 %d: 发送了 %u 字节\n", i + 1, msgvec[i].msg_len);
}
close(client_fd);
close(server_fd);
// 等待客户端结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_sendmmsg_basic();
}
示例2:UDP批量发送
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
/**
* 演示UDP批量发送消息
*/
int demo_udp_batch_send() {
int client_fd;
struct sockaddr_in server_addr;
struct mmsghdr msgvec[5];
struct iovec iov[5][2]; // 每个消息使用2个缓冲区
char *message_parts[5][2] = {
{"UDP ", "Message 1"},
{"UDP ", "Message 2"},
{"UDP ", "Message 3"},
{"UDP ", "Message 4"},
{"UDP ", "Message 5"}
};
int messages_sent;
printf("=== UDP批量发送示例 ===\n");
// 创建UDP客户端套接字
client_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (client_fd == -1) {
perror("创建UDP套接字失败");
return -1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8081);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
printf("UDP客户端准备向 127.0.0.1:8081 发送消息\n");
// 准备批量发送结构(使用分散缓冲区)
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < 5; i++) {
// 设置分散缓冲区
iov[i][0].iov_base = message_parts[i][0];
iov[i][0].iov_len = strlen(message_parts[i][0]);
iov[i][1].iov_base = message_parts[i][1];
iov[i][1].iov_len = strlen(message_parts[i][1]);
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 2;
msgvec[i].msg_hdr.msg_name = &server_addr;
msgvec[i].msg_hdr.msg_namelen = sizeof(server_addr);
}
printf("准备发送5个UDP消息(每个消息使用分散缓冲区)...\n");
// 批量发送UDP消息
messages_sent = sendmmsg(client_fd, msgvec, 5, 0);
if (messages_sent == -1) {
perror("sendmmsg 失败");
close(client_fd);
return -1;
}
printf("成功发送 %d 个UDP消息:\n", messages_sent);
// 显示发送统计
size_t total_bytes = 0;
for (int i = 0; i < messages_sent; i++) {
printf(" 消息 %d: %u 字节\n", i + 1, msgvec[i].msg_len);
total_bytes += msgvec[i].msg_len;
}
printf("总发送字节数: %zu\n", total_bytes);
close(client_fd);
return 0;
}
/**
* UDP服务器用于接收批量消息
*/
int udp_server_receive() {
int server_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer[256];
ssize_t bytes_received;
int message_count = 0;
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP服务器套接字失败");
return -1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8081);
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定UDP套接字失败");
close(server_fd);
return -1;
}
printf("UDP服务器监听在端口 8081\n");
printf("等待接收消息(10秒超时)...\n");
// 设置接收超时
struct timeval timeout;
timeout.tv_sec = 10;
timeout.tv_usec = 0;
setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
// 接收消息
while (message_count < 10) {
bytes_received = recvfrom(server_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&client_addr, &client_len);
if (bytes_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("接收超时\n");
break;
}
perror("接收消息失败");
break;
}
buffer[bytes_received] = '\0';
printf("接收到消息 %d: %s (来自 %s:%d)\n",
++message_count, buffer,
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
}
close(server_fd);
return 0;
}
int main() {
// 启动UDP服务器
if (fork() == 0) {
sleep(1); // 等待客户端准备
return udp_server_receive();
}
// 执行UDP批量发送
sleep(1);
int result = demo_udp_batch_send();
// 等待服务器结束
int status;
wait(&status);
return result;
}
示例3:高性能网络服务器
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <time.h>
/**
* 高性能服务器结构
*/
typedef struct {
int server_fd;
int port;
struct pollfd *clients;
int max_clients;
int client_count;
unsigned long messages_sent;
unsigned long bytes_sent;
} high_perf_server_t;
/**
* 初始化高性能服务器
*/
int server_init(high_perf_server_t *server, int port, int max_clients) {
struct sockaddr_in server_addr;
memset(server, 0, sizeof(high_perf_server_t));
server->port = port;
server->max_clients = max_clients;
server->client_count = 0;
server->messages_sent = 0;
server->bytes_sent = 0;
// 分配客户端数组
server->clients = calloc(max_clients + 1, sizeof(struct pollfd));
if (!server->clients) {
perror("分配客户端数组失败");
return -1;
}
// 创建服务器套接字
server->server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server->server_fd == -1) {
perror("创建服务器套接字失败");
free(server->clients);
return -1;
}
// 设置套接字选项
int opt = 1;
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
perror("设置套接字选项失败");
close(server->server_fd);
free(server->clients);
return -1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
// 绑定套接字
if (bind(server->server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server->server_fd);
free(server->clients);
return -1;
}
// 监听连接
if (listen(server->server_fd, 10) == -1) {
perror("监听失败");
close(server->server_fd);
free(server->clients);
return -1;
}
// 设置服务器套接字为poll监听
server->clients[0].fd = server->server_fd;
server->clients[0].events = POLLIN;
printf("高性能服务器初始化完成,监听端口 %d\n", port);
return 0;
}
/**
* 接受新客户端连接
*/
int server_accept_client(high_perf_server_t *server) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd;
client_fd = accept(server->server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
perror("接受连接失败");
return -1;
}
if (server->client_count >= server->max_clients) {
printf("客户端数量已达上限,拒绝连接\n");
close(client_fd);
return -1;
}
// 添加到客户端数组
int index = server->client_count + 1;
server->clients[index].fd = client_fd;
server->clients[index].events = POLLOUT; // 准备发送数据
server->client_count++;
printf("新客户端连接: %s:%d (fd=%d)\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), client_fd);
return 0;
}
/**
* 使用sendmmsg向客户端批量发送消息
*/
int server_send_batch_messages(high_perf_server_t *server, int client_index) {
int client_fd = server->clients[client_index].fd;
const int BATCH_SIZE = 8; // 每次批量发送8个消息
struct mmsghdr msgvec[BATCH_SIZE];
struct iovec iov[BATCH_SIZE][1];
char messages[BATCH_SIZE][128];
int messages_to_send = 0;
// 准备批量发送消息
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < BATCH_SIZE; i++) {
// 构造消息内容
snprintf(messages[i], sizeof(messages[i]),
"Server Message %lu: Batch %d, Index %d",
server->messages_sent + i + 1,
(int)(server->messages_sent / BATCH_SIZE) + 1, i + 1);
// 设置缓冲区
iov[i][0].iov_base = messages[i];
iov[i][0].iov_len = strlen(messages[i]);
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = NULL;
msgvec[i].msg_hdr.msg_namelen = 0;
messages_to_send++;
}
// 批量发送消息
int messages_sent = sendmmsg(client_fd, msgvec, messages_to_send, MSG_NOSIGNAL);
if (messages_sent == -1) {
if (errno == EPIPE || errno == ECONNRESET) {
printf("客户端 %d 连接断开\n", client_fd);
return -1;
}
perror("sendmmsg 失败");
return -1;
}
// 更新统计信息
server->messages_sent += messages_sent;
for (int i = 0; i < messages_sent; i++) {
server->bytes_sent += msgvec[i].msg_len;
}
printf("向客户端 %d 批量发送 %d 个消息\n", client_fd, messages_sent);
return 0;
}
/**
* 运行服务器主循环
*/
int server_run(high_perf_server_t *server) {
printf("服务器开始运行,等待客户端连接...\n");
time_t start_time = time(NULL);
while (difftime(time(NULL), start_time) < 30) { // 运行30秒
// 使用poll等待事件
int nfds = server->client_count + 1;
int activity = poll(server->clients, nfds, 1000); // 1秒超时
if (activity == -1) {
if (errno == EINTR) continue; // 被信号中断
perror("poll 失败");
break;
}
if (activity == 0) {
// 超时,继续循环
continue;
}
// 检查服务器套接字(新连接)
if (server->clients[0].revents & POLLIN) {
server_accept_client(server);
activity--;
}
// 检查客户端套接字(准备发送数据)
for (int i = 1; i <= server->client_count && activity > 0; i++) {
if (server->clients[i].revents & POLLOUT) {
if (server_send_batch_messages(server, i) == -1) {
// 客户端断开连接,移除客户端
close(server->clients[i].fd);
// 将最后一个客户端移到当前位置
if (i < server->client_count) {
server->clients[i] = server->clients[server->client_count];
}
server->client_count--;
i--; // 重新检查当前位置
}
activity--;
}
}
}
return 0;
}
/**
* 清理服务器资源
*/
void server_cleanup(high_perf_server_t *server) {
// 关闭所有客户端连接
for (int i = 1; i <= server->client_count; i++) {
close(server->clients[i].fd);
}
// 关闭服务器套接字
if (server->server_fd != -1) {
close(server->server_fd);
}
// 释放内存
if (server->clients) {
free(server->clients);
}
printf("服务器资源清理完成\n");
printf("统计信息:\n");
printf(" 总发送消息数: %lu\n", server->messages_sent);
printf(" 总发送字节数: %lu\n", server->bytes_sent);
if (server->messages_sent > 0) {
printf(" 平均消息大小: %.2f 字节\n",
(double)server->bytes_sent / server->messages_sent);
}
}
/**
* 演示高性能网络服务器
*/
int demo_high_performance_server() {
high_perf_server_t server;
printf("=== 高性能网络服务器示例 ===\n");
// 初始化服务器
if (server_init(&server, 8082, 10) != 0) {
return -1;
}
// 启动测试客户端
if (fork() == 0) {
sleep(2); // 等待服务器启动
// 创建多个并发客户端
for (int i = 0; i < 3; i++) {
if (fork() == 0) {
int client_sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8082);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(client_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0) {
printf("客户端 %d 连接成功\n", i + 1);
// 接收服务器发送的消息
char buffer[256];
for (int msg = 0; msg < 20; msg++) {
ssize_t bytes = recv(client_sock, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0) {
buffer[bytes] = '\0';
printf("客户端 %d 接收消息: %s\n", i + 1, buffer);
} else if (bytes == 0) {
printf("客户端 %d 连接关闭\n", i + 1);
break;
}
}
}
close(client_sock);
exit(0);
}
}
// 等待所有客户端完成
for (int i = 0; i < 3; i++) {
int status;
wait(&status);
}
exit(0);
}
// 运行服务器
server_run(&server);
// 清理资源
server_cleanup(&server);
// 等待测试客户端结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_high_performance_server();
}
示例4:实时数据流发送
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
/**
* 实时数据流结构
*/
typedef struct {
int sockfd;
struct sockaddr_in dest_addr;
unsigned long packets_sent;
unsigned long bytes_sent;
time_t start_time;
volatile int running;
} data_stream_t;
// 全局变量用于信号处理
static data_stream_t *g_stream = NULL;
/**
* 信号处理函数
*/
void signal_handler(int sig) {
if (g_stream) {
g_stream->running = 0;
printf("\n收到信号 %d,准备停止数据流...\n", sig);
}
}
/**
* 初始化数据流发送器
*/
int stream_init(data_stream_t *stream, const char *dest_ip, int dest_port) {
memset(stream, 0, sizeof(data_stream_t));
stream->start_time = time(NULL);
stream->running = 1;
// 设置全局指针用于信号处理
g_stream = stream;
// 注册信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// 创建UDP套接字
stream->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (stream->sockfd == -1) {
perror("创建UDP套接字失败");
return -1;
}
// 设置目标地址
memset(&stream->dest_addr, 0, sizeof(stream->dest_addr));
stream->dest_addr.sin_family = AF_INET;
stream->dest_addr.sin_port = htons(dest_port);
stream->dest_addr.sin_addr.s_addr = inet_addr(dest_ip);
printf("数据流发送器初始化完成\n");
printf("目标地址: %s:%d\n", dest_ip, dest_port);
return 0;
}
/**
* 发送实时数据包批次
*/
int stream_send_batch(data_stream_t *stream) {
const int BATCH_SIZE = 16; // 每次发送16个数据包
struct mmsghdr msgvec[BATCH_SIZE];
struct iovec iov[BATCH_SIZE][1];
char packets[BATCH_SIZE][256];
struct timespec ts;
// 准备批量发送结构
memset(msgvec, 0, sizeof(msgvec));
clock_gettime(CLOCK_REALTIME, &ts);
for (int i = 0; i < BATCH_SIZE; i++) {
// 构造实时数据包
snprintf(packets[i], sizeof(packets[i]),
"REALTIME_DATA:%lu.%09ld:Packet_%lu:Value_%d",
ts.tv_sec, ts.tv_nsec,
stream->packets_sent + i + 1,
rand() % 1000);
// 设置缓冲区
iov[i][0].iov_base = packets[i];
iov[i][0].iov_len = strlen(packets[i]);
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = &stream->dest_addr;
msgvec[i].msg_hdr.msg_namelen = sizeof(stream->dest_addr);
}
// 批量发送数据包
int packets_sent = sendmmsg(stream->sockfd, msgvec, BATCH_SIZE, 0);
if (packets_sent == -1) {
perror("sendmmsg 发送数据包失败");
return -1;
}
// 更新统计信息
stream->packets_sent += packets_sent;
for (int i = 0; i < packets_sent; i++) {
stream->bytes_sent += msgvec[i].msg_len;
}
return packets_sent;
}
/**
* 显示实时统计信息
*/
void stream_show_stats(data_stream_t *stream) {
time_t current_time = time(NULL);
double uptime = difftime(current_time, stream->start_time);
if (uptime > 0) {
double packets_per_sec = stream->packets_sent / uptime;
double bytes_per_sec = stream->bytes_sent / uptime;
printf("\r运行时间: %.0fs | 数据包: %lu | 字节: %lu | "
"速率: %.0f包/s %.2fMB/s",
uptime, stream->packets_sent, stream->bytes_sent,
packets_per_sec, bytes_per_sec / (1024 * 1024));
fflush(stdout);
}
}
/**
* 运行数据流发送器
*/
int stream_run(data_stream_t *stream) {
printf("数据流发送器开始运行,按 Ctrl+C 停止\n");
time_t last_stats_time = time(NULL);
int batch_count = 0;
while (stream->running) {
int result = stream_send_batch(stream);
if (result == -1) {
break;
}
batch_count++;
// 定期显示统计信息
time_t current_time = time(NULL);
if (current_time - last_stats_time >= 1) {
stream_show_stats(stream);
last_stats_time = current_time;
}
// 控制发送速率(每秒约1000个数据包)
if (batch_count % 64 == 0) {
usleep(10000); // 10ms延迟
}
}
printf("\n数据流发送器停止\n");
return 0;
}
/**
* 演示实时数据流发送
*/
int demo_real_time_data_stream() {
data_stream_t stream;
printf("=== 实时数据流发送示例 ===\n");
// 初始化数据流发送器
if (stream_init(&stream, "127.0.0.1", 8083) != 0) {
return -1;
}
// 启动数据接收器
if (fork() == 0) {
int server_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer[512];
ssize_t bytes_received;
int packet_count = 0;
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建接收器套接字失败");
exit(1);
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8083);
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定接收器套接字失败");
close(server_fd);
exit(1);
}
printf("数据接收器启动,监听端口 8083\n");
// 设置接收超时
struct timeval timeout;
timeout.tv_sec = 30;
timeout.tv_usec = 0;
setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
// 接收数据包
while (packet_count < 1000) {
bytes_received = recvfrom(server_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&client_addr, &client_len);
if (bytes_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("接收超时\n");
break;
}
perror("接收数据包失败");
break;
}
buffer[bytes_received] = '\0';
packet_count++;
if (packet_count % 100 == 0) {
printf("接收器: 接收到 %d 个数据包\n", packet_count);
}
}
printf("接收器: 总共接收到 %d 个数据包\n", packet_count);
close(server_fd);
exit(0);
}
// 运行数据流发送器
srand(time(NULL));
stream_run(&stream);
// 显示最终统计
printf("\n\n最终统计:\n");
printf(" 总发送数据包: %lu\n", stream.packets_sent);
printf(" 总发送字节数: %lu\n", stream.bytes_sent);
printf(" 平均包大小: %.2f 字节\n",
stream.packets_sent > 0 ? (double)stream.bytes_sent / stream.packets_sent : 0);
// 清理资源
close(stream.sockfd);
// 等待接收器结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_real_time_data_stream();
}
示例5:性能对比测试
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
/**
* 性能测试结构
*/
typedef struct {
const char *name;
unsigned long messages_sent;
unsigned long bytes_sent;
struct timeval start_time;
struct timeval end_time;
} perf_test_t;
/**
* 使用传统sendmsg进行测试
*/
int test_sendmsg_performance(int sockfd, struct sockaddr_in *dest_addr,
int message_count, const char *message) {
struct msghdr msg;
struct iovec iov[1];
int sent_count = 0;
// 准备发送结构
memset(&msg, 0, sizeof(msg));
iov[0].iov_base = (void*)message;
iov[0].iov_len = strlen(message);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_name = dest_addr;
msg.msg_namelen = sizeof(*dest_addr);
// 逐个发送消息
while (sent_count < message_count) {
if (sendmsg(sockfd, &msg, 0) == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("sendmsg 失败");
break;
}
} else {
sent_count++;
}
}
return sent_count;
}
/**
* 使用sendmmsg进行测试
*/
int test_sendmmsg_performance(int sockfd, struct sockaddr_in *dest_addr,
int message_count, const char *message) {
const int BATCH_SIZE = 32;
struct mmsghdr msgvec[BATCH_SIZE];
struct iovec iov[BATCH_SIZE][1];
char *messages[BATCH_SIZE];
int total_sent = 0;
int messages_sent;
// 准备批量发送结构
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < BATCH_SIZE; i++) {
messages[i] = strdup(message);
if (!messages[i]) {
perror("分配消息缓冲区失败");
return total_sent;
}
iov[i][0].iov_base = messages[i];
iov[i][0].iov_len = strlen(message);
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = dest_addr;
msgvec[i].msg_hdr.msg_namelen = sizeof(*dest_addr);
}
// 批量发送消息
while (total_sent < message_count) {
int to_send = (message_count - total_sent < BATCH_SIZE) ?
message_count - total_sent : BATCH_SIZE;
messages_sent = sendmmsg(sockfd, msgvec, to_send, 0);
if (messages_sent == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("sendmmsg 失败");
break;
}
} else {
total_sent += messages_sent;
}
}
// 释放内存
for (int i = 0; i < BATCH_SIZE; i++) {
free(messages[i]);
}
return total_sent;
}
/**
* UDP性能测试服务器
*/
int perf_test_server(int port) {
int server_fd;
struct sockaddr_in server_addr;
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP服务器套接字失败");
return -1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定服务器套接字失败");
close(server_fd);
return -1;
}
printf("性能测试服务器启动,监听端口 %d\n", port);
return server_fd;
}
/**
* 演示性能对比测试
*/
int demo_performance_comparison() {
int server_fd, client_fd;
struct sockaddr_in server_addr, dest_addr;
const int SERVER_PORT = 8084;
const int MESSAGE_COUNT = 10000;
const char *test_message = "Performance test message for sendmmsg vs sendmsg comparison";
perf_test_t tests[2];
printf("=== sendmmsg vs sendmsg 性能对比测试 ===\n");
// 初始化测试结构
tests[0].name = "sendmsg";
tests[0].messages_sent = 0;
tests[0].bytes_sent = 0;
tests[1].name = "sendmmsg";
tests[1].messages_sent = 0;
tests[1].bytes_sent = 0;
// 启动服务器
server_fd = perf_test_server(SERVER_PORT);
if (server_fd == -1) {
return -1;
}
// 创建客户端套接字
client_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (client_fd == -1) {
perror("创建客户端套接字失败");
close(server_fd);
return -1;
}
// 设置目标地址
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(SERVER_PORT);
dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
// 启动服务器接收线程
if (fork() == 0) {
char buffer[512];
int received_count = 0;
printf("服务器开始接收测试消息...\n");
// 设置接收超时
struct timeval timeout;
timeout.tv_sec = 30;
timeout.tv_usec = 0;
setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
while (received_count < MESSAGE_COUNT * 2) { // 两次测试
ssize_t bytes = recv(server_fd, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0) {
received_count++;
if (received_count % 1000 == 0) {
printf("服务器已接收 %d 个消息\n", received_count);
}
} else if (bytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("服务器接收超时\n");
break;
}
}
}
printf("服务器接收完成,总共接收 %d 个消息\n", received_count);
close(server_fd);
exit(0);
}
// 等待服务器启动
sleep(1);
// 测试1: 使用sendmsg
printf("\n测试1: 使用传统sendmsg发送 %d 个消息...\n", MESSAGE_COUNT);
gettimeofday(&tests[0].start_time, NULL);
tests[0].messages_sent = test_sendmsg_performance(client_fd, &dest_addr,
MESSAGE_COUNT, test_message);
tests[0].bytes_sent = tests[0].messages_sent * strlen(test_message);
gettimeofday(&tests[0].end_time, NULL);
printf("sendmsg测试完成: 发送 %lu 个消息\n", tests[0].messages_sent);
// 短暂休息
sleep(1);
// 测试2: 使用sendmmsg
printf("\n测试2: 使用sendmmsg发送 %d 个消息...\n", MESSAGE_COUNT);
gettimeofday(&tests[1].start_time, NULL);
tests[1].messages_sent = test_sendmmsg_performance(client_fd, &dest_addr,
MESSAGE_COUNT, test_message);
tests[1].bytes_sent = tests[1].messages_sent * strlen(test_message);
gettimeofday(&tests[1].end_time, NULL);
printf("sendmmsg测试完成: 发送 %lu 个消息\n", tests[1].messages_sent);
// 计算并显示结果
printf("\n=== 性能测试结果 ===\n");
for (int i = 0; i < 2; i++) {
double elapsed_time = (tests[i].end_time.tv_sec - tests[i].start_time.tv_sec) +
(tests[i].end_time.tv_usec - tests[i].start_time.tv_usec) / 1000000.0;
double messages_per_sec = tests[i].messages_sent / elapsed_time;
double bytes_per_sec = tests[i].bytes_sent / elapsed_time;
printf("%s 测试:\n", tests[i].name);
printf(" 发送消息数: %lu\n", tests[i].messages_sent);
printf(" 发送字节数: %lu\n", tests[i].bytes_sent);
printf(" 耗时: %.3f 秒\n", elapsed_time);
printf(" 吞吐量: %.0f 消息/秒 (%.2f MB/s)\n",
messages_per_sec, bytes_per_sec / (1024 * 1024));
printf(" 平均延迟: %.3f 微秒/消息\n",
(elapsed_time * 1000000) / tests[i].messages_sent);
printf("\n");
}
// 计算性能提升
double sendmsg_time = (tests[0].end_time.tv_sec - tests[0].start_time.tv_sec) +
(tests[0].end_time.tv_usec - tests[0].start_time.tv_usec) / 1000000.0;
double sendmmsg_time = (tests[1].end_time.tv_sec - tests[1].start_time.tv_sec) +
(tests[1].end_time.tv_usec - tests[1].start_time.tv_usec) / 1000000.0;
if (sendmsg_time > 0 && sendmmsg_time > 0) {
double improvement = (sendmsg_time - sendmmsg_time) / sendmsg_time * 100;
printf("性能提升: %.1f%%\n", improvement);
}
close(client_fd);
// 等待服务器结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_performance_comparison();
}
sendmmsg 标志参数详解
常用标志:
- MSG_CONFIRM: 提供路径确认反馈
- MSG_DONTROUTE: 不使用网关路由
- MSG_DONTWAIT: 非阻塞操作
- MSG_EOR: 发送记录结束标记
- MSG_MORE: 还有更多数据要发送
- MSG_NOSIGNAL: 发送时不产生SIGPIPE信号
- MSG_OOB: 发送带外数据
高级标志:
- MSG_PROXY: SOCKS代理相关
- MSG_TRYHARD: 尽力发送(已废弃)
使用注意事项
系统要求:
- 内核版本: 需要Linux 2.6.39或更高版本
- glibc版本: 需要支持sendmmsg的glibc版本
- 编译选项: 需要定义_GNU_SOURCE
性能优化:
- 批量大小: 根据应用需求选择合适的批量大小
- 缓冲区管理: 合理分配缓冲区避免内存浪费
- 错误处理: 妥善处理部分发送的情况
错误处理:
- 部分发送: 处理实际发送消息数少于请求的情况
- 连接状态: 检查连接是否正常
- 资源清理: 及时关闭套接字和释放内存
安全考虑:
- 缓冲区溢出: 确保消息缓冲区大小正确
- 输入验证: 验证发送的数据内容
- 权限检查: 确保有适当的网络访问权限
sendmmsg 优势
1. 性能优势:
- 减少系统调用: 单次调用发送多个消息
- 降低上下文切换: 减少用户态和内核态切换
- 提高吞吐量: 特别适用于小消息高频发送场景
2. 功能优势:
- 完整功能: 支持sendmsg的所有功能
- 分散缓冲区: 每个消息可使用多个缓冲区
- 控制信息: 支持发送辅助控制数据
3. 应用场景:
- 高并发服务器: Web服务器、游戏服务器
- 实时系统: 音视频流传输、传感器数据采集
- 消息队列: 批量消息处理系统
- 网络协议: 实现自定义网络协议
总结
sendmmsg
是构建高性能网络应用的重要工具,它提供了:
- 批量消息发送能力,显著减少系统调用开销
- 与sendmsg相同的完整功能集
- 更好的性能和可扩展性
- 适用于高吞吐量的实时应用
通过合理使用 sendmmsg
,可以大幅提升网络应用的性能,特别是在需要发送大量小消息的场景中效果显著。在实际应用中,需要注意批量大小的选择、错误处理和系统兼容性等问题。