优化:网关堆外内存泄漏优化

This commit is contained in:
yangyang01000846
2025-11-28 11:47:54 +08:00
parent af34fde7cd
commit ce0502c1e6

View File

@@ -8,6 +8,7 @@ import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
@@ -16,196 +17,177 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
/**
* 网关请求响应日志记录过滤器
* 集成了TraceId生成与传递功能
* 使用 DataBufferUtils.join() 解决多次拷贝问题
*/
@Slf4j
@Component
public class RequestResponseLoggingFilter implements GlobalFilter, Ordered {
/**
* MDC 中存储 traceId 的 key需与日志格式中的 %X{traceId} 对应)
*/
public static final String TRACE_ID_KEY = "traceId";
/**
* 请求头/响应头中传递 traceId 的 key
*/
public static final String TRACE_ID_KEY = "traceId";
public static final String TRACE_ID_HEADER = "X-Trace-Id";
/** 最大记录的响应体长度(防止大 JSON */
private static final int MAX_BODY_LENGTH = 10 * 1024;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 生成或获取 traceId优先从请求头获取支持前端传递便于联调
String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);
if (traceId == null || traceId.trim().isEmpty()) {
// 生成 UUID 并去除横杠32位简洁易读
traceId = UUID.randomUUID().toString().replace("-", "");
}
// 存入 MDC供日志打印使用所有日志框架可通过 %X{traceId} 获取)
String traceId = getOrCreateTraceId(exchange);
ServerWebExchange tracedExchange = mutateExchangeWithTraceId(exchange, traceId);
MDC.put(TRACE_ID_KEY, traceId);
// 写入响应头(返回给前端,便于前端排查问题时匹配日志)
// 只在响应头中还没有X-Trace-Id时才设置避免与下游服务设置的TraceId重复
ServerHttpResponse response = exchange.getResponse();
if (!response.getHeaders().containsKey(TRACE_ID_HEADER)) {
response.getHeaders().set(TRACE_ID_HEADER, traceId);
}
// 构建带有 traceId 的新请求
ServerHttpRequest request = exchange.getRequest().mutate()
.header(TRACE_ID_HEADER, traceId)
.build();
ServerHttpRequest finalRequest = request;
String method = request.getMethod().toString();
String path = request.getURI().getPath();
String query = request.getURI().getQuery();
String clientIp = getClientIp(request);
// 记录请求详细信息
StringBuilder requestLog = new StringBuilder();
requestLog.append("\n==================== 网关接收到请求 ====================\n");
requestLog.append("TraceId: ").append(traceId).append("\n");
requestLog.append("请求方法: ").append(method).append("\n");
requestLog.append("请求路径: ").append(path).append("\n");
requestLog.append("请求参数: ").append(query != null ? query : "").append("\n");
requestLog.append("客户端IP: ").append(clientIp).append("\n");
// 记录请求头
requestLog.append("请求头:\n");
for (Map.Entry<String, List<String>> header : request.getHeaders().entrySet()) {
requestLog.append(" ").append(header.getKey()).append(": ").append(header.getValue()).append("\n");
}
requestLog.append("========================================================");
log.info(requestLog.toString());
long startTime = System.currentTimeMillis();
// 包装响应以捕获响应数据
DataBufferFactory bufferFactory = response.bufferFactory();
String finalTraceId = traceId;
String finalTraceId1 = traceId;
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.map(dataBuffer -> {
// 记录响应信息
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
StringBuilder responseLog = new StringBuilder();
responseLog.append("\n==================== 网关响应信息 ====================\n");
responseLog.append("TraceId: ").append(finalTraceId).append("\n");
responseLog.append("请求方法: ").append(method).append("\n");
responseLog.append("请求路径: ").append(path).append("\n");
responseLog.append("响应状态: ").append(getStatusCode()).append("\n");
responseLog.append("处理时间: ").append(duration).append("ms\n");
// 记录响应头
responseLog.append("响应头:\n");
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
responseLog.append(" ").append(header.getKey()).append(": ").append(String.join(",", header.getValue())).append("\n");
}
// 注意:记录响应体可能会影响性能,特别是对于大文件
// 如果需要记录响应体,可以取消下面的注释,但要注意性能影响
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
String responseBody = new String(content, StandardCharsets.UTF_8);
DataBuffer newBuffer = bufferFactory.wrap(content);
responseLog.append("响应体: ").append(responseBody).append("\n");
responseLog.append("========================================================");
log.info(responseLog.toString());
return newBuffer;
}));
}
return super.writeWith(body);
}
// 处理零字节响应的情况
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
StringBuilder responseLog = new StringBuilder();
responseLog.append("\n==================== 网关响应信息 ====================\n");
responseLog.append("TraceId: ").append(finalTraceId1).append("\n");
responseLog.append("请求方法: ").append(method).append("\n");
responseLog.append("请求路径: ").append(path).append("\n");
responseLog.append("响应状态: ").append(getStatusCode()).append("\n");
responseLog.append("处理时间: ").append(duration).append("ms\n");
// 记录响应头
responseLog.append("响应头:\n");
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
responseLog.append(" ").append(header.getKey()).append(": ").append(header.getValue()).append("\n");
}
logRequest(tracedExchange, traceId);
responseLog.append("========================================================");
log.info(responseLog.toString());
return super.writeAndFlushWith(body);
}
};
ServerHttpResponseDecorator responseDecorator =
decorateResponse(tracedExchange, traceId, startTime);
// 将装饰后的响应和带 traceId 的请求替换到 exchange 中
return chain.filter(exchange.mutate()
.request(finalRequest)
.response(decoratedResponse)
return chain.filter(tracedExchange.mutate()
.response(responseDecorator)
.build())
.doFinally(signalType -> {
// 清除 MDC 中的 traceId关键避免线程池复用导致的 traceId 污染)
MDC.remove(TRACE_ID_KEY);
});
}
private String getClientIp(ServerHttpRequest request) {
String xForwardedFor = request.getHeaders().getFirst("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
String xRealIp = request.getHeaders().getFirst("X-Real-IP");
if (xRealIp != null && !xRealIp.isEmpty()) {
return xRealIp;
}
// 检查Origin头部
String origin = request.getHeaders().getFirst("Origin");
if (origin != null && !origin.isEmpty()) {
// Origin格式为 http://domain:port 或 https://domain:port
try {
String[] parts = origin.split("://");
if (parts.length > 1) {
String hostPort = parts[1];
String[] hostPortParts = hostPort.split(":");
return hostPortParts[0];
}
} catch (Exception e) {
// 解析失败则继续使用其他方式获取IP
}
}
InetSocketAddress remoteAddress = request.getRemoteAddress();
return remoteAddress != null ? remoteAddress.getAddress().getHostAddress() : "unknown";
.doFinally(s -> MDC.remove(TRACE_ID_KEY));
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1; // 确保在大多数其他过滤器之前执行
return Ordered.HIGHEST_PRECEDENCE + 1;
}
}
// ================= TraceId =================
private String getOrCreateTraceId(ServerWebExchange exchange) {
String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);
return (traceId == null || traceId.isBlank())
? UUID.randomUUID().toString().replace("-", "")
: traceId;
}
private ServerWebExchange mutateExchangeWithTraceId(ServerWebExchange exchange, String traceId) {
exchange.getResponse().getHeaders().set(TRACE_ID_HEADER, traceId);
ServerHttpRequest request = exchange.getRequest().mutate()
.header(TRACE_ID_HEADER, traceId)
.build();
return exchange.mutate().request(request).build();
}
// ================= Request =================
private void logRequest(ServerWebExchange exchange, String traceId) {
ServerHttpRequest request = exchange.getRequest();
log.info("""
==================== 网关请求 ====================
TraceId: {}
Method: {}
Path: {}
Query: {}
ClientIp: {}
Headers: {}
=================================================
""",
traceId,
request.getMethod(),
request.getURI().getPath(),
request.getURI().getQuery(),
getClientIp(request),
request.getHeaders()
);
}
// ================= Response =================
private ServerHttpResponseDecorator decorateResponse(
ServerWebExchange exchange,
String traceId,
long startTime) {
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
return new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (!(body instanceof Flux)) {
return super.writeWith(body);
}
Flux<DataBuffer> fluxBody = (Flux<DataBuffer>) body;
return DataBufferUtils.join(fluxBody)
.flatMap(joinedBuffer -> {
byte[] content = null;
try {
int length = Math.min(
joinedBuffer.readableByteCount(),
MAX_BODY_LENGTH
);
content = new byte[length];
joinedBuffer.read(content);
logResponse(exchange, traceId, startTime, content);
// 必须返回完整响应内容给客户端
DataBuffer newBuffer = bufferFactory.wrap(
joinedBuffer.asByteBuffer()
);
return super.writeWith(Mono.just(newBuffer));
} finally {
DataBufferUtils.release(joinedBuffer);
}
});
}
};
}
private void logResponse(
ServerWebExchange exchange,
String traceId,
long startTime,
byte[] content) {
long cost = System.currentTimeMillis() - startTime;
ServerHttpResponse response = exchange.getResponse();
String responseBody = content == null
? ""
: new String(content, StandardCharsets.UTF_8);
log.info("""
==================== 网关响应 ====================
TraceId: {}
Status: {}
Cost: {} ms
Headers: {}
Body: {}
=================================================
""",
traceId,
response.getStatusCode(),
cost,
response.getHeaders(),
responseBody
);
}
// ================= Util =================
private String getClientIp(ServerHttpRequest request) {
String xff = request.getHeaders().getFirst("X-Forwarded-For");
if (xff != null && !xff.isBlank()) {
return xff.split(",")[0].trim();
}
return Optional.ofNullable(request.getRemoteAddress())
.map(addr -> addr.getAddress().getHostAddress())
.orElse("unknown");
}
}