AWK 复杂应用场景总结

AWK 复杂应用场景总结

一、大数据处理与分析

1. 流式日志分析系统

# 实时分析Web服务器日志,生成实时报表
awk -v interval=60 '
BEGIN {
    start_time = systime()
    OFMT = "%.2f"
}

{
    # 解析Apache日志
    ip = $1
    status = $9
    bytes = ($10 == "-" ? 0 : $10)
    url = $7
    
    # 统计指标
    total_requests++
    total_bytes += bytes
    status_count[status]++
    ip_count[ip]++
    
    # 按URL分类统计
    url_stats[url]["count"]++
    url_stats[url]["bytes"] += bytes
    
    # 每分钟输出统计
    current_time = systime()
    if(current_time - start_time >= interval) {
        generate_report()
        reset_counters()
        start_time = current_time
    }
}

END {
    if(total_requests > 0) {
        generate_report()
    }
}

function generate_report() {
    print strftime("%Y-%m-%d %H:%M:%S"), "=== 1分钟统计报告 ==="
    print "总请求数:", total_requests
    print "总流量:", format_bytes(total_bytes)
    print "平均请求大小:", format_bytes(total_bytes/total_requests)
    print "QPS:", total_requests/interval
    
    # 状态码分布
    print "\n状态码分布:"
    for(status in status_count) {
        print "  " status ":", status_count[status], 
              sprintf("(%.1f%%)", status_count[status]*100/total_requests)
    }
    
    # Top 10 IP
    print "\nTop 10 访问IP:"
    sort_array_by_value(ip_count, "desc")
    for(i = 1; i <= 10 && i <= length(sorted_keys); i++) {
        ip = sorted_keys[i]
        print "  " ip ":", ip_count[ip], 
              sprintf("(%.1f%%)", ip_count[ip]*100/total_requests)
    }
    
    print "========================\n"
}

function reset_counters() {
    delete status_count
    delete ip_count
    delete url_stats
    total_requests = 0
    total_bytes = 0
}

function format_bytes(bytes) {
    units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
    unit = 1
    while(bytes >= 1024 && unit < 4) {
        bytes /= 1024
        unit++
    }
    return sprintf("%.2f %s", bytes, units[unit])
}'

2. 分布式数据聚合

# 处理分布在多个文件中的数据,进行全局统计
awk '
BEGIN {
    # 初始化聚合变量
    global_stats["total_records"] = 0
    global_stats["total_amount"] = 0
    global_stats["max_amount"] = 0
    global_stats["min_amount"] = 999999999
}

# 处理每个文件的数据
FNR == 1 {
    current_file = FILENAME
    file_stats[current_file]["records"] = 0
    file_stats[current_file]["amount"] = 0
}

{
    # 假设第3列是金额
    amount = $3 + 0
    
    # 文件级别统计
    file_stats[current_file]["records"]++
    file_stats[current_file]["amount"] += amount
    
    # 全局统计
    global_stats["total_records"]++
    global_stats["total_amount"] += amount
    if(amount > global_stats["max_amount"]) {
        global_stats["max_amount"] = amount
    }
    if(amount < global_stats["min_amount"]) {
        global_stats["min_amount"] = amount
    }
    
    # 按类别统计(假设第2列是类别)
    category_stats[$2]["count"]++
    category_stats[$2]["amount"] += amount
}

END {
    # 输出全局统计
    print "=== 全局统计 ==="
    print "总记录数:", global_stats["total_records"]
    print "总金额:", global_stats["total_amount"]
    print "平均金额:", global_stats["total_amount"]/global_stats["total_records"]
    print "最大金额:", global_stats["max_amount"]
    print "最小金额:", global_stats["min_amount"]
    
    # 输出文件级别统计
    print "\n=== 各文件统计 ==="
    for(file in file_stats) {
        print file ":"
        print "  记录数:", file_stats[file]["records"]
        print "  金额:", file_stats[file]["amount"]
        print "  平均金额:", file_stats[file]["amount"]/file_stats[file]["records"]
    }
    
    # 输出类别统计
    print "\n=== 类别统计 ==="
    for(category in category_stats) {
        print category ":"
        print "  记录数:", category_stats[category]["count"]
        print "  总金额:", category_stats[category]["amount"]
        print "  平均金额:", category_stats[category]["amount"]/category_stats[category]["count"]
    }
}'

二、系统监控与运维

1. 综合系统监控脚本

# 多维度系统监控工具
awk -v threshold_cpu=80 -v threshold_mem=80 -v threshold_disk=90 '
BEGIN {
    # 收集系统信息
    collect_system_info()
    
    # 分析各项指标
    analyze_cpu()
    analyze_memory()
    analyze_disk()
    analyze_network()
    
    # 生成报告
    generate_system_report()
    
    # 检查告警
    check_alerts()
}

function collect_system_info() {
    # CPU信息
    cmd = "top -bn1 | grep '%Cpu(s)'"
    if((cmd | getline) > 0) {
        system_info["cpu_usage"] = extract_cpu_usage($0)
    }
    close(cmd)
    
    # 内存信息
    cmd = "free | grep Mem"
    if((cmd | getline) > 0) {
        system_info["mem_total"] = $2
        system_info["mem_used"] = $3
        system_info["mem_usage"] = ($3/$2)*100
    }
    close(cmd)
    
    # 磁盘信息
    cmd = "df -h | grep -E '^/dev/'"
    disk_index = 0
    while((cmd | getline) > 0) {
        disk_info[disk_index]["filesystem"] = $1
        disk_info[disk_index]["size"] = $2
        disk_info[disk_index]["used"] = $3
        disk_info[disk_index]["available"] = $4
        disk_info[disk_index]["usage"] = substr($5, 1, length($5)-1) + 0
        disk_info[disk_index]["mount"] = $6
        disk_index++
    }
    close(cmd)
    
    # 网络连接
    cmd = "netstat -an | grep ESTABLISHED | wc -l"
    if((cmd | getline) > 0) {
        system_info["active_connections"] = $0 + 0
    }
    close(cmd)
    
    # 系统负载
    cmd = "uptime"
    if((cmd | getline) > 0) {
        system_info["load_average"] = extract_load_average($0)
    }
    close(cmd)
}

function analyze_cpu() {
    cpu_usage = system_info["cpu_usage"]
    if(cpu_usage > threshold_cpu) {
        alerts["cpu"] = "CPU使用率过高: " cpu_usage "%"
    }
}

function analyze_memory() {
    mem_usage = system_info["mem_usage"]
    if(mem_usage > threshold_mem) {
        alerts["memory"] = "内存使用率过高: " sprintf("%.1f", mem_usage) "%"
    }
}

function analyze_disk() {
    for(i = 0; i < disk_index; i++) {
        usage = disk_info[i]["usage"]
        if(usage > threshold_disk) {
            alerts["disk_" i] = "磁盘 " disk_info[i]["mount"] " 使用率过高: " usage "%"
        }
    }
}

function analyze_network() {
    connections = system_info["active_connections"]
    if(connections > 1000) {  # 假设1000个连接为阈值
        alerts["network"] = "活跃连接数过多: " connections
    }
}

function generate_system_report() {
    print "=== 系统监控报告 ==="
    print "生成时间:", strftime("%Y-%m-%d %H:%M:%S")
    print ""
    
    # CPU信息
    print "CPU使用率:", system_info["cpu_usage"], "%"
    
    # 内存信息
    print "内存使用情况:"
    print "  总量:", format_bytes(system_info["mem_total"]*1024)
    print "  已用:", format_bytes(system_info["mem_used"]*1024)
    print "  使用率:", sprintf("%.1f", system_info["mem_usage"]), "%"
    
    # 磁盘信息
    print "\n磁盘使用情况:"
    for(i = 0; i < disk_index; i++) {
        print "  " disk_info[i]["mount"] ":", 
              disk_info[i]["used"], "/", disk_info[i]["size"],
              "(" disk_info[i]["usage"] "%)"
    }
    
    # 网络信息
    print "\n网络连接:", system_info["active_connections"]
    
    # 系统负载
    print "系统负载:", system_info["load_average"]
}

function check_alerts() {
    if(length(alerts) > 0) {
        print "\n=== 告警信息 ==="
        for(alert_type in alerts) {
            print "⚠️  " alerts[alert_type]
        }
        
        # 可以在这里添加告警通知逻辑
        # send_alert_email(alerts)
    } else {
        print "\n✅ 系统状态正常"
    }
}

function extract_cpu_usage(line) {
    # 从top输出中提取CPU使用率
    if(match(line, /[0-9.]+ id/)) {
        idle = substr(line, RSTART, RLENGTH-3) + 0
        return 100 - idle
    }
    return 0
}

function extract_load_average(line) {
    # 从uptime输出中提取负载平均值
    if(match(line, /load average: [0-9., ]+/)) {
        load_str = substr(line, RSTART+14, RLENGTH-14)
        gsub(/,/, "", load_str)
        return load_str
    }
    return "N/A"
}

function format_bytes(bytes) {
    units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
    unit = 1
    temp_bytes = bytes
    while(temp_bytes >= 1024 && unit < 4) {
        temp_bytes /= 1024
        unit++
    }
    return sprintf("%.1f %s", temp_bytes, units[unit])
}'

三、网络安全与审计

1. 入侵检测系统

# 分析安全日志,检测潜在威胁
awk -f intrusion_detection.awk /var/log/auth.log /var/log/syslog

# intrusion_detection.awk 内容:
BEGIN {
    # 定义威胁模式
    threat_patterns["failed_login"] = "Failed password"
    threat_patterns["invalid_user"] = "Invalid user"
    threat_patterns["break_in"] = "POSSIBLE BREAK-IN ATTEMPT"
    threat_patterns["root_login"] = "Accepted .* for root"
    
    # 初始化统计
    start_time = systime()
    OFMT = "%.0f"
}

{
    # 检查每种威胁模式
    for(threat_type in threat_patterns) {
        pattern = threat_patterns[threat_type]
        if($0 ~ pattern) {
            threats[threat_type]++
            threat_details[threat_type][threats[threat_type]] = $0
            threat_times[threat_type][threats[threat_type]] = FNR
            
            # 记录IP地址(如果存在)
            if(match($0, /[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/)) {
                ip = substr($0, RSTART, RLENGTH)
                threat_ips[threat_type][ip]++
            }
        }
    }
    
    # 统计IP访问频率
    if(match($0, /[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/)) {
        ip = substr($0, RSTART, RLENGTH)
        ip_activity[ip]++
        
        # 检测异常高频访问
        if(ip_activity[ip] > 100 && (systime() - start_time) < 3600) {
            suspicious_ips[ip] = ip_activity[ip]
        }
    }
}

END {
    # 生成安全报告
    print "=== 安全审计报告 ==="
    print "分析时间:", strftime("%Y-%m-%d %H:%M:%S")
    print "分析文件:", ARGV[1]
    print ""
    
    # 输出威胁统计
    total_threats = 0
    for(threat_type in threats) {
        count = threats[threat_type]
        total_threats += count
        print threat_type ":", count, "次"
        
        # 显示Top 3详细信息
        print "  详细信息 (Top 3):"
        for(i = 1; i <= 3 && i <= count; i++) {
            print "    " threat_details[threat_type][i]
        }
        if(count > 3) {
            print "    ... 还有", count-3, "条记录"
        }
        print ""
    }
    
    # 输出可疑IP
    if(length(suspicious_ips) > 0) {
        print "⚠️  可疑IP地址 (1小时内访问超过100次):"
        for(ip in suspicious_ips) {
            print "  " ip ":", suspicious_ips[ip], "次"
        }
        print ""
    }
    
    # 输出威胁IP统计
    print "威胁源IP统计:"
    for(threat_type in threat_ips) {
        print "  " threat_type ":"
        sort_array_by_count(threat_ips[threat_type])
        for(i = 1; i <= 5 && i <= length(sorted_array); i++) {
            ip = sorted_array[i]
            print "    " ip ":", threat_ips[threat_type][ip], "次"
        }
    }
    
    print "\n总计发现威胁:", total_threats, "次"
    
    # 如果威胁较多,建议采取措施
    if(total_threats > 50) {
        print "\n🚨 高风险: 建议立即检查防火墙规则和系统安全配置"
    }
}

function sort_array_by_count(array) {
    delete sorted_array
    count = 0
    for(key in array) {
        temp_array[++count] = key
    }
    
    # 按值排序
    for(i = 1; i <= count; i++) {
        max_key = temp_array[i]
        max_val = array[temp_array[i]]
        max_index = i
        
        for(j = i+1; j <= count; j++) {
            if(array[temp_array[j]] > max_val) {
                max_val = array[temp_array[j]]
                max_key = temp_array[j]
                max_index = j
            }
        }
        
        # 交换
        temp = temp_array[i]
        temp_array[i] = temp_array[max_index]
        temp_array[max_index] = temp
        
        sorted_array[i] = max_key
    }
}'

四、数据科学与机器学习预处理

1. 特征工程工具

# 数据预处理和特征工程
awk -F',' '
BEGIN {
    # 数据类型检测和处理
    OFS = ","
}

NR == 1 {
    # 处理表头
    header = $0
    print "原始表头:", header > "/dev/stderr"
    
    # 记录字段数量
    field_count = NF
    for(i = 1; i <= NF; i++) {
        field_names[i] = $i
        field_stats[i]["missing"] = 0
        field_stats[i]["type"] = "unknown"
    }
    next
}

{
    # 数据质量检查
    for(i = 1; i <= field_count; i++) {
        value = $i
        
        # 缺失值检查
        if(value == "" || value == "NULL" || value == "N/A") {
            field_stats[i]["missing"]++
            missing_data[NR,i] = 1
        } else {
            # 数据类型推断
            infer_data_type(i, value)
            
            # 统计信息收集
            collect_statistics(i, value)
        }
    }
    
    # 异常值检测
    detect_outliers()
    
    # 数据清洗
    clean_data()
}

END {
    # 生成数据质量报告
    generate_quality_report()
    
    # 输出清洗后的数据
    output_cleaned_data()
}

function infer_data_type(field_index, value) {
    # 数值类型检测
    if(value ~ /^-?[0-9]+\.?[0-9]*$/) {
        if(field_stats[field_index]["type"] == "unknown") {
            field_stats[field_index]["type"] = "numeric"
        } else if(field_stats[field_index]["type"] != "numeric") {
            field_stats[field_index]["type"] = "mixed"
        }
    }
    # 日期类型检测
    else if(value ~ /^[0-9]{4}-[0-9]{2}-[0-9]{2}/) {
        field_stats[field_index]["type"] = "date"
    }
    # 分类类型
    else {
        field_stats[field_index]["type"] = "categorical"
        field_stats[field_index]["categories"][value]++
    }
}

function collect_statistics(field_index, value) {
    if(field_stats[field_index]["type"] == "numeric") {
        num_value = value + 0
        field_stats[field_index]["sum"] += num_value
        field_stats[field_index]["count"]++
        
        if(field_stats[field_index]["min"] == "" || num_value < field_stats[field_index]["min"]) {
            field_stats[field_index]["min"] = num_value
        }
        if(field_stats[field_index]["max"] == "" || num_value > field_stats[field_index]["max"]) {
            field_stats[field_index]["max"] = num_value
        }
        
        # 存储值用于计算方差
        field_stats[field_index]["values"][field_stats[field_index]["count"]] = num_value
    }
}

function detect_outliers() {
    # 使用3σ原则检测异常值
    for(i = 1; i <= field_count; i++) {
        if(field_stats[i]["type"] == "numeric" && field_stats[i]["count"] > 10) {
            mean = field_stats[i]["sum"] / field_stats[i]["count"]
            
            # 计算方差
            variance = 0
            for(j = 1; j <= field_stats[i]["count"]; j++) {
                diff = field_stats[i]["values"][j] - mean
                variance += diff * diff
            }
            variance /= field_stats[i]["count"]
            std_dev = sqrt(variance)
            
            # 检查当前值是否为异常值
            current_value = $i + 0
            if(abs(current_value - mean) > 3 * std_dev) {
                outliers[NR,i] = 1
                field_stats[i]["outliers"]++
            }
        }
    }
}

function clean_data() {
    # 处理缺失值
    for(i = 1; i <= field_count; i++) {
        if((NR,i) in missing_data) {
            # 数值型用均值填充
            if(field_stats[i]["type"] == "numeric") {
                $i = field_stats[i]["sum"] / field_stats[i]["count"]
            }
            # 分类型用众数填充
            else if(field_stats[i]["type"] == "categorical") {
                $i = get_mode(field_stats[i]["categories"])
            }
        }
    }
    
    # 标准化数值型数据
    for(i = 1; i <= field_count; i++) {
        if(field_stats[i]["type"] == "numeric") {
            mean = field_stats[i]["sum"] / field_stats[i]["count"]
            # 简单的Min-Max标准化
            range = field_stats[i]["max"] - field_stats[i]["min"]
            if(range > 0) {
                $i = ($i - field_stats[i]["min"]) / range
            }
        }
    }
}

function generate_quality_report() {
    print "=== 数据质量报告 ===" > "/dev/stderr"
    print "总记录数:", NR-1 > "/dev/stderr"
    print "字段数:", field_count > "/dev/stderr"
    print "" > "/dev/stderr"
    
    for(i = 1; i <= field_count; i++) {
        print "字段", i, "(", field_names[i], "):" > "/dev/stderr"
        print "  数据类型:", field_stats[i]["type"] > "/dev/stderr"
        print "  缺失值:", field_stats[i]["missing"] > "/dev/stderr"
        
        if(field_stats[i]["type"] == "numeric") {
            print "  最小值:", field_stats[i]["min"] > "/dev/stderr"
            print "  最大值:", field_stats[i]["max"] > "/dev/stderr"
            print "  平均值:", field_stats[i]["sum"]/field_stats[i]["count"] > "/dev/stderr"
            if("outliers" in field_stats[i]) {
                print "  异常值数量:", field_stats[i]["outliers"] > "/dev/stderr"
            }
        }
        print "" > "/dev/stderr"
    }
}

function output_cleaned_data() {
    print header  # 输出处理后的表头
    # 实际数据在主处理流程中已经输出
}

function abs(x) {
    return (x < 0) ? -x : x
}

function get_mode(categories) {
    max_count = 0
    mode_value = ""
    for(value in categories) {
        if(categories[value] > max_count) {
            max_count = categories[value]
            mode_value = value
        }
    }
    return mode_value
}'

五、DevOps自动化工具

1. 持续集成/持续部署(CI/CD)流水线工具

# CI/CD 流水线监控和报告生成器
awk '
BEGIN {
    # 流水线配置
    pipeline_stages["build"] = "构建"
    pipeline_stages["test"] = "测试"
    pipeline_stages["deploy"] = "部署"
    pipeline_stages["verify"] = "验证"
    
    current_pipeline = ""
    start_time = systime()
}

# 解析CI/CD日志
/^Pipeline: / {
    current_pipeline = $2
    pipeline_start[current_pipeline] = systime()
    print "🚀 开始执行流水线:", current_pipeline
}

/^Stage: ([a-zA-Z]+) (STARTED|FINISHED|FAILED)/ {
    stage = $2
    status = $3
    timestamp = systime()
    
    if(status == "STARTED") {
        stage_start[current_pipeline,stage] = timestamp
        print "  ⏱️  阶段开始:", pipeline_stages[stage]
    } else if(status == "FINISHED") {
        duration = timestamp - stage_start[current_pipeline,stage]
        stage_duration[current_pipeline,stage] = duration
        print "  ✅ 阶段完成:", pipeline_stages[stage], 
              "(" duration "秒)"
    } else if(status == "FAILED") {
        duration = timestamp - stage_start[current_pipeline,stage]
        stage_duration[current_pipeline,stage] = duration
        stage_failed[current_pipeline,stage] = 1
        print "  ❌ 阶段失败:", pipeline_stages[stage], 
              "(" duration "秒)"
    }
}

/^Artifact: (.+) Size: ([0-9]+) bytes/ {
    artifact_name = $2
    artifact_size = $4
    artifacts[current_pipeline,artifact_name] = artifact_size
    print "  📦 生成制品:", artifact_name, 
          "(" format_bytes(artifact_size) ")"
}

/^Test Results: ([0-9]+) passed, ([0-9]+) failed, ([0-9]+) skipped/ {
    passed = $3
    failed = $5
    skipped = $8
    total_tests = passed + failed + skipped
    
    test_results[current_pipeline,"passed"] = passed
    test_results[current_pipeline,"failed"] = failed
    test_results[current_pipeline,"skipped"] = skipped
    
    success_rate = (total_tests > 0) ? (passed/total_tests)*100 : 0
    
    print "  🧪 测试结果:", passed "通过,", failed "失败,", skipped "跳过"
    print "  📊 成功率:", sprintf("%.1f", success_rate) "%"
}

END {
    # 生成最终报告
    generate_ci_report()
}

function generate_ci_report() {
    print "\n" "=" x 50
    print "CI/CD 流水线执行报告"
    print "=" x 50
    
    for(pipeline in pipeline_start) {
        print "\n📋 流水线:", pipeline
        print "开始时间:", strftime("%Y-%m-%d %H:%M:%S", pipeline_start[pipeline])
        
        total_duration = 0
        all_passed = 1
        
        for(stage in pipeline_stages) {
            if((pipeline,stage) in stage_duration) {
                duration = stage_duration[pipeline,stage]
                total_duration += duration
                
                status_icon = ((pipeline,stage) in stage_failed) ? "❌" : "✅"
                print "  " status_icon, pipeline_stages[stage] ":", duration "秒"
                
                if((pipeline,stage) in stage_failed) {
                    all_passed = 0
                }
            }
        }
        
        print "总耗时:", total_duration "秒"
        print "最终状态:", (all_passed ? "✅ 成功" : "❌ 失败")
        
        # 测试结果
        if((pipeline,"passed") in test_results) {
            passed = test_results[pipeline,"passed"]
            failed = test_results[pipeline,"failed"]
            skipped = test_results[pipeline,"skipped"]
            total = passed + failed + skipped
            
            print "测试统计:", passed "通过,", failed "失败,", skipped "跳过"
            if(total > 0) {
                success_rate = (passed/total)*100
                print "成功率:", sprintf("%.1f", success_rate) "%"
            }
        }
        
        # 制品信息
        artifact_count = 0
        total_size = 0
        for(key in artifacts) {
            split(key, parts, SUBSEP)
            if(parts[1] == pipeline) {
                artifact_count++
                total_size += artifacts[key]
            }
        }
        
        if(artifact_count > 0) {
            print "生成制品:", artifact_count "个, 总大小:", format_bytes(total_size)
        }
    }
    
    print "\n" "=" x 50
    print "报告生成时间:", strftime("%Y-%m-%d %H:%M:%S")
    print "=" x 50
}

function format_bytes(bytes) {
    units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
    unit = 1
    temp_bytes = bytes
    while(temp_bytes >= 1024 && unit < 4) {
        temp_bytes /= 1024
        unit++
    }
    return sprintf("%.1f %s", temp_bytes, units[unit])
}'

六、性能优化最佳实践

1. AWK脚本性能分析工具

# AWK脚本性能分析器
awk -p performance_profile.txt '
BEGIN {
    # 性能监控开始
    start_time = systime()
    start_timestamp = gettimeofday()
    
    # 内存使用监控(通过系统调用)
    initial_memory = get_memory_usage()
}

# 主处理逻辑
{
    # 记录处理速度
    lines_processed++
    
    # 每处理10000行输出一次进度
    if(lines_processed % 10000 == 0) {
        current_time = systime()
        rate = lines_processed / (current_time - start_time + 1)
        print "已处理:", lines_processed, "行, 速度:", rate, "行/秒" > "/dev/stderr"
    }
    
    # 实际数据处理逻辑
    process_data()
}

END {
    # 性能统计
    end_time = systime()
    end_timestamp = gettimeofday()
    
    final_memory = get_memory_usage()
    
    total_time = end_time - start_time
    total_time_precise = end_timestamp - start_timestamp
    
    print "\n=== 性能分析报告 ===" > "/dev/stderr"
    print "总处理行数:", lines_processed > "/dev/stderr"
    print "总耗时:", total_time, "秒 (精确:", sprintf("%.3f", total_time_precise), "秒)" > "/dev/stderr"
    
    if(total_time > 0) {
        print "平均处理速度:", lines_processed/total_time, "行/秒" > "/dev/stderr"
    }
    
    print "内存使用变化:", initial_memory, "->", final_memory, 
          "(", final_memory - initial_memory, ")" > "/dev/stderr"
    
    # 函数调用统计(如果使用了自定义函数)
    if("function_calls" in SYMTAB) {
        print "函数调用次数:", function_calls > "/dev/stderr"
    }
}

function process_data() {
    # 模拟复杂的数据处理
    # 这里应该是实际的业务逻辑
    
    # 性能优化技巧示例:
    
    # 1. 避免重复计算
    # 不好的做法:length($1) 在条件中重复计算
    # 好的做法:
    field1_length = length($1)
    if(field1_length > 10) {
        # 处理逻辑
    }
    
    # 2. 使用哈希表优化查找
    if($2 in lookup_table) {
        # O(1) 查找
    }
    
    # 3. 批量处理减少I/O
    # 收集数据到数组中,最后统一处理
    
    # 4. 避免不必要的字符串操作
    # 预编译正则表达式
    if(!regex_compiled) {
        regex_pattern = "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
        regex_compiled = 1
    }
    
    if($1 ~ regex_pattern) {
        # 处理日期格式
    }
    
    # 5. 使用适当的数值格式
    OFMT = "%.6f"  # 控制浮点数精度
}

function get_memory_usage() {
    # 获取当前进程内存使用情况
    cmd = "ps -o rss= -p " PROCINFO["pid"]
    if((cmd | getline mem_kb) > 0) {
        close(cmd)
        return mem_kb + 0  # 转换为数字
    }
    close(cmd)
    return 0
}

function gettimeofday() {
    # 获取高精度时间戳
    cmd = "date +%s.%N"
    if((cmd | getline timestamp) > 0) {
        close(cmd)
        return timestamp + 0
    }
    close(cmd)
    return systime()
}'

七、综合应用案例

1. 企业级日志分析平台

# 企业级多维度日志分析系统
awk -v company="MyCompany" -v environment="production" '
BEGIN {
    # 系统配置
    config["company"] = company
    config["environment"] = environment
    config["timezone"] = "Asia/Shanghai"
    
    # 初始化各种统计器
    init_analytics()
    
    # 启动时间
    start_timestamp = systime()
    
    print "🚀 启动 " company " " environment " 环境日志分析系统" > "/dev/stderr"
    print "开始时间:", strftime("%Y-%m-%d %H:%M:%S", start_timestamp) > "/dev/stderr"
}

# 多种日志格式处理
{
    # 自动识别日志类型
    log_type = identify_log_type($0)
    
    if(log_type == "apache") {
        process_apache_log()
    } else if(log_type == "nginx") {
        process_nginx_log()
    } else if(log_type == "application") {
        process_app_log()
    } else if(log_type == "security") {
        process_security_log()
    } else {
        process_generic_log()
    }
    
    # 更新全局统计
    update_global_stats()
}

END {
    # 生成综合报告
    generate_comprehensive_report()
    
    # 输出JSON格式的实时数据(用于仪表板)
    output_json_data()
    
    # 发送告警(如果需要)
    send_alerts_if_needed()
}

function init_analytics() {
    # 初始化各种分析模块
    init_performance_analytics()
    init_security_analytics()
    init_business_analytics()
    init_user_behavior_analytics()
}

function identify_log_type(line) {
    # 通过正则表达式识别日志类型
    if(line ~ /^([0-9]{1,3}\.){3}[0-9]{1,3} - - \[/) {
        return "apache"
    } else if(line ~ /^[0-9]{4}\/[0-9]{2}\/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[/) {
        return "nginx"
    } else if(line ~ /ERROR|WARN|INFO|DEBUG/) {
        return "application"
    } else if(line ~ /SECURITY|AUTH|LOGIN|FAILED/) {
        return "security"
    } else {
        return "generic"
    }
}

function process_apache_log() {
    # Apache日志处理逻辑
    ip = $1
    timestamp = $4
    method = $6
    url = $7
    status = $9
    bytes = ($10 == "-" ? 0 : $10)
    user_agent = $12
    
    # 性能统计
    analytics["apache"]["requests"]++
    analytics["apache"]["bytes"] += bytes
    analytics["apache"]["status"][status]++
    
    # 用户行为分析
    analytics["users"][ip]["requests"]++
    analytics["users"][ip]["bytes"] += bytes
    
    # URL分析
    analytics["urls"][url]["count"]++
    analytics["urls"][url]["bytes"] += bytes
    
    # 响应时间分析(如果日志中包含)
    if($NF ~ /([0-9]+)ms/) {
        response_time = substr($NF, 1, length($NF)-2) + 0
        analytics["apache"]["response_times"] += response_time
        analytics["apache"]["response_count"]++
    }
}

function process_nginx_log() {
    # Nginx日志处理逻辑
    # 类似Apache处理,但格式略有不同
    ip = $1
    timestamp = $4
    # ... 处理逻辑
}

function process_app_log() {
    # 应用程序日志处理
    timestamp = $1 " " $2
    level = $3
    message = ""
    for(i = 4; i <= NF; i++) {
        message = message $i " "
    }
    
    # 错误统计
    if(level == "ERROR" || level == "FATAL") {
        analytics["errors"]["count"]++
        analytics["errors"]["by_type"][message]++
        
        # 提取错误类型
        if(match(message, /(Database|Network|Timeout|Permission)/)) {
            error_type = substr(message, RSTART, RLENGTH)
            analytics["errors"]["by_category"][error_type]++
        }
    }
}

function process_security_log() {
    # 安全日志处理
    timestamp = $1 " " $2
    event_type = $3
    details = ""
    for(i = 4; i <= NF; i++) {
        details = details $i " "
    }
    
    # 安全事件统计
    analytics["security"]["events"]++
    analytics["security"]["by_type"][event_type]++
    
    # IP地址分析
    if(match(details, /[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/)) {
        ip = substr(details, RSTART, RLENGTH)
        analytics["security"]["suspicious_ips"][ip]++
    }
}

function update_global_stats() {
    # 更新全局统计信息
    global_stats["lines_processed"]++
    
    # 每处理一定数量的行,更新进度
    if(global_stats["lines_processed"] % 50000 == 0) {
        print "处理进度:", global_stats["lines_processed"], "行" > "/dev/stderr"
    }
}

function generate_comprehensive_report() {
    end_timestamp = systime()
    duration = end_timestamp - start_timestamp
    
    print "\n" "=" x 80
    print "📊 " company " " environment " 环境综合分析报告"
    print "=" x 80
    print "分析时间:", strftime("%Y-%m-%d %H:%M:%S", start_timestamp), 
          "至", strftime("%Y-%m-%d %H:%M:%S", end_timestamp)
    print "分析耗时:", duration, "秒"
    print "处理行数:", global_stats["lines_processed"]
    print ""
    
    # Web访问统计
    if("apache" in analytics) {
        print "🌐 Web访问统计:"
        print "  总请求数:", analytics["apache"]["requests"]
        print "  总流量:", format_bytes(analytics["apache"]["bytes"])
        if(analytics["apache"]["requests"] > 0) {
            print "  平均请求大小:", 
                  format_bytes(analytics["apache"]["bytes"]/analytics["apache"]["requests"])
        }
        if(analytics["apache"]["response_count"] > 0) {
            print "  平均响应时间:", 
                  analytics["apache"]["response_times"]/analytics["apache"]["response_count"], "ms"
        }
        print ""
    }
    
    # 错误统计
    if("errors" in analytics && analytics["errors"]["count"] > 0) {
        print "❌ 错误统计:"
        print "  总错误数:", analytics["errors"]["count"]
        print "  错误类型分布:"
        for(error_type in analytics["errors"]["by_type"]) {
            count = analytics["errors"]["by_type"][error_type]
            print "    " error_type ":", count
        }
        print ""
    }
    
    # 安全事件
    if("security" in analytics && analytics["security"]["events"] > 0) {
        print "🛡️  安全事件:"
        print "  总事件数:", analytics["security"]["events"]
        print "  事件类型:"
        for(event_type in analytics["security"]["by_type"]) {
            count = analytics["security"]["by_type"][event_type]
            print "    " event_type ":", count
        }
        print ""
    }
    
    print "=" x 80
}

function output_json_data() {
    # 输出JSON格式数据用于实时监控
    print "{"
    print "  \"company\": \"" company "\","
    print "  \"environment\": \"" environment "\","
    print "  \"timestamp\": \"" strftime("%Y-%m-%d %H:%M:%S") "\","
    print "  \"metrics\": {"
    print "    \"requests_per_second\": " (analytics["apache"]["requests"]/duration) ","
    print "    \"error_rate\": " (analytics["errors"]["count"]*100/global_stats["lines_processed"]) ","
    print "    \"total_bytes\": " analytics["apache"]["bytes"]
    print "  }"
    print "}"
}

function send_alerts_if_needed() {
    # 根据配置发送告警
    error_rate = (analytics["errors"]["count"]*100/global_stats["lines_processed"])
    if(error_rate > 5) {  # 错误率超过5%告警
        print "🚨 高错误率告警: " error_rate "% > 5%" > "/dev/stderr"
        # system("echo '高错误率告警' | mail -s '系统告警' admin@company.com")
    }
}

function format_bytes(bytes) {
    units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
    unit = 1
    temp_bytes = bytes
    while(temp_bytes >= 1024 && unit < 4) {
        temp_bytes /= 1024
        unit++
    }
    return sprintf("%.1f %s", temp_bytes, units[unit])
}'

这些复杂的AWK应用场景展示了AWK在企业级系统中的强大能力。通过合理的设计和优化,AWK可以处理各种复杂的文本处理任务,成为系统管理员、数据分析师和DevOps工程师的重要工具。

此条目发表在linux文章分类目录,贴了标签。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注