sendmmsg系统调用及示例

sendmmsg 函数详解

1. 函数介绍

sendmmsg 是Linux 2.6.39引入的高效批量发送消息系统调用。它是 sendmsg 的批量版本,允许应用程序在单次系统调用中发送多个消息,显著减少了系统调用开销,特别适用于高吞吐量的网络服务器和实时应用。

2. 函数原型

#define _GNU_SOURCE
#include <sys/socket.h>
int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
             int flags);

3. 功能

sendmmsg 允许向套接字批量发送多个消息,每个消息可以包含数据和控制信息。它支持分散缓冲区发送、地址信息指定、控制数据发送等功能,是构建高性能网络应用的关键工具。

4. 参数

  • int sockfd: 套接字文件描述符
  • *struct mmsghdr msgvec: 消息向量数组,描述多个发送消息
  • unsigned int vlen: 消息向量数组的长度(最大可发送的消息数)
  • int flags: 发送标志,与sendmsg相同

5. 返回值

  • 成功: 返回实际发送的消息数量
  • 失败: 返回-1,并设置errno

6. 相似函数,或关联函数

  • sendmsg: 单消息发送函数
  • send: 基本发送函数
  • recvmmsg: 对应的批量接收函数
  • writev: 分散缓冲区写入函数

7. 示例代码

示例1:基础sendmmsg使用

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

/**
 * 演示sendmmsg的基础使用方法
 */
int demo_sendmmsg_basic() {
    int server_fd, client_fd;
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len = sizeof(client_addr);
    struct mmsghdr msgvec[3];
    struct iovec iov[3][1];
    char *messages[3] = {
        "First message from sendmmsg",
        "Second message from sendmmsg", 
        "Third message from sendmmsg"
    };
    int messages_sent;
    
    printf("=== sendmmsg 基础使用示例 ===\n");
    
    // 创建TCP服务器套接字
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("创建服务器套接字失败");
        return -1;
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8080);
    
    // 绑定套接字
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(server_fd);
        return -1;
    }
    
    // 监听连接
    if (listen(server_fd, 1) == -1) {
        perror("监听失败");
        close(server_fd);
        return -1;
    }
    
    printf("服务器监听在端口 8080\n");
    
    // 启动客户端连接
    if (fork() == 0) {
        // 客户端代码
        sleep(1);  // 等待服务器启动
        client_fd = socket(AF_INET, SOCK_STREAM, 0);
        struct sockaddr_in serv_addr;
        
        memset(&serv_addr, 0, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(8080);
        serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
        
        if (connect(client_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0) {
            printf("客户端连接成功\n");
            
            // 接收服务器发送的消息
            char buffer[256];
            for (int i = 0; i < 3; i++) {
                ssize_t bytes = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
                if (bytes > 0) {
                    buffer[bytes] = '\0';
                    printf("客户端接收到消息 %d: %s\n", i + 1, buffer);
                }
            }
        }
        
        close(client_fd);
        exit(0);
    }
    
    // 接受客户端连接
    client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
    if (client_fd == -1) {
        perror("接受连接失败");
        close(server_fd);
        return -1;
    }
    
    printf("客户端连接来自: %s:%d\n", 
           inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    
    // 准备批量发送消息结构
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < 3; i++) {
        // 设置每个消息的缓冲区
        iov[i][0].iov_base = messages[i];
        iov[i][0].iov_len = strlen(messages[i]);
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = NULL;  // TCP不需要目标地址
        msgvec[i].msg_hdr.msg_namelen = 0;
    }
    
    printf("准备批量发送3个消息...\n");
    
    // 批量发送消息
    messages_sent = sendmmsg(client_fd, msgvec, 3, 0);
    
    if (messages_sent == -1) {
        perror("sendmmsg 失败");
        close(client_fd);
        close(server_fd);
        return -1;
    }
    
    printf("成功发送 %d 个消息:\n", messages_sent);
    
    // 显示发送结果
    for (int i = 0; i < messages_sent; i++) {
        printf("  消息 %d: 发送了 %u 字节\n", i + 1, msgvec[i].msg_len);
    }
    
    close(client_fd);
    close(server_fd);
    
    // 等待客户端结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_sendmmsg_basic();
}

示例2:UDP批量发送

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>

/**
 * 演示UDP批量发送消息
 */
int demo_udp_batch_send() {
    int client_fd;
    struct sockaddr_in server_addr;
    struct mmsghdr msgvec[5];
    struct iovec iov[5][2];  // 每个消息使用2个缓冲区
    char *message_parts[5][2] = {
        {"UDP ", "Message 1"},
        {"UDP ", "Message 2"},
        {"UDP ", "Message 3"},
        {"UDP ", "Message 4"},
        {"UDP ", "Message 5"}
    };
    int messages_sent;
    
    printf("=== UDP批量发送示例 ===\n");
    
    // 创建UDP客户端套接字
    client_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (client_fd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8081);
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    
    printf("UDP客户端准备向 127.0.0.1:8081 发送消息\n");
    
    // 准备批量发送结构(使用分散缓冲区)
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < 5; i++) {
        // 设置分散缓冲区
        iov[i][0].iov_base = message_parts[i][0];
        iov[i][0].iov_len = strlen(message_parts[i][0]);
        iov[i][1].iov_base = message_parts[i][1];
        iov[i][1].iov_len = strlen(message_parts[i][1]);
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 2;
        msgvec[i].msg_hdr.msg_name = &server_addr;
        msgvec[i].msg_hdr.msg_namelen = sizeof(server_addr);
    }
    
    printf("准备发送5个UDP消息(每个消息使用分散缓冲区)...\n");
    
    // 批量发送UDP消息
    messages_sent = sendmmsg(client_fd, msgvec, 5, 0);
    
    if (messages_sent == -1) {
        perror("sendmmsg 失败");
        close(client_fd);
        return -1;
    }
    
    printf("成功发送 %d 个UDP消息:\n", messages_sent);
    
    // 显示发送统计
    size_t total_bytes = 0;
    for (int i = 0; i < messages_sent; i++) {
        printf("  消息 %d: %u 字节\n", i + 1, msgvec[i].msg_len);
        total_bytes += msgvec[i].msg_len;
    }
    
    printf("总发送字节数: %zu\n", total_bytes);
    
    close(client_fd);
    
    return 0;
}

/**
 * UDP服务器用于接收批量消息
 */
int udp_server_receive() {
    int server_fd;
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len = sizeof(client_addr);
    char buffer[256];
    ssize_t bytes_received;
    int message_count = 0;
    
    server_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (server_fd == -1) {
        perror("创建UDP服务器套接字失败");
        return -1;
    }
    
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8081);
    
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定UDP套接字失败");
        close(server_fd);
        return -1;
    }
    
    printf("UDP服务器监听在端口 8081\n");
    printf("等待接收消息(10秒超时)...\n");
    
    // 设置接收超时
    struct timeval timeout;
    timeout.tv_sec = 10;
    timeout.tv_usec = 0;
    setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
    
    // 接收消息
    while (message_count < 10) {
        bytes_received = recvfrom(server_fd, buffer, sizeof(buffer) - 1, 0,
                                 (struct sockaddr*)&client_addr, &client_len);
        if (bytes_received == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                printf("接收超时\n");
                break;
            }
            perror("接收消息失败");
            break;
        }
        
        buffer[bytes_received] = '\0';
        printf("接收到消息 %d: %s (来自 %s:%d)\n", 
               ++message_count, buffer,
               inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    }
    
    close(server_fd);
    return 0;
}

int main() {
    // 启动UDP服务器
    if (fork() == 0) {
        sleep(1);  // 等待客户端准备
        return udp_server_receive();
    }
    
    // 执行UDP批量发送
    sleep(1);
    int result = demo_udp_batch_send();
    
    // 等待服务器结束
    int status;
    wait(&status);
    
    return result;
}

示例3:高性能网络服务器

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <time.h>

/**
 * 高性能服务器结构
 */
typedef struct {
    int server_fd;
    int port;
    struct pollfd *clients;
    int max_clients;
    int client_count;
    unsigned long messages_sent;
    unsigned long bytes_sent;
} high_perf_server_t;

/**
 * 初始化高性能服务器
 */
int server_init(high_perf_server_t *server, int port, int max_clients) {
    struct sockaddr_in server_addr;
    
    memset(server, 0, sizeof(high_perf_server_t));
    server->port = port;
    server->max_clients = max_clients;
    server->client_count = 0;
    server->messages_sent = 0;
    server->bytes_sent = 0;
    
    // 分配客户端数组
    server->clients = calloc(max_clients + 1, sizeof(struct pollfd));
    if (!server->clients) {
        perror("分配客户端数组失败");
        return -1;
    }
    
    // 创建服务器套接字
    server->server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server->server_fd == -1) {
        perror("创建服务器套接字失败");
        free(server->clients);
        return -1;
    }
    
    // 设置套接字选项
    int opt = 1;
    if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
        perror("设置套接字选项失败");
        close(server->server_fd);
        free(server->clients);
        return -1;
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);
    
    // 绑定套接字
    if (bind(server->server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(server->server_fd);
        free(server->clients);
        return -1;
    }
    
    // 监听连接
    if (listen(server->server_fd, 10) == -1) {
        perror("监听失败");
        close(server->server_fd);
        free(server->clients);
        return -1;
    }
    
    // 设置服务器套接字为poll监听
    server->clients[0].fd = server->server_fd;
    server->clients[0].events = POLLIN;
    
    printf("高性能服务器初始化完成,监听端口 %d\n", port);
    return 0;
}

/**
 * 接受新客户端连接
 */
int server_accept_client(high_perf_server_t *server) {
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    int client_fd;
    
    client_fd = accept(server->server_fd, (struct sockaddr*)&client_addr, &client_len);
    if (client_fd == -1) {
        perror("接受连接失败");
        return -1;
    }
    
    if (server->client_count >= server->max_clients) {
        printf("客户端数量已达上限,拒绝连接\n");
        close(client_fd);
        return -1;
    }
    
    // 添加到客户端数组
    int index = server->client_count + 1;
    server->clients[index].fd = client_fd;
    server->clients[index].events = POLLOUT;  // 准备发送数据
    server->client_count++;
    
    printf("新客户端连接: %s:%d (fd=%d)\n", 
           inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), client_fd);
    
    return 0;
}

/**
 * 使用sendmmsg向客户端批量发送消息
 */
int server_send_batch_messages(high_perf_server_t *server, int client_index) {
    int client_fd = server->clients[client_index].fd;
    const int BATCH_SIZE = 8;  // 每次批量发送8个消息
    struct mmsghdr msgvec[BATCH_SIZE];
    struct iovec iov[BATCH_SIZE][1];
    char messages[BATCH_SIZE][128];
    int messages_to_send = 0;
    
    // 准备批量发送消息
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < BATCH_SIZE; i++) {
        // 构造消息内容
        snprintf(messages[i], sizeof(messages[i]), 
                "Server Message %lu: Batch %d, Index %d", 
                server->messages_sent + i + 1, 
                (int)(server->messages_sent / BATCH_SIZE) + 1, i + 1);
        
        // 设置缓冲区
        iov[i][0].iov_base = messages[i];
        iov[i][0].iov_len = strlen(messages[i]);
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = NULL;
        msgvec[i].msg_hdr.msg_namelen = 0;
        
        messages_to_send++;
    }
    
    // 批量发送消息
    int messages_sent = sendmmsg(client_fd, msgvec, messages_to_send, MSG_NOSIGNAL);
    
    if (messages_sent == -1) {
        if (errno == EPIPE || errno == ECONNRESET) {
            printf("客户端 %d 连接断开\n", client_fd);
            return -1;
        }
        perror("sendmmsg 失败");
        return -1;
    }
    
    // 更新统计信息
    server->messages_sent += messages_sent;
    for (int i = 0; i < messages_sent; i++) {
        server->bytes_sent += msgvec[i].msg_len;
    }
    
    printf("向客户端 %d 批量发送 %d 个消息\n", client_fd, messages_sent);
    
    return 0;
}

/**
 * 运行服务器主循环
 */
int server_run(high_perf_server_t *server) {
    printf("服务器开始运行,等待客户端连接...\n");
    
    time_t start_time = time(NULL);
    
    while (difftime(time(NULL), start_time) < 30) {  // 运行30秒
        // 使用poll等待事件
        int nfds = server->client_count + 1;
        int activity = poll(server->clients, nfds, 1000);  // 1秒超时
        
        if (activity == -1) {
            if (errno == EINTR) continue;  // 被信号中断
            perror("poll 失败");
            break;
        }
        
        if (activity == 0) {
            // 超时,继续循环
            continue;
        }
        
        // 检查服务器套接字(新连接)
        if (server->clients[0].revents & POLLIN) {
            server_accept_client(server);
            activity--;
        }
        
        // 检查客户端套接字(准备发送数据)
        for (int i = 1; i <= server->client_count && activity > 0; i++) {
            if (server->clients[i].revents & POLLOUT) {
                if (server_send_batch_messages(server, i) == -1) {
                    // 客户端断开连接,移除客户端
                    close(server->clients[i].fd);
                    // 将最后一个客户端移到当前位置
                    if (i < server->client_count) {
                        server->clients[i] = server->clients[server->client_count];
                    }
                    server->client_count--;
                    i--;  // 重新检查当前位置
                }
                activity--;
            }
        }
    }
    
    return 0;
}

/**
 * 清理服务器资源
 */
void server_cleanup(high_perf_server_t *server) {
    // 关闭所有客户端连接
    for (int i = 1; i <= server->client_count; i++) {
        close(server->clients[i].fd);
    }
    
    // 关闭服务器套接字
    if (server->server_fd != -1) {
        close(server->server_fd);
    }
    
    // 释放内存
    if (server->clients) {
        free(server->clients);
    }
    
    printf("服务器资源清理完成\n");
    printf("统计信息:\n");
    printf("  总发送消息数: %lu\n", server->messages_sent);
    printf("  总发送字节数: %lu\n", server->bytes_sent);
    if (server->messages_sent > 0) {
        printf("  平均消息大小: %.2f 字节\n", 
               (double)server->bytes_sent / server->messages_sent);
    }
}

/**
 * 演示高性能网络服务器
 */
int demo_high_performance_server() {
    high_perf_server_t server;
    
    printf("=== 高性能网络服务器示例 ===\n");
    
    // 初始化服务器
    if (server_init(&server, 8082, 10) != 0) {
        return -1;
    }
    
    // 启动测试客户端
    if (fork() == 0) {
        sleep(2);  // 等待服务器启动
        
        // 创建多个并发客户端
        for (int i = 0; i < 3; i++) {
            if (fork() == 0) {
                int client_sock = socket(AF_INET, SOCK_STREAM, 0);
                struct sockaddr_in serv_addr;
                
                memset(&serv_addr, 0, sizeof(serv_addr));
                serv_addr.sin_family = AF_INET;
                serv_addr.sin_port = htons(8082);
                serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
                
                if (connect(client_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0) {
                    printf("客户端 %d 连接成功\n", i + 1);
                    
                    // 接收服务器发送的消息
                    char buffer[256];
                    for (int msg = 0; msg < 20; msg++) {
                        ssize_t bytes = recv(client_sock, buffer, sizeof(buffer) - 1, 0);
                        if (bytes > 0) {
                            buffer[bytes] = '\0';
                            printf("客户端 %d 接收消息: %s\n", i + 1, buffer);
                        } else if (bytes == 0) {
                            printf("客户端 %d 连接关闭\n", i + 1);
                            break;
                        }
                    }
                }
                
                close(client_sock);
                exit(0);
            }
        }
        
        // 等待所有客户端完成
        for (int i = 0; i < 3; i++) {
            int status;
            wait(&status);
        }
        
        exit(0);
    }
    
    // 运行服务器
    server_run(&server);
    
    // 清理资源
    server_cleanup(&server);
    
    // 等待测试客户端结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_high_performance_server();
}

示例4:实时数据流发送

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

/**
 * 实时数据流结构
 */
typedef struct {
    int sockfd;
    struct sockaddr_in dest_addr;
    unsigned long packets_sent;
    unsigned long bytes_sent;
    time_t start_time;
    volatile int running;
} data_stream_t;

// 全局变量用于信号处理
static data_stream_t *g_stream = NULL;

/**
 * 信号处理函数
 */
void signal_handler(int sig) {
    if (g_stream) {
        g_stream->running = 0;
        printf("\n收到信号 %d,准备停止数据流...\n", sig);
    }
}

/**
 * 初始化数据流发送器
 */
int stream_init(data_stream_t *stream, const char *dest_ip, int dest_port) {
    memset(stream, 0, sizeof(data_stream_t));
    stream->start_time = time(NULL);
    stream->running = 1;
    
    // 设置全局指针用于信号处理
    g_stream = stream;
    
    // 注册信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    
    // 创建UDP套接字
    stream->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (stream->sockfd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    // 设置目标地址
    memset(&stream->dest_addr, 0, sizeof(stream->dest_addr));
    stream->dest_addr.sin_family = AF_INET;
    stream->dest_addr.sin_port = htons(dest_port);
    stream->dest_addr.sin_addr.s_addr = inet_addr(dest_ip);
    
    printf("数据流发送器初始化完成\n");
    printf("目标地址: %s:%d\n", dest_ip, dest_port);
    
    return 0;
}

/**
 * 发送实时数据包批次
 */
int stream_send_batch(data_stream_t *stream) {
    const int BATCH_SIZE = 16;  // 每次发送16个数据包
    struct mmsghdr msgvec[BATCH_SIZE];
    struct iovec iov[BATCH_SIZE][1];
    char packets[BATCH_SIZE][256];
    struct timespec ts;
    
    // 准备批量发送结构
    memset(msgvec, 0, sizeof(msgvec));
    
    clock_gettime(CLOCK_REALTIME, &ts);
    
    for (int i = 0; i < BATCH_SIZE; i++) {
        // 构造实时数据包
        snprintf(packets[i], sizeof(packets[i]), 
                "REALTIME_DATA:%lu.%09ld:Packet_%lu:Value_%d", 
                ts.tv_sec, ts.tv_nsec,
                stream->packets_sent + i + 1,
                rand() % 1000);
        
        // 设置缓冲区
        iov[i][0].iov_base = packets[i];
        iov[i][0].iov_len = strlen(packets[i]);
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = &stream->dest_addr;
        msgvec[i].msg_hdr.msg_namelen = sizeof(stream->dest_addr);
    }
    
    // 批量发送数据包
    int packets_sent = sendmmsg(stream->sockfd, msgvec, BATCH_SIZE, 0);
    
    if (packets_sent == -1) {
        perror("sendmmsg 发送数据包失败");
        return -1;
    }
    
    // 更新统计信息
    stream->packets_sent += packets_sent;
    for (int i = 0; i < packets_sent; i++) {
        stream->bytes_sent += msgvec[i].msg_len;
    }
    
    return packets_sent;
}

/**
 * 显示实时统计信息
 */
void stream_show_stats(data_stream_t *stream) {
    time_t current_time = time(NULL);
    double uptime = difftime(current_time, stream->start_time);
    
    if (uptime > 0) {
        double packets_per_sec = stream->packets_sent / uptime;
        double bytes_per_sec = stream->bytes_sent / uptime;
        
        printf("\r运行时间: %.0fs | 数据包: %lu | 字节: %lu | "
               "速率: %.0f包/s %.2fMB/s",
               uptime, stream->packets_sent, stream->bytes_sent,
               packets_per_sec, bytes_per_sec / (1024 * 1024));
        fflush(stdout);
    }
}

/**
 * 运行数据流发送器
 */
int stream_run(data_stream_t *stream) {
    printf("数据流发送器开始运行,按 Ctrl+C 停止\n");
    
    time_t last_stats_time = time(NULL);
    int batch_count = 0;
    
    while (stream->running) {
        int result = stream_send_batch(stream);
        if (result == -1) {
            break;
        }
        
        batch_count++;
        
        // 定期显示统计信息
        time_t current_time = time(NULL);
        if (current_time - last_stats_time >= 1) {
            stream_show_stats(stream);
            last_stats_time = current_time;
        }
        
        // 控制发送速率(每秒约1000个数据包)
        if (batch_count % 64 == 0) {
            usleep(10000);  // 10ms延迟
        }
    }
    
    printf("\n数据流发送器停止\n");
    return 0;
}

/**
 * 演示实时数据流发送
 */
int demo_real_time_data_stream() {
    data_stream_t stream;
    
    printf("=== 实时数据流发送示例 ===\n");
    
    // 初始化数据流发送器
    if (stream_init(&stream, "127.0.0.1", 8083) != 0) {
        return -1;
    }
    
    // 启动数据接收器
    if (fork() == 0) {
        int server_fd;
        struct sockaddr_in server_addr, client_addr;
        socklen_t client_len = sizeof(client_addr);
        char buffer[512];
        ssize_t bytes_received;
        int packet_count = 0;
        
        server_fd = socket(AF_INET, SOCK_DGRAM, 0);
        if (server_fd == -1) {
            perror("创建接收器套接字失败");
            exit(1);
        }
        
        memset(&server_addr, 0, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = INADDR_ANY;
        server_addr.sin_port = htons(8083);
        
        if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
            perror("绑定接收器套接字失败");
            close(server_fd);
            exit(1);
        }
        
        printf("数据接收器启动,监听端口 8083\n");
        
        // 设置接收超时
        struct timeval timeout;
        timeout.tv_sec = 30;
        timeout.tv_usec = 0;
        setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
        
        // 接收数据包
        while (packet_count < 1000) {
            bytes_received = recvfrom(server_fd, buffer, sizeof(buffer) - 1, 0,
                                     (struct sockaddr*)&client_addr, &client_len);
            if (bytes_received == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    printf("接收超时\n");
                    break;
                }
                perror("接收数据包失败");
                break;
            }
            
            buffer[bytes_received] = '\0';
            packet_count++;
            
            if (packet_count % 100 == 0) {
                printf("接收器: 接收到 %d 个数据包\n", packet_count);
            }
        }
        
        printf("接收器: 总共接收到 %d 个数据包\n", packet_count);
        close(server_fd);
        exit(0);
    }
    
    // 运行数据流发送器
    srand(time(NULL));
    stream_run(&stream);
    
    // 显示最终统计
    printf("\n\n最终统计:\n");
    printf("  总发送数据包: %lu\n", stream.packets_sent);
    printf("  总发送字节数: %lu\n", stream.bytes_sent);
    printf("  平均包大小: %.2f 字节\n", 
           stream.packets_sent > 0 ? (double)stream.bytes_sent / stream.packets_sent : 0);
    
    // 清理资源
    close(stream.sockfd);
    
    // 等待接收器结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_real_time_data_stream();
}

示例5:性能对比测试

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>

/**
 * 性能测试结构
 */
typedef struct {
    const char *name;
    unsigned long messages_sent;
    unsigned long bytes_sent;
    struct timeval start_time;
    struct timeval end_time;
} perf_test_t;

/**
 * 使用传统sendmsg进行测试
 */
int test_sendmsg_performance(int sockfd, struct sockaddr_in *dest_addr, 
                            int message_count, const char *message) {
    struct msghdr msg;
    struct iovec iov[1];
    int sent_count = 0;
    
    // 准备发送结构
    memset(&msg, 0, sizeof(msg));
    iov[0].iov_base = (void*)message;
    iov[0].iov_len = strlen(message);
    
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    msg.msg_name = dest_addr;
    msg.msg_namelen = sizeof(*dest_addr);
    
    // 逐个发送消息
    while (sent_count < message_count) {
        if (sendmsg(sockfd, &msg, 0) == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
                perror("sendmsg 失败");
                break;
            }
        } else {
            sent_count++;
        }
    }
    
    return sent_count;
}

/**
 * 使用sendmmsg进行测试
 */
int test_sendmmsg_performance(int sockfd, struct sockaddr_in *dest_addr, 
                             int message_count, const char *message) {
    const int BATCH_SIZE = 32;
    struct mmsghdr msgvec[BATCH_SIZE];
    struct iovec iov[BATCH_SIZE][1];
    char *messages[BATCH_SIZE];
    int total_sent = 0;
    int messages_sent;
    
    // 准备批量发送结构
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < BATCH_SIZE; i++) {
        messages[i] = strdup(message);
        if (!messages[i]) {
            perror("分配消息缓冲区失败");
            return total_sent;
        }
        
        iov[i][0].iov_base = messages[i];
        iov[i][0].iov_len = strlen(message);
        
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = dest_addr;
        msgvec[i].msg_hdr.msg_namelen = sizeof(*dest_addr);
    }
    
    // 批量发送消息
    while (total_sent < message_count) {
        int to_send = (message_count - total_sent < BATCH_SIZE) ? 
                     message_count - total_sent : BATCH_SIZE;
        
        messages_sent = sendmmsg(sockfd, msgvec, to_send, 0);
        
        if (messages_sent == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
                perror("sendmmsg 失败");
                break;
            }
        } else {
            total_sent += messages_sent;
        }
    }
    
    // 释放内存
    for (int i = 0; i < BATCH_SIZE; i++) {
        free(messages[i]);
    }
    
    return total_sent;
}

/**
 * UDP性能测试服务器
 */
int perf_test_server(int port) {
    int server_fd;
    struct sockaddr_in server_addr;
    
    server_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (server_fd == -1) {
        perror("创建UDP服务器套接字失败");
        return -1;
    }
    
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);
    
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定服务器套接字失败");
        close(server_fd);
        return -1;
    }
    
    printf("性能测试服务器启动,监听端口 %d\n", port);
    return server_fd;
}

/**
 * 演示性能对比测试
 */
int demo_performance_comparison() {
    int server_fd, client_fd;
    struct sockaddr_in server_addr, dest_addr;
    const int SERVER_PORT = 8084;
    const int MESSAGE_COUNT = 10000;
    const char *test_message = "Performance test message for sendmmsg vs sendmsg comparison";
    perf_test_t tests[2];
    
    printf("=== sendmmsg vs sendmsg 性能对比测试 ===\n");
    
    // 初始化测试结构
    tests[0].name = "sendmsg";
    tests[0].messages_sent = 0;
    tests[0].bytes_sent = 0;
    
    tests[1].name = "sendmmsg";
    tests[1].messages_sent = 0;
    tests[1].bytes_sent = 0;
    
    // 启动服务器
    server_fd = perf_test_server(SERVER_PORT);
    if (server_fd == -1) {
        return -1;
    }
    
    // 创建客户端套接字
    client_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (client_fd == -1) {
        perror("创建客户端套接字失败");
        close(server_fd);
        return -1;
    }
    
    // 设置目标地址
    memset(&dest_addr, 0, sizeof(dest_addr));
    dest_addr.sin_family = AF_INET;
    dest_addr.sin_port = htons(SERVER_PORT);
    dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    
    // 启动服务器接收线程
    if (fork() == 0) {
        char buffer[512];
        int received_count = 0;
        
        printf("服务器开始接收测试消息...\n");
        
        // 设置接收超时
        struct timeval timeout;
        timeout.tv_sec = 30;
        timeout.tv_usec = 0;
        setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
        
        while (received_count < MESSAGE_COUNT * 2) {  // 两次测试
            ssize_t bytes = recv(server_fd, buffer, sizeof(buffer) - 1, 0);
            if (bytes > 0) {
                received_count++;
                if (received_count % 1000 == 0) {
                    printf("服务器已接收 %d 个消息\n", received_count);
                }
            } else if (bytes == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    printf("服务器接收超时\n");
                    break;
                }
            }
        }
        
        printf("服务器接收完成,总共接收 %d 个消息\n", received_count);
        close(server_fd);
        exit(0);
    }
    
    // 等待服务器启动
    sleep(1);
    
    // 测试1: 使用sendmsg
    printf("\n测试1: 使用传统sendmsg发送 %d 个消息...\n", MESSAGE_COUNT);
    gettimeofday(&tests[0].start_time, NULL);
    
    tests[0].messages_sent = test_sendmsg_performance(client_fd, &dest_addr, 
                                                     MESSAGE_COUNT, test_message);
    tests[0].bytes_sent = tests[0].messages_sent * strlen(test_message);
    
    gettimeofday(&tests[0].end_time, NULL);
    
    printf("sendmsg测试完成: 发送 %lu 个消息\n", tests[0].messages_sent);
    
    // 短暂休息
    sleep(1);
    
    // 测试2: 使用sendmmsg
    printf("\n测试2: 使用sendmmsg发送 %d 个消息...\n", MESSAGE_COUNT);
    gettimeofday(&tests[1].start_time, NULL);
    
    tests[1].messages_sent = test_sendmmsg_performance(client_fd, &dest_addr, 
                                                      MESSAGE_COUNT, test_message);
    tests[1].bytes_sent = tests[1].messages_sent * strlen(test_message);
    
    gettimeofday(&tests[1].end_time, NULL);
    
    printf("sendmmsg测试完成: 发送 %lu 个消息\n", tests[1].messages_sent);
    
    // 计算并显示结果
    printf("\n=== 性能测试结果 ===\n");
    
    for (int i = 0; i < 2; i++) {
        double elapsed_time = (tests[i].end_time.tv_sec - tests[i].start_time.tv_sec) +
                             (tests[i].end_time.tv_usec - tests[i].start_time.tv_usec) / 1000000.0;
        
        double messages_per_sec = tests[i].messages_sent / elapsed_time;
        double bytes_per_sec = tests[i].bytes_sent / elapsed_time;
        
        printf("%s 测试:\n", tests[i].name);
        printf("  发送消息数: %lu\n", tests[i].messages_sent);
        printf("  发送字节数: %lu\n", tests[i].bytes_sent);
        printf("  耗时: %.3f 秒\n", elapsed_time);
        printf("  吞吐量: %.0f 消息/秒 (%.2f MB/s)\n", 
               messages_per_sec, bytes_per_sec / (1024 * 1024));
        printf("  平均延迟: %.3f 微秒/消息\n", 
               (elapsed_time * 1000000) / tests[i].messages_sent);
        printf("\n");
    }
    
    // 计算性能提升
    double sendmsg_time = (tests[0].end_time.tv_sec - tests[0].start_time.tv_sec) +
                         (tests[0].end_time.tv_usec - tests[0].start_time.tv_usec) / 1000000.0;
    double sendmmsg_time = (tests[1].end_time.tv_sec - tests[1].start_time.tv_sec) +
                          (tests[1].end_time.tv_usec - tests[1].start_time.tv_usec) / 1000000.0;
    
    if (sendmsg_time > 0 && sendmmsg_time > 0) {
        double improvement = (sendmsg_time - sendmmsg_time) / sendmsg_time * 100;
        printf("性能提升: %.1f%%\n", improvement);
    }
    
    close(client_fd);
    
    // 等待服务器结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_performance_comparison();
}

sendmmsg 标志参数详解

常用标志:

  • MSG_CONFIRM: 提供路径确认反馈
  • MSG_DONTROUTE: 不使用网关路由
  • MSG_DONTWAIT: 非阻塞操作
  • MSG_EOR: 发送记录结束标记
  • MSG_MORE: 还有更多数据要发送
  • MSG_NOSIGNAL: 发送时不产生SIGPIPE信号
  • MSG_OOB: 发送带外数据

高级标志:

  • MSG_PROXY: SOCKS代理相关
  • MSG_TRYHARD: 尽力发送(已废弃)

使用注意事项

系统要求:

  1. 内核版本: 需要Linux 2.6.39或更高版本
  2. glibc版本: 需要支持sendmmsg的glibc版本
  3. 编译选项: 需要定义_GNU_SOURCE

性能优化:

  1. 批量大小: 根据应用需求选择合适的批量大小
  2. 缓冲区管理: 合理分配缓冲区避免内存浪费
  3. 错误处理: 妥善处理部分发送的情况

错误处理:

  1. 部分发送: 处理实际发送消息数少于请求的情况
  2. 连接状态: 检查连接是否正常
  3. 资源清理: 及时关闭套接字和释放内存

安全考虑:

  1. 缓冲区溢出: 确保消息缓冲区大小正确
  2. 输入验证: 验证发送的数据内容
  3. 权限检查: 确保有适当的网络访问权限

sendmmsg 优势

1. 性能优势:

  • 减少系统调用: 单次调用发送多个消息
  • 降低上下文切换: 减少用户态和内核态切换
  • 提高吞吐量: 特别适用于小消息高频发送场景

2. 功能优势:

  • 完整功能: 支持sendmsg的所有功能
  • 分散缓冲区: 每个消息可使用多个缓冲区
  • 控制信息: 支持发送辅助控制数据

3. 应用场景:

  • 高并发服务器: Web服务器、游戏服务器
  • 实时系统: 音视频流传输、传感器数据采集
  • 消息队列: 批量消息处理系统
  • 网络协议: 实现自定义网络协议

总结

sendmmsg 是构建高性能网络应用的重要工具,它提供了:

  • 批量消息发送能力,显著减少系统调用开销
  • 与sendmsg相同的完整功能集
  • 更好的性能和可扩展性
  • 适用于高吞吐量的实时应用

通过合理使用 sendmmsg,可以大幅提升网络应用的性能,特别是在需要发送大量小消息的场景中效果显著。在实际应用中,需要注意批量大小的选择、错误处理和系统兼容性等问题。

此条目发表在linux文章分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注