AWK 复杂应用场景总结
一、大数据处理与分析
1. 流式日志分析系统
# 实时分析Web服务器日志,生成实时报表
awk -v interval=60 '
BEGIN {
start_time = systime()
OFMT = "%.2f"
}
{
# 解析Apache日志
ip = $1
status = $9
bytes = ($10 == "-" ? 0 : $10)
url = $7
# 统计指标
total_requests++
total_bytes += bytes
status_count[status]++
ip_count[ip]++
# 按URL分类统计
url_stats[url]["count"]++
url_stats[url]["bytes"] += bytes
# 每分钟输出统计
current_time = systime()
if(current_time - start_time >= interval) {
generate_report()
reset_counters()
start_time = current_time
}
}
END {
if(total_requests > 0) {
generate_report()
}
}
function generate_report() {
print strftime("%Y-%m-%d %H:%M:%S"), "=== 1分钟统计报告 ==="
print "总请求数:", total_requests
print "总流量:", format_bytes(total_bytes)
print "平均请求大小:", format_bytes(total_bytes/total_requests)
print "QPS:", total_requests/interval
# 状态码分布
print "\n状态码分布:"
for(status in status_count) {
print " " status ":", status_count[status],
sprintf("(%.1f%%)", status_count[status]*100/total_requests)
}
# Top 10 IP
print "\nTop 10 访问IP:"
sort_array_by_value(ip_count, "desc")
for(i = 1; i <= 10 && i <= length(sorted_keys); i++) {
ip = sorted_keys[i]
print " " ip ":", ip_count[ip],
sprintf("(%.1f%%)", ip_count[ip]*100/total_requests)
}
print "========================\n"
}
function reset_counters() {
delete status_count
delete ip_count
delete url_stats
total_requests = 0
total_bytes = 0
}
function format_bytes(bytes) {
units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
unit = 1
while(bytes >= 1024 && unit < 4) {
bytes /= 1024
unit++
}
return sprintf("%.2f %s", bytes, units[unit])
}'
2. 分布式数据聚合
# 处理分布在多个文件中的数据,进行全局统计
awk '
BEGIN {
# 初始化聚合变量
global_stats["total_records"] = 0
global_stats["total_amount"] = 0
global_stats["max_amount"] = 0
global_stats["min_amount"] = 999999999
}
# 处理每个文件的数据
FNR == 1 {
current_file = FILENAME
file_stats[current_file]["records"] = 0
file_stats[current_file]["amount"] = 0
}
{
# 假设第3列是金额
amount = $3 + 0
# 文件级别统计
file_stats[current_file]["records"]++
file_stats[current_file]["amount"] += amount
# 全局统计
global_stats["total_records"]++
global_stats["total_amount"] += amount
if(amount > global_stats["max_amount"]) {
global_stats["max_amount"] = amount
}
if(amount < global_stats["min_amount"]) {
global_stats["min_amount"] = amount
}
# 按类别统计(假设第2列是类别)
category_stats[$2]["count"]++
category_stats[$2]["amount"] += amount
}
END {
# 输出全局统计
print "=== 全局统计 ==="
print "总记录数:", global_stats["total_records"]
print "总金额:", global_stats["total_amount"]
print "平均金额:", global_stats["total_amount"]/global_stats["total_records"]
print "最大金额:", global_stats["max_amount"]
print "最小金额:", global_stats["min_amount"]
# 输出文件级别统计
print "\n=== 各文件统计 ==="
for(file in file_stats) {
print file ":"
print " 记录数:", file_stats[file]["records"]
print " 金额:", file_stats[file]["amount"]
print " 平均金额:", file_stats[file]["amount"]/file_stats[file]["records"]
}
# 输出类别统计
print "\n=== 类别统计 ==="
for(category in category_stats) {
print category ":"
print " 记录数:", category_stats[category]["count"]
print " 总金额:", category_stats[category]["amount"]
print " 平均金额:", category_stats[category]["amount"]/category_stats[category]["count"]
}
}'
二、系统监控与运维
1. 综合系统监控脚本
# 多维度系统监控工具
awk -v threshold_cpu=80 -v threshold_mem=80 -v threshold_disk=90 '
BEGIN {
# 收集系统信息
collect_system_info()
# 分析各项指标
analyze_cpu()
analyze_memory()
analyze_disk()
analyze_network()
# 生成报告
generate_system_report()
# 检查告警
check_alerts()
}
function collect_system_info() {
# CPU信息
cmd = "top -bn1 | grep '%Cpu(s)'"
if((cmd | getline) > 0) {
system_info["cpu_usage"] = extract_cpu_usage($0)
}
close(cmd)
# 内存信息
cmd = "free | grep Mem"
if((cmd | getline) > 0) {
system_info["mem_total"] = $2
system_info["mem_used"] = $3
system_info["mem_usage"] = ($3/$2)*100
}
close(cmd)
# 磁盘信息
cmd = "df -h | grep -E '^/dev/'"
disk_index = 0
while((cmd | getline) > 0) {
disk_info[disk_index]["filesystem"] = $1
disk_info[disk_index]["size"] = $2
disk_info[disk_index]["used"] = $3
disk_info[disk_index]["available"] = $4
disk_info[disk_index]["usage"] = substr($5, 1, length($5)-1) + 0
disk_info[disk_index]["mount"] = $6
disk_index++
}
close(cmd)
# 网络连接
cmd = "netstat -an | grep ESTABLISHED | wc -l"
if((cmd | getline) > 0) {
system_info["active_connections"] = $0 + 0
}
close(cmd)
# 系统负载
cmd = "uptime"
if((cmd | getline) > 0) {
system_info["load_average"] = extract_load_average($0)
}
close(cmd)
}
function analyze_cpu() {
cpu_usage = system_info["cpu_usage"]
if(cpu_usage > threshold_cpu) {
alerts["cpu"] = "CPU使用率过高: " cpu_usage "%"
}
}
function analyze_memory() {
mem_usage = system_info["mem_usage"]
if(mem_usage > threshold_mem) {
alerts["memory"] = "内存使用率过高: " sprintf("%.1f", mem_usage) "%"
}
}
function analyze_disk() {
for(i = 0; i < disk_index; i++) {
usage = disk_info[i]["usage"]
if(usage > threshold_disk) {
alerts["disk_" i] = "磁盘 " disk_info[i]["mount"] " 使用率过高: " usage "%"
}
}
}
function analyze_network() {
connections = system_info["active_connections"]
if(connections > 1000) { # 假设1000个连接为阈值
alerts["network"] = "活跃连接数过多: " connections
}
}
function generate_system_report() {
print "=== 系统监控报告 ==="
print "生成时间:", strftime("%Y-%m-%d %H:%M:%S")
print ""
# CPU信息
print "CPU使用率:", system_info["cpu_usage"], "%"
# 内存信息
print "内存使用情况:"
print " 总量:", format_bytes(system_info["mem_total"]*1024)
print " 已用:", format_bytes(system_info["mem_used"]*1024)
print " 使用率:", sprintf("%.1f", system_info["mem_usage"]), "%"
# 磁盘信息
print "\n磁盘使用情况:"
for(i = 0; i < disk_index; i++) {
print " " disk_info[i]["mount"] ":",
disk_info[i]["used"], "/", disk_info[i]["size"],
"(" disk_info[i]["usage"] "%)"
}
# 网络信息
print "\n网络连接:", system_info["active_connections"]
# 系统负载
print "系统负载:", system_info["load_average"]
}
function check_alerts() {
if(length(alerts) > 0) {
print "\n=== 告警信息 ==="
for(alert_type in alerts) {
print "⚠️ " alerts[alert_type]
}
# 可以在这里添加告警通知逻辑
# send_alert_email(alerts)
} else {
print "\n✅ 系统状态正常"
}
}
function extract_cpu_usage(line) {
# 从top输出中提取CPU使用率
if(match(line, /[0-9.]+ id/)) {
idle = substr(line, RSTART, RLENGTH-3) + 0
return 100 - idle
}
return 0
}
function extract_load_average(line) {
# 从uptime输出中提取负载平均值
if(match(line, /load average: [0-9., ]+/)) {
load_str = substr(line, RSTART+14, RLENGTH-14)
gsub(/,/, "", load_str)
return load_str
}
return "N/A"
}
function format_bytes(bytes) {
units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
unit = 1
temp_bytes = bytes
while(temp_bytes >= 1024 && unit < 4) {
temp_bytes /= 1024
unit++
}
return sprintf("%.1f %s", temp_bytes, units[unit])
}'
三、网络安全与审计
1. 入侵检测系统
# 分析安全日志,检测潜在威胁
awk -f intrusion_detection.awk /var/log/auth.log /var/log/syslog
# intrusion_detection.awk 内容:
BEGIN {
# 定义威胁模式
threat_patterns["failed_login"] = "Failed password"
threat_patterns["invalid_user"] = "Invalid user"
threat_patterns["break_in"] = "POSSIBLE BREAK-IN ATTEMPT"
threat_patterns["root_login"] = "Accepted .* for root"
# 初始化统计
start_time = systime()
OFMT = "%.0f"
}
{
# 检查每种威胁模式
for(threat_type in threat_patterns) {
pattern = threat_patterns[threat_type]
if($0 ~ pattern) {
threats[threat_type]++
threat_details[threat_type][threats[threat_type]] = $0
threat_times[threat_type][threats[threat_type]] = FNR
# 记录IP地址(如果存在)
if(match($0, /[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/)) {
ip = substr($0, RSTART, RLENGTH)
threat_ips[threat_type][ip]++
}
}
}
# 统计IP访问频率
if(match($0, /[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/)) {
ip = substr($0, RSTART, RLENGTH)
ip_activity[ip]++
# 检测异常高频访问
if(ip_activity[ip] > 100 && (systime() - start_time) < 3600) {
suspicious_ips[ip] = ip_activity[ip]
}
}
}
END {
# 生成安全报告
print "=== 安全审计报告 ==="
print "分析时间:", strftime("%Y-%m-%d %H:%M:%S")
print "分析文件:", ARGV[1]
print ""
# 输出威胁统计
total_threats = 0
for(threat_type in threats) {
count = threats[threat_type]
total_threats += count
print threat_type ":", count, "次"
# 显示Top 3详细信息
print " 详细信息 (Top 3):"
for(i = 1; i <= 3 && i <= count; i++) {
print " " threat_details[threat_type][i]
}
if(count > 3) {
print " ... 还有", count-3, "条记录"
}
print ""
}
# 输出可疑IP
if(length(suspicious_ips) > 0) {
print "⚠️ 可疑IP地址 (1小时内访问超过100次):"
for(ip in suspicious_ips) {
print " " ip ":", suspicious_ips[ip], "次"
}
print ""
}
# 输出威胁IP统计
print "威胁源IP统计:"
for(threat_type in threat_ips) {
print " " threat_type ":"
sort_array_by_count(threat_ips[threat_type])
for(i = 1; i <= 5 && i <= length(sorted_array); i++) {
ip = sorted_array[i]
print " " ip ":", threat_ips[threat_type][ip], "次"
}
}
print "\n总计发现威胁:", total_threats, "次"
# 如果威胁较多,建议采取措施
if(total_threats > 50) {
print "\n🚨 高风险: 建议立即检查防火墙规则和系统安全配置"
}
}
function sort_array_by_count(array) {
delete sorted_array
count = 0
for(key in array) {
temp_array[++count] = key
}
# 按值排序
for(i = 1; i <= count; i++) {
max_key = temp_array[i]
max_val = array[temp_array[i]]
max_index = i
for(j = i+1; j <= count; j++) {
if(array[temp_array[j]] > max_val) {
max_val = array[temp_array[j]]
max_key = temp_array[j]
max_index = j
}
}
# 交换
temp = temp_array[i]
temp_array[i] = temp_array[max_index]
temp_array[max_index] = temp
sorted_array[i] = max_key
}
}'
四、数据科学与机器学习预处理
1. 特征工程工具
# 数据预处理和特征工程
awk -F',' '
BEGIN {
# 数据类型检测和处理
OFS = ","
}
NR == 1 {
# 处理表头
header = $0
print "原始表头:", header > "/dev/stderr"
# 记录字段数量
field_count = NF
for(i = 1; i <= NF; i++) {
field_names[i] = $i
field_stats[i]["missing"] = 0
field_stats[i]["type"] = "unknown"
}
next
}
{
# 数据质量检查
for(i = 1; i <= field_count; i++) {
value = $i
# 缺失值检查
if(value == "" || value == "NULL" || value == "N/A") {
field_stats[i]["missing"]++
missing_data[NR,i] = 1
} else {
# 数据类型推断
infer_data_type(i, value)
# 统计信息收集
collect_statistics(i, value)
}
}
# 异常值检测
detect_outliers()
# 数据清洗
clean_data()
}
END {
# 生成数据质量报告
generate_quality_report()
# 输出清洗后的数据
output_cleaned_data()
}
function infer_data_type(field_index, value) {
# 数值类型检测
if(value ~ /^-?[0-9]+\.?[0-9]*$/) {
if(field_stats[field_index]["type"] == "unknown") {
field_stats[field_index]["type"] = "numeric"
} else if(field_stats[field_index]["type"] != "numeric") {
field_stats[field_index]["type"] = "mixed"
}
}
# 日期类型检测
else if(value ~ /^[0-9]{4}-[0-9]{2}-[0-9]{2}/) {
field_stats[field_index]["type"] = "date"
}
# 分类类型
else {
field_stats[field_index]["type"] = "categorical"
field_stats[field_index]["categories"][value]++
}
}
function collect_statistics(field_index, value) {
if(field_stats[field_index]["type"] == "numeric") {
num_value = value + 0
field_stats[field_index]["sum"] += num_value
field_stats[field_index]["count"]++
if(field_stats[field_index]["min"] == "" || num_value < field_stats[field_index]["min"]) {
field_stats[field_index]["min"] = num_value
}
if(field_stats[field_index]["max"] == "" || num_value > field_stats[field_index]["max"]) {
field_stats[field_index]["max"] = num_value
}
# 存储值用于计算方差
field_stats[field_index]["values"][field_stats[field_index]["count"]] = num_value
}
}
function detect_outliers() {
# 使用3σ原则检测异常值
for(i = 1; i <= field_count; i++) {
if(field_stats[i]["type"] == "numeric" && field_stats[i]["count"] > 10) {
mean = field_stats[i]["sum"] / field_stats[i]["count"]
# 计算方差
variance = 0
for(j = 1; j <= field_stats[i]["count"]; j++) {
diff = field_stats[i]["values"][j] - mean
variance += diff * diff
}
variance /= field_stats[i]["count"]
std_dev = sqrt(variance)
# 检查当前值是否为异常值
current_value = $i + 0
if(abs(current_value - mean) > 3 * std_dev) {
outliers[NR,i] = 1
field_stats[i]["outliers"]++
}
}
}
}
function clean_data() {
# 处理缺失值
for(i = 1; i <= field_count; i++) {
if((NR,i) in missing_data) {
# 数值型用均值填充
if(field_stats[i]["type"] == "numeric") {
$i = field_stats[i]["sum"] / field_stats[i]["count"]
}
# 分类型用众数填充
else if(field_stats[i]["type"] == "categorical") {
$i = get_mode(field_stats[i]["categories"])
}
}
}
# 标准化数值型数据
for(i = 1; i <= field_count; i++) {
if(field_stats[i]["type"] == "numeric") {
mean = field_stats[i]["sum"] / field_stats[i]["count"]
# 简单的Min-Max标准化
range = field_stats[i]["max"] - field_stats[i]["min"]
if(range > 0) {
$i = ($i - field_stats[i]["min"]) / range
}
}
}
}
function generate_quality_report() {
print "=== 数据质量报告 ===" > "/dev/stderr"
print "总记录数:", NR-1 > "/dev/stderr"
print "字段数:", field_count > "/dev/stderr"
print "" > "/dev/stderr"
for(i = 1; i <= field_count; i++) {
print "字段", i, "(", field_names[i], "):" > "/dev/stderr"
print " 数据类型:", field_stats[i]["type"] > "/dev/stderr"
print " 缺失值:", field_stats[i]["missing"] > "/dev/stderr"
if(field_stats[i]["type"] == "numeric") {
print " 最小值:", field_stats[i]["min"] > "/dev/stderr"
print " 最大值:", field_stats[i]["max"] > "/dev/stderr"
print " 平均值:", field_stats[i]["sum"]/field_stats[i]["count"] > "/dev/stderr"
if("outliers" in field_stats[i]) {
print " 异常值数量:", field_stats[i]["outliers"] > "/dev/stderr"
}
}
print "" > "/dev/stderr"
}
}
function output_cleaned_data() {
print header # 输出处理后的表头
# 实际数据在主处理流程中已经输出
}
function abs(x) {
return (x < 0) ? -x : x
}
function get_mode(categories) {
max_count = 0
mode_value = ""
for(value in categories) {
if(categories[value] > max_count) {
max_count = categories[value]
mode_value = value
}
}
return mode_value
}'
五、DevOps自动化工具
1. 持续集成/持续部署(CI/CD)流水线工具
# CI/CD 流水线监控和报告生成器
awk '
BEGIN {
# 流水线配置
pipeline_stages["build"] = "构建"
pipeline_stages["test"] = "测试"
pipeline_stages["deploy"] = "部署"
pipeline_stages["verify"] = "验证"
current_pipeline = ""
start_time = systime()
}
# 解析CI/CD日志
/^Pipeline: / {
current_pipeline = $2
pipeline_start[current_pipeline] = systime()
print "🚀 开始执行流水线:", current_pipeline
}
/^Stage: ([a-zA-Z]+) (STARTED|FINISHED|FAILED)/ {
stage = $2
status = $3
timestamp = systime()
if(status == "STARTED") {
stage_start[current_pipeline,stage] = timestamp
print " ⏱️ 阶段开始:", pipeline_stages[stage]
} else if(status == "FINISHED") {
duration = timestamp - stage_start[current_pipeline,stage]
stage_duration[current_pipeline,stage] = duration
print " ✅ 阶段完成:", pipeline_stages[stage],
"(" duration "秒)"
} else if(status == "FAILED") {
duration = timestamp - stage_start[current_pipeline,stage]
stage_duration[current_pipeline,stage] = duration
stage_failed[current_pipeline,stage] = 1
print " ❌ 阶段失败:", pipeline_stages[stage],
"(" duration "秒)"
}
}
/^Artifact: (.+) Size: ([0-9]+) bytes/ {
artifact_name = $2
artifact_size = $4
artifacts[current_pipeline,artifact_name] = artifact_size
print " 📦 生成制品:", artifact_name,
"(" format_bytes(artifact_size) ")"
}
/^Test Results: ([0-9]+) passed, ([0-9]+) failed, ([0-9]+) skipped/ {
passed = $3
failed = $5
skipped = $8
total_tests = passed + failed + skipped
test_results[current_pipeline,"passed"] = passed
test_results[current_pipeline,"failed"] = failed
test_results[current_pipeline,"skipped"] = skipped
success_rate = (total_tests > 0) ? (passed/total_tests)*100 : 0
print " 🧪 测试结果:", passed "通过,", failed "失败,", skipped "跳过"
print " 📊 成功率:", sprintf("%.1f", success_rate) "%"
}
END {
# 生成最终报告
generate_ci_report()
}
function generate_ci_report() {
print "\n" "=" x 50
print "CI/CD 流水线执行报告"
print "=" x 50
for(pipeline in pipeline_start) {
print "\n📋 流水线:", pipeline
print "开始时间:", strftime("%Y-%m-%d %H:%M:%S", pipeline_start[pipeline])
total_duration = 0
all_passed = 1
for(stage in pipeline_stages) {
if((pipeline,stage) in stage_duration) {
duration = stage_duration[pipeline,stage]
total_duration += duration
status_icon = ((pipeline,stage) in stage_failed) ? "❌" : "✅"
print " " status_icon, pipeline_stages[stage] ":", duration "秒"
if((pipeline,stage) in stage_failed) {
all_passed = 0
}
}
}
print "总耗时:", total_duration "秒"
print "最终状态:", (all_passed ? "✅ 成功" : "❌ 失败")
# 测试结果
if((pipeline,"passed") in test_results) {
passed = test_results[pipeline,"passed"]
failed = test_results[pipeline,"failed"]
skipped = test_results[pipeline,"skipped"]
total = passed + failed + skipped
print "测试统计:", passed "通过,", failed "失败,", skipped "跳过"
if(total > 0) {
success_rate = (passed/total)*100
print "成功率:", sprintf("%.1f", success_rate) "%"
}
}
# 制品信息
artifact_count = 0
total_size = 0
for(key in artifacts) {
split(key, parts, SUBSEP)
if(parts[1] == pipeline) {
artifact_count++
total_size += artifacts[key]
}
}
if(artifact_count > 0) {
print "生成制品:", artifact_count "个, 总大小:", format_bytes(total_size)
}
}
print "\n" "=" x 50
print "报告生成时间:", strftime("%Y-%m-%d %H:%M:%S")
print "=" x 50
}
function format_bytes(bytes) {
units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
unit = 1
temp_bytes = bytes
while(temp_bytes >= 1024 && unit < 4) {
temp_bytes /= 1024
unit++
}
return sprintf("%.1f %s", temp_bytes, units[unit])
}'
六、性能优化最佳实践
1. AWK脚本性能分析工具
# AWK脚本性能分析器
awk -p performance_profile.txt '
BEGIN {
# 性能监控开始
start_time = systime()
start_timestamp = gettimeofday()
# 内存使用监控(通过系统调用)
initial_memory = get_memory_usage()
}
# 主处理逻辑
{
# 记录处理速度
lines_processed++
# 每处理10000行输出一次进度
if(lines_processed % 10000 == 0) {
current_time = systime()
rate = lines_processed / (current_time - start_time + 1)
print "已处理:", lines_processed, "行, 速度:", rate, "行/秒" > "/dev/stderr"
}
# 实际数据处理逻辑
process_data()
}
END {
# 性能统计
end_time = systime()
end_timestamp = gettimeofday()
final_memory = get_memory_usage()
total_time = end_time - start_time
total_time_precise = end_timestamp - start_timestamp
print "\n=== 性能分析报告 ===" > "/dev/stderr"
print "总处理行数:", lines_processed > "/dev/stderr"
print "总耗时:", total_time, "秒 (精确:", sprintf("%.3f", total_time_precise), "秒)" > "/dev/stderr"
if(total_time > 0) {
print "平均处理速度:", lines_processed/total_time, "行/秒" > "/dev/stderr"
}
print "内存使用变化:", initial_memory, "->", final_memory,
"(", final_memory - initial_memory, ")" > "/dev/stderr"
# 函数调用统计(如果使用了自定义函数)
if("function_calls" in SYMTAB) {
print "函数调用次数:", function_calls > "/dev/stderr"
}
}
function process_data() {
# 模拟复杂的数据处理
# 这里应该是实际的业务逻辑
# 性能优化技巧示例:
# 1. 避免重复计算
# 不好的做法:length($1) 在条件中重复计算
# 好的做法:
field1_length = length($1)
if(field1_length > 10) {
# 处理逻辑
}
# 2. 使用哈希表优化查找
if($2 in lookup_table) {
# O(1) 查找
}
# 3. 批量处理减少I/O
# 收集数据到数组中,最后统一处理
# 4. 避免不必要的字符串操作
# 预编译正则表达式
if(!regex_compiled) {
regex_pattern = "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
regex_compiled = 1
}
if($1 ~ regex_pattern) {
# 处理日期格式
}
# 5. 使用适当的数值格式
OFMT = "%.6f" # 控制浮点数精度
}
function get_memory_usage() {
# 获取当前进程内存使用情况
cmd = "ps -o rss= -p " PROCINFO["pid"]
if((cmd | getline mem_kb) > 0) {
close(cmd)
return mem_kb + 0 # 转换为数字
}
close(cmd)
return 0
}
function gettimeofday() {
# 获取高精度时间戳
cmd = "date +%s.%N"
if((cmd | getline timestamp) > 0) {
close(cmd)
return timestamp + 0
}
close(cmd)
return systime()
}'
七、综合应用案例
1. 企业级日志分析平台
# 企业级多维度日志分析系统
awk -v company="MyCompany" -v environment="production" '
BEGIN {
# 系统配置
config["company"] = company
config["environment"] = environment
config["timezone"] = "Asia/Shanghai"
# 初始化各种统计器
init_analytics()
# 启动时间
start_timestamp = systime()
print "🚀 启动 " company " " environment " 环境日志分析系统" > "/dev/stderr"
print "开始时间:", strftime("%Y-%m-%d %H:%M:%S", start_timestamp) > "/dev/stderr"
}
# 多种日志格式处理
{
# 自动识别日志类型
log_type = identify_log_type($0)
if(log_type == "apache") {
process_apache_log()
} else if(log_type == "nginx") {
process_nginx_log()
} else if(log_type == "application") {
process_app_log()
} else if(log_type == "security") {
process_security_log()
} else {
process_generic_log()
}
# 更新全局统计
update_global_stats()
}
END {
# 生成综合报告
generate_comprehensive_report()
# 输出JSON格式的实时数据(用于仪表板)
output_json_data()
# 发送告警(如果需要)
send_alerts_if_needed()
}
function init_analytics() {
# 初始化各种分析模块
init_performance_analytics()
init_security_analytics()
init_business_analytics()
init_user_behavior_analytics()
}
function identify_log_type(line) {
# 通过正则表达式识别日志类型
if(line ~ /^([0-9]{1,3}\.){3}[0-9]{1,3} - - \[/) {
return "apache"
} else if(line ~ /^[0-9]{4}\/[0-9]{2}\/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[/) {
return "nginx"
} else if(line ~ /ERROR|WARN|INFO|DEBUG/) {
return "application"
} else if(line ~ /SECURITY|AUTH|LOGIN|FAILED/) {
return "security"
} else {
return "generic"
}
}
function process_apache_log() {
# Apache日志处理逻辑
ip = $1
timestamp = $4
method = $6
url = $7
status = $9
bytes = ($10 == "-" ? 0 : $10)
user_agent = $12
# 性能统计
analytics["apache"]["requests"]++
analytics["apache"]["bytes"] += bytes
analytics["apache"]["status"][status]++
# 用户行为分析
analytics["users"][ip]["requests"]++
analytics["users"][ip]["bytes"] += bytes
# URL分析
analytics["urls"][url]["count"]++
analytics["urls"][url]["bytes"] += bytes
# 响应时间分析(如果日志中包含)
if($NF ~ /([0-9]+)ms/) {
response_time = substr($NF, 1, length($NF)-2) + 0
analytics["apache"]["response_times"] += response_time
analytics["apache"]["response_count"]++
}
}
function process_nginx_log() {
# Nginx日志处理逻辑
# 类似Apache处理,但格式略有不同
ip = $1
timestamp = $4
# ... 处理逻辑
}
function process_app_log() {
# 应用程序日志处理
timestamp = $1 " " $2
level = $3
message = ""
for(i = 4; i <= NF; i++) {
message = message $i " "
}
# 错误统计
if(level == "ERROR" || level == "FATAL") {
analytics["errors"]["count"]++
analytics["errors"]["by_type"][message]++
# 提取错误类型
if(match(message, /(Database|Network|Timeout|Permission)/)) {
error_type = substr(message, RSTART, RLENGTH)
analytics["errors"]["by_category"][error_type]++
}
}
}
function process_security_log() {
# 安全日志处理
timestamp = $1 " " $2
event_type = $3
details = ""
for(i = 4; i <= NF; i++) {
details = details $i " "
}
# 安全事件统计
analytics["security"]["events"]++
analytics["security"]["by_type"][event_type]++
# IP地址分析
if(match(details, /[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/)) {
ip = substr(details, RSTART, RLENGTH)
analytics["security"]["suspicious_ips"][ip]++
}
}
function update_global_stats() {
# 更新全局统计信息
global_stats["lines_processed"]++
# 每处理一定数量的行,更新进度
if(global_stats["lines_processed"] % 50000 == 0) {
print "处理进度:", global_stats["lines_processed"], "行" > "/dev/stderr"
}
}
function generate_comprehensive_report() {
end_timestamp = systime()
duration = end_timestamp - start_timestamp
print "\n" "=" x 80
print "📊 " company " " environment " 环境综合分析报告"
print "=" x 80
print "分析时间:", strftime("%Y-%m-%d %H:%M:%S", start_timestamp),
"至", strftime("%Y-%m-%d %H:%M:%S", end_timestamp)
print "分析耗时:", duration, "秒"
print "处理行数:", global_stats["lines_processed"]
print ""
# Web访问统计
if("apache" in analytics) {
print "🌐 Web访问统计:"
print " 总请求数:", analytics["apache"]["requests"]
print " 总流量:", format_bytes(analytics["apache"]["bytes"])
if(analytics["apache"]["requests"] > 0) {
print " 平均请求大小:",
format_bytes(analytics["apache"]["bytes"]/analytics["apache"]["requests"])
}
if(analytics["apache"]["response_count"] > 0) {
print " 平均响应时间:",
analytics["apache"]["response_times"]/analytics["apache"]["response_count"], "ms"
}
print ""
}
# 错误统计
if("errors" in analytics && analytics["errors"]["count"] > 0) {
print "❌ 错误统计:"
print " 总错误数:", analytics["errors"]["count"]
print " 错误类型分布:"
for(error_type in analytics["errors"]["by_type"]) {
count = analytics["errors"]["by_type"][error_type]
print " " error_type ":", count
}
print ""
}
# 安全事件
if("security" in analytics && analytics["security"]["events"] > 0) {
print "🛡️ 安全事件:"
print " 总事件数:", analytics["security"]["events"]
print " 事件类型:"
for(event_type in analytics["security"]["by_type"]) {
count = analytics["security"]["by_type"][event_type]
print " " event_type ":", count
}
print ""
}
print "=" x 80
}
function output_json_data() {
# 输出JSON格式数据用于实时监控
print "{"
print " \"company\": \"" company "\","
print " \"environment\": \"" environment "\","
print " \"timestamp\": \"" strftime("%Y-%m-%d %H:%M:%S") "\","
print " \"metrics\": {"
print " \"requests_per_second\": " (analytics["apache"]["requests"]/duration) ","
print " \"error_rate\": " (analytics["errors"]["count"]*100/global_stats["lines_processed"]) ","
print " \"total_bytes\": " analytics["apache"]["bytes"]
print " }"
print "}"
}
function send_alerts_if_needed() {
# 根据配置发送告警
error_rate = (analytics["errors"]["count"]*100/global_stats["lines_processed"])
if(error_rate > 5) { # 错误率超过5%告警
print "🚨 高错误率告警: " error_rate "% > 5%" > "/dev/stderr"
# system("echo '高错误率告警' | mail -s '系统告警' admin@company.com")
}
}
function format_bytes(bytes) {
units[1] = "B"; units[2] = "KB"; units[3] = "MB"; units[4] = "GB"
unit = 1
temp_bytes = bytes
while(temp_bytes >= 1024 && unit < 4) {
temp_bytes /= 1024
unit++
}
return sprintf("%.1f %s", temp_bytes, units[unit])
}'
这些复杂的AWK应用场景展示了AWK在企业级系统中的强大能力。通过合理的设计和优化,AWK可以处理各种复杂的文本处理任务,成为系统管理员、数据分析师和DevOps工程师的重要工具。