irpas技术客

Spring cloud Gateway 源码(二) 路由流程_中年张先生_gateway 获取路由

网络 7453

????????

目录

1、DispatcherHandler

1.1?handle方法

1.1.1 getHandler 获取请求处理器

? ?

1.1.2?invokeHandler 执行

?2、路由选择

? 2.1? 选择目标服务地址(ReactiveLoadBalancerClientFilter)

2.2、请求目标服务和响应

7.4. The Netty Routing Filter

7.5. The Netty Write Response Filter

??2.2.1 NettyWriteResponseFilter

? ?2.2.2?NettyRoutingFilter


? ? ?? ? ? ??

??????????在上次我们介绍完基本的对象后,这次我们看下gateway是如何工作。

????????首先提一个就是 DispatcherHandler,这个是WebFlux的核心处理类,作用同SpringMvc的DispatcherServlet。

1、DispatcherHandler @Override public void setApplicationContext(ApplicationContext applicationContext) { initStrategies(applicationContext); } //加载各种handlermapping。 protected void initStrategies(ApplicationContext context) { //查找当前容器内,HandlerMapping的子类,关注的就是之前提过的RoutePredicateHandlerMapping这个属于gateway内的对象 Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerMapping.class, true, false); ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); this.handlerMappings = Collections.unmodifiableList(mappings); Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerAdapter.class, true, false); this.handlerAdapters = new ArrayList<>(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerResultHandler.class, true, false); this.resultHandlers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); } 1.1?handle方法 public Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) { return createNotFoundError(); } if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { return handlePreFlight(exchange); } return Flux.fromIterable(this.handlerMappings) //① 使用mapping获取该请求对应的处理器 .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) //② 调用handler执行处理 .flatMap(handler -> invokeHandler(exchange, handler)) //③ 处理请求 .flatMap(result -> handleResult(exchange, result)); } 1.1.1 getHandler 获取请求处理器

? ? ? ? 这里我们只研究跟gateway相关的处理器。?我们之前说过了一个RoutePredicateHandlerMapping处理类,getHandler方法入口在AbstractHandlerMapping,它提供了一个模板方法。

? ? ? ? 根据代码,我们查看RoutePredicateHandlerMapping#getHandlerInternal方法。

?????????首先会在当前参数内放入?GATEWAY_HANDLER_MAPPER_ATTR 变量。目前只看到了放入,还没看到在哪里会用到,它的值也只是当前处理类的一个简称。

? ? ? ? 重点是?lookupRoute 方法

protected Mono<Route> lookupRoute(ServerWebExchange exchange) { //routeLocator 前面说过,它就是获取所有路由的处理类,在当前类构造方法内传入的具体实现类是CachingRouteLocator,为什么注入的是这个实现类呢?可以看看在autoconfiguration种它是@Primary return this.routeLocator.getRoutes() .concatMap(route -> Mono.just(route).filterWhen(r -> { //将路由id放入当前请求上下文 exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); //① 判断该路由谓词是否符合请求 return r.getPredicate().apply(exchange); }).doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) .next() .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; }); }

?①:这里是判断路由的谓词是否匹配到了请求,而我们知道谓词有很多种模式,我们可以根据自己配置的类型,去查看不同的谓词处理程序。例如应用较多的PathRoutePredicateFactory源码太多不放上来了。主要就是获取请求URI与定义的谓词规则进行匹配对比。

? ? ? ? 至此我们获取到了匹配到的路由,获取到路由后,会将路由放到请求上下文的GATEWAY_ROUTE_ATTR,返回的就是我们之前说的FilteringWebHandler。

? ? 1.1.2?invokeHandler 执行 private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) { if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) { return Mono.empty(); } if (this.handlerAdapters != null) { for (HandlerAdapter handlerAdapter : this.handlerAdapters) { if (handlerAdapter.supports(handler)) { //进入SimpleHandlerAdapter.handle方法,执行参数handler的handle方法 return handlerAdapter.handle(exchange, handler); } } } return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler)); } public Mono<Void> handle(ServerWebExchange exchange) { //这里就是从请求上下文种取到了上面决策到的路由 Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); //我们配置的filter List<GatewayFilter> gatewayFilters = route.getFilters(); //全局filter List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); //合并排序 combined.addAll(gatewayFilters); AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } //构建过滤器链 return new DefaultGatewayFilterChain(combined).filter(exchange); }

? ? ? ? gateway是支持服务发现的,那么到此是在哪里执行进行的服务选择的呢?还记得我们之前的

ReactiveLoadBalancerClientFilter吗?我们可以debug看看它是否在这个globalFilter里。

????????它确实在里面,所以当过滤器链挨个调用过滤器的时候会走倒它那里。后面再详细说这个。至此,我们的大致调用流程就清晰了。

? ? ? ??

?2、路由选择 ? 2.1? 选择目标服务地址(ReactiveLoadBalancerClientFilter) //获取的是 lb://test 类似的url,GATEWAY_REQUEST_URL_ATTR 这个属性是在全局过滤器 RouteToRequestUrlFilter设置的 URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); //scheme,lb/https 类 String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } //把当前url存放到上下文属性里 addOriginalRequestUrl(exchange, url);

? ? ? ? 后面的choose方法就是从注册中心选择注册的服务,并转换地址覆盖GATEWAY_REQUEST_URL_ATTR。

choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> { if (!response.hasServer()) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response))); throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } ServiceInstance retrievedInstance = response.getServer(); URI uri = exchange.getRequest().getURI(); String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); URI requestUrl = reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response)); })

? ? ? ? 具体怎么选择的,以及负载均衡策略,后续单独记录。

2.2、请求目标服务和响应

? ? ? ? 先提前透漏,处理请求和响应的是两个全局过滤器,看看官网的解释。

????????Spring Cloud Gateway

????????

7.4. The Netty Routing Filter

The Netty routing filter runs if the URL located in the?ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR?exchange attribute has a?http?or?https?scheme. It uses the Netty?HttpClient?to make the downstream proxy request. The response is put in the?ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR?exchange attribute for use in a later filter. (There is also an experimental?WebClientHttpRoutingFilter?that performs the same function but does not require Netty.)

7.5. The Netty Write Response Filter

The?NettyWriteResponseFilter?runs if there is a Netty?HttpClientResponse?in the?ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR?exchange attribute. It runs after all other filters have completed and writes the proxy response back to the gateway client response. (There is also an experimental?WebClientWriteResponseFilter?that performs the same function but does not require Netty.)

????????从文档可知,NettyRoutingFilter和NettyWriteResponseFilter是分别处理请求和响应的,那么他们的执行顺序是先请求再响应吗?它们都同属于GlobalFilter,顺序我们看源码的getOrder返回值即可,我们看了源码后发现和我们想的不一样,顺序明显是NettyWriteResponseFilter在NettyRoutingFilter之前。我理解这正好是响应编程的模式。

??2.2.1 NettyWriteResponseFilter @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange) //看名字知道是出错后 .doOnError(throwable -> cleanup(exchange)) //前面filter执行完,异步执行then .then(Mono.defer(() -> { //CLIENT_RESPONSE_CONN_ATTR会在nettyroutingfilter执行http请求后设置 Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); if (connection == null) { return Mono.empty(); } if (log.isTraceEnabled()) { log.trace("NettyWriteResponseFilter start inbound: " + connection.channel().id().asShortText() + ", outbound: " + exchange.getLogPrefix()); } ServerHttpResponse response = exchange.getResponse(); //connection请求后信息写入response final Flux<DataBuffer> body = connection .inbound() .receive() .retain() .map(byteBuf -> wrap(byteBuf, response)); MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { if (log.isTraceEnabled()) { log.trace("invalid media type", e); } } //根据类型执行不同操作并返回给客户端,应该媒体类的特殊处理 return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })).doOnCancel(() -> cleanup(exchange)); } ? ?2.2.2?NettyRoutingFilter

????????

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); //目标请求url final String url = requestUrl.toASCIIString(); //拿到路由配置的header过滤器处理后的headers https://docs.spring.io/spring-cloud- g //ateway/docs/current/reference/html/#httpheadersfilters HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); //构建httpclient Flux<HttpClientResponse> responseFlux = getHttpClient(route, //前面的header加入httpclient内 exchange).headers(headers -> { headers.add(httpHeaders); // Will either be set below, or later by Netty headers.remove(HttpHeaders.HOST); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } }).request(method).uri(url).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound.withConnection(connection -> log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } return nettyOutbound.send(request.getBody().map(this::getByteBuf)); //处理返回response }).responseConnection((res, connection) -> { exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); //nttywriteresponsefilter就是拿到的这个connection exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } setResponseStatus(res, response); // make sure headers filters run after setting status so it is // available in response HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { // It is not valid to have both the transfer-encoding header and // the content-length header. // Remove the transfer-encoding header in the response if the // content-length header is present. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); return Mono.just(res); }); Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux .timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #gateway #获取路由 #首先提一个就是 #void