io_uring实现高效大文件多线程写入
多线程 + io_uring 实现高效大文件写入(64MB数据块,2GB文件分割)
以下是完整的代码实现,使用 两个线程:
- 生产者线程:生成 64MB 数据块,放入队列。
 - 消费者线程:从队列取出数据,通过 
io_uring异步写入文件,并在文件超过 2GB 时自动切分。 
1. 完整代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>
#include <sys/stat.h>
#include <stdatomic.h>
#define BLOCK_SIZE (64 * 1024 * 1024)  // 64MB 数据块
#define MAX_FILE_SIZE (2ULL * 1024 * 1024 * 1024)  // 2GB 文件分割阈值
#define QUEUE_SIZE 8  // 队列容量(防止内存爆炸)
// 数据块结构
typedef struct {
    char *data;
    size_t size;
} DataBlock;
// 线程安全队列
typedef struct {
    DataBlock blocks[QUEUE_SIZE];
    atomic_int head, tail;
    pthread_mutex_t mutex;
    pthread_cond_t not_empty, not_full;
} BlockQueue;
// 全局队列
BlockQueue block_queue;
atomic_int file_counter = 0;  // 文件计数器(用于切分)
atomic_ullong current_file_size = 0;  // 当前文件大小
// 初始化队列
void init_queue(BlockQueue *q) {
    q->head = q->tail = 0;
    pthread_mutex_init(&q->mutex, NULL);
    pthread_cond_init(&q->not_empty, NULL);
    pthread_cond_init(&q->not_full, NULL);
}
// 生产者:生成随机数据并放入队列
void *producer_thread(void *arg) {
    while (1) {
        DataBlock block;
        block.data = malloc(BLOCK_SIZE);
        if (!block.data) {
            perror("malloc");
            exit(EXIT_FAILURE);
        }
        block.size = BLOCK_SIZE;
        // 填充随机数据
        for (size_t i = 0; i < BLOCK_SIZE; i++) {
            block.data[i] = rand() % 256;
        }
        // 放入队列
        pthread_mutex_lock(&block_queue.mutex);
        while ((block_queue.tail + 1) % QUEUE_SIZE == block_queue.head) {
            pthread_cond_wait(&block_queue.not_full, &block_queue.mutex);
        }
        block_queue.blocks[block_queue.tail] = block;
        block_queue.tail = (block_queue.tail + 1) % QUEUE_SIZE;
        pthread_cond_signal(&block_queue.not_empty);
        pthread_mutex_unlock(&block_queue.mutex);
    }
    return NULL;
}
// 消费者:从队列取出数据,用 io_uring 写入文件
void *consumer_thread(void *arg) {
    struct io_uring ring;
    int fd = -1;
    char filename[256];
    // 初始化 io_uring
    if (io_uring_queue_init(8, &ring, 0) < 0) {
        perror("io_uring_queue_init");
        exit(EXIT_FAILURE);
    }
    while (1) {
        DataBlock block;
        // 从队列取出数据
        pthread_mutex_lock(&block_queue.mutex);
        while (block_queue.head == block_queue.tail) {
            pthread_cond_wait(&block_queue.not_empty, &block_queue.mutex);
        }
        block = block_queue.blocks[block_queue.head];
        block_queue.head = (block_queue.head + 1) % QUEUE_SIZE;
        pthread_cond_signal(&block_queue.not_full);
        pthread_mutex_unlock(&block_queue.mutex);
        // 检查是否需要切分文件
        if (fd == -1 || current_file_size + block.size > MAX_FILE_SIZE) {
            if (fd != -1) close(fd);
            snprintf(filename, sizeof(filename), "large_file_%d.bin", file_counter++);
            fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
            if (fd < 0) {
                perror("open");
                exit(EXIT_FAILURE);
            }
            current_file_size = 0;
            printf("Created new file: %s\n", filename);
        }
        // 提交异步写入请求
        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        io_uring_prep_write(sqe, fd, block.data, block.size, current_file_size);
        io_uring_sqe_set_data(sqe, block.data);  // 关联数据块(用于释放)
        io_uring_submit(&ring);
        // 等待写入完成
        struct io_uring_cqe *cqe;
        int ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            perror("io_uring_wait_cqe");
            exit(EXIT_FAILURE);
        }
        if (cqe->res < 0) {
            fprintf(stderr, "Write error: %s\n", strerror(-cqe->res));
            exit(EXIT_FAILURE);
        }
        // 更新文件大小并释放内存
        current_file_size += cqe->res;
        io_uring_cqe_seen(&ring, cqe);
        free(block.data);
    }
    io_uring_queue_exit(&ring);
    return NULL;
}
int main() {
    pthread_t producer, consumer;
    // 初始化队列
    init_queue(&block_queue);
    // 启动生产者线程
    if (pthread_create(&producer, NULL, producer_thread, NULL) != 0) {
        perror("pthread_create");
        exit(EXIT_FAILURE);
    }
    // 启动消费者线程
    if (pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) {
        perror("pthread_create");
        exit(EXIT_FAILURE);
    }
    // 等待线程结束(实际上不会结束)
    pthread_join(producer, NULL);
    pthread_join(consumer, NULL);
    return 0;
}
2. 关键设计
(1)线程分工
| 线程 | 任务 | 
|---|---|
| 生产者线程 | 生成 64MB 随机数据,放入队列 | 
| 消费者线程 | 从队列取出数据,用 io_uring 异步写入文件 | 
(2)线程安全队列
- 环形缓冲区(
BlockQueue)避免频繁malloc/free。 - 互斥锁(
pthread_mutex_t) + 条件变量(pthread_cond_t) 保证线程安全:not_empty:队列非空时唤醒消费者。not_full:队列未满时唤醒生产者。
 
(3)文件切分逻辑
- 
current_file_size 记录当前文件大小。 - 超过 2GB 时:
- 关闭当前文件。
 - 创建新文件(
large_file_0.bin,large_file_1.bin, …)。 
 
(4)io_uring 优化
- 批量化提交:可调整 
QUEUE_SIZE提高并发。 - 内存对齐:建议 
posix_memalign分配内存(减少内核拷贝)。 - 轮询模式:可启用 
IORING_SETUP_SQPOLL减少系统调用(需root)。 
3. 编译与运行
gcc -o io_uring_multi_thread io_uring_multi_thread.c -luring -lpthread
./io_uring_multi_thread
输出示例:
Created new file: large_file_0.bin
Created new file: large_file_1.bin
...
4. 验证结果
ls -lh large_file_*.bin  # 检查文件大小
md5sum large_file_*.bin  # 验证数据完整性
5. 性能优化建议
| 优化点 | 说明 | 
|---|---|
| 内存池 | 预分配多个 64MB 块,减少 malloc/free 开销 | 
| 批量提交 | 一次提交多个 io_uring 请求(提高吞吐) | 
| O_DIRECT | 直接 I/O 绕过页缓存(需内存对齐) | 
| 多消费者线程 | 多个消费者线程并行处理队列 | 
6. 总结
- 生产者-消费者模型:解耦数据生成和写入,提高并行度。
 - 
io_uring异步 I/O:最大化磁盘写入性能。 - 自动文件切分:避免单个文件过大(2GB 限制)。
 
适用于 日志系统、数据库、大数据存储 等场景。 🚀
