Linux I/O 系统调用完整对比分析

1. 概述

Linux 提供了丰富的 I/O 系统调用,每种都有其特定的用途和优势。本文将详细分析这些系统调用的特点、使用场景和性能特征。

2. 系统调用详细对比

2.1 基本读写函数

pread/pwrite

#include <unistd.h>

// 位置指定读取/写入
ssize_t pread(int fd, void *buf, size_t count, off_t offset);
ssize_t pwrite(int int fd, const void *buf, size_t count, off_t offset);

特点

  • 原子操作(读取/写入 + 位置指定)
  • 不改变文件描述符的当前位置
  • 线程安全

read/write

#include <unistd.h>

// 基本读取/写入
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

特点

  • 最基本的 I/O 操作
  • 会改变文件描述符的当前位置
  • 相对简单但功能有限

2.2 分散/聚集 I/O 函数

preadv/pwritev

#include <sys/uio.h>

// 位置指定的分散读取/聚集写入
ssize_t preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset);
ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt, off_t offset);

preadv2/pwritev2

#define _GNU_SOURCE
#include <sys/uio.h>

// 带标志的增强版分散/聚集 I/O
ssize_t preadv2(int fd, const struct iovec *iov, int iovcnt, 
                off_t offset, int flags);
ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, 
                 off_t offset, int flags);

2.3 资源限制函数

prlimit64

#include <sys/resource.h>

// 获取/设置进程资源限制
int prlimit64(pid_t pid, int resource, 
              const struct rlimit64 *new_limit, 
              struct rlimit64 *old_limit);

3. 功能对比表

函数位置指定分散/聚集标志控制原子性跨平台
read/write⚠️
pread/pwrite
readv/writev⚠️
preadv/pwritev
preadv2/pwritev2
prlimit64

4. 实际示例代码

4.1 基础读写对比

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <string.h>
#include <errno.h>
#include <time.h>

// 创建测试文件
void create_test_file(const char *filename) {
    int fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    if (fd == -1) {
        perror("创建文件失败");
        return;
    }
    
    const char *content = 
        "Line 1: This is the first line of test data.\n"
        "Line 2: This is the second line of test data.\n"
        "Line 3: This is the third line of test data.\n"
        "Line 4: This is the fourth line of test data.\n"
        "Line 5: This is the fifth and final line.\n";
    
    write(fd, content, strlen(content));
    close(fd);
    printf("创建测试文件: %s\n", filename);
}

// read/pread 性能对比
void compare_basic_io(const char *filename) {
    int fd = open(filename, O_RDONLY);
    if (fd == -1) {
        perror("打开文件失败");
        return;
    }
    
    char buffer[100];
    ssize_t bytes_read;
    struct timespec start, end;
    
    printf("\n=== 基础 I/O 对比 ===\n");
    
    // 使用 read (会改变文件位置)
    printf("1. read 测试:\n");
    clock_gettime(CLOCK_MONOTONIC, &start);
    bytes_read = read(fd, buffer, 50);
    clock_gettime(CLOCK_MONOTONIC, &end);
    buffer[bytes_read] = '\0';
    printf("   读取 %zd 字节: %.30s...\n", bytes_read, buffer);
    printf("   当前文件位置: %ld\n", (long)lseek(fd, 0, SEEK_CUR));
    
    // 使用 pread (不改变文件位置)
    printf("2. pread 测试:\n");
    clock_gettime(CLOCK_MONOTONIC, &start);
    bytes_read = pread(fd, buffer, 50, 0);  // 从开头读取
    clock_gettime(CLOCK_MONOTONIC, &end);
    buffer[bytes_read] = '\0';
    printf("   读取 %zd 字节: %.30s...\n", bytes_read, buffer);
    printf("   文件位置仍为: %ld\n", (long)lseek(fd, 0, SEEK_CUR));
    
    close(fd);
}

// 分散/聚集 I/O 示例
void demonstrate_vector_io(const char *filename) {
    int fd = open(filename, O_RDONLY);
    if (fd == -1) {
        perror("打开文件失败");
        return;
    }
    
    printf("\n=== 分散/聚集 I/O 示例 ===\n");
    
    // 设置分散读取缓冲区
    char buffer1[20], buffer2[30], buffer3[25];
    struct iovec iov[3];
    
    iov[0].iov_base = buffer1;
    iov[0].iov_len = sizeof(buffer1) - 1;
    
    iov[1].iov_base = buffer2;
    iov[1].iov_len = sizeof(buffer2) - 1;
    
    iov[2].iov_base = buffer3;
    iov[2].iov_len = sizeof(buffer3) - 1;
    
    printf("分散读取设置:\n");
    printf("  缓冲区1: %zu 字节\n", iov[0].iov_len);
    printf("  缓冲区2: %zu 字节\n", iov[1].iov_len);
    printf("  缓冲区3: %zu 字节\n", iov[2].iov_len);
    
    // 使用 preadv 一次性读取到多个缓冲区
    ssize_t total_bytes = preadv(fd, iov, 3, 0);  // 从文件开头开始读取
    printf("总共读取: %zd 字节\n", total_bytes);
    
    if (total_bytes > 0) {
        buffer1[iov[0].iov_len] = '\0';
        buffer2[iov[1].iov_len] = '\0';
        buffer3[iov[2].iov_len] = '\0';
        
        printf("读取结果:\n");
        printf("  缓冲区1: %s\n", buffer1);
        printf("  缓冲区2: %s\n", buffer2);
        printf("  缓冲区3: %s\n", buffer3);
    }
    
    close(fd);
}

// 资源限制示例
void demonstrate_resource_limits() {
    printf("\n=== 资源限制示例 ===\n");
    
    struct rlimit64 limit;
    
    // 获取当前进程的文件大小限制
    if (prlimit64(0, RLIMIT_FSIZE, NULL, &limit) == 0) {
        printf("文件大小限制:\n");
        printf("  软限制: %lld\n", (long long)limit.rlim_cur);
        printf("  硬限制: %lld\n", (long long)limit.rlim_max);
    }
    
    // 获取内存限制
    if (prlimit64(0, RLIMIT_AS, NULL, &limit) == 0) {
        printf("虚拟内存限制:\n");
        if (limit.rlim_cur == RLIM64_INFINITY) {
            printf("  软限制: 无限制\n");
        } else {
            printf("  软限制: %lld 字节 (%.2f MB)\n", 
                   (long long)limit.rlim_cur,
                   (double)limit.rlim_cur / (1024 * 1024));
        }
        if (limit.rlim_max == RLIM64_INFINITY) {
            printf("  硬限制: 无限制\n");
        } else {
            printf("  硬限制: %lld 字节 (%.2f MB)\n", 
                   (long long)limit.rlim_max,
                   (double)limit.rlim_max / (1024 * 1024));
        }
    }
    
    // 获取打开文件数限制
    if (prlimit64(0, RLIMIT_NOFILE, NULL, &limit) == 0) {
        printf("文件描述符限制:\n");
        printf("  软限制: %lld\n", (long long)limit.rlim_cur);
        printf("  硬限制: %lld\n", (long long)limit.rlim_max);
    }
}

int main() {
    const char *test_file = "io_test_file.txt";
    
    printf("=== Linux I/O 系统调用对比分析 ===\n");
    
    // 创建测试文件
    create_test_file(test_file);
    
    // 基础 I/O 对比
    compare_basic_io(test_file);
    
    // 分散/聚集 I/O 示例
    demonstrate_vector_io(test_file);
    
    // 资源限制示例
    demonstrate_resource_limits();
    
    // 清理
    unlink(test_file);
    
    printf("\n=== 使用建议 ===\n");
    printf("选择指南:\n");
    printf("1. 简单读写: 使用 read/write\n");
    printf("2. 需要指定位置: 使用 pread/pwrite\n");
    printf("3. 多缓冲区操作: 使用 preadv/pwritev\n");
    printf("4. 需要高级控制: 使用 preadv2/pwritev2\n");
    printf("5. 资源限制管理: 使用 prlimit64\n");
    
    return 0;
}

4.2 性能基准测试

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <time.h>
#include <string.h>

#define ITERATIONS 10000
#define BUFFER_SIZE 1024

// 性能测试结构体
struct performance_test {
    const char *name;
    double (*test_func)(int fd, const char *filename);
};

// read 性能测试
double test_read_performance(int fd, const char *filename) {
    char buffer[BUFFER_SIZE];
    struct timespec start, end;
    
    clock_gettime(CLOCK_MONOTONIC, &start);
    
    for (int i = 0; i < ITERATIONS; i++) {
        lseek(fd, 0, SEEK_SET);  // 重置到文件开头
        read(fd, buffer, sizeof(buffer));
    }
    
    clock_gettime(CLOCK_MONOTONIC, &end);
    
    return (end.tv_sec - start.tv_sec) * 1000000.0 + 
           (end.tv_nsec - start.tv_nsec) / 1000.0;  // 微秒
}

// pread 性能测试
double test_pread_performance(int fd, const char *filename) {
    char buffer[BUFFER_SIZE];
    struct timespec start, end;
    
    clock_gettime(CLOCK_MONOTONIC, &start);
    
    for (int i = 0; i < ITERATIONS; i++) {
        pread(fd, buffer, sizeof(buffer), 0);  // 始终从位置0读取
    }
    
    clock_gettime(CLOCK_MONOTONIC, &end);
    
    return (end.tv_sec - start.tv_sec) * 1000000.0 + 
           (end.tv_nsec - start.tv_nsec) / 1000.0;  // 微秒
}

// readv 性能测试
double test_readv_performance(int fd, const char *filename) {
    char buffer1[256], buffer2[256], buffer3[256], buffer4[256];
    struct iovec iov[4];
    struct timespec start, end;
    
    // 设置分散读取
    iov[0].iov_base = buffer1;
    iov[0].iov_len = sizeof(buffer1);
    iov[1].iov_base = buffer2;
    iov[1].iov_len = sizeof(buffer2);
    iov[2].iov_base = buffer3;
    iov[2].iov_len = sizeof(buffer3);
    iov[3].iov_base = buffer4;
    iov[3].iov_len = sizeof(buffer4);
    
    clock_gettime(CLOCK_MONOTONIC, &start);
    
    for (int i = 0; i < ITERATIONS; i++) {
        lseek(fd, 0, SEEK_SET);
        readv(fd, iov, 4);
    }
    
    clock_gettime(CLOCK_MONOTONIC, &end);
    
    return (end.tv_sec - start.tv_sec) * 1000000.0 + 
           (end.tv_nsec - start.tv_nsec) / 1000.0;  // 微秒
}

// preadv 性能测试
double test_preadv_performance(int fd, const char *filename) {
    char buffer1[256], buffer2[256], buffer3[256], buffer4[256];
    struct iovec iov[4];
    struct timespec start, end;
    
    // 设置分散读取
    iov[0].iov_base = buffer1;
    iov[0].iov_len = sizeof(buffer1);
    iov[1].iov_base = buffer2;
    iov[1].iov_len = sizeof(buffer2);
    iov[2].iov_base = buffer3;
    iov[2].iov_len = sizeof(buffer3);
    iov[3].iov_base = buffer4;
    iov[3].iov_len = sizeof(buffer4);
    
    clock_gettime(CLOCK_MONOTONIC, &start);
    
    for (int i = 0; i < ITERATIONS; i++) {
        preadv(fd, iov, 4, 0);  // 始终从位置0读取
    }
    
    clock_gettime(CLOCK_MONOTONIC, &end);
    
    return (end.tv_sec - start.tv_sec) * 1000000.0 + 
           (end.tv_nsec - start.tv_nsec) / 1000.0;  // 微秒
}

void run_performance_benchmark() {
    const char *test_file = "benchmark_test.dat";
    int fd;
    
    printf("=== I/O 性能基准测试 ===\n");
    
    // 创建大测试文件
    fd = open(test_file, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    if (fd != -1) {
        char *buffer = malloc(1024 * 1024);  // 1MB 缓冲区
        if (buffer) {
            for (int i = 0; i < 10; i++) {  // 创建 10MB 文件
                write(fd, buffer, 1024 * 1024);
            }
            free(buffer);
        }
        close(fd);
    }
    
    // 打开文件进行读取测试
    fd = open(test_file, O_RDONLY);
    if (fd == -1) {
        perror("打开测试文件失败");
        return;
    }
    
    struct performance_test tests[] = {
        {"read", test_read_performance},
        {"pread", test_pread_performance},
        {"readv", test_readv_performance},
        {"preadv", test_preadv_performance},
        {NULL, NULL}
    };
    
    printf("%-10s %-15s %-15s\n", "函数", "耗时(微秒)", "平均耗时(纳秒)");
    printf("%-10s %-15s %-15s\n", "----", "----------", "--------------");
    
    for (int i = 0; tests[i].name; i++) {
        double total_time = tests[i].test_func(fd, test_file);
        double avg_time = total_time * 1000.0 / ITERATIONS;
        
        printf("%-10s %-15.2f %-15.2f\n", 
               tests[i].name, total_time, avg_time);
    }
    
    close(fd);
    unlink(test_file);
    
    printf("\n性能分析:\n");
    printf("1. pread 比 read 略慢 (位置指定开销)\n");
    printf("2. readv/preadv 在多缓冲区场景下更高效\n");
    printf("3. preadv2/pwritev2 提供更多控制选项\n");
    printf("4. 选择应基于具体使用场景\n");
}

int main() {
    printf("=== Linux I/O 系统调用完整对比分析 ===\n\n");
    
    // 运行性能基准测试
    run_performance_benchmark();
    
    printf("\n=== 详细功能对比 ===\n");
    printf("pread vs read:\n");
    printf("  • pread: 指定位置读取,不改变文件位置\n");
    printf("  • read: 顺序读取,会改变文件位置\n");
    printf("\n");
    
    printf("preadv vs pread:\n");
    printf("  • preadv: 分散读取到多个缓冲区\n");
    printf("  • pread: 读取到单个缓冲区\n");
    printf("\n");
    
    printf("preadv2 vs preadv:\n");
    printf("  • preadv2: 支持额外标志控制\n");
    printf("  • preadv: 基本的分散读取功能\n");
    printf("\n");
    
    printf("prlimit64:\n");
    printf("  • 用于获取和设置进程资源限制\n");
    printf("  • 支持 64 位资源限制值\n");
    printf("  • 可以操作其他进程的资源限制\n");
    
    printf("\n=== 实际应用建议 ===\n");
    printf("1. 日志文件读写: 使用 pread/pwrite\n");
    printf("2. 数据库存储引擎: 使用 preadv/pwritev\n");
    printf("3. 高性能网络服务: 使用 preadv2/pwritev2\n");
    printf("4. 系统资源管理: 使用 prlimit64\n");
    printf("5. 简单文件操作: 使用 read/write\n");
    
    return 0;
}

4.3 实际应用场景演示

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <string.h>
#include <errno.h>
#include <sys/resource.h>

// 日志文件读取示例
void log_file_reader_example() {
    printf("=== 日志文件读取场景 ===\n");
    
    // 创建模拟日志文件
    const char *log_file = "application.log";
    int fd = open(log_file, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    
    if (fd != -1) {
        const char *log_entries[] = {
            "2023-01-01 10:00:00 INFO Application started\n",
            "2023-01-01 10:00:01 DEBUG Loading configuration\n",
            "2023-01-01 10:00:02 WARN Low memory warning\n",
            "2023-01-01 10:00:03 ERROR Database connection failed\n",
            "2023-01-01 10:00:04 INFO Recovery attempt started\n"
        };
        
        for (int i = 0; i < 5; i++) {
            write(fd, log_entries[i], strlen(log_entries[i]));
        }
        close(fd);
    }
    
    // 使用 pread 读取特定时间段的日志
    fd = open(log_file, O_RDONLY);
    if (fd != -1) {
        char buffer[256];
        
        printf("读取最后一条日志记录:\n");
        // 从文件末尾附近读取
        ssize_t bytes_read = pread(fd, buffer, sizeof(buffer) - 1, 150);
        if (bytes_read > 0) {
            buffer[bytes_read] = '\0';
            printf("  %s", buffer);
        }
        
        close(fd);
    }
    
    unlink(log_file);
}

// 数据库页读取示例
void database_page_reader_example() {
    printf("\n=== 数据库页读取场景 ===\n");
    
    const char *db_file = "database_pages.dat";
    int fd = open(db_file, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    
    if (fd != -1) {
        // 创建模拟的数据库页
        char page_data[4096];
        for (int page = 0; page < 10; page++) {
            snprintf(page_data, sizeof(page_data), 
                    "Page %d: Database page content with ID=%d and timestamp=%ld\n",
                    page, page * 1000, time(NULL));
            write(fd, page_data, strlen(page_data));
        }
        close(fd);
    }
    
    // 使用 preadv 读取多个数据库页
    fd = open(db_file, O_RDONLY);
    if (fd != -1) {
        char page1[1024], page2[1024], page3[1024];
        struct iovec iov[3];
        
        // 设置分散读取
        iov[0].iov_base = page1;
        iov[0].iov_len = sizeof(page1) - 1;
        
        iov[1].iov_base = page2;
        iov[1].iov_len = sizeof(page2) - 1;
        
        iov[2].iov_base = page3;
        iov[2].iov_len = sizeof(page3) - 1;
        
        printf("使用 preadv 读取多个数据库页:\n");
        ssize_t total_bytes = preadv(fd, iov, 3, 0);  // 从开头读取
        printf("  总共读取: %zd 字节\n", total_bytes);
        
        if (total_bytes > 0) {
            page1[iov[0].iov_len] = '\0';
            page2[iov[1].iov_len] = '\0';
            page3[iov[2].iov_len] = '\0';
            
            printf("  页1: %.50s...\n", page1);
            printf("  页2: %.50s...\n", page2);
            printf("  页3: %.50s...\n", page3);
        }
        
        close(fd);
    }
    
    unlink(db_file);
}

// 网络数据包处理示例
void network_packet_processor_example() {
    printf("\n=== 网络数据包处理场景 ===\n");
    
    // 模拟网络数据包结构
    struct packet_header {
        uint32_t magic;
        uint16_t version;
        uint16_t type;
        uint32_t length;
        uint32_t checksum;
    } __attribute__((packed));
    
    struct packet_payload {
        char data[1024];
    };
    
    // 创建测试数据包文件
    const char *packet_file = "network_packets.dat";
    int fd = open(packet_file, O_CREAT | O_WRONLY | O_TRUNC, 0644);
    
    if (fd != -1) {
        // 写入多个数据包
        for (int i = 0; i < 3; i++) {
            struct packet_header header = {
                .magic = 0x12345678,
                .version = 1,
                .type = i,
                .length = 100,
                .checksum = 0xABCDEF00 + i
            };
            
            char payload[1024];
            snprintf(payload, sizeof(payload), 
                    "Packet %d payload data with timestamp %ld", 
                    i, time(NULL));
            
            write(fd, &header, sizeof(header));
            write(fd, payload, strlen(payload) + 1);
        }
        close(fd);
    }
    
    // 使用 preadv2 读取数据包(如果支持)
    fd = open(packet_file, O_RDONLY);
    if (fd != -1) {
        struct packet_header headers[3];
        char payloads[3][256];
        struct iovec iov[6];  // 3个头部 + 3个载荷
        
        // 设置分散读取结构
        for (int i = 0; i < 3; i++) {
            iov[i*2].iov_base = &headers[i];
            iov[i*2].iov_len = sizeof(struct packet_header);
            
            iov[i*2+1].iov_base = payloads[i];
            iov[i*2+1].iov_len = sizeof(payloads[i]) - 1;
        }
        
        printf("使用分散读取处理网络数据包:\n");
        ssize_t bytes_read = preadv(fd, iov, 6, 0);
        printf("  读取字节数: %zd\n", bytes_read);
        
        if (bytes_read > 0) {
            for (int i = 0; i < 3; i++) {
                printf("  数据包 %d:\n", i);
                printf("    魔数: 0x%08X\n", headers[i].magic);
                printf("    版本: %d\n", headers[i].version);
                printf("    类型: %d\n", headers[i].type);
                printf("    长度: %d\n", headers[i].length);
                printf("    校验: 0x%08X\n", headers[i].checksum);
                payloads[i][iov[i*2+1].iov_len] = '\0';
                printf("    载荷: %.50s...\n", payloads[i]);
                printf("\n");
            }
        }
        
        close(fd);
    }
    
    unlink(packet_file);
}

// 资源限制管理示例
void resource_limit_management_example() {
    printf("\n=== 资源限制管理场景 ===\n");
    
    struct rlimit64 old_limit, new_limit;
    
    // 获取当前文件大小限制
    if (prlimit64(0, RLIMIT_FSIZE, NULL, &old_limit) == 0) {
        printf("当前文件大小限制:\n");
        if (old_limit.rlim_cur == RLIM64_INFINITY) {
            printf("  软限制: 无限制\n");
        } else {
            printf("  软限制: %lld 字节 (%.2f GB)\n", 
                   (long long)old_limit.rlim_cur,
                   (double)old_limit.rlim_cur / (1024 * 1024 * 1024));
        }
    }
    
    // 获取打开文件数限制
    if (prlimit64(0, RLIMIT_NOFILE, NULL, &old_limit) == 0) {
        printf("当前文件描述符限制:\n");
        printf("  软限制: %lld\n", (long long)old_limit.rlim_cur);
        printf("  硬限制: %lld\n", (long long)old_limit.rlim_max);
    }
    
    // 获取内存限制
    if (prlimit64(0, RLIMIT_AS, NULL, &old_limit) == 0) {
        printf("当前虚拟内存限制:\n");
        if (old_limit.rlim_cur == RLIM64_INFINITY) {
            printf("  软限制: 无限制\n");
        } else {
            printf("  软限制: %lld 字节 (%.2f GB)\n", 
                   (long long)old_limit.rlim_cur,
                   (double)old_limit.rlim_cur / (1024 * 1024 * 1024));
        }
    }
    
    printf("\n资源限制管理最佳实践:\n");
    printf("1. 合理设置文件大小限制防止磁盘填满\n");
    printf("2. 适当增加文件描述符限制支持高并发\n");
    printf("3. 监控内存使用防止内存泄漏\n");
    printf("4. 使用 prlimit64 动态调整资源限制\n");
}

int main() {
    printf("=== Linux I/O 系统调用应用场景演示 ===\n\n");
    
    // 日志文件读取场景
    log_file_reader_example();
    
    // 数据库页读取场景
    database_page_reader_example();
    
    // 网络数据包处理场景
    network_packet_processor_example();
    
    // 资源限制管理场景
    resource_limit_management_example();
    
    printf("\n=== 总结 ===\n");
    printf("I/O 系统调用选择指南:\n");
    printf("\n");
    printf("┌─────────────┬────────────────────────────────────┐\n");
    printf("│ 场景        │ 推荐函数                              │\n");
    printf("├─────────────┼────────────────────────────────────┤\n");
    printf("│ 简单读写    │ read/write                            │\n");
    printf("│ 位置指定    │ pread/pwrite                          │\n");
    printf("│ 多缓冲区    │ readv/writev                          │\n");
    printf("│ 位置+多缓冲 │ preadv/pwritev                        │\n");
    printf("│ 高级控制    │ preadv2/pwritev2                      │\n");
    printf("│ 资源限制    │ prlimit64                             │\n");
    printf("└─────────────┴────────────────────────────────────┘\n");
    printf("\n");
    printf("性能优化建议:\n");
    printf("1. 批量操作减少系统调用次数\n");
    printf("2. 合理选择缓冲区大小\n");
    printf("3. 使用位置指定避免文件位置移动\n");
    printf("4. 分散/聚集 I/O 减少内存拷贝\n");
    printf("5. 合理设置资源限制防止系统过载\n");
    
    return 0;
}

5. 编译和运行说明

# 编译示例程序
gcc -o io_comparison_example1 example1.c
gcc -o io_comparison_example2 example2.c
gcc -o io_comparison_example3 example3.c

# 运行示例
./io_comparison_example1
./io_comparison_example2
./io_comparison_example3

6. 系统要求检查

# 检查内核版本
uname -r

# 检查 glibc 版本
ldd --version

# 检查系统调用支持
grep -E "(pread|pwrite|prlimit)" /usr/include/asm/unistd_64.h

# 查看文件系统性能
hdparm -Tt /dev/sda  # 硬盘性能测试

7. 重要注意事项

  1. 原子性: pread/pwrite 操作是原子的
  2. 位置独立: 不改变文件描述符的当前位置
  3. 错误处理: 始终检查返回值和 errno
  4. 内存对齐: 在某些架构上有对齐要求
  5. 权限检查: 确保有足够的权限进行操作
  6. 资源清理: 及时关闭文件描述符

8. 最佳实践总结

// 安全的 I/O 操作封装
ssize_t safe_pread(int fd, void *buf, size_t count, off_t offset) {
    if (fd < 0 || !buf || count == 0) {
        errno = EINVAL;
        return -1;
    }
    
    ssize_t result;
    do {
        result = pread(fd, buf, count, offset);
    } while (result == -1 && errno == EINTR);
    
    return result;
}

// 安全的分散读取封装
ssize_t safe_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset) {
    if (fd < 0 || !iov || iovcnt <= 0 || iovcnt > IOV_MAX) {
        errno = EINVAL;
        return -1;
    }
    
    ssize_t result;
    do {
        result = preadv(fd, iov, iovcnt, offset);
    } while (result == -1 && errno == EINTR);
    
    return result;
}

// 资源限制检查
int check_resource_limits() {
    struct rlimit64 limit;
    
    // 检查文件大小限制
    if (prlimit64(0, RLIMIT_FSIZE, NULL, &limit) == 0) {
        if (limit.rlim_cur != RLIM64_INFINITY && limit.rlim_cur < 1024 * 1024) {
            printf("警告: 文件大小限制过小 (%lld 字节)\n", 
                   (long long)limit.rlim_cur);
        }
    }
    
    // 检查文件描述符限制
    if (prlimit64(0, RLIMIT_NOFILE, NULL, &limit) == 0) {
        if (limit.rlim_cur < 1024) {
            printf("警告: 文件描述符限制过小 (%lld)\n", 
                   (long long)limit.rlim_cur);
        }
    }
    
    return 0;
}

这些示例全面展示了 Linux I/O 系统调用的功能特点、使用方法和实际应用场景,帮助开发者根据具体需求选择合适的 I/O 操作方式。

发表在 linux文章 | 留下评论

Linux I/O 多路复用机制对比分析:poll/ppoll/epoll/select

1. 概述

I/O 多路复用是现代高性能网络编程的核心技术,它允许单个线程同时监视多个文件描述符的状态变化,从而实现高效的并发处理。Linux 提供了多种 I/O 多路复用机制,每种都有其特点和适用场景。

本文将深入分析四种主要的 I/O 多路复用机制:selectpollppoll 和 epoll,并通过实际示例展示它们的使用方法和性能差异。

2. 四种机制对比分析

2.1 基本特性对比

特性selectpollppollepoll
引入时间早期UnixSVR3 (1986)Linux 2.6.16Linux 2.5.44
文件描述符限制FD_SETSIZE (通常1024)无理论限制无理论限制无理论限制
数据结构fd_set位图pollfd数组pollfd数组epoll_event数组
文件描述符拷贝每次调用都拷贝每次调用都拷贝每次调用都拷贝注册一次,多次使用
事件复杂度O(n)O(n)O(n)O(1)
跨平台性良好良好Linux特有Linux特有
信号处理基本支持基本支持增强支持基本支持

2.2 详细特性分析

select

  • 优点: 跨平台性最好,几乎所有Unix-like系统都支持
  • 缺点: 文件描述符数量受限,每次调用都需要拷贝fd_set

poll

  • 优点: 无文件描述符数量限制,API设计更清晰
  • 缺点: 每次调用都需要遍历所有文件描述符

ppoll

  • 优点: 提供了更好的信号处理机制,避免竞态条件
  • 缺点: Linux特有,需要较新内核支持

epoll

  • 优点: 性能最优,事件驱动,支持边缘触发和水平触发
  • 缺点: Linux特有,学习成本较高

3. 实际示例代码

3.1 select 示例

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>
#include <errno.h>
#include <string.h>

#define MAX_CLIENTS 1024
#define BUFFER_SIZE 1024

int main_select() {
    fd_set read_fds, master_fds;
    int max_fd = 0;
    int client_sockets[MAX_CLIENTS] = {0};
    char buffer[BUFFER_SIZE];
    
    printf("=== select 示例 ===\n");
    
    // 初始化文件描述符集合
    FD_ZERO(&master_fds);
    FD_SET(STDIN_FILENO, &master_fds);
    max_fd = STDIN_FILENO;
    
    while (1) {
        read_fds = master_fds;
        struct timeval timeout = {1, 0};  // 1秒超时
        
        int activity = select(max_fd + 1, &read_fds, NULL, NULL, &timeout);
        
        if (activity < 0) {
            if (errno == EINTR) continue;
            perror("select error");
            break;
        }
        
        if (activity == 0) {
            printf("select timeout\n");
            continue;
        }
        
        // 检查标准输入
        if (FD_ISSET(STDIN_FILENO, &read_fds)) {
            ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
            if (bytes_read > 0) {
                buffer[bytes_read] = '\0';
                printf("stdin: %s", buffer);
                
                if (strncmp(buffer, "quit", 4) == 0) {
                    break;
                }
            }
        }
    }
    
    return 0;
}

3.2 poll 示例

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <poll.h>
#include <errno.h>
#include <string.h>

#define MAX_FDS 10
#define TIMEOUT_MS 5000  // 5秒超时

int main_poll() {
    struct pollfd fds[MAX_FDS];
    int nfds = 1;
    char buffer[1024];
    
    printf("=== poll 示例 ===\n");
    
    // 初始化 pollfd 结构
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    
    printf("监视标准输入,输入 'quit' 退出\n");
    
    while (1) {
        int ready = poll(fds, nfds, TIMEOUT_MS);
        
        if (ready == -1) {
            if (errno == EINTR) continue;
            perror("poll error");
            break;
        }
        
        if (ready == 0) {
            printf("poll timeout\n");
            continue;
        }
        
        // 处理就绪的文件描述符
        for (int i = 0; i < nfds; i++) {
            if (fds[i].revents & POLLIN) {
                if (fds[i].fd == STDIN_FILENO) {
                    ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
                    if (bytes_read > 0) {
                        buffer[bytes_read] = '\0';
                        printf("received: %s", buffer);
                        
                        if (strncmp(buffer, "quit", 4) == 0) {
                            return 0;
                        }
                    }
                }
            }
            
            if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
                printf("fd %d error\n", fds[i].fd);
                return 1;
            }
        }
    }
    
    return 0;
}

3.3 ppoll 示例

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <poll.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>

volatile sig_atomic_t signal_received = 0;

void signal_handler(int sig) {
    signal_received = sig;
    printf("\nSignal %d received\n", sig);
}

int main_ppoll() {
    struct pollfd fds[2];
    struct timespec timeout;
    sigset_t sigmask;
    char buffer[1024];
    
    printf("=== ppoll 示例 ===\n");
    
    // 设置信号处理
    struct sigaction sa;
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGTERM, &sa, NULL);
    
    // 初始化 pollfd
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    
    // 设置超时时间
    timeout.tv_sec = 3;
    timeout.tv_nsec = 0;
    
    // 设置信号屏蔽集
    sigemptyset(&sigmask);
    
    printf("Monitoring stdin with ppoll...\n");
    printf("Press Ctrl+C to send signal\n");
    printf("Type 'quit' to exit\n");
    
    while (!signal_received) {
        int ready = ppoll(fds, 1, &timeout, &sigmask);
        
        if (ready == -1) {
            if (errno == EINTR) {
                printf("ppoll interrupted by signal\n");
                if (signal_received) {
                    printf("Signal handling complete\n");
                }
                continue;
            } else {
                perror("ppoll error");
                break;
            }
        }
        
        if (ready == 0) {
            printf("ppoll timeout\n");
            continue;
        }
        
        if (fds[0].revents & POLLIN) {
            ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
            if (bytes_read > 0) {
                buffer[bytes_read] = '\0';
                printf("Input: %s", buffer);
                
                if (strncmp(buffer, "quit", 4) == 0) {
                    break;
                }
            }
        }
        
        fds[0].revents = 0;  // 重置事件
    }
    
    printf("Program exiting normally\n");
    return 0;
}

3.4 epoll 示例

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>

#define MAX_EVENTS 10
#define BUFFER_SIZE 1024

int make_socket_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        return -1;
    }
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main_epoll() {
    int epoll_fd;
    struct epoll_event ev, events[MAX_EVENTS];
    char buffer[BUFFER_SIZE];
    
    printf("=== epoll 示例 ===\n");
    
    // 创建 epoll 实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        return 1;
    }
    
    // 设置标准输入为非阻塞
    if (make_socket_nonblocking(STDIN_FILENO) == -1) {
        perror("fcntl");
        close(epoll_fd);
        return 1;
    }
    
    // 添加标准输入到 epoll
    ev.events = EPOLLIN | EPOLLET;  // 边缘触发模式
    ev.data.fd = STDIN_FILENO;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &ev) == -1) {
        perror("epoll_ctl: stdin");
        close(epoll_fd);
        return 1;
    }
    
    printf("epoll monitoring stdin (edge-triggered mode)\n");
    printf("Type 'quit' to exit\n");
    
    while (1) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 3000);  // 3秒超时
        
        if (nfds == -1) {
            if (errno == EINTR) continue;
            perror("epoll_wait");
            break;
        }
        
        if (nfds == 0) {
            printf("epoll timeout\n");
            continue;
        }
        
        for (int n = 0; n < nfds; n++) {
            if (events[n].events & EPOLLIN) {
                if (events[n].data.fd == STDIN_FILENO) {
                    ssize_t bytes_read;
                    while ((bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1)) > 0) {
                        buffer[bytes_read] = '\0';
                        printf("epoll input: %s", buffer);
                        
                        if (strncmp(buffer, "quit", 4) == 0) {
                            close(epoll_fd);
                            return 0;
                        }
                    }
                    
                    if (bytes_read == -1) {
                        if (errno != EAGAIN && errno != EWOULDBLOCK) {
                            perror("read");
                        }
                    }
                }
            }
            
            if (events[n].events & (EPOLLERR | EPOLLHUP)) {
                printf("epoll error on fd %d\n", events[n].data.fd);
                close(epoll_fd);
                return 1;
            }
        }
    }
    
    close(epoll_fd);
    return 0;
}

4. 性能测试对比

4.1 基准测试代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <poll.h>
#include <sys/select.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <string.h>

#define TEST_ITERATIONS 10000
#define TEST_FDS 100

// 性能测试结构体
struct performance_test {
    const char *name;
    long long (*test_func)(int fd_count);
};

// select 性能测试
long long test_select_performance(int fd_count) {
    fd_set read_fds;
    struct timeval timeout = {0, 1000};  // 1ms 超时
    struct timeval start, end;
    
    gettimeofday(&start, NULL);
    
    for (int i = 0; i < TEST_ITERATIONS; i++) {
        FD_ZERO(&read_fds);
        FD_SET(STDIN_FILENO, &read_fds);
        
        select(STDIN_FILENO + 1, &read_fds, NULL, NULL, &timeout);
    }
    
    gettimeofday(&end, NULL);
    
    return (end.tv_sec - start.tv_sec) * 1000000LL + (end.tv_usec - start.tv_usec);
}

// poll 性能测试
long long test_poll_performance(int fd_count) {
    struct pollfd pfd;
    struct timeval start, end;
    
    pfd.fd = STDIN_FILENO;
    pfd.events = POLLIN;
    pfd.revents = 0;
    
    gettimeofday(&start, NULL);
    
    for (int i = 0; i < TEST_ITERATIONS; i++) {
        poll(&pfd, 1, 1);  // 1ms 超时
        pfd.revents = 0;
    }
    
    gettimeofday(&end, NULL);
    
    return (end.tv_sec - start.tv_sec) * 1000000LL + (end.tv_usec - start.tv_usec);
}

// 显示性能测试结果
void show_performance_results() {
    struct performance_test tests[] = {
        {"select", test_select_performance},
        {"poll", test_poll_performance},
        {NULL, NULL}
    };
    
    printf("=== 性能测试结果 (10000 次调用) ===\n");
    printf("%-10s %-15s %-15s\n", "机制", "耗时(微秒)", "平均耗时(纳秒)");
    printf("%-10s %-15s %-15s\n", "----", "----------", "--------------");
    
    for (int i = 0; tests[i].name; i++) {
        long long total_time = tests[i].test_func(TEST_FDS);
        double avg_time = (double)total_time * 1000.0 / TEST_ITERATIONS;
        
        printf("%-10s %-15lld %-15.2f\n", 
               tests[i].name, total_time, avg_time);
    }
    
    printf("\n性能特点:\n");
    printf("1. select: 有文件描述符数量限制,每次调用需要拷贝fd_set\n");
    printf("2. poll:   无文件描述符数量限制,但仍需遍历所有描述符\n");
    printf("3. epoll:  事件驱动,只处理活跃的描述符,性能最优\n");
    printf("4. ppoll:  提供更好的信号处理机制,避免竞态条件\n");
}

int main_performance_comparison() {
    printf("=== I/O 多路复用性能对比测试 ===\n\n");
    
    show_performance_results();
    
    printf("\n=== 实际应用建议 ===\n");
    printf("选择建议:\n");
    printf("1. 跨平台应用: 使用 select\n");
    printf("2. 中等并发(<1000): 使用 poll\n");
    printf("3. 高并发(>1000): 使用 epoll\n");
    printf("4. 需要信号处理: 使用 ppoll\n");
    printf("5. Linux 专用: 使用 epoll\n");
    
    return 0;
}

5. 实际应用场景分析

5.1 网络服务器场景

// 简单的 HTTP 服务器示例,展示不同机制的使用
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <poll.h>
#include <sys/epoll.h>

#define PORT 8080
#define MAX_CLIENTS 1000
#define BUFFER_SIZE 4096

// 创建监听套接字
int create_server_socket(int port) {
    int server_fd;
    struct sockaddr_in address;
    int opt = 1;
    
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        return -1;
    }
    
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
        perror("setsockopt");
        close(server_fd);
        return -1;
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(port);
    
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        close(server_fd);
        return -1;
    }
    
    if (listen(server_fd, 3) < 0) {
        perror("listen");
        close(server_fd);
        return -1;
    }
    
    return server_fd;
}

// 处理 HTTP 请求
void handle_http_request(int client_fd) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read = read(client_fd, buffer, sizeof(buffer) - 1);
    
    if (bytes_read > 0) {
        buffer[bytes_read] = '\0';
        
        // 简单的 HTTP 响应
        const char *response = 
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/html\r\n"
            "Connection: close\r\n"
            "\r\n"
            "<html><body><h1>Hello from I/O Multiplexing Server!</h1></body></html>\r\n";
        
        write(client_fd, response, strlen(response));
    }
    
    close(client_fd);
}

// 使用 poll 的 HTTP 服务器
int http_server_poll(int port) {
    int server_fd, client_fd;
    struct sockaddr_in address;
    int addrlen = sizeof(address);
    struct pollfd *fds;
    int max_fds = MAX_CLIENTS + 1;
    int nfds = 1;
    
    printf("Starting HTTP server with poll on port %d\n", port);
    
    server_fd = create_server_socket(port);
    if (server_fd == -1) return -1;
    
    fds = calloc(max_fds, sizeof(struct pollfd));
    if (!fds) {
        perror("calloc");
        close(server_fd);
        return -1;
    }
    
    // 添加监听套接字
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    
    while (1) {
        int ready = poll(fds, nfds, 1000);  // 1秒超时
        
        if (ready == -1) {
            if (errno == EINTR) continue;
            perror("poll");
            break;
        }
        
        if (ready == 0) continue;  // 超时
        
        // 检查监听套接字
        if (fds[0].revents & POLLIN) {
            client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
            if (client_fd >= 0) {
                if (nfds < max_fds) {
                    fds[nfds].fd = client_fd;
                    fds[nfds].events = POLLIN;
                    fds[nfds].revents = 0;
                    nfds++;
                    printf("New connection from %s:%d\n", 
                           inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                } else {
                    printf("Too many connections, rejecting\n");
                    close(client_fd);
                }
            }
        }
        
        // 检查客户端连接
        for (int i = 1; i < nfds; i++) {
            if (fds[i].revents & POLLIN) {
                handle_http_request(fds[i].fd);
                // 移除已处理的连接
                for (int j = i; j < nfds - 1; j++) {
                    fds[j] = fds[j + 1];
                }
                nfds--;
                i--;  // 重新检查当前位置
            }
        }
        
        // 重置 revents
        for (int i = 0; i < nfds; i++) {
            fds[i].revents = 0;
        }
    }
    
    free(fds);
    close(server_fd);
    return 0;
}

// 使用 epoll 的 HTTP 服务器
int http_server_epoll(int port) {
    int server_fd, client_fd, epoll_fd;
    struct sockaddr_in address;
    int addrlen = sizeof(address);
    struct epoll_event ev, events[MAX_CLIENTS];
    int nfds;
    
    printf("Starting HTTP server with epoll on port %d\n", port);
    
    server_fd = create_server_socket(port);
    if (server_fd == -1) return -1;
    
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        close(server_fd);
        return -1;
    }
    
    // 添加监听套接字到 epoll
    ev.events = EPOLLIN;
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
        perror("epoll_ctl: listen");
        close(server_fd);
        close(epoll_fd);
        return -1;
    }
    
    while (1) {
        nfds = epoll_wait(epoll_fd, events, MAX_CLIENTS, 1000);  // 1秒超时
        
        if (nfds == -1) {
            if (errno == EINTR) continue;
            perror("epoll_wait");
            break;
        }
        
        if (nfds == 0) continue;  // 超时
        
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == server_fd) {
                // 新连接
                client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
                if (client_fd >= 0) {
                    ev.events = EPOLLIN | EPOLLET;
                    ev.data.fd = client_fd;
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
                        perror("epoll_ctl: client");
                        close(client_fd);
                    } else {
                        printf("New connection from %s:%d\n", 
                               inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                    }
                }
            } else {
                // 客户端数据
                handle_http_request(events[i].data.fd);
                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
            }
        }
    }
    
    close(epoll_fd);
    close(server_fd);
    return 0;
}

6. 最佳实践和使用建议

6.1 选择指南

// 根据应用场景选择合适的 I/O 多路复用机制

/*
选择决策树:

1. 是否需要跨平台支持?
   - 是 -> 选择 select
   - 否 -> 继续下一步

2. 操作系统是什么?
   - Windows -> 选择 select 或 IOCP
   - Linux -> 继续下一步

3. 并发连接数是多少?
   - < 100 -> select 或 poll 都可以
   - 100-1000 -> poll
   - > 1000 -> epoll

4. 是否需要特殊的信号处理?
   - 是 -> ppoll
   - 否 -> 继续下一步

5. 性能要求如何?
   - 高性能 -> epoll
   - 一般 -> poll
*/

// 配置结构体
struct io_multiplexing_config {
    enum {
        IO_SELECT,
        IO_POLL,
        IO_PPOLL,
        IO_EPOLL
    } method;
    
    int max_connections;
    int timeout_ms;
    int edge_triggered;  // 仅对 epoll 有效
    int signal_safe;    // 是否需要信号安全
};

// 根据配置推荐机制
const char* recommend_io_method(const struct io_multiplexing_config *config) {
    if (config->method != 0) {
        // 明确指定了方法
        switch (config->method) {
            case IO_SELECT: return "select";
            case IO_POLL: return "poll";
            case IO_PPOLL: return "ppoll";
            case IO_EPOLL: return "epoll";
        }
    }
    
    // 根据配置自动推荐
    if (config->signal_safe) {
        return "ppoll";
    }
    
    if (config->max_connections > 1000) {
        return "epoll";
    }
    
    if (config->max_connections > 100) {
        return "poll";
    }
    
    return "select";
}

// 显示推荐结果
void show_recommendation(const struct io_multiplexing_config *config) {
    printf("=== I/O 多路复用机制推荐 ===\n");
    printf("配置参数:\n");
    printf("  最大连接数: %d\n", config->max_connections);
    printf("  超时时间: %d ms\n", config->timeout_ms);
    printf("  边缘触发: %s\n", config->edge_triggered ? "是" : "否");
    printf("  信号安全: %s\n", config->signal_safe ? "是" : "否");
    printf("\n");
    printf("推荐机制: %s\n", recommend_io_method(config));
    printf("\n");
}

6.2 错误处理最佳实践

// 安全的 I/O 多路复用封装
typedef struct {
    int fd;
    void *data;
    int (*read_handler)(int fd, void *data);
    int (*write_handler)(int fd, void *data);
    int (*error_handler)(int fd, void *data);
} io_handler_t;

// 通用的错误处理函数
int handle_io_errors(int fd, int revents, const char *context) {
    if (revents & (POLLERR | POLLHUP | POLLNVAL)) {
        fprintf(stderr, "Error on fd %d in %s: ", fd, context);
        
        if (revents & POLLERR) {
            fprintf(stderr, "POLLERR ");
        }
        if (revents & POLLHUP) {
            fprintf(stderr, "POLLHUP ");
        }
        if (revents & POLLNVAL) {
            fprintf(stderr, "POLLNVAL ");
        }
        fprintf(stderr, "\n");
        
        return -1;
    }
    return 0;
}

// 安全的 poll 封装
int safe_poll(struct pollfd *fds, nfds_t nfds, int timeout_ms) {
    if (!fds || nfds == 0) {
        errno = EINVAL;
        return -1;
    }
    
    int result;
    do {
        result = poll(fds, nfds, timeout_ms);
    } while (result == -1 && errno == EINTR);
    
    return result;
}

// 安全的 ppoll 封装
int safe_ppoll(struct pollfd *fds, nfds_t nfds,
               const struct timespec *timeout_ts,
               const sigset_t *sigmask) {
    if (!fds || nfds == 0) {
        errno = EINVAL;
        return -1;
    }
    
    int result;
    do {
        result = ppoll(fds, nfds, timeout_ts, sigmask);
    } while (result == -1 && errno == EINTR);
    
    return result;
}

// 安全的 epoll 封装
int safe_epoll_wait(int epfd, struct epoll_event *events,
                    int maxevents, int timeout) {
    if (!events || maxevents <= 0) {
        errno = EINVAL;
        return -1;
    }
    
    int result;
    do {
        result = epoll_wait(epfd, events, maxevents, timeout);
    } while (result == -1 && errno == EINTR);
    
    return result;
}

7. 完整的综合示例

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <poll.h>
#include <sys/select.h>
#include <sys/epoll.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>

// 综合测试结构体
struct comprehensive_test {
    const char *name;
    void (*test_func)(void);
    int supported;
};

// 综合性能测试
void comprehensive_performance_test() {
    printf("=== 综合性能测试 ===\n");
    
    // 测试不同文件描述符数量下的性能
    int test_sizes[] = {10, 50, 100, 500, 1000};
    int num_tests = sizeof(test_sizes) / sizeof(test_sizes[0]);
    
    printf("%-10s %-10s %-15s %-15s %-15s\n", 
           "机制", "FD数量", "select(μs)", "poll(μs)", "epoll(μs)");
    printf("%-10s %-10s %-15s %-15s %-15s\n", 
           "----", "------", "----------", "---------", "----------");
    
    for (int i = 0; i < num_tests; i++) {
        int fd_count = test_sizes[i];
        // 这里简化处理,实际应该进行真实的时间测量
        printf("%-10s %-10d %-15d %-15d %-15d\n",
               "测试", fd_count,
               fd_count * 2,    // select 模拟时间
               fd_count * 1.5,  // poll 模拟时间
               fd_count * 0.5); // epoll 模拟时间
    }
}

// 实际使用场景演示
void practical_usage_scenarios() {
    printf("\n=== 实际使用场景 ===\n");
    
    printf("1. Web 服务器:\n");
    printf("   - 高并发: 推荐使用 epoll\n");
    printf("   - 中等并发: 可以使用 poll\n");
    printf("   - 简单场景: select 也足够\n");
    
    printf("\n2. 数据库连接池:\n");
    printf("   - 连接数较少: poll 或 select\n");
    printf("   - 连接数较多: epoll\n");
    printf("   - 需要信号处理: ppoll\n");
    
    printf("\n3. 实时通信应用:\n");
    printf("   - 聊天服务器: epoll (支持边缘触发)\n");
    printf("   - 游戏服务器: epoll (高性能)\n");
    printf("   - 需要精确信号处理: ppoll\n");
    
    printf("\n4. 系统监控工具:\n");
    printf("   - 文件监控: poll (简单可靠)\n");
    printf("   - 网络监控: epoll (高性能)\n");
    printf("   - 需要信号处理: ppoll\n");
}

// 移植性考虑
void portability_considerations() {
    printf("\n=== 移植性考虑 ===\n");
    
    printf("跨平台支持:\n");
    printf("1. select: 几乎所有 Unix-like 系统都支持\n");
    printf("2. poll: POSIX 标准,广泛支持\n");
    printf("3. ppoll: Linux 特有 (2.6.16+)\n");
    printf("4. epoll: Linux 特有 (2.5.44+)\n");
    
    printf("\n条件编译示例:\n");
    printf("#ifdef __linux__\n");
    printf("    // 使用 epoll\n");
    printf("#elif defined(__FreeBSD__) || defined(__APPLE__)\n");
    printf("    // 使用 kqueue\n");
    printf("#else\n");
    printf("    // 使用 poll 或 select\n");
    printf("#endif\n");
}

int main() {
    printf("=== Linux I/O 多路复用机制综合分析 ===\n\n");
    
    // 性能测试
    comprehensive_performance_test();
    
    // 实际使用场景
    practical_usage_scenarios();
    
    // 移植性考虑
    portability_considerations();
    
    printf("\n=== 总结 ===\n");
    printf("1. select: 简单可靠,跨平台性好,但有 FD 数量限制\n");
    printf("2. poll:   无 FD 限制,API 清晰,适合中等并发\n");
    printf("3. ppoll:  增强的信号处理,避免竞态条件,Linux 特有\n");
    printf("4. epoll:  性能最优,事件驱动,Linux 特有\n");
    printf("\n");
    printf("选择建议:\n");
    printf("- 新项目且只运行在 Linux: 首选 epoll\n");
    printf("- 需要跨平台支持: 使用 poll 或 select\n");
    printf("- 需要特殊信号处理: 考虑 ppoll\n");
    printf("- 简单应用场景: select 也足够\n");
    
    return 0;
}

8. 编译和运行说明

# 编译所有示例
gcc -o select_example example1.c
gcc -o poll_example example2.c
gcc -o ppoll_example example3.c -D_GNU_SOURCE
gcc -o epoll_example example4.c
gcc -o performance_test performance_test.c
gcc -o comprehensive_analysis comprehensive.c

# 运行示例
./select_example
./poll_example
./ppoll_example
./epoll_example
./performance_test
./comprehensive_analysis

# 测试不同场景
echo "Testing select with 100 FDs..."
./select_example 100

echo "Testing epoll with high concurrency..."
./epoll_example 1000

echo "Running comprehensive analysis..."
./comprehensive_analysis

9. 系统要求检查

# 检查内核版本
uname -r

# 检查 glibc 版本
ldd --version

# 检查 epoll 支持
grep -w epoll /usr/include/linux/eventpoll.h

# 检查 ppoll 支持
grep -w ppoll /usr/include/asm/unistd_64.h

# 查看系统调用限制
ulimit -n  # 文件描述符限制
cat /proc/sys/fs/file-max  # 系统最大文件描述符

10. 最佳实践总结

// 1. 错误处理模板
int robust_io_multiplexing() {
    struct pollfd *fds = NULL;
    int max_fds = 1024;
    int nfds = 0;
    
    // 分配内存
    fds = malloc(max_fds * sizeof(struct pollfd));
    if (!fds) {
        return -1;
    }
    
    // 初始化
    memset(fds, 0, max_fds * sizeof(struct pollfd));
    
    // 主循环
    while (1) {
        int ready;
        
        // 使用安全的 poll 调用
        do {
            ready = poll(fds, nfds, 1000);  // 1秒超时
        } while (ready == -1 && errno == EINTR);
        
        if (ready == -1) {
            if (errno != EINTR) {
                perror("poll error");
                break;
            }
            continue;
        }
        
        if (ready == 0) {
            // 超时处理
            continue;
        }
        
        // 处理事件
        for (int i = 0; i < nfds; i++) {
            if (fds[i].revents != 0) {
                // 检查错误
                if (handle_io_errors(fds[i].fd, fds[i].revents, "main loop") == -1) {
                    // 处理错误连接
                    continue;
                }
                
                // 处理正常事件
                if (fds[i].revents & POLLIN) {
                    // 处理可读事件
                }
                
                if (fds[i].revents & POLLOUT) {
                    // 处理可写事件
                }
            }
        }
    }
    
    // 清理资源
    if (fds) {
        free(fds);
    }
    
    return 0;
}

// 2. 资源管理模板
typedef struct {
    int epoll_fd;
    int *client_fds;
    int client_count;
    int max_clients;
} server_context_t;

int init_server_context(server_context_t *ctx, int max_clients) {
    ctx->epoll_fd = -1;
    ctx->client_fds = NULL;
    ctx->client_count = 0;
    ctx->max_clients = max_clients;
    
    // 创建 epoll
    ctx->epoll_fd = epoll_create1(0);
    if (ctx->epoll_fd == -1) {
        return -1;
    }
    
    // 分配客户端数组
    ctx->client_fds = malloc(max_clients * sizeof(int));
    if (!ctx->client_fds) {
        close(ctx->epoll_fd);
        ctx->epoll_fd = -1;
        return -1;
    }
    
    return 0;
}

void cleanup_server_context(server_context_t *ctx) {
    if (ctx->epoll_fd != -1) {
        close(ctx->epoll_fd);
        ctx->epoll_fd = -1;
    }
    
    if (ctx->client_fds) {
        free(ctx->client_fds);
        ctx->client_fds = NULL;
    }
    
    ctx->client_count = 0;
}

通过以上详细的对比分析和示例代码,我们可以清楚地看到各种 I/O 多路复用机制的特点和适用场景。选择合适的机制对于构建高性能的网络应用程序至关重要。

发表在 linux文章 | 留下评论

JUNIPER_ATM1 /JUNIPER_ATM2 DeepSeek回答

Based on the search results, JUNIPER_ATM1 and JUNIPER_ATM2 are specialized data link types (DLT) used in network packet capture and analysis, specifically designed for Juniper Networks devices. Here is a detailed breakdown:

🔍 1. Technical Definition & Purpose

• Data Link Types (DLT): These are identifiers in packet capture tools (e.g., tcpdump or Wireshark) that define how ATM (Asynchronous Transfer Mode) frames are encapsulated on Juniper hardware.

• DLT_JUNIPER_ATM1 and DLT_JUNIPER_ATM2 correspond to different ATM encapsulation formats used by Juniper routers/switches .

• Function: They enable tools to correctly parse ATM traffic captured from Juniper interfaces, ensuring accurate analysis of VPI/VCI (Virtual Path/Channel Identifier) fields, cell headers, and payload data.

⚙️ 2. Technical Differences

• Encapsulation Scope:

• JUNIPER_ATM1: Typically handles single ATM virtual circuits (e.g., point-to-point links) with simplified headers.

• JUNIPER_ATM2: Supports multiple ATM virtual circuits (e.g., multiplexed traffic) and may include additional metadata for complex routing scenarios .

• Use Cases:

• ATM1 is common for basic ATM interfaces (e.g., legacy broadband access).

• ATM2 appears in carrier-grade Juniper devices (e.g., M-series/T-series routers) for backbone traffic aggregation .

🧩 3. Implementation in Software

• In tcpdump and similar tools, these DLTs are mapped to dedicated parsing functions:

• juniper_atm1_if_print for DLT_JUNIPER_ATM1

• juniper_atm2_if_print for DLT_JUNIPER_ATM2 .

• This ensures raw ATM data from Juniper devices is decoded into human-readable formats (e.g., displaying VLAN tags, MPLS labels over ATM).

🌐 4. Role in Juniper’s Network Ecosystem

• Juniper routers use ATM for legacy WAN connectivity, backhaul networks, and multiservice aggregation (voice/data/video).

• These DLTs are critical for:

• Monitoring ATM-based ISP backbone networks.

• Troubleshooting QoS in MPLS-over-ATM deployments .

• Example hardware: M-series routers (e.g., M320, M120) with ATM interface cards .

⚠️ 5. Relevance Today

• ATM technology is largely legacy but persists in:

• Older telecom infrastructures (e.g., DSLAM backhauls).

• Juniper-centric networks where gradual migration to IP/MPLS is ongoing .

• Tools like tcpdump retain support for these DLTs to maintain compatibility with existing Juniper deployments.

💎 Summary

Aspect JUNIPER_ATM1 JUNIPER_ATM2
Function Parsing single-circuit ATM frames Parsing multi-circuit/multiplexed ATM
Use Cases Basic ATM links, legacy access networks Carrier backbones, complex traffic aggregation
Juniper Hardware Entry/Mid-range routers (e.g., J-series) High-end routers (e.g., M/T-series)
Software Support juniper_atm1_if_print in tcpdump juniper_atm2_if_print in tcpdump

For network engineers, understanding these DLTs is essential when analyzing Juniper-specific ATM traffic in diagnostics or migration projects.

发表在 linux文章 | 留下评论

静态链接 YARA 库,实现免依赖的独立可执行文件部署

如何静态链接 YARA 库,实现免依赖的独立可执行文件部署
适用于嵌入式设备、安全产品发布、或避免目标系统安装 YARA 共享库的场景。


✅ 目标

  • 编译 YARA 为 静态库(.a
  • 使用静态库编译你的 C 程序
  • 生成 完全静态链接的可执行文件(不依赖 libyara.so
  • 实现“拷贝即运行”的免依赖部署

🧰 一、环境准备(以 Linux 为例)

推荐在干净的构建环境中操作(如 Ubuntu 20.04/22.04):

sudo apt-get update
sudo apt-get install build-essential autoconf automake libtool pkg-config

🔧 二、从源码编译 YARA(静态库模式)

1. 下载 YARA 源码

git clone https://github.com/VirusTotal/yara.git
cd yara
git checkout v4.3.2  # 推荐稳定版本(或最新 v4.x)

✅ 注意:静态编译需关闭动态库生成,开启静态库。


2. 配置并编译(仅静态库)

./bootstrap.sh          # 第一次需要生成 configure 脚本
./configure \
    --enable-static \
    --disable-shared \
    --disable-magic \
    --without-crypto \
    --prefix=/usr/local

参数说明:

参数说明
--enable-static生成 .a 静态库
--disable-shared禁止生成 .so 动态库
--disable-magic禁用 libmagic(避免额外依赖)
--without-crypto禁用 OpenSSL(hash.md5 等模块)
--prefix安装路径

⚠️ 如果你需要 hash 模块(如 hash.md5),需安装 OpenSSL 并启用:

sudo apt-get install libssl-dev
./configure ... --with-crypto

3. 编译并安装

make -j$(nproc)
sudo make install

安装后你会看到:

  • 静态库:/usr/local/lib/libyara.a
  • 头文件:/usr/local/include/yara.h/usr/local/include/yara/*

📦 三、编写测试程序(复用之前的示例)

保存为 yara_static.c

#include <stdio.h>
#include <yara.h>

int callback(YR_SCAN_CONTEXT* context, int msg, void* data, void* user_data)
{
    if (msg == CALLBACK_MSG_RULE_MATCHING)
    {
        printf("✅ 匹配规则: %s\n", ((YR_RULE*)data)->identifier);
    }
    return CALLBACK_CONTINUE;
}

int main()
{
    YR_COMPILER* compiler;
    YR_RULES* rules;
    FILE* fh;

    yr_initialize();

    yr_compiler_create(&compiler);
    fh = fopen("test.yar", "r");
    if (!fh) { perror("规则文件"); return 1; }

    yr_compiler_add_file(compiler, fh, NULL, NULL);

    if (compiler->errors > 0) {
        printf("规则编译失败\n");
        return 1;
    }

    yr_compiler_get_rules(compiler, &rules);
    fclose(fh);

    // 扫描自身(文本匹配)
    yr_rules_scan_file(rules, "yara_static.c", 0, callback, NULL, 0);

    yr_rules_destroy(rules);
    yr_compiler_destroy(compiler);
    yr_finalize();

    return 0;
}

🔗 四、静态编译你的程序

gcc -o yara_static yara_static.c \
    -I/usr/local/include \
    /usr/local/lib/libyara.a \
    -lpthread -lm -lz

关键点说明:

为什么需要
libyara.aYARA 静态库(主逻辑)
-lpthreadYARA 使用线程
-lm数学函数(某些模块使用)
-lzzlib,用于处理压缩或内存操作

✅ 此时生成的 yara_static 是 完全静态链接 的可执行文件(不依赖任何外部 .so)。


🔍 五、验证是否静态链接成功

# 检查是否依赖动态库
ldd yara_static

✅ 正确输出应为:

not a dynamic executable

或:

statically linked

❌ 如果显示 libyara.so,说明仍动态链接了,需检查编译命令。


🧪 六、测试运行

创建规则文件 test.yar

rule FoundCFile
{
    strings:
        $main = "main()" ascii
    condition:
        $main
}

运行:

./yara_static

输出:

✅ 匹配规则: FoundCFile

🚀 七、跨平台静态编译(可选:生成 Windows 版)

你可以使用 交叉编译 生成 Windows 静态可执行文件(.exe):

安装交叉编译器(Ubuntu)

sudo apt-get install gcc-mingw-w64

重新编译 YARA(Windows 静态库)

# 在 yara 源码目录
make clean

CC=x86_64-w64-mingw32-gcc \
CFLAGS="-static" \
./configure \
    --host=x86_64-w64-mingw32 \
    --enable-static \
    --disable-shared \
    --disable-magic \
    --without-crypto \
    --prefix=/tmp/yara-win64

make && make install

编译 Windows 可执行文件

x86_64-w64-mingw32-gcc -o yara.exe yara_static.c \
    -I/tmp/yara-win64/include \
    /tmp/yara-win64/lib/libyara.a \
    -lpthread -lm -lz -static

生成的 yara.exe 可在 Windows 上直接运行,无需安装任何依赖!


📦 八、部署建议

场景建议
安全网关/EDR静态链接,嵌入扫描引擎
嵌入式设备使用 musl-gcc 编译更小体积
分析工具发布打包为单文件工具(如 yara-scan.exe
容器内运行无需安装 libyara,减少镜像体积

📏 九、生成文件大小优化(可选)

静态链接后体积较大(~2-5MB),可通过以下方式减小:

# 1. 编译时优化
gcc -Os -s -DNDEBUG ...  # 启用优化、去符号、关闭调试

# 2. 去除符号表
strip --strip-all yara_static

# 3. 使用 upx 压缩(可选)
upx --best --compress-exports=0 yara_static

可将 3MB → 1MB 左右。


✅ 总结:静态链接 YARA 的完整流程

# 1. 编译 YARA 静态库
./bootstrap.sh
./configure --enable-static --disable-shared --without-crypto
make && sudo make install

# 2. 编译你的程序(静态链接)
gcc -o myscanner myscan.c /usr/local/lib/libyara.a -lpthread -lm -lz -static

# 3. 验证
ldd myscanner  # 应显示 not a dynamic executable

# 4. 部署到任意 Linux 主机,无需安装 YARA

📎 附件:一键构建脚本(build_static_yara.sh)

#!/bin/bash
git clone https://github.com/VirusTotal/yara.git
cd yara
git checkout v4.3.2

./bootstrap.sh
./configure --enable-static --disable-shared --disable-magic --without-crypto --prefix=/usr/local
make -j$(nproc)
sudo make install

echo "✅ YARA 静态库已安装"
echo "现在你可以使用 /usr/local/lib/libyara.a 进行静态编译"
发表在 linux文章 | 留下评论

YARA 的 C 语言完整集成与开发方法

以下是 YARA 的 C 语言完整集成与开发方法,包括:

  • YARA C API 的使用
  • 规则编译、文件扫描、回调处理
  • 完整的 C 示例代码
  • 编译与链接方法
  • 高级用法(自定义外部变量、模块支持等)

🧰 一、前置条件

1. 安装 YARA 开发库

Ubuntu/Debian

sudo apt-get install libyara-dev yara

CentOS/RHEL

sudo yum install yara-devel
# 或使用 dnf(新版本)
sudo dnf install yara-devel

macOS

brew install yara

确保 yara 命令可用,并且头文件(yara.h)和库文件(libyara.so / libyara.a)已安装。


📦 二、YARA C API 核心概念

YARA C API 主要包含以下组件:

组件说明
YR_COMPILER用于编译 YARA 规则
YR_RULES编译后的规则集合
yr_rules_scan_*(...)扫描文件/内存/文件描述符
YR_CALLBACK_FUNC匹配回调函数(接收匹配结果)

🧪 三、完整 C 示例代码:扫描文件并输出匹配结果

文件:yara_scan.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <yara.h>

// 回调函数:处理匹配结果
int callback_function(YR_SCAN_CONTEXT* context, int message, void* message_data, void* user_data)
{
    YR_RULE* rule;
    YR_STRING* string;
    YR_MATCH* match;

    switch (message)
    {
        case CALLBACK_MSG_RULE_MATCHING:
            rule = (YR_RULE*) message_data;
            printf("[+] 匹配规则: %s", rule->identifier);

            if (rule->tags[0] != NULL)
            {
                printf(" (标签: ");
                for (int i = 0; rule->tags[i] != NULL; i++)
                {
                    if (i > 0) printf(", ");
                    printf("%s", rule->tags[i]);
                }
                printf(")");
            }
            printf("\n");

            // 打印匹配的字符串
            yr_rule_strings_foreach(rule, string)
            {
                yr_string_matches_foreach(context, string, match)
                {
                    printf("    ├─ 字符串: %s = \"%.*s\" @ 0x%llx\n",
                           string->identifier,
                           match->data_length,
                           match->data,
                           match->base + match->offset);
                }
            }
            break;

        case CALLBACK_MSG_RULE_NOT_MATCHING:
            // 可选:打印未匹配的规则
            // rule = (YR_RULE*) message_data;
            // printf("[-] 未匹配: %s\n", rule->identifier);
            break;

        case CALLBACK_MSG_SCAN_FINISHED:
            // 扫描完成
            break;

        default:
            break;
    }

    return CALLBACK_CONTINUE;  // 继续扫描
}

int main(int argc, char** argv)
{
    if (argc != 3)
    {
        fprintf(stderr, "用法: %s <规则文件.yar> <目标文件>\n", argv[0]);
        return 1;
    }

    const char* rules_file = argv[1];
    const char* target_file = argv[2];

    YR_COMPILER* compiler = NULL;
    YR_RULES* rules = NULL;
    FILE* rule_fh = NULL;
    int result;

    // 初始化 YARA
    yr_initialize();

    // 创建编译器
    result = yr_compiler_create(&compiler);
    if (result != ERROR_SUCCESS)
    {
        fprintf(stderr, "无法创建编译器: %d\n", result);
        goto cleanup;
    }

    // 打开规则文件
    rule_fh = fopen(rules_file, "r");
    if (!rule_fh)
    {
        perror("无法打开规则文件");
        result = -1;
        goto cleanup;
    }

    // 编译规则(从文件)
    yr_compiler_add_file(compiler, rule_fh, NULL, rules_file);

    // 检查编译错误
    if (compiler->errors > 0)
    {
        fprintf(stderr, "规则编译失败!\n");
        goto cleanup;
    }

    // 获取编译后的规则
    result = yr_compiler_get_rules(compiler, &rules);
    if (result != ERROR_SUCCESS)
    {
        fprintf(stderr, "无法获取规则: %d\n", result);
        goto cleanup;
    }

    // 扫描目标文件
    printf("🔍 开始扫描文件: %s\n", target_file);
    result = yr_rules_scan_file(rules, target_file, 0, callback_function, NULL, 0);

    if (result != ERROR_SUCCESS)
    {
        fprintf(stderr, "扫描失败: %d\n", result);
    }

cleanup:
    // 清理资源
    if (rule_fh) fclose(rule_fh);
    if (rules) yr_rules_destroy(rules);
    if (compiler) yr_compiler_destroy(compiler);
    yr_finalize();

    return result == ERROR_SUCCESS ? 0 : 1;
}

🔧 四、编写测试 YARA 规则文件

文件:test.yar

rule HelloWorldRule
{
    tags: test demo

    strings:
        $greeting = "Hello, World!" ascii
        $number = { 48 65 6C 6C 6F }  // "Hello" in hex

    condition:
        all of them
}

🛠️ 五、编译 C 程序

方法 1:使用 gcc 直接编译

gcc -o yara_scan yara_scan.c -lyara

如果提示找不到 -lyara,请确认 libyara.so 在 /usr/lib 或 /usr/local/lib

方法 2:指定路径(如自定义安装)

gcc -o yara_scan yara_scan.c \
    -I/usr/local/include \
    -L/usr/local/lib \
    -lyara

▶️ 六、运行测试

# 创建测试文件
echo "Hello, World!" > test.txt

# 编译并运行
gcc -o yara_scan yara_scan.c -lyara
./yara_scan test.yar test.txt

预期输出:

🔍 开始扫描文件: test.txt
[+] 匹配规则: HelloWorldRule (标签: test, demo)
    ├─ 字符串: $greeting = "Hello, World!" @ 0x0
    ├─ 字符串: $number = "Hello" @ 0x0

⚙️ 七、高级功能集成

1. 使用外部变量(External Variables)

修改规则:

rule HasKeyword
{
    condition:
        contains(text, keyword)
}

在 C 中设置外部变量:

YR_EXTERNAL_VARIABLE ext_vars[] = {
    { .identifier = "keyword", .type = YE_STRING, .value = {.ss = "malware"} },
    { .identifier = "text",    .type = YE_STRING, .value = {.ss = "this is a malware sample"} },
    { .identifier = NULL }  // 结束标记
};

// 扫描时传入
yr_rules_scan_mem(..., callback, NULL, 0, ext_vars);

2. 扫描内存缓冲区

char buffer[] = "Hello, World!";
size_t buffer_len = strlen(buffer);

yr_rules_scan_mem(rules, buffer, buffer_len, 0, callback_function, NULL, 0);

3. 启用内置模块(如 hashpeelf

// 启用 PE 模块
yr_rules_load(rules, "compiled_rules.bin");  // 可选:保存编译后的规则
yr_initialize();

// 启用模块(必须在扫描前)
yr_enable_features(YARA_FEATURE_EXTERNAL_VARIABLES | YARA_FEATURE_MODULES);

// 或使用宏控制
#define YR_ENABLE_CRYPTO

注意:使用 pehash 等模块需在编译 YARA 时启用对应功能。


📁 八、保存/加载编译后的规则(提高性能)

// 保存编译后的规则到文件
yr_rules_save(rules, "compiled.yarc");

// 加载已编译规则
YR_RULES* loaded_rules;
yr_rules_load("compiled.yarc", &loaded_rules);

// 直接扫描
yr_rules_scan_file(loaded_rules, target_file, 0, callback, NULL, 0);

// 销毁
yr_rules_destroy(loaded_rules);

适用于规则不变、频繁扫描的场景(如安全网关、EDR)。


🧩 九、常见错误与解决

错误原因解决
undefined reference to 'yr_initialize'未链接 -lyara添加 -lyara
cannot open shared object libyara.so动态库未找到sudo ldconfig 或设置 LD_LIBRARY_PATH
编译规则失败语法错误使用 yara test.yar /bin/ls 测试规则
内存扫描崩溃缓冲区为空或未初始化检查指针和长度

📚 十、官方文档与资源

  • YARA C API 文档:https://yara.readthedocs.io/en/stable/capi.html
  • 头文件参考:/usr/include/yara.h
  • 示例代码:https://github.com/VirusTotal/yara/tree/master/tests
  • YARA 构建说明:https://github.com/VirusTotal/yara#building-from-sources

✅ 总结

通过 C 语言集成 YARA,你可以:

  • 将 YARA 深度嵌入到安全产品中(如杀毒引擎、EDR、防火墙)
  • 实现高性能、低延迟的实时扫描
  • 与自定义解析器、沙箱、驱动联动
  • 支持跨平台(Linux、Windows、macOS)
发表在 linux文章 | 留下评论

Flask + YARA-Python实现文件扫描功能

以下是一个 完整的 Web API 示例,使用 Flask + YARA-Python 实现文件扫描功能,支持上传文件并返回 YARA 规则匹配结果。


✅ 功能说明

  • 提供一个 /scan 接口,支持文件上传
  • 使用预加载的 YARA 规则进行扫描
  • 返回 JSON 格式的匹配结果
  • 支持多规则、可扩展

📦 项目结构

yara-flask-api/
├── app.py                  # Flask 主程序
├── rules/                  # YARA 规则目录
│   ├── hello.yar
│   └── suspicious_pe.yar
├── uploads/                # 临时存储上传文件(可选)
└── requirements.txt

1. 安装依赖

创建 requirements.txt

flask
yara-python

安装:

pip install -r requirements.txt

确保系统已安装 YARA 开发库:

  • Ubuntu: sudo apt-get install yara libyara-dev
  • macOS: brew install yara

2. 编写 YARA 规则

rules/hello.yar

rule ContainsHello
{
    strings:
        $hello = "Hello" ascii nocase
    condition:
        $hello
}

rules/suspicious_pe.yar

import "pe"

rule SuspiciousPEScan
{
    meta:
        description = "Detects common suspicious PE imports"

    strings:
        $create_remote_thread = "CreateRemoteThread" fullword ascii
        $write_process_memory = "WriteProcessMemory" fullword ascii

    condition:
        pe.is_pe and
        any of them
}

3. Flask Web API 主程序 (app.py)

import os
import yara
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename

# 初始化 Flask 应用
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024  # 10MB 限制

# 确保目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs('rules', exist_ok=True)

# 编译所有 .yar 规则
def load_yara_rules():
    try:
        rule_files = {}
        for filename in os.listdir('rules'):
            if filename.endswith('.yar'):
                filepath = os.path.join('rules', filename)
                rule_files[f"rule_{filename}"] = filepath
        rules = yara.compile(filepaths=rule_files)
        print(f"[+] 成功加载 {len(rule_files)} 条 YARA 规则")
        return rules
    except yara.Error as e:
        print(f"[-] YARA 规则编译失败: {e}")
        return None

# 全局加载规则
yara_rules = load_yara_rules()

if not yara_rules:
    print("[-] 无法启动:YARA 规则加载失败")
    exit(1)

# 根路径
@app.route('/')
def index():
    return '''
    <h3>YARA 扫描 API 服务</h3>
    <p>使用 POST /scan 上传文件进行扫描</p>
    '''

# 扫描接口
@app.route('/scan', methods=['POST'])
def scan_file():
    if 'file' not in request.files:
        return jsonify({"error": "未提供文件字段 'file'"}), 400

    file = request.files['file']
    if file.filename == '':
        return jsonify({"error": "未选择文件"}), 400

    if file:
        filename = secure_filename(file.filename)
        filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        file.save(filepath)

        try:
            # 执行 YARA 扫描
            matches = yara_rules.match(filepath)

            result = {
                "filename": filename,
                "matches": []
            }

            for match in matches:
                indicators = []
                for string in match.strings:
                    indicators.append({
                        "offset": f"0x{string[0]:X}",
                        "identifier": string[1],
                        "data": string[2].decode('utf-8', errors='replace')
                    })
                result["matches"].append({
                    "rule": match.rule,
                    "tags": match.tags,
                    "indicators": indicators
                })

            os.remove(filepath)  # 扫描后删除文件(可选)
            return jsonify(result), 200

        except Exception as e:
            os.remove(filepath)
            return jsonify({"error": f"扫描出错: {str(e)}"}), 500

    return jsonify({"error": "未知错误"}), 500


# 启动服务
if __name__ == '__main__':
    print("🚀 启动 YARA 扫描服务 http://127.0.0.1:5000")
    app.run(host='0.0.0.0', port=5000, debug=False)

4. 启动服务

python app.py

服务将运行在:http://127.0.0.1:5000


5. 测试 API(使用 curl)

测试文本文件

echo "Hello, this is a test." > test.txt
curl -X POST -F "file=@test.txt" http://127.0.0.1:5000/scan

✅ 预期输出(匹配 ContainsHello):

{
  "filename": "test.txt",
  "matches": [
    {
      "rule": "ContainsHello",
      "tags": [],
      "indicators": [
        {
          "offset": "0x0",
          "identifier": "$hello",
          "data": "Hello"
        }
      ]
    }
  ]
}

测试 PE 文件(如 exe)

curl -X POST -F "file=@malware.exe" http://127.0.0.1:5000/scan

如果该 PE 文件调用了 CreateRemoteThread,会触发 SuspiciousPEScan 规则。

总结

这个 Flask + YARA 的 Web API 示例可以:

  • 快速集成到 SOC、EDR、文件网关等系统
  • 用于自动化恶意软件检测流水线
  • 作为威胁情报分析的后端引擎
发表在 linux文章 | 留下评论

识别和分类恶意软件样本的工具YARA

YARA 是一个用于识别和分类恶意软件样本的工具,广泛应用于恶意软件分析、威胁情报、入侵检测等领域。它通过编写规则(YARA Rules)来匹配文件中的特定字符串、十六进制模式、正则表达式等特征。


一、YARA 的基本使用方法

1. 安装 YARA

Linux(Ubuntu/Debian)

sudo apt-get install yara

macOS

brew install yara

Python 安装(推荐用于集成)

pip install yara-python

注意:yara-python 是 YARA 的 Python 绑定,允许你在 Python 脚本中使用 YARA。


2. 编写 YARA 规则(.yar 文件)

创建一个简单的 YARA 规则文件,例如 example.yar

rule HelloWorld
{
    meta:
        author = "YourName"
        description = "Detects the string 'Hello, World!'"

    strings:
        $hello = "Hello, World!" ascii

    condition:
        $hello
}

3. 使用命令行运行 YARA

yara example.yar target_file.txt

如果 target_file.txt 中包含 Hello, World!,则会输出:

HelloWorld target_file.txt

二、YARA 集成到 Python 脚本(示例 Demo)

示例:使用 yara-python 扫描文件

import yara

# 编译规则
rules = yara.compile(filepath='example.yar')

# 扫描目标文件
matches = rules.match('target_file.txt')

# 输出结果
if matches:
    print("匹配到规则:")
    for match in matches:
        print(match)
else:
    print("未匹配到任何规则")

示例:从字符串加载规则(无需文件)

import yara

# 直接在代码中定义规则
rule_source = '''
rule HelloWorld
{
    strings:
        $hello = "Hello, World!" ascii
    condition:
        $hello
}
'''

# 编译规则
rules = yara.compile(source=rule_source)

# 扫描文件
matches = rules.match('target_file.txt')
print(matches)

示例:扫描目录中的所有文件

import yara
import os

def scan_directory(directory, rules):
    for root, dirs, files in os.walk(directory):
        for file in files:
            filepath = os.path.join(root, file)
            try:
                matches = rules.match(filepath)
                if matches:
                    print(f"[+] 匹配: {filepath} -> {matches}")
            except Exception as e:
                print(f"[-] 错误扫描 {filepath}: {e}")

# 加载规则
rules = yara.compile(filepath='example.yar')

# 扫描目录
scan_directory('/path/to/scan', rules)

三、高级 YARA 规则示例

检测 PE 文件中的特定导入函数(Windows 恶意软件常见)

import "pe"

rule SuspiciousPE
{
    meta:
        description = "检测包含可疑 API 调用的 PE 文件"

    condition:
        pe.is_pe and
        any of ($suspicious_funcs) in (pe.imported_functions)
    
    strings:
        $suspicious_funcs = "VirtualAllocEx"
        $suspicious_funcs = "WriteProcessMemory"
        $suspicious_funcs = "CreateRemoteThread"
}

注意:使用 pe 模块需要目标文件是有效的 PE 文件。


四、YARA 与 SIEM/SOC 集成思路

  1. 定时扫描文件系统:使用 Python 脚本定期扫描上传目录或临时目录。
  2. 与文件上传服务集成:在 Web 应用中,用户上传文件后自动调用 YARA 扫描。
  3. 结合 ELK/Splunk:将扫描结果发送到日志系统进行告警。
  4. 沙箱联动:在动态分析沙箱中运行样本后,使用 YARA 提取特征。

五、实用技巧

  • 使用 --rules 参数查看已编译规则结构:yara -r example.yar /path/to/files
  • 忽略大小写:使用 nocase 修饰符$a = "virus" nocase
  • 正则表达式支持:$re = /https?:\/\/[a-zA-Z0-9\.\/]*/
  • 使用 uint32be(0) 检测文件头:$mz = { 4D 5A } // PE 文件头 condition: $mz at 0

六、常见问题

  • 编译错误:检查语法,YARA 对缩进和标点敏感。
  • 性能问题:避免过于宽泛的规则,使用 asciiwidenocase 精确控制。
  • 权限问题:扫描系统文件可能需要管理员权限。

七、资源推荐

  • YARA 官方文档:https://yara.readthedocs.io/
  • YARA Rules 仓库:
    • https://github.com/Yara-Rules/rules
    • https://github.com/Neo23x0/signature-base
  • 在线规则测试:https://yara-web.vercel.app/

总结

YARA 是一个强大灵活的模式匹配工具,适合用于:

  • 恶意软件检测
  • 威胁狩猎(Threat Hunting)
  • 自动化分析流水线
  • 安全产品集成(EDR、AV、沙箱)

通过 yara-python,你可以轻松将其集成到你的安全工具或平台中。

如需更复杂的集成(如多线程扫描、规则热加载、Web API 封装),可进一步封装为 REST 服务(使用 Flask/FastAPI)。

发表在 linux文章 | 留下评论

安装并使用libmagic库

如果你遇到的错误:

detect.c:5:10: fatal error: magic.h: No such file or directory
    5 | #include <magic.h>
      |          ^~~~~~~~~
compilation terminated.

说明 编译器找不到 magic.h 头文件,这是 libmagic 开发包的一部分。虽然你可能已经安装了 file 命令,但 没有安装其开发头文件(devel/dev 包)


✅ 解决方案:安装 libmagic 的开发包

🔧 Ubuntu / Debian 系统

你需要安装 libmagic-dev

sudo apt update
sudo apt install -y libmagic-dev

✅ 这个包会提供:

  • /usr/include/magic.h ← 编译时需要
  • /usr/lib/x86_64-linux-gnu/libmagic.so ← 链接时需要

安装完成后,重新编译:

gcc -o detect detect.c -lmagic

✅ 应该可以成功编译。


🔧 CentOS / RHEL / Rocky Linux / AlmaLinux

sudo yum install -y file-devel
# 或者在较新版本中:
sudo dnf install -y file-devel

然后编译:

gcc -o detect detect.c -lmagic

🔧 Fedora

sudo dnf install -y file-devel

🔧 openSUSE

sudo zypper install -y libmagic-devel

🔍 验证头文件是否已安装

运行以下命令,检查 magic.h 是否存在:

ls /usr/include/magic.h

✅ 正常输出:

/usr/include/magic.h

如果没有输出,说明 libmagic-dev 还没装好。


📦 验证库文件是否存在

find /usr/lib* -name "libmagic.*" | grep -v pkgconfig

你应该看到类似:

/usr/lib/x86_64-linux-gnu/libmagic.so
/usr/lib/x86_64-linux-gnu/libmagic.a

🛠️ 重新编译(确认成功)

// detect.c - 使用 libmagic 识别卫星数据类型
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <magic.h>  // libmagic 头文件

// 打印错误并退出
void die(magic_t magic) {
    fprintf(stderr, "Error: %s\n", magic_error(magic));
    magic_close(magic);
    exit(1);
}

// 分析单个文件
void analyze_file(const char *filename) {
    magic_t magic;

    // 1. 创建 magic handle
    magic = magic_open(MAGIC_MIME_TYPE | MAGIC_MIME_ENCODING);
    if (!magic) {
        fprintf(stderr, "Failed to initialize libmagic\n");
        exit(1);
    }

    // 2. 加载 magic 数据库
    if (magic_load(magic, NULL) != 0) {  // NULL 表示使用默认数据库
        die(magic);
    }

    // 3. 获取 MIME 类型和编码
    const char *mime = magic_file(magic, filename);
    const char *encoding = NULL;

    // 分离 MIME 和编码(magic_file 返回 "type; charset=xxx")
    char *semicolon = strchr(mime, ';');
    char mime_type[64] = {0};
    char charset[32] = {0};

    if (semicolon) {
        strncpy(mime_type, mime, semicolon - mime);
        sscanf(semicolon, "; charset=%s", charset);
    } else {
        strcpy(mime_type, mime);
        strcpy(charset, "unknown");
    }

    // 4. 打印结果
    printf("📄 %s:\n", filename);
    printf("   MIME 类型: %s\n", mime_type);
    printf("   字符编码: %s\n", charset);

    // 5. 卫星场景智能判断
    if (strcmp(mime_type, "text/plain") == 0) {
        printf("   🛰️ 判定: 明文遥测/日志文件 → 可直接解析\n");
    } else if (strstr(mime_type, "image/")) {
        printf("   🛰️ 判定: 遥感图像 → 保存为可视化数据\n");
    } else if (strcmp(mime_type, "application/gzip") == 0 ||
               strcmp(mime_type, "application/zip") == 0) {
        printf("   🛰️ 判定: 压缩数据 → 需解压后进一步分析\n");
    } else if (strcmp(mime_type, "application/x-executable") == 0) {
        printf("   🛰️ 判定: 固件更新包 → 验证签名后升级\n");
    } else if (strcmp(mime_type, "application/octet-stream") == 0) {
        printf("   🛰️ 判定: 二进制流 → 可能为加密或自定义协议\n");
    } else {
        printf("   🛰️ 判定: 未知类型 → 需人工分析\n");
    }
    printf("\n");

    // 6. 释放资源
    magic_close(magic);
}

int main(int argc, char *argv[]) {
    if (argc < 2) {
        fprintf(stderr, "用法: %s <文件1> [文件2] ...\n", argv[0]);
        exit(1);
    }

    printf("🔍 启动 libmagic C 接口 - 卫星数据类型识别\n");
    printf("------------------------------------------------\n\n");

    for (int i = 1; i < argc; i++) {
        analyze_file(argv[i]);
    }

    return 0;
}
gcc -o detect detect.c -lmagic

✅ 成功后运行:

./detect telemetry.txt

你应该看到输出:

📄 telemetry.txt:
   MIME 类型: text/plain
   字符编码: us-ascii
   🛰️ 判定: 明文遥测/日志文件 → 可直接解析

❌ 常见错误排查

错误原因解决
magic.h: No such file or directory缺少 libmagic-dev安装 libmagic-dev
undefined reference to 'magic_open'忘了 -lmagic编译时加 -lmagic
error while loading shared libraries: libmagic.so.1运行时库缺失安装 libmagic1(Ubuntu)或 file(其他)

如果是 64 位系统但提示库找不到,可尝试软链接(极少见):

sudo ln -s /usr/lib/x86_64-linux-gnu/libmagic.so /usr/lib/libmagic.so

✅ 总结

你遇到的问题是典型的 “缺少开发头文件” 错误。

只需一步修复(Ubuntu/Debian):

sudo apt install -y libmagic-dev

然后重新编译:

gcc -o detect detect.c -lmagic

✅ 问题解决。


如果你现在运行 ./detect * 能正确识别 PNG、GZIP、ELF 等文件类型,说明你的 C 语言调用 libmagic 环境已完全就绪

发表在 linux文章 | 留下评论

✅ 正确流程:使用自定义 magic 的标准做法

编写 sat.magic(纯规则) 
         ↓
    file -C -m sat.magic   → 生成 magic.mgc(二进制数据库)
         ↓
C 程序中 magic_load("magic.mgc")  → 成功加载
         ↓
调用 magic_file() → 正确识别

✅ 第一步:创建正确的 sat.magic(仅规则,无扩展字段)

cd ~/satellite-analysis-asscii/smagic
nano sat.magic

✅ 内容(严格兼容 file -C):

# Satellite Telemetry Frame
0       belong      0xAA55CCDD
>8      string      \x01\x02\x03\x04
>8      string      SAT-TELEMETRY
# (Satellite Telemetry Packet)

# GBK Chinese Text Detection
0       byte        > 0xA0
>&0     byte        < 0xFF
>1      byte        > 0xA0
>&1     byte        < 0xFF
# (Chinese GBK Text)

✅ 关键:

  • 不要写 name=desc=mime=
  • 注释用 # (Description) 格式
  • 使用英文或 ASCII

✅ 第二步:编译生成 magic.mgc

file -C -m sat.magic

✅ 正确输出:

Creating magic.mgc from sat.magic

👉 生成了 magic.mgc,这是 唯一能被 magic_load() 正确加载的文件


✅ 第三步:修改你的 C 程序,加载 magic.mgc

你的 a.out 是从某个 .c 文件编译来的,假设是 detect_encoding.c

编辑它:

vim detect_encoding.c

修改 magic_load 部分:

// 原代码(错误):
// if (magic_load(magic, NULL) != 0) { ... }

// 新代码:先加载默认库,再加载自定义库
if (magic_load(magic, NULL) != 0) {
    fprintf(stderr, "Error loading default magic: %s\n", magic_error(magic));
    magic_close(magic);
    exit(1);
}

// 加载自定义 magic.mgc
if (magic_load(magic, "./smagic/magic.mgc") != 0) {
    fprintf(stderr, "Error loading custom magic: %s\n", magic_error(magic));
    magic_close(magic);
    exit(1);
}

✅ 注意路径:./smagic/magic.mgc


✅ 第四步:重新编译并运行

gcc -o a.out detect_encoding.c -lmagic

生成测试文件(确保有 GBK 文本)

cd ~/satellite-analysis-asscii

# 生成 GBK 编码文件
echo "卫星状态:正常
时间:2025-04-05
消息:系统在线" > gbk.txt

iconv -f UTF-8 -t GBK gbk.txt > gbk.bin

运行程序

./a.out gbk.bin

✅ 预期输出:

🔍 启动 libmagic 文本编码识别(C 语言版)
------------------------------------------------

📄 gbk.bin:
   检测编码: iso-8859-1 → ISO-8859-1(西欧)

⚠️ 注意:libmagic 仍可能将 GBK 误判为 iso-8859-1,因为两者都是“可打印扩展 ASCII”,但至少 你的自定义 magic 已正确加载


✅ 如何让 GBK 被正确识别?—— 使用 cchardet

libmagic 不擅长文本编码识别。你应该:

1. 安装 cchardet

pip3 install cchardet

2. 创建 detect_gbk.py

#!/usr/bin/env python3
import cchardet
import sys

with open(sys.argv[1], 'rb') as f:
    raw = f.read()
result = cchardet.detect(raw)
print(f"文件: {sys.argv[1]}")
print(f"检测编码: {result['encoding']}")
print(f"置信度: {result['confidence']:.2f}")

3. 运行

python3 detect_gbk.py gbk.bin

✅ 输出:

检测编码: GBK
置信度: 0.99

🎯 成功识别!


✅ 最终建议:分工明确

任务工具
识别文件类型(PNG/GZIP/ELF)✅ libmagic(C)
识别文本编码(UTF-8/GBK/ASCII)✅ cchardet(Python)
识别自定义协议帧(0xAA55…)✅ libmagic + 自定义 magic.mgc

🎯 你现在可以:

  • 用 file -C 生成 magic.mgc
  • 用 C 程序加载它,识别自定义卫星帧
  • 用 Python + cchardet 精确识别 GBK 编码
发表在 linux文章 | 留下评论

生成一系列 多语言、多编码的测试文本文件

生成一系列 多语言、多编码的测试文本文件,文件扩展名为 .txt,用于测试 libmagicchardetcchardet 等工具对不同语种和编码的识别能力。

这些文件将覆盖:

  • ✅ 主要语言:中文、日文、韩文、阿拉伯文、俄文、西欧文、UTF-16 等
  • ✅ 常见编码:UTF-8、GBK、Big5、Shift_JIS、EUC-KR、ISO-8859-1、UTF-16LE/BE
  • ✅ 文件命名规范:<语言>_<编码>.txt

🛠️ 第一步:创建工作目录

mkdir ~/encoding-test
cd ~/encoding-test

🧪 第二步:生成各文种编码测试文件(.txt)

1. UTF-8(通用 Unicode)

cat > zh_utf8.txt << 'EOF'
中文测试:卫星状态正常
时间:2025-04-05T12:00:00Z
消息:系统在线,载荷激活
EOF

cat > ja_utf8.txt << 'EOF'
日本語テスト:衛星ステータス正常
時刻:2025-04-05T12:00:00Z
メッセージ:システム起動中
EOF

cat > ko_utf8.txt << 'EOF'
한국어 테스트: 위성 상태 정상
시간: 2025-04-05T12:00:00Z
메시지: 시스템 온라인
EOF

cat > ar_utf8.txt << 'EOF'
اختبار عربي: الحالة طبيعية
الوقت: 2025-04-05T12:00:00Z
الرسالة: النظام يعمل
EOF

cat > ru_utf8.txt << 'EOF'
Тест на русском: Состояние нормальное
Время: 2025-04-05T12:00:00Z
Сообщение: Система работает
EOF

cat > en_utf8.txt << 'EOF'
English Test: Status Nominal
Time: 2025-04-05T12:00:00Z
Message: System Online
EOF

cat > fr_utf8.txt << 'EOF'
Test français : État normal
Heure : 2025-04-05T12:00:00Z
Message : Système en ligne
EOF

2. GBK(简体中文)

# 从 UTF-8 转为 GBK
iconv -f UTF-8 -t GBK zh_utf8.txt -o zh_gbk.txt

3. Big5(繁体中文)

iconv -f UTF-8 -t BIG5 zh_utf8.txt -o zh_big5.txt

4. Shift_JIS(日文)

iconv -f UTF-8 -t SHIFT_JIS ja_utf8.txt -o ja_shift_jis.txt

5. EUC-KR(韩文)

iconv -f UTF-8 -t EUC-KR ko_utf8.txt -o ko_euc_kr.txt

6. ISO-8859-1(西欧,如法语、德语)

iconv -f UTF-8 -t ISO-8859-1 fr_utf8.txt -o fr_latin1.txt

7. UTF-16LE(小端 Unicode)

echo "UTF-16LE Test: 多语言混合" | iconv -t UTF-16LE -o mixed_utf16le.txt

8. UTF-16BE(大端 Unicode)

echo "UTF-16BE Test: Satellite Message" | iconv -t UTF-16BE -o en_utf16be.txt

9. ASCII(纯英文,无扩展字符)

cat > en_ascii.txt << 'EOF'
Satellite Telemetry Log
Status: Nominal
Time: 2025-04-05T12:00:00Z
EOF

10. UTF-8 with BOM(带签名)

# 手动生成带 BOM 的 UTF-8
printf '\xEF\xBB\xBF' > zh_utf8_bom.txt
cat zh_utf8.txt >> zh_utf8_bom.txt

✅ 第三步:验证文件生成成功

ls -l *.txt

你应该看到类似:

-rw-r--r-- 1 user user  123 Apr  5 12:00 zh_utf8.txt
-rw-r--r-- 1 user user  123 Apr  5 12:00 zh_gbk.txt
-rw-r--r-- 1 user user  123 Apr  5 12:00 zh_big5.txt
-rw-r--r-- 1 user user  123 Apr  5 12:00 ja_shift_jis.txt
-rw-r--r-- 1 user user  123 Apr  5 12:00 ko_euc_kr.txt
-rw-r--r-- 1 user user  123 Apr  5 12:00 fr_latin1.txt
-rw-r--r-- 1 user user  123 Apr  5 12:00 en_utf16le.txt
...

🔍 第四步:使用 file 命令识别类型

file *.txt

✅ 预期输出示例:

en_ascii.txt:        ASCII text
en_utf8.txt:         UTF-8 Unicode text
zh_gbk.txt:          ISO-8859 text
zh_big5.txt:         ISO-8859 text
ja_shift_jis.txt:    ISO-8859 text
ko_euc_kr.txt:       ISO-8859 text
fr_latin1.txt:       ISO-8859 text
mixed_utf16le.txt:   Little-endian UTF-16 Unicode text
en_utf16be.txt:      Big-endian UTF-16 Unicode text
zh_utf8_bom.txt:     UTF-8 Unicode (with BOM) text

⚠️ 注意:zh_gbk.txt 被识别为 ISO-8859 text —— 这正是我们之前说的:libmagic 无法区分 GBK 和 Latin-1


🧪 第五步:使用 cchardet 精确识别编码

pip3 install cchardet

创建 detect_all.py

#!/usr/bin/env python3
import cchardet
import glob

for filepath in sorted(glob.glob("*.txt")):
    with open(filepath, 'rb') as f:
        raw = f.read()
    result = cchardet.detect(raw)
    encoding = result['encoding']
    confidence = result['confidence']
    print(f"{filepath:20} → {encoding:10} (置信度: {confidence:.2f})")

运行:

python3 detect_all.py

✅ 预期输出(示例):

en_ascii.txt         → ascii       (置信度: 1.00)
en_utf16be.txt       → UTF-16BE    (置信度: 1.00)
en_utf8.txt          → UTF-8       (置信度: 1.00)
fr_latin1.txt        → ISO-8859-1  (置信度: 1.00)
ja_shift_jis.txt     → SHIFT_JIS   (置信度: 0.99)
ko_euc_kr.txt        → EUC-KR      (置信度: 0.99)
zh_big5.txt          → Big5        (置信度: 0.99)
zh_gbk.txt           → GB2312      (置信度: 0.99)
zh_utf8_bom.txt      → UTF-8       (置信度: 1.00)
zh_utf8.txt          → UTF-8       (置信度: 1.00)

✅ 成功识别 GBK 为 GB2312(GB2312 是 GBK 的子集,可接受)


✅ 总结:各文种编码测试文件清单

语言编码文件名
中文(简体)UTF-8zh_utf8.txt
中文(简体)GBKzh_gbk.txt
中文(繁体)Big5zh_big5.txt
日文Shift_JISja_shift_jis.txt
韩文EUC-KRko_euc_kr.txt
俄文UTF-8ru_utf8.txt
阿拉伯文UTF-8ar_utf8.txt
法文ISO-8859-1fr_latin1.txt
英文ASCIIen_ascii.txt
英文UTF-16LEen_utf16le.txt
英文UTF-16BEen_utf16be.txt
中文UTF-8 with BOMzh_utf8_bom.txt

🛰️ 在卫星通信中的应用

你可以用这些文件:

  • 测试地面站软件的编码自动识别能力
  • 训练 chardet 模型(可选)
  • 构建“多语言遥测解析引擎”
  • 自动化处理来自不同国家卫星的数据

发表在 linux文章 | 留下评论