创建一个全局过滤器把body中的数据缓存起来
springcloud版本 2020版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j @Component public class CacheBodyGlobalFilter implements Ordered , GlobalFilter { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { if (!StrUtil.containsAnyIgnoreCase(exchange.getRequest().getURI().getPath(), SecurityConstants.OAUTH_TOKEN_URL, SecurityConstants.SMS_TOKEN_URL)) { return chain.filter(exchange); } else { if (MediaType.APPLICATION_JSON.equals(exchange.getRequest().getHeaders().getContentType()) && HttpMethod.POST.equals(exchange.getRequest().getMethod())) { return DataBufferUtils.join(exchange.getRequest().getBody()) .flatMap(dataBuffer -> { DataBufferUtils.retain(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0 , dataBuffer.readableByteCount()))); ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody () { return cachedFlux; } }; return chain.filter(exchange.mutate().request(mutatedRequest).build()); }); } return chain.filter(exchange); } } @Override public int getOrder () { return Ordered.HIGHEST_PRECEDENCE; } }
创建一个全局过滤器把body中的数据缓存起来
CacheBodyGlobalFilter这个全局过滤器的目的就是把原有的request请求中的body内容读出来,并且使用ServerHttpRequestDecorator这个请求装饰器对request进行包装,重写getBody方法,并把包装后的请求放到过滤器链中传递下去。
这样后面的过滤器中再使用exchange.getRequest().getBody()来获取body时,实际上就是调用的重载后的getBody方法,获取的最先已经缓存了的body数据。这样就能够实现body的多次读取了。
值得一提的是,这个过滤器的order设置的是Ordered.HIGHEST_PRECEDENCE,即最高优先级的过滤器。优先级设置这么高的原因是某些系统内置的过滤器可能也会去读body,这样就会导致我们自定义过滤器中获取body的时候报body只能读取一次这样的错误如下:
1 2 3 4 5 java.lang.IllegalStateException: Only one connection receive subscriber allowed. at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279) at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:129) at
** 所以,必须把CacheBodyGlobalFilter的优先级设到最高。 **
在自定义的过滤器中尝试获取body中的数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Slf4j @RequiredArgsConstructor @Component @SuppressWarnings("all") public class ValidateCodeJsonGatewayFilter extends AbstractGatewayFilterFactory { private final ObjectMapper objectMapper; private final RedisTemplate redisTemplate; @Override public GatewayFilter apply (Object config) { return (exchange, chain) -> { ServerHttpRequest request = exchange.getRequest(); if (!StrUtil.containsAnyIgnoreCase(request.getURI().getPath(), SecurityConstants.OAUTH_TOKEN_URL, SecurityConstants.SMS_TOKEN_URL)) { return chain.filter(exchange); } Flux<DataBuffer> dataBufferFlux = request.getBody(); String body = FilterRequestResponseUtil.resolveBodyFromRequest(dataBufferFlux); JSONObject jsonObject = JSONUtil.parseObj(body); String grantType = jsonObject.getStr("grant_type" ); if (StrUtil.equals(SecurityConstants.REFRESH_TOKEN, grantType)) { return chain.filter(exchange); } if (!isCheckCaptchaClient(request)) { return chain.filter(exchange); } try { checkCode(jsonObject); } catch (Exception e) { ServerHttpResponse response = exchange.getResponse(); response.getHeaders().setContentType(MediaType.APPLICATION_JSON); response.setStatusCode(HttpStatus.PRECONDITION_REQUIRED); try { return response.writeWith(Mono.just( response.bufferFactory().wrap(objectMapper.writeValueAsBytes(Resp.failed(e.getMessage()))))); } catch (JsonProcessingException e1) { log.error("对象输出异常" , e1); } } return chain.filter(exchange); }; } private boolean isCheckCaptchaClient (ServerHttpRequest request) { ... } @SneakyThrows private void checkCode (JSONObject jsonObject) { ... } }
解析body的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @UtilityClass public class FilterRequestResponseUtil { public final Pattern p = Pattern.compile("\\s*|\t|\r|\n" ); public String resolveBodyFromRequest ( Flux<DataBuffer> body) { AtomicReference<String> bodyRef = new AtomicReference<>(); body.subscribe(dataBuffer -> { CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); bodyRef.set(charBuffer.toString()); }); return bodyRef.get(); } public String resolveBodyFromRequest2 ( Flux<DataBuffer> body) { StringBuilder sb = new StringBuilder(); body.subscribe(buffer -> { byte [] bytes = new byte [buffer.readableByteCount()]; buffer.read(bytes); DataBufferUtils.release(buffer); String bodyString = new String(bytes, StandardCharsets.UTF_8); sb.append(bodyString); }); return formatStr(sb.toString()); } private String formatStr (String str) { if (str != null && str.length() > 0 ) { Matcher m = p.matcher(str); return m.replaceAll("" ); } return str; } }
解析body的内容,网上普遍是上面的两种方式,亲测resolveBodyFromRequest方法解析body中的数据,没有1024字节的限制。