返回正文
搭建安全网关-基于云
🚀 第八部分:云原生安全架构演进
Ingress安全策略
传统Nginx vs 云原生架构的挑战:
在传统架构中,我们直接管理Nginx实例,但在Kubernetes环境中,安全边界变得更加复杂。间的网络通信、服务发现、动态扩缩容都带来了新的安全挑战。
Kubernetes网络安全模型:
yaml
# NetworkPolicy:网络层访问控制
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: nginx-ingress-network-policy
namespace: production
spec:
podSelector:
matchLabels:
app: nginx-ingress
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: production
- podSelector:
matchLabels:
role: frontend
ports:
- protocol: TCP
port: 80
- protocol: TCP
port: 443
egress:
- to:
- namespaceSelector:
matchLabels:
name: production
ports:
- protocol: TCP
port: 8080 # 后端服务端口
- protocol: TCP
port: 53 # DNS查询
- protocol: UDP
port: 531
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Ingress Controller安全配置:
yaml
# nginx-ingress-controller安全部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-ingress-controller
namespace: ingress-nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx-ingress
template:
metadata:
labels:
app: nginx-ingress
spec:
serviceAccountName: nginx-ingress-serviceaccount
securityContext:
runAsNonRoot: true
runAsUser: 101
fsGroup: 101
containers:
- name: nginx-ingress-controller
image: k8s.gcr.io/ingress-nginx/controller:v1.8.1
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
add:
- NET_BIND_SERVICE
args:
- /nginx-ingress-controller
- --configmap=$(POD_NAMESPACE)/nginx-configuration
- --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
- --udp-services-configmap=$(POD_NAMESPACE)/udp-services
- --annotations-prefix=nginx.ingress.kubernetes.io
- --enable-ssl-passthrough
- --ssl-passthrough-proxy-port=442
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
ports:
- name: http
containerPort: 80
protocol: TCP
- name: https
containerPort: 443
protocol: TCP
- name: webhook
containerPort: 8443
protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: 10254
scheme: HTTP
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /healthz
port: 10254
scheme: HTTP
initialDelaySeconds: 10
periodSeconds: 10
resources:
requests:
cpu: 100m
memory: 90Mi
limits:
cpu: 200m
memory: 180Mi1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
Pod安全策略(PodSecurityPolicy):
yaml
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
name: nginx-ingress-psp
spec:
privileged: false
allowPrivilegeEscalation: false
requiredDropCapabilities:
- ALL
allowedCapabilities:
- NET_BIND_SERVICE
volumes:
- 'configMap'
- 'emptyDir'
- 'projected'
- 'secret'
- 'downwardAPI'
- 'persistentVolumeClaim'
hostNetwork: false
hostIPC: false
hostPID: false
runAsUser:
rule: 'MustRunAsNonRoot'
supplementalGroups:
rule: 'MustRunAs'
ranges:
- min: 1
max: 65535
fsGroup:
rule: 'MustRunAs'
ranges:
- min: 1
max: 65535
readOnlyRootFilesystem: true
seLinux:
rule: 'RunAsAny'1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
容器化Nginx安全最佳实践
最小权限容器镜像构建:
dockerfile
# 多阶段构建安全Nginx镜像
FROM alpine:3.18 AS builder
# 安装构建依赖
RUN apk add --no-cache \
gcc \
g++ \
make \
pcre-dev \
zlib-dev \
openssl-dev \
geoip-dev
# 下载并编译Nginx(包含安全模块)
ENV NGINX_VERSION=1.25.3
RUN wget http://nginx.org/download/nginx-${NGINX_VERSION}.tar.gz && \
tar -xzf nginx-${NGINX_VERSION}.tar.gz && \
cd nginx-${NGINX_VERSION} && \
./configure \
--prefix=/etc/nginx \
--sbin-path=/usr/sbin/nginx \
--conf-path=/etc/nginx/nginx.conf \
--error-log-path=/var/log/nginx/error.log \
--http-log-path=/var/log/nginx/access.log \
--pid-path=/var/run/nginx.pid \
--lock-path=/var/run/nginx.lock \
--with-http_ssl_module \
--with-http_v2_module \
--with-http_realip_module \
--with-http_geoip_module \
--with-http_secure_link_module \
--with-http_sub_module \
--with-http_stub_status_module \
--with-stream \
--with-stream_ssl_module \
--with-stream_ssl_preread_module && \
make && make install
# 生产镜像
FROM alpine:3.18
# 创建非特权用户
RUN addgroup -g 101 -S nginx && \
adduser -S -D -H -u 101 -h /var/cache/nginx -s /sbin/nologin -G nginx -g nginx nginx
# 安装运行时依赖
RUN apk add --no-cache \
pcre \
zlib \
openssl \
geoip \
ca-certificates \
tzdata
# 复制编译好的Nginx
COPY --from=builder /etc/nginx /etc/nginx
COPY --from=builder /usr/sbin/nginx /usr/sbin/nginx
COPY --from=builder /var/log/nginx /var/log/nginx
# 复制配置文件
COPY nginx.conf /etc/nginx/nginx.conf
COPY security.conf /etc/nginx/conf.d/security.conf
# 设置文件权限
RUN chown -R nginx:nginx /etc/nginx && \
chown -R nginx:nginx /var/log/nginx && \
chown -R nginx:nginx /var/cache/nginx && \
chmod 644 /etc/nginx/nginx.conf && \
chmod 644 /etc/nginx/conf.d/security.conf
# 创建必要的目录
RUN mkdir -p /var/cache/nginx/client_temp && \
mkdir -p /var/cache/nginx/proxy_temp && \
mkdir -p /var/cache/nginx/fastcgi_temp && \
mkdir -p /var/cache/nginx/uwsgi_temp && \
mkdir -p /var/cache/nginx/scgi_temp && \
chown -R nginx:nginx /var/cache/nginx
# 健康检查脚本
COPY healthcheck.sh /usr/local/bin/healthcheck.sh
RUN chmod +x /usr/local/bin/healthcheck.sh
# 切换到非特权用户
USER nginx
# 暴露端口
EXPOSE 8080 8443
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD /usr/local/bin/healthcheck.sh
# 启动命令
ENTRYPOINT ["nginx", "-g", "daemon off;"]1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
健康检查脚本:
bash
#!/bin/sh
# healthcheck.sh
set -e
# 检查Nginx进程
if ! pgrep -x "nginx" > /dev/null; then
echo "Nginx process not running"
exit 1
fi
# 检查配置文件语法
if ! nginx -t > /dev/null 2>&1; then
echo "Nginx configuration test failed"
exit 1
fi
# 检查监听端口
if ! netstat -ln | grep -q ":8080 "; then
echo "Nginx not listening on port 8080"
exit 1
fi
# 测试HTTP响应
if ! wget -q -O /dev/null -T 5 http://localhost:8080/health; then
echo "Nginx health check endpoint failed"
exit 1
fi
echo "Health check passed"
exit 01
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Service Mesh集成安全方案
Istio环境下的安全策略:
yaml
# Istio安全策略配置
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: nginx-ingress-peer-auth
namespace: ingress-nginx
spec:
selector:
matchLabels:
app: nginx-ingress
mtls:
mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: nginx-ingress-authz
namespace: ingress-nginx
spec:
selector:
matchLabels:
app: nginx-ingress
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/frontend"]
- source:
ipBlocks: ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]
to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]
paths: ["/api/*", "/health", "/metrics"]
- from:
- source:
principals: ["cluster.local/ns/monitoring/sa/prometheus"]
to:
- operation:
methods: ["GET"]
paths: ["/metrics", "/health"]
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: nginx-ingress-destination-rule
namespace: ingress-nginx
spec:
host: nginx-ingress-service.ingress-nginx.svc.cluster.local
trafficPolicy:
tls:
mode: ISTIO_MUTUAL
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
http2MaxRequests: 100
maxRequestsPerConnection: 10
loadBalancer:
simple: LEAST_REQUEST
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
minHealthPercent: 301
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
🤖 第九部分:AI驱动智能防护
机器学习异常检测
基于流量的异常检测系统:
python
#!/usr/bin/env python3
# ml_anomaly_detector.py
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import json
import logging
from datetime import datetime, timedelta
import redis
import asyncio
import aiohttp
class NginxAnomalyDetector:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.model = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
self.is_fitted = False
# 特征定义
self.features = [
'request_rate', 'response_time', 'status_4xx_ratio',
'status_5xx_ratio', 'unique_ips', 'payload_size_avg',
'user_agent_entropy', 'path_depth_avg', 'query_params_count'
]
def extract_features_from_log(self, log_data):
"""从Nginx日志提取特征"""
df = pd.DataFrame(log_data)
features = {}
features['request_rate'] = len(df) / 60 # 每分钟请求数
features['response_time'] = df['request_time'].mean() if 'request_time' in df else 0.1
features['status_4xx_ratio'] = (df['status'] >= 400).sum() / len(df) if len(df) > 0 else 0
features['status_5xx_ratio'] = (df['status'] >= 500).sum() / len(df) if len(df) > 0 else 0
features['unique_ips'] = df['remote_addr'].nunique()
features['payload_size_avg'] = df['body_bytes_sent'].mean() if 'body_bytes_sent' in df else 0
features['user_agent_entropy'] = self._calculate_entropy(df['http_user_agent'].dropna())
features['path_depth_avg'] = df['request_uri'].apply(lambda x: x.count('/') if pd.notna(x) else 0).mean()
features['query_params_count'] = df['request_uri'].apply(lambda x: x.count('?') + x.count('&') if pd.notna(x) else 0).mean()
return features
def _calculate_entropy(self, series):
"""计算信息熵"""
if len(series) == 0:
return 0
value_counts = series.value_counts()
probabilities = value_counts / len(series)
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))
return entropy
def train_model(self, historical_data):
"""训练异常检测模型"""
features_list = []
for log_entry in historical_data:
features = self.extract_features_from_log([log_entry])
features_list.append([features[f] for f in self.features])
X = np.array(features_list)
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
self.is_fitted = True
logging.info(f"模型训练完成,使用{len(historical_data)}条历史数据")
def predict_anomaly(self, log_data):
"""预测异常"""
if not self.is_fitted:
return False, 0.0
features = self.extract_features_from_log([log_data])
feature_vector = np.array([[features[f] for f in self.features]])
feature_scaled = self.scaler.transform(feature_vector)
anomaly_score = self.model.decision_function(feature_scaled)[0]
is_anomaly = self.model.predict(feature_scaled)[0] == -1
return is_anomaly, anomaly_score
async def real_time_monitoring(self):
"""实时监控Nginx日志"""
logging.info("开始实时监控Nginx日志...")
while True:
try:
# 从Redis获取最新日志(模拟实时日志流)
log_entry = self.redis_client.lpop('nginx:logs:realtime')
if log_entry:
log_data = json.loads(log_entry)
is_anomaly, score = self.predict_anomaly(log_data)
if is_anomaly:
logging.warning(f"检测到异常流量!异常评分:{score:.3f}")
# 触发自动阻断
await self.trigger_auto_block(log_data)
# 发送告警
await self.send_alert(log_data, score)
# 缓存正常行为模式
await self.cache_behavior_pattern(log_data)
except Exception as e:
logging.error(f"实时监控异常:{e}")
await asyncio.sleep(1)
async def trigger_auto_block(self, log_data):
"""自动阻断异常IP"""
suspicious_ip = log_data.get('remote_addr')
if suspicious_ip:
# 添加到Redis黑名单
self.redis_client.sadd('nginx:blocklist:ips', suspicious_ip)
self.redis_client.expire(f'nginx:blocklist:ips', 3600) # 1小时过期
# 记录阻断日志
block_info = {
'ip': suspicious_ip,
'timestamp': datetime.now().isoformat(),
'reason': 'ml_anomaly_detection',
'request_uri': log_data.get('request_uri'),
'user_agent': log_data.get('http_user_agent')
}
self.redis_client.lpush('nginx:blocklist:history', json.dumps(block_info))
logging.info(f"自动阻断IP:{suspicious_ip}")
async def send_alert(self, log_data, anomaly_score):
"""发送安全告警"""
alert = {
'type': 'anomaly_detection',
'severity': 'high' if anomaly_score < -0.5 else 'medium',
'timestamp': datetime.now().isoformat(),
'source_ip': log_data.get('remote_addr'),
'request_uri': log_data.get('request_uri'),
'anomaly_score': anomaly_score,
'details': log_data
}
# 发送到告警系统(Webhook、邮件、短信等)
self.redis_client.publish('security:alerts', json.dumps(alert))
# 记录到数据库
self.redis_client.lpush('security:alerts:history', json.dumps(alert))
async def cache_behavior_pattern(self, log_data):
"""缓存用户行为模式"""
user_ip = log_data.get('remote_addr')
if user_ip:
# 构建用户行为指纹
behavior_fingerprint = {
'user_agent': log_data.get('http_user_agent'),
'accept_language': log_data.get('http_accept_language'),
'request_rate': await self.get_user_request_rate(user_ip),
'path_patterns': await self.get_user_path_patterns(user_ip)
}
# 缓存24小时
self.redis_client.setex(
f'behavior:fingerprint:{user_ip}',
86400,
json.dumps(behavior_fingerprint)
)
async def get_user_request_rate(self, user_ip):
"""获取用户请求频率"""
now = datetime.now()
key = f'request_rate:{user_ip}:{now.strftime("%Y%m%d%H%M")}'
return self.redis_client.get(key) or 0
async def get_user_path_patterns(self, user_ip):
"""获取用户访问路径模式"""
# 从最近100条记录中分析路径模式
key = f'user_paths:{user_ip}'
paths = self.redis_client.lrange(key, 0, 99)
return [p.decode('utf-8') for p in paths] if paths else []
# 集成到Nginx配置
async def main():
detector = NginxAnomalyDetector()
# 加载历史数据训练模型
historical_logs = load_historical_logs() # 从日志文件或数据库加载
detector.train_model(historical_logs)
# 启动实时监控
await detector.real_time_monitoring()
if __name__ == "__main__":
asyncio.run(main())1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
实时威胁情报集成
威胁情报数据聚合系统:
python
#!/usr/bin/env python3
# threat_intelligence_feed.py
import asyncio
import aiohttp
import json
import redis
from datetime import datetime, timedelta
import logging
class ThreatIntelligenceAggregator:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=1)
self.feeds = [
{
'name': 'AbuseIPDB',
'url': 'https://api.abuseipdb.com/api/v2/blacklist',
'headers': {'Key': 'YOUR_API_KEY', 'Accept': 'application/json'},
'confidence_threshold': 80
},
{
'name': 'VirusTotal',
'url': 'https://www.virustotal.com/vtapi/v2/ip-address/report',
'api_key': 'YOUR_VIRUSTOTAL_API_KEY',
'params': {'apikey': 'YOUR_VIRUSTOTAL_API_KEY'}
},
{
'name': 'AlienVault_OTX',
'url': 'https://otx.alienvault.com/api/v1/indicators/export',
'headers': {'X-OTX-API-KEY': 'YOUR_OTX_API_KEY'}
}
]
async def fetch_threat_feed(self, session, feed):
"""获取威胁情报数据"""
try:
async with session.get(feed['url'], headers=feed.get('headers', {}), params=feed.get('params', {})) as response:
if response.status == 200:
data = await response.json()
return {'feed_name': feed['name'], 'data': data}
else:
logging.error(f"获取威胁情报失败:{feed['name']} - {response.status}")
return None
except Exception as e:
logging.error(f"获取威胁情报异常:{feed['name']} - {e}")
return None
async def aggregate_threat_intelligence(self):
"""聚合威胁情报数据"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_threat_feed(session, feed) for feed in self.feeds]
results = await asyncio.gather(*tasks)
aggregated_threats = {
'malicious_ips': set(),
'suspicious_domains': set(),
'attack_signatures': set(),
'timestamp': datetime.now().isoformat()
}
for result in results:
if result:
await self.process_feed_data(result, aggregated_threats)
# 存储到Redis
await self.store_threat_intelligence(aggregated_threats)
return aggregated_threats
async def process_feed_data(self, feed_result, aggregated_threats):
"""处理各个威胁情报源的数据"""
feed_name = feed_result['feed_name']
data = feed_result['data']
if feed_name == 'AbuseIPDB':
for ip_data in data.get('data', []):
if ip_data.get('confidence', 0) >= 80:
aggregated_threats['malicious_ips'].add(ip_data['ipAddress'])
elif feed_name == 'VirusTotal':
for ip, report in data.items():
if report.get('response_code') == 1 and report.get('positives', 0) > 5:
aggregated_threats['malicious_ips'].add(ip)
elif feed_name == 'AlienVault_OTX':
for pulse in data.get('results', []):
for indicator in pulse.get('indicators', []):
if indicator.get('type') == 'IP':
aggregated_threats['malicious_ips'].add(indicator.get('indicator'))
elif indicator.get('type') == 'domain':
aggregated_threats['suspicious_domains'].add(indicator.get('indicator'))
async def store_threat_intelligence(self, threats):
"""存储威胁情报数据"""
# 存储恶意IP
if threats['malicious_ips']:
self.redis_client.delete('threats:malicious_ips')
for ip in threats['malicious_ips']:
self.redis_client.sadd('threats:malicious_ips', ip)
self.redis_client.expire('threats:malicious_ips', 3600) # 1小时过期
# 存储可疑域名
if threats['suspicious_domains']:
self.redis_client.delete('threats:suspicious_domains')
for domain in threats['suspicious_domains']:
self.redis_client.sadd('threats:suspicious_domains', domain)
self.redis_client.expire('threats:suspicious_domains', 3600)
# 记录更新时间
self.redis_client.set('threats:last_update', threats['timestamp'])
logging.info(f"威胁情报更新完成:{len(threats['malicious_ips'])}个恶意IP,{len(threats['suspicious_domains'])}个可疑域名")
async def check_ip_reputation(self, ip_address):
"""检查IP信誉"""
malicious_ips = self.redis_client.smembers('threats:malicious_ips')
malicious_ips = [ip.decode('utf-8') for ip in malicious_ips]
if ip_address in malicious_ips:
return {
'is_threat': True,
'threat_level': 'high',
'source': 'threat_intelligence_feeds',
'recommendation': 'block_immediately'
}
return {
'is_threat': False,
'threat_level': 'low',
'source': 'clean',
'recommendation': 'allow'
}
async def run_continuous_updates(self):
"""持续更新威胁情报"""
while True:
try:
logging.info("开始更新威胁情报数据...")
await self.aggregate_threat_intelligence()
logging.info("威胁情报数据更新完成")
# 每30分钟更新一次
await asyncio.sleep(1800)
except Exception as e:
logging.error(f"威胁情报更新异常:{e}")
await asyncio.sleep(300) # 5分钟后重试1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
nginx
# Nginx配置集成威胁情报
location / {
# 在access阶段检查IP威胁情报
access_by_lua_block {
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000) -- 1秒超时
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Redis连接失败: ", err)
return
end
local client_ip = ngx.var.remote_addr
local is_threat = red:sismember("threats:malicious_ips", client_ip)
if is_threat == 1 then
ngx.log(ngx.WARN, "检测到恶意IP: ", client_ip)
ngx.exit(403)
end
}
# 其他配置...
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
🏢 第十部分:现代企业级集成方案
DevSecOps流水线集成
安全流水线:
yaml
# .gitlab-ci.yml
stages:
- security-scan
- build
- security-test
- deploy
- security-monitor
variables:
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: "/certs"
REGISTRY: registry.example.com
IMAGE_NAME: $REGISTRY/nginx-security-gateway
# 安全扫描阶段
security:nginx-config:
stage: security-scan
image:
name: nginx:alpine
entrypoint: [""]
script:
- apk add --no-cache python3 py3-pip
- pip3 install pyyaml jsonschema
- |
python3 -c "
import yaml
import json
import sys
# 验证Nginx配置文件语法
import subprocess
result = subprocess.run(['nginx', '-t', '-c', '/etc/nginx/nginx.conf'],
capture_output=True, text=True)
if result.returncode != 0:
print('Nginx配置语法错误:', result.stderr)
sys.exit(1)
# 安全基线检查
with open('/etc/nginx/nginx.conf', 'r') as f:
config = f.read()
security_checks = [
('server_tokens off', '版本号隐藏'),
('add_header X-Frame-Options', '点击劫持防护'),
('add_header X-Content-Type-Options', 'MIME嗅探防护'),
('location ~ /\\.', '隐藏文件保护'),
('location ~* \\\.(git|env)', '敏感文件保护')
]
missing_security = []
for check, description in security_checks:
if check not in config:
missing_security.append(description)
if missing_security:
print('缺失的安全配置:', missing_security)
sys.exit(1)
print('✅ 安全配置检查通过')
"
artifacts:
reports:
junit: security-report.xml
expire_in: 1 week
only:
- branches
- merge_requests
# 容器镜像安全扫描
security:container-scan:
stage: security-scan
image: aquasec/trivy:latest
script:
- trivy image --exit-code 1 --severity HIGH,CRITICAL $IMAGE_NAME:latest || true
- trivy image --format json --output trivy-report.json $IMAGE_NAME:latest
artifacts:
reports:
container_scanning: trivy-report.json
expire_in: 1 week
allow_failure: true
# 构建阶段
build:docker:
stage: build
image: docker:latest
services:
- docker:dind
script:
- docker build -t $IMAGE_NAME:$CI_COMMIT_SHA .
- docker tag $IMAGE_NAME:$CI_COMMIT_SHA $IMAGE_NAME:latest
- docker push $IMAGE_NAME:$CI_COMMIT_SHA
- docker push $IMAGE_NAME:latest
dependencies:
- security:nginx-config
# 安全测试阶段
security:penetration-test:
stage: security-test
image: owasp/zap2docker-stable:latest
script:
- mkdir -p zap-reports
- zap-baseline.py -t http://nginx-security-gateway-staging.example.com \\
-r zap-report.html \\
-J zap-report.json \\
-w zap-report.md \\
-x zap-report.xml
artifacts:
reports:
junit: zap-report.xml
paths:
- zap-reports/
expire_in: 1 week
allow_failure: true
# 部署阶段
deploy:staging:
stage: deploy
image: alpine:latest
script:
- apk add --no-cache curl
- |
# 部署到staging环境
curl -X POST \\
-H "Content-Type: application/json" \\
-H "Authorization: Bearer $STAGING_API_TOKEN" \\
-d "{\\"image\\": \\"$IMAGE_NAME:$CI_COMMIT_SHA\\", \\"environment\\": \\"staging\\"}" \\
https://api.staging.example.com/deploy
# 等待部署完成
sleep 30
# 验证部署
curl -f http://nginx-security-gateway-staging.example.com/health || exit 1
environment:
name: staging
url: http://nginx-security-gateway-staging.example.com
dependencies:
- build:docker
deploy:production:
stage: deploy
image: alpine:latest
script:
- apk add --no-cache curl jq
- |
# 获取staging环境测试结果
STAGING_TESTS=$(curl -s https://api.staging.example.com/tests/$CI_COMMIT_SHA)
SECURITY_SCORE=$(echo $STAGING_TESTS | jq -r '.security_score')
if [ "$SECURITY_SCORE" -lt 90 ]; then
echo "安全评分不足: $SECURITY_SCORE/100"
exit 1
fi
# 部署到生产环境
curl -X POST \\
-H "Content-Type: application/json" \\
-H "Authorization: Bearer $PRODUCTION_API_TOKEN" \\
-d "{\\"image\\": \\"$IMAGE_NAME:$CI_COMMIT_SHA\\", \\"environment\\": \\"production\\"}" \\
https://api.production.example.com/deploy
# 验证部署
sleep 60
curl -f https://nginx-security-gateway.example.com/health || exit 1
environment:
name: production
url: https://nginx-security-gateway.example.com
when: manual
only:
- master
- tags
# 安全监控阶段
monitor:security-metrics:
stage: security-monitor
image: python:3.11-alpine
script:
- pip install requests prometheus-client
- |
python3 -c "
import requests
import json
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
# 获取安全指标
response = requests.get('https://nginx-security-gateway.example.com/metrics')
metrics = response.text
# 解析关键安全指标
registry = CollectorRegistry()
blocked_requests = Gauge('nginx_blocked_requests_total', 'Total blocked requests', registry=registry)
anomaly_detections = Gauge('nginx_anomaly_detections_total', 'Total anomaly detections', registry=registry)
threat_intel_hits = Gauge('nginx_threat_intel_hits_total', 'Total threat intelligence hits', registry=registry)
# 推送指标到Prometheus
push_to_gateway('prometheus.example.com:9091', job='nginx-security-gateway', registry=registry)
print('✅ 安全指标已推送到监控系统')
"
dependencies:
- deploy:production
allow_failure: true1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
SIEM/SOAR平台集成
Splunk集成配置:
bash
#!/bin/bash
# splunk-integration.sh
# Nginx安全日志转发到Splunk
NGINX_LOG_DIR="/var/log/nginx"
SPLUNK_HEC_URL="https://splunk.example.com:8088/services/collector"
SPLUNK_HEC_TOKEN="your-hec-token-here"
# 创建日志转发脚本
cat > /usr/local/bin/nginx-splunk-forwarder.py << 'EOF'
#!/usr/bin/env python3
import json
import time
import requests
import gzip
from datetime import datetime
import logging
class NginxSplunkForwarder:
def __init__(self, splunk_url, splunk_token):
self.splunk_url = splunk_url
self.splunk_token = splunk_token
self.headers = {
'Authorization': f'Splunk {splunk_token}',
'Content-Type': 'application/json'
}
def parse_nginx_log(self, log_line):
"""解析Nginx日志"""
try:
# 假设使用JSON格式的Nginx日志
log_data = json.loads(log_line)
# 添加安全相关字段
log_data['event_type'] = 'nginx_access'
log_data['security_relevant'] = self.is_security_relevant(log_data)
log_data['threat_level'] = self.calculate_threat_level(log_data)
return log_data
except json.JSONDecodeError:
return None
def is_security_relevant(self, log_data):
"""判断是否为安全相关事件"""
security_indicators = [
log_data.get('status', 0) >= 400, # 错误状态码
'bot' in log_data.get('http_user_agent', '').lower(),
'scan' in log_data.get('http_user_agent', '').lower(),
log_data.get('request_uri', '').count('../') > 0, # 路径遍历
'.git' in log_data.get('request_uri', ''), # Git目录访问
'.env' in log_data.get('request_uri', ''), # 环境文件访问
len(log_data.get('request_uri', '')) > 1000, # 超长URI
log_data.get('body_bytes_sent', 0) == 0 and log_data.get('status') == 200 # 空响应
]
return any(security_indicators)
def calculate_threat_level(self, log_data):
"""计算威胁等级"""
threat_score = 0
# 状态码评分
status = log_data.get('status', 0)
if status >= 500:
threat_score += 10
elif status >= 400:
threat_score += 5
# URI异常评分
uri = log_data.get('request_uri', '')
if '../' in uri:
threat_score += 15
if '.git' in uri or '.env' in uri:
threat_score += 20
if len(uri) > 1000:
threat_score += 5
# User-Agent异常评分
user_agent = log_data.get('http_user_agent', '')
suspicious_patterns = ['bot', 'scan', 'nmap', 'nikto', 'sqlmap', 'hydra']
for pattern in suspicious_patterns:
if pattern in user_agent.lower():
threat_score += 10
break
# 响应时间异常评分
request_time = float(log_data.get('request_time', 0))
if request_time > 5.0:
threat_score += 5
# 威胁等级映射
if threat_score >= 30:
return 'critical'
elif threat_score >= 20:
return 'high'
elif threat_score >= 10:
return 'medium'
elif threat_score >= 5:
return 'low'
else:
return 'info'
def send_to_splunk(self, event_data):
"""发送事件到Splunk"""
payload = {
'event': event_data,
'sourcetype': 'nginx:security',
'index': 'security',
'host': 'nginx-security-gateway'
}
try:
response = requests.post(
self.splunk_url,
headers=self.headers,
json=payload,
timeout=10
)
if response.status_code != 200:
logging.error(f"Splunk HEC错误: {response.status_code} - {response.text}")
return False
return True
except requests.exceptions.RequestException as e:
logging.error(f"发送事件到Splunk失败: {e}")
return False
def monitor_log_file(self, log_file_path):
"""监控日志文件"""
with open(log_file_path, 'r') as f:
# 移动到文件末尾
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
continue
# 解析日志
parsed_data = self.parse_nginx_log(line.strip())
if parsed_data:
# 发送到Splunk
if parsed_data['security_relevant'] or parsed_data['threat_level'] != 'info':
success = self.send_to_splunk(parsed_data)
if success:
logging.info(f"安全事件已发送到Splunk: {parsed_data.get('remote_addr')} - {parsed_data.get('threat_level')}")
# 启动日志监控
forwarder = NginxSplunkForwarder(SPLUNK_HEC_URL, SPLUNK_HEC_TOKEN)
forwarder.monitor_log_file("/var/log/nginx/security-access.log")
EOF
chmod +x /usr/local/bin/nginx-splunk-forwarder.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
Elasticsearch安全分析:
json
{
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"event_type": {"type": "keyword"},
"remote_addr": {"type": "ip"},
"request_uri": {"type": "text", "analyzer": "standard"},
"http_user_agent": {"type": "text", "analyzer": "standard"},
"status": {"type": "integer"},
"body_bytes_sent": {"type": "long"},
"request_time": {"type": "float"},
"security_relevant": {"type": "boolean"},
"threat_level": {"type": "keyword"},
"geoip": {
"properties": {
"country_iso_code": {"type": "keyword"},
"location": {"type": "geo_point"}
}
},
"attack_classification": {
"properties": {
"attack_type": {"type": "keyword"},
"confidence": {"type": "float"},
"severity": {"type": "keyword"}
}
}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index": {
"lifecycle": {
"name": "nginx-security-policy",
"rollover_alias": "nginx-security"
}
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
多云环境统一安全策略
多云安全配置:
txt
# terraform/main.tf
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
azurerm = {
source = "hashicorp/azurerm"
version = "~> 3.0"
}
google = {
source = "hashicorp/google"
version = "~> 4.0"
}
}
}
# AWS WAF配置
module "aws_waf" {
source = "./modules/aws-waf"
name_prefix = "nginx-security"
# 基础规则
managed_rules = [
{
name = "AWSManagedRulesCommonRuleSet"
priority = 1
override_action = "none"
excluded_rules = []
},
{
name = "AWSManagedRulesKnownBadInputsRuleSet"
priority = 2
override_action = "none"
excluded_rules = []
},
{
name = "AWSManagedRulesSQLiRuleSet"
priority = 3
override_action = "none"
excluded_rules = []
},
{
name = "AWSManagedRulesLinuxRuleSet"
priority = 4
override_action = "none"
excluded_rules = []
}
]
# 自定义规则
custom_rules = [
{
name = "block_malicious_ips"
priority = 5
action = "block"
statement = {
ip_set_reference_statement = {
arn = aws_wafv2_ip_set.malicious_ips.arn
}
}
},
{
name = "rate_limit_per_ip"
priority = 6
action = "block"
statement = {
rate_based_statement = {
limit = 2000
aggregate_key_type = "IP"
evaluation_window_sec = 300
}
}
}
]
tags = {
Environment = var.environment
Purpose = "nginx-security-gateway"
ManagedBy = "terraform"
}
}
# Azure Application Gateway WAF
module "azure_waf" {
source = "./modules/azure-waf"
name = "nginx-security-waf"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
# WAF策略
waf_policy_settings = {
enabled = true
mode = "Prevention"
managed_rules = {
owasp_3_2 = {
enabled = true
rule_overrides = [
{
rule_id = "942100"
enabled = true
action = "Block"
},
{
rule_id = "942110"
enabled = true
action = "Block"
}
]
}
}
custom_rules = [
{
name = "block_malicious_ips"
priority = 1
rule_type = "MatchRule"
action = "Block"
match_conditions = [
{
match_variables = [
{
variable_name = "RemoteAddr"
}
]
operator = "IPMatch"
match_values = var.malicious_ip_ranges
}
]
}
]
}
tags = {
Environment = var.environment
Purpose = "nginx-security-gateway"
ManagedBy = "terraform"
}
}
# Google Cloud Armor
module "gcp_cloud_armor" {
source = "./modules/gcp-cloud-armor"
project = var.gcp_project_id
name = "nginx-security-policy"
# 安全规则
security_rules = [
{
action = "deny(403)"
priority = 100
match = {
versioned_expr = "SRC_IPS_V1"
config = {
src_ip_ranges = var.malicious_ip_ranges
}
}
description = "Block known malicious IPs"
},
{
action = "rate_based_ban"
priority = 200
match = {
versioned_expr = "SRC_IPS_V1"
config = {
src_ip_ranges = ["*"]
}
}
rate_limit_options = {
conform_action = "allow"
exceed_action = "deny(429)"
enforce_on_key = "IP"
rate_limit_threshold = {
count = 100
interval_sec = 60
}
ban_duration_sec = 600
}
description = "Rate limit per IP"
}
]
adaptive_protection_config = {
layer_7_ddos_defense_config = {
enable = true
}
}
tags = {
Environment = var.environment
Purpose = "nginx-security-gateway"
ManagedBy = "terraform"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
⚡ 第十一部分:高级攻防对抗与零日防护
零日漏洞应急响应机制
自动化漏洞响应系统:
python
#!/usr/bin/env python3
# zero_day_response_system.py
import asyncio
import aiohttp
import json
import redis
from datetime import datetime, timedelta
import logging
import subprocess
import hashlib
class ZeroDayResponseSystem:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=2)
self.vulnerability_feeds = [
{
'name': 'NVD',
'url': 'https://services.nvd.nist.gov/rest/json/cves/2.0',
'params': {
'resultsPerPage': 100,
'startIndex': 0,
'pubStartDate': (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
}
},
{
'name': 'VulnDB',
'url': 'https://vulndb.cyberriskanalytics.com/api/v1/vulnerabilities',
'headers': {'X-API-KEY': 'YOUR_VULNDB_API_KEY'}
}
]
# Nginx相关CVE模式
self.nginx_cve_patterns = [
'nginx', 'NGINX', 'CVE-2023', 'CVE-2024', # 年份模式
'buffer overflow', 'denial of service', 'DoS', 'remote code execution',
'RCE', 'authentication bypass', 'privilege escalation'
]
async def fetch_vulnerability_feed(self, session, feed):
"""获取漏洞情报"""
try:
async with session.get(feed['url'],
headers=feed.get('headers', {}),
params=feed.get('params', {})) as response:
if response.status == 200:
data = await response.json()
return {'feed_name': feed['name'], 'data': data}
else:
logging.error(f"获取漏洞情报失败:{feed['name']} - {response.status}")
return None
except Exception as e:
logging.error(f"获取漏洞情报异常:{feed['name']} - {e}")
return None
async def analyze_vulnerabilities(self, vuln_data):
"""分析漏洞数据,识别与Nginx相关的威胁"""
relevant_vulnerabilities = []
for item in vuln_data:
feed_name = item['feed_name']
data = item['data']
if feed_name == 'NVD':
for vuln in data.get('vulnerabilities', []):
cve_id = vuln.get('cve', {}).get('id', '')
description = vuln.get('cve', {}).get('descriptions', [{}])[0].get('value', '')
# 检查是否与Nginx相关
if self.is_nginx_related_vulnerability(cve_id, description):
vuln_info = {
'cve_id': cve_id,
'description': description,
'severity': self.extract_severity(vuln),
'published_date': vuln.get('cve', {}).get('published', ''),
'affected_versions': self.extract_affected_versions(description),
'attack_vector': self.extract_attack_vector(vuln),
'mitigation_available': self.check_mitigation_available(cve_id)
}
relevant_vulnerabilities.append(vuln_info)
return relevant_vulnerabilities
def is_nginx_related_vulnerability(self, cve_id, description):
"""判断是否为与Nginx相关的漏洞"""
combined_text = f"{cve_id} {description}".lower()
# 检查是否包含Nginx相关关键词
for pattern in self.nginx_cve_patterns:
if pattern.lower() in combined_text:
return True
return False
def extract_severity(self, vuln_data):
"""提取漏洞严重程度"""
metrics = vuln_data.get('cve', {}).get('metrics', {})
# CVSS v3.1评分
if 'cvssMetricV31' in metrics:
cvss = metrics['cvssMetricV31'][0]
return {
'version': 'CVSS v3.1',
'score': cvss.get('cvssData', {}).get('baseScore', 0),
'severity': cvss.get('cvssData', {}).get('baseSeverity', 'UNKNOWN'),
'vector': cvss.get('cvssData', {}).get('vectorString', '')
}
# CVSS v3.0评分
elif 'cvssMetricV30' in metrics:
cvss = metrics['cvssMetricV30'][0]
return {
'version': 'CVSS v3.0',
'score': cvss.get('cvssData', {}).get('baseScore', 0),
'severity': cvss.get('cvssData', {}).get('baseSeverity', 'UNKNOWN'),
'vector': cvss.get('cvssData', {}).get('vectorString', '')
}
return {'version': 'UNKNOWN', 'score': 0, 'severity': 'UNKNOWN', 'vector': ''}
def extract_affected_versions(self, description):
"""提取受影响的版本信息"""
version_patterns = [
r'nginx\s+([0-9]+\.[0-9]+\.[0-9]+)', # nginx 1.2.3
r'versions?\s+([0-9]+\.[0-9]+\.[0-9]+)', # versions 1.2.3
r'before\s+([0-9]+\.[0-9]+\.[0-9]+)', # before 1.2.3
r'prior\s+to\s+([0-9]+\.[0-9]+\.[0-9]+)' # prior to 1.2.3
]
affected_versions = []
import re
for pattern in version_patterns:
matches = re.findall(pattern, description, re.IGNORECASE)
affected_versions.extend(matches)
return list(set(affected_versions))
def extract_attack_vector(self, vuln_data):
"""提取攻击向量信息"""
descriptions = vuln_data.get('cve', {}).get('descriptions', [])
attack_vectors = []
for desc in descriptions:
value = desc.get('value', '').lower()
if 'network' in value:
attack_vectors.append('NETWORK')
if 'adjacent' in value:
attack_vectors.append('ADJACENT_NETWORK')
if 'local' in value:
attack_vectors.append('LOCAL')
if 'physical' in value:
attack_vectors.append('PHYSICAL')
return list(set(attack_vectors))
def check_mitigation_available(self, cve_id):
"""检查是否有可用的缓解措施"""
# 这里可以查询内部知识库或外部API
# 返回缓解措施信息
return {
'available': True, # 假设总有缓解措施
'type': 'configuration',
'description': '可以通过配置调整缓解',
'effort_level': 'medium'
}
async def generate_emergency_response(self, vulnerability):
"""生成应急响应措施"""
response_actions = []
# 根据漏洞严重程度生成响应措施
severity_score = vulnerability['severity']['score']
if severity_score >= 9.0: # Critical
response_actions = [
{
'action': 'immediate_block',
'description': '立即阻断可疑攻击模式',
'implementation': self.create_immediate_block_rule(vulnerability)
},
{
'action': 'enhanced_logging',
'description': '启用增强日志记录',
'implementation': self.enable_enhanced_logging(vulnerability)
},
{
'action': 'emergency_patch',
'description': '紧急补丁部署',
'implementation': self.schedule_emergency_patch(vulnerability)
}
]
elif severity_score >= 7.0: # High
response_actions = [
{
'action': 'rate_limiting',
'description': '实施严格的速率限制',
'implementation': self.create_rate_limiting_rule(vulnerability)
},
{
'action': 'signature_detection',
'description': '部署特征检测规则',
'implementation': self.create_signature_rule(vulnerability)
}
]
elif severity_score >= 4.0: # Medium
response_actions = [
{
'action': 'monitoring_enhancement',
'description': '增强监控和告警',
'implementation': self.enhance_monitoring(vulnerability)
}
]
return response_actions
def create_immediate_block_rule(self, vulnerability):
"""创建立即阻断规则"""
# 生成Nginx配置片段
block_rule = f"""
# 零日漏洞紧急阻断规则 - {vulnerability['cve_id']}
location / {{
# 阻断已知攻击IP
include /etc/nginx/blocklists/emergency-block-{vulnerability['cve_id']}.conf;
# 阻断可疑User-Agent
if ($http_user_agent ~* "{self.extract_suspicious_user_agents(vulnerability)}") {{
return 403;
}}
# 阻断可疑请求模式
if ($request_uri ~* "{self.extract_attack_patterns(vulnerability)}") {{
return 403;
}}
# 额外的安全检查
include /etc/nginx/security/emergency-security.conf;
}}
"""
# 保存规则到文件
rule_file = f"/etc/nginx/conf.d/emergency-{vulnerability['cve_id']}.conf"
with open(rule_file, 'w') as f:
f.write(block_rule)
return {
'rule_file': rule_file,
'reload_required': True,
'testing_required': True
}
def extract_suspicious_user_agents(self, vulnerability):
"""提取可疑的User-Agent模式"""
# 基于漏洞特征生成User-Agent检测模式
return "(bot|scanner|nikto|sqlmap|nmap|masscan|zgrab)"
def extract_attack_patterns(self, vulnerability):
"""提取攻击模式"""
# 基于漏洞描述生成攻击模式检测
return "(\\\\.\\\\.\\\\/|\\\\.git|\\\\.env|config\\\\.php|wp-admin)"
async def deploy_emergency_rules(self, vulnerability, response_actions):
"""部署应急响应规则"""
deployment_results = []
for action in response_actions:
try:
if action['action'] == 'immediate_block':
result = action['implementation']
# 测试Nginx配置
test_result = subprocess.run(['nginx', '-t'], capture_output=True, text=True)
if test_result.returncode == 0:
# 重新加载Nginx
reload_result = subprocess.run(['nginx', '-s', 'reload'], capture_output=True, text=True)
if reload_result.returncode == 0:
deployment_results.append({
'action': action['action'],
'status': 'success',
'message': '紧急阻断规则已成功部署'
})
else:
deployment_results.append({
'action': action['action'],
'status': 'failed',
'message': f'Nginx重新加载失败: {reload_result.stderr}'
})
else:
deployment_results.append({
'action': action['action'],
'status': 'failed',
'message': f'Nginx配置测试失败: {test_result.stderr}'
})
except Exception as e:
deployment_results.append({
'action': action['action'],
'status': 'error',
'message': str(e)
})
return deployment_results
async def run_continuous_monitoring(self):
"""持续监控零日漏洞"""
while True:
try:
logging.info("开始检查新的零日漏洞...")
# 获取最新漏洞信息
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_vulnerability_feed(session, feed) for feed in self.vulnerability_feeds]
results = await asyncio.gather(*tasks)
# 分析漏洞
relevant_vulns = await self.analyze_vulnerabilities([r for r in results if r])
# 处理高危漏洞
for vuln in relevant_vulns:
if vuln['severity']['score'] >= 7.0: # High severity
logging.warning(f"发现高危Nginx漏洞: {vuln['cve_id']} (Score: {vuln['severity']['score']})")
# 生成应急响应
response_actions = await self.generate_emergency_response(vuln)
# 部署应急措施
deployment_results = await self.deploy_emergency_rules(vuln, response_actions)
# 记录响应日志
response_log = {
'timestamp': datetime.now().isoformat(),
'vulnerability': vuln,
'response_actions': response_actions,
'deployment_results': deployment_results
}
self.redis_client.lpush('zeroday:response_logs', json.dumps(response_log))
# 发送告警
await self.send_zero_day_alert(vuln, response_actions)
# 每6小时检查一次
await asyncio.sleep(21600)
except Exception as e:
logging.error(f"零日漏洞监控异常:{e}")
await asyncio.sleep(3600) # 1小时后重试
async def send_zero_day_alert(self, vulnerability, response_actions):
"""发送零日漏洞告警"""
alert = {
'type': 'zero_day_vulnerability',
'severity': 'critical' if vulnerability['severity']['score'] >= 9.0 else 'high',
'timestamp': datetime.now().isoformat(),
'vulnerability': vulnerability,
'response_actions': response_actions,
'action_required': 'immediate_response'
}
# 发送到告警系统
self.redis_client.publish('security:zero_day_alerts', json.dumps(alert))
# 启动零日漏洞监控系统
async def main():
zero_day_system = ZeroDayResponseSystem()
await zero_day_system.run_continuous_monitoring()
if __name__ == "__main__":
asyncio.run(main())1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
高级持续威胁(APT)防护
APT攻击检测与防护系统:
python
#!/usr/bin/env python3
# apt_detection_system.py
import asyncio
import redis
import json
from datetime import datetime, timedelta
import numpy as np
from collections import defaultdict, deque
import logging
class APTDetectionSystem:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=3)
# APT攻击行为模式
self.apt_behavior_patterns = {
'reconnaissance': {
'indicators': ['scanning', 'enumeration', 'fingerprinting'],
'threshold': 10,
'time_window': 3600 # 1小时
},
'lateral_movement': {
'indicators': ['privilege_escalation', 'credential_dumping', 'remote_access'],
'threshold': 5,
'time_window': 1800 # 30分钟
},
'data_exfiltration': {
'indicators': ['large_data_transfer', 'unusual_outbound', 'encrypted_communication'],
'threshold': 3,
'time_window': 7200 # 2小时
},
'persistence': {
'indicators': ['backdoor_installation', 'scheduled_tasks', 'registry_modification'],
'threshold': 2,
'time_window': 86400 # 24小时
}
}
# 用户行为基线
self.user_behavior_baseline = {}
# 威胁狩猎规则
self.threat_hunting_rules = [
self.detect_low_and_slow_attacks,
self.detect_living_off_the_land,
self.detect_c2_communications,
self.detect_data_staging
]
def analyze_user_behavior(self, user_id, current_behavior):
"""分析用户行为偏差"""
if user_id not in self.user_behavior_baseline:
# 建立用户行为基线
self.user_behavior_baseline[user_id] = {
'request_patterns': deque(maxlen=1000),
'time_patterns': deque(maxlen=1000),
'resource_access': defaultdict(int),
'geo_patterns': deque(maxlen=100),
'established': datetime.now()
}
return {'risk_score': 0, 'anomalies': []}
baseline = self.user_behavior_baseline[user_id]
anomalies = []
risk_score = 0
# 1. 时间模式异常检测
current_hour = current_behavior.get('timestamp', datetime.now()).hour
time_pattern = baseline['time_patterns']
if len(time_pattern) > 50:
usual_hours = [t.hour for t in time_pattern]
hour_frequency = defaultdict(int)
for h in usual_hours:
hour_frequency[h] += 1
# 检查当前时间是否在用户通常活跃时间之外
if current_hour not in hour_frequency and len(hour_frequency) > 0:
anomalies.append({
'type': 'unusual_time_access',
'severity': 'medium',
'details': f"Access at {current_hour}:00, user usually active at {list(hour_frequency.keys())}"
})
risk_score += 20
# 2. 资源访问异常检测
current_resource = current_behavior.get('resource', '')
resource_access = baseline['resource_access']
if current_resource and current_resource not in resource_access:
# 首次访问新资源
anomalies.append({
'type': 'new_resource_access',
'severity': 'low',
'details': f"First time accessing: {current_resource}"
})
risk_score += 10
# 3. 请求模式异常检测
current_pattern = {
'method': current_behavior.get('method', ''),
'path': current_behavior.get('path', ''),
'params': current_behavior.get('params', {})
}
request_patterns = baseline['request_patterns']
if len(request_patterns) > 100:
# 计算与历史模式的相似度
similarity_scores = []
for historical_pattern in request_patterns:
similarity = self.calculate_pattern_similarity(current_pattern, historical_pattern)
similarity_scores.append(similarity)
# 计算平均相似度
avg_similarity = np.mean(similarity_scores)
if avg_similarity < 0.3: # 相似度低于阈值
anomalies.append({
'type': 'unusual_request_pattern',
'severity': 'high',
'details': f"Request pattern similarity {avg_similarity:.2f}, significantly different from historical patterns"
})
risk_score += 30
# 4. 地理位置异常检测
current_geo = current_behavior.get('geo_location')
if current_geo:
geo_patterns = baseline['geo_patterns']
if len(geo_patterns) > 10 and current_geo not in geo_patterns:
# 检查是否存在不可能的地理位置跳转
recent_locations = [g for g in geo_patterns[-5:]] # 最近5次访问位置
if current_geo not in recent_locations:
anomalies.append({
'type': 'unusual_geolocation',
'severity': 'high',
'details': f"Access from unusual location: {current_geo}, recent locations: {recent_locations}"
})
risk_score += 40
# 5. 应用威胁狩猎规则
for hunter in self.threat_hunting_rules:
hunter_result = hunter(current_behavior, baseline)
if hunter_result:
anomalies.append(hunter_result)
risk_score += 25
# 更新基线
baseline['request_patterns'].append(current_pattern)
baseline['time_patterns'].append(current_behavior.get('timestamp', datetime.now()))
if current_resource:
baseline['resource_access'][current_resource] += 1
if current_geo:
baseline['geo_patterns'].append(current_geo)
return {
'risk_score': min(risk_score, 100), # 限制最大风险分数为100
'anomalies': anomalies
}
def calculate_pattern_similarity(self, pattern1, pattern2):
"""计算请求模式相似度"""
# 简化实现,实际应用中可使用更复杂的算法
if pattern1['method'] != pattern2['method']:
return 0.0
path_similarity = self._string_similarity(pattern1['path'], pattern2['path'])
params_similarity = self._params_similarity(pattern1['params'], pattern2['params'])
return (path_similarity * 0.7) + (params_similarity * 0.3)
def _string_similarity(self, s1, s2):
"""计算字符串相似度(简化版)"""
if not s1 or not s2:
return 0.0
# 使用Levenshtein距离计算相似度
from Levenshtein import ratio
return ratio(s1, s2)
def _params_similarity(self, params1, params2):
"""计算参数相似度"""
if not params1 and not params2:
return 1.0
if not params1 or not params2:
return 0.0
keys1 = set(params1.keys())
keys2 = set(params2.keys())
common_keys = keys1.intersection(keys2)
all_keys = keys1.union(keys2)
if not all_keys:
return 1.0
return len(common_keys) / len(all_keys)
def detect_low_and_slow_attacks(self, behavior, baseline):
"""检测低速攻击"""
# 实现低速攻击检测逻辑
pass
def detect_living_off_the_land(self, behavior, baseline):
"""检测Living-off-the-land攻击"""
# 实现合法工具滥用攻击检测逻辑
pass
async def start_monitoring(self):
"""启动APT监控"""
while True:
# 实现APT监控主循环
await asyncio.sleep(60)
# Nginx日志实时监控集成
class NginxAPTMonitor:
"""Nginx APT攻击实时监控系统"""
def __init__(self, log_file_path='/var/log/nginx/access.log'):
self.log_file_path = log_file_path
self.apt_detector = APTDetectionSystem()
self.redis_client = redis.Redis(host='localhost', port=6379, db=4)
self.alert_threshold = 70 # 风险分数阈值
# 启动异步监控任务
self.monitoring_tasks = [
self.monitor_nginx_logs(),
self.process_suspicious_activities(),
self.generate_threat_reports()
]
async def monitor_nginx_logs(self):
"""监控Nginx访问日志"""
import aiofiles
import re
# Nginx日志格式解析正则表达式
log_pattern = re.compile(
r'(\d+\.\d+\.\d+\.\d+)\s+-\s+-\s+\[(.+?)\]\s+"(\w+)\s+(.+?)\s+HTTP/[\d.]+"\s+(\d+)\s+(\d+)\s+"(.+?)"\s+"(.+?)"'
)
async with aiofiles.open(self.log_file_path, mode='r') as log_file:
# 移动到文件末尾(只监控新日志)
await log_file.seek(0, 2)
while True:
line = await log_file.readline()
if line:
match = log_pattern.match(line.strip())
if match:
log_data = {
'remote_addr': match.group(1),
'timestamp': match.group(2),
'method': match.group(3),
'request': match.group(4),
'status': int(match.group(5)),
'body_bytes_sent': int(match.group(6)),
'http_referer': match.group(7),
'http_user_agent': match.group(8)
}
# 分析用户行为
risk_analysis = await self.analyze_user_session(log_data)
if risk_analysis['risk_score'] > self.alert_threshold:
await self.trigger_high_risk_alert(log_data, risk_analysis)
await asyncio.sleep(0.1) # 避免CPU占用过高
async def analyze_user_session(self, log_data):
"""分析用户会话风险"""
user_id = log_data['remote_addr']
# 构建用户行为数据
behavior_data = {
'timestamp': datetime.now(),
'method': log_data['method'],
'path': log_data['request'].split('?')[0],
'params': dict(param.split('=') for param in log_data['request'].split('?')[1].split('&')) if '?' in log_data['request'] else {},
'status_code': log_data['status'],
'user_agent': log_data['http_user_agent'],
'resource': log_data['request'].split('?')[0],
'geo_location': await self.get_geo_location(log_data['remote_addr'])
}
# 使用APT检测系统分析
risk_analysis = self.apt_detector.analyze_user_behavior(user_id, behavior_data)
# 缓存分析结果
await self.cache_risk_analysis(user_id, risk_analysis)
return risk_analysis
async def get_geo_location(self, ip_address):
"""获取IP地址地理位置"""
# 实现IP地理位置查询(可使用MaxMind GeoIP等库)
# 这里返回模拟数据
return f"location_for_{ip_address}"
async def cache_risk_analysis(self, user_id, risk_analysis):
"""缓存风险分析结果"""
key = f"risk_analysis:{user_id}"
await self.redis_client.setex(key, 3600, json.dumps(risk_analysis)) # 缓存1小时
async def trigger_high_risk_alert(self, log_data, risk_analysis):
"""触发高风险警报"""
alert_data = {
'timestamp': datetime.now().isoformat(),
'user_ip': log_data['remote_addr'],
'risk_score': risk_analysis['risk_score'],
'anomalies': risk_analysis['anomalies'],
'request_details': log_data,
'alert_level': 'HIGH' if risk_analysis['risk_score'] > 80 else 'MEDIUM'
}
# 发送到警报队列
await self.redis_client.lpush('security_alerts:high_risk', json.dumps(alert_data))
# 记录日志
logging.warning(f"HIGH RISK ALERT: User {log_data['remote_addr']} risk score {risk_analysis['risk_score']}")
async def process_suspicious_activities(self):
"""处理可疑活动"""
while True:
try:
# 从队列获取高风险警报
alert_data = await self.redis_client.brpop('security_alerts:high_risk', timeout=1)
if alert_data:
alert = json.loads(alert_data[1])
# 自动阻断高风险用户
if alert['risk_score'] > 80:
await self.auto_block_user(alert['user_ip'], alert['risk_score'])
# 发送通知(邮件、Slack等)
await self.send_security_notification(alert)
except Exception as e:
logging.error(f"Error processing suspicious activities: {e}")
await asyncio.sleep(1)
async def auto_block_user(self, user_ip, risk_score):
"""自动阻断用户"""
# 使用Nginx的deny指令阻断IP
block_command = f"echo 'deny {user_ip};' >> /etc/nginx/conf.d/auto_blocks.conf && nginx -s reload"
# 记录阻断操作
block_record = {
'user_ip': user_ip,
'risk_score': risk_score,
'blocked_at': datetime.now().isoformat(),
'auto_unblock_at': (datetime.now() + timedelta(hours=24)).isoformat() # 24小时后自动解封
}
await self.redis_client.setex(f"blocked_user:{user_ip}", 86400, json.dumps(block_record))
logging.info(f"Auto-blocked user {user_ip} with risk score {risk_score}")
async def send_security_notification(self, alert):
"""发送安全通知"""
# 实现通知发送逻辑(邮件、Slack、企业微信等)
notification = {
'type': 'security_alert',
'title': f"APT Attack Detected - Risk Score: {alert['risk_score']}",
'content': f"Suspicious activity detected from IP {alert['user_ip']}. Anomalies: {alert['anomalies']}",
'timestamp': alert['timestamp']
}
# 这里可以集成各种通知服务
logging.info(f"Security notification sent: {notification}")
async def generate_threat_reports(self):
"""生成威胁报告"""
while True:
try:
# 每小时生成一次威胁报告
await asyncio.sleep(3600)
# 收集过去一小时的威胁数据
threat_summary = await self.collect_threat_summary()
# 生成报告
report = {
'period': f"{datetime.now() - timedelta(hours=1)} - {datetime.now()}",
'total_alerts': threat_summary['total_alerts'],
'high_risk_users': threat_summary['high_risk_users'],
'blocked_ips': threat_summary['blocked_ips'],
'top_threat_types': threat_summary['top_threat_types'],
'recommendations': self.generate_security_recommendations(threat_summary)
}
# 保存报告
report_key = f"threat_report:{datetime.now().strftime('%Y%m%d_%H')}"
await self.redis_client.setex(report_key, 86400 * 7, json.dumps(report)) # 保存7天
logging.info(f"Threat report generated: {report_key}")
except Exception as e:
logging.error(f"Error generating threat reports: {e}")
async def collect_threat_summary(self):
"""收集威胁摘要"""
# 实现威胁数据收集逻辑
return {
'total_alerts': 42,
'high_risk_users': ['192.168.1.100', '10.0.0.50'],
'blocked_ips': ['192.168.1.100'],
'top_threat_types': ['unusual_request_pattern', 'unusual_geolocation']
}
def generate_security_recommendations(self, threat_summary):
"""生成安全建议"""
recommendations = []
if threat_summary['total_alerts'] > 50:
recommendations.append("Consider implementing stricter access controls")
if len(threat_summary['blocked_ips']) > 5:
recommendations.append("Review and potentially expand IP blocking policies")
recommendations.append("Regular security awareness training for development teams")
recommendations.append("Consider implementing additional MFA for administrative access")
return recommendations
async def start_all_monitoring(self):
"""启动所有监控任务"""
logging.info("Starting Nginx APT monitoring system...")
# 并发运行所有监控任务
await asyncio.gather(*self.monitoring_tasks)
# 使用示例和部署配置
if __name__ == "__main__":
# 创建监控实例
monitor = NginxAPTMonitor('/var/log/nginx/access.log')
# 启动监控
asyncio.run(monitor.start_all_monitoring())1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
🎯 总结:构建企业级安全防线的完整路径
通过本文的深入探讨,我们已经从基础的Nginx安全加固,逐步构建了一个涵盖云原生架构、AI智能防护、企业集成和高级威胁检测的完整安全体系。让我们回顾一下这个渐进式的安全建设路径:
🛡️ 基础安全(90%漏洞防护)
- Host头攻击防护:通过严格的主机名验证和默认服务器配置
- 敏感文件保护:使用location匹配和访问控制列表
- 目录遍历防护:URL规范化处理和路径验证
- 版本信息隐藏:server_tokens off配置
- 错误页面定制:防止信息泄露的统一错误处理
☁️ 云原生安全架构
- Kubernetes Ingress安全:NetworkPolicy和RBAC的精细化控制
- 容器化最佳实践:最小权限镜像和安全上下文
- Service Mesh集成:Istio的mTLS和流量策略
- 多集群安全策略:统一的安全治理和合规检查
🤖 AI驱动的智能防护
- 机器学习异常检测:实时流量分析和行为基线建立
- 威胁情报集成:多源威胁数据的实时关联分析
- 自动化响应机制:基于风险评分的智能阻断和告警
- 预测性安全防护:零日漏洞的提前预警和防护
🏢 企业级集成方案
- DevSecOps流水线:安全测试的左移和自动化
- SIEM/SOAR集成:安全事件的统一管理和响应
- 合规性自动化:GDPR、等保等标准的自动合规检查
- 多云安全策略:跨云平台的统一安全管理
🎯 高级威胁防护(APT检测)
- 行为分析引擎:用户行为基线和异常检测
- 威胁狩猎系统:主动威胁发现和情报收集
- 零日漏洞响应:快速漏洞评估和临时防护
- APT攻击链检测:多阶段攻击的完整链路分析
📊 实施建议与最佳实践
渐进式部署策略
txt
阶段1:基础安全加固(1-2周)
├── Nginx配置优化
├── 访问控制实施
└── 日志监控建立
阶段2:高级防护集成(2-4周)
├── WAF规则部署
├── 速率限制优化
└── SSL/TLS强化
阶段3:智能化升级(4-8周)
├── AI异常检测
├── 威胁情报集成
└── 自动化响应
阶段4:企业级整合(8-12周)
├── DevSecOps集成
├── SIEM/SOAR连接
└── 合规性自动化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
关键性能指标(KPI)
- 安全事件响应时间:从检测到阻断 < 5分钟
- 误报率:AI检测误报 < 5%
- 系统可用性:安全服务可用性 > 99.9%
- 合规覆盖率:自动化合规检查 > 95%
运维监控要点
- 实时监控:24/7安全运营中心
- 定期评估:月度安全态势分析
- 威胁狩猎:季度主动威胁搜索
- 应急演练:半年度安全事件演练
🔮 未来发展趋势
随着技术的不断演进,Nginx安全网关也将面临新的挑战和机遇:
零信任架构集成
- 微分段技术:更细粒度的网络分段
- 身份感知代理:基于身份的动态访问控制
- 持续信任评估:实时的信任度计算和决策
量子安全准备
- 后量子密码学:抗量子计算攻击的加密算法
- 量子密钥分发:量子通信技术的安全应用
- 混合加密方案:传统与量子安全的平滑过渡
边缘计算安全
- 边缘节点防护:分布式边缘环境的安全管理
- 5G网络安全:新一代网络的安全挑战
- IoT设备集成:海量物联网设备的安全接入
💡 最后的建议
构建企业级安全防线是一个持续演进的过程,而非一次性项目。建议采用以下策略:
- 安全优先:在系统设计的每个阶段都将安全作为首要考虑
- 分层防护:实施多层防御策略,避免单点失效
- 自动化优先:尽可能自动化安全流程,减少人为错误
- 持续学习:保持对新威胁和安全技术的持续学习
- 团队协作:建立跨部门的安全协作机制
记住,最好的安全不是最强的防护,而是最适合的平衡。在追求极致安全的同时,也要考虑系统的可用性、性能和成本效益。通过本文提供的这套完整解决方案,您可以根据自身需求和环境特点,选择最适合的安全建设路径,逐步构建起坚不可摧的企业级安全防线。
安全之路,永无止境。让我们携手共建更安全的数字世界!🚀
V 0.12.8 |
基于 MIT Licensed版权所有 © 2009- 2026 CMONO.NET
本站访客数
--次 本站总访问量
--人次 