irpas技术客

Spring Cloud:负载均衡 - Spring Cloud Loadbalancer原理_言尭_spring.cloud.loadbalancer

网络 1940

Spring Cloud 2020版本以后,默认移除了对Netflix的依赖,其中就包括Ribbon,官方默认推荐使用Spring Cloud Loadbalancer正式替换Ribbon,并成为了Spring Cloud负载均衡器的唯一实现。

今天我们深入分析一下Spring Cloud Loadbalancer的具体实现:

使用

1、公共依赖Spring Cloud,例如版本2020.0.2

<dependency> ? ? ? ? <groupId>org.springframework.cloud</groupId> ? ? ? ? <artifactId>spring-cloud-dependencies</artifactId> ? ? ? ? <version>2020.0.2</version> ? ? ? ? <type>pom</type> ? ? ? ? <scope>import</scope> </dependency>

注意:

如果是Hoxton之前的版本,默认负载均衡器为Ribbon,需要移除Ribbon引用和增加配置spring.cloud.loadbalancer.ribbon.enabled: false

2、引入loadbalancer依赖

<dependency> ? ? <groupId>org.springframework.cloud</groupId> ? ? <artifactId>spring-cloud-starter-loadbalancer</artifactId> </dependency> <!-- 负载均衡需要搭配注册中心使用,这里引入Nacos做服务注册,也可采用Eureka等 --> <!-- SpringCloud Ailibaba Nacos --> <dependency> ? ? <groupId>com.alibaba.cloud</groupId> ? ? <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>

注意:

Nacos使用请参考官网:https://nacos.io/zh-cn/index.html

3、使用RestTemplate实现Demo

引入web依赖:

<dependency> ? ? <!-- 使用web,使用Spring MVC对外提供服务?? --> ? ? <groupId>org.springframework.boot</groupId> ? ? <artifactId>spring-boot-starter-web</artifactId> </dependency>

编写DemoController:

@RestController public class TestController { ? ? @Autowired ? ? private RestTemplate restTemplate; ? ? // 新增restTemplate对象注入方法,注意,此处LoadBalanced注解一定要加上,否则无法远程调用 ? ? @Bean ? ? @LoadBalanced ? ? public RestTemplate restTemplate() { ? ? ? ? return new RestTemplate(); ? ? } ? ? @GetMapping("/load") ? ? public String load() { ? ? ? ? return restTemplate.getForObject("http://demo-server/hello/", String.class); ? ? } ? ? @GetMapping(value = "/hello") ? ? public String hello() { ? ? ? ? return "Hello World"; ? ? } }

4、启动Nacos,调用接口http://localhost:8080/load

原理

上面是RestTemplate负载均衡的简单实现,除此之外,Spring Cloud LoadBalancer还支持Spring Web Flux响应式编程,这里我们不展开,两者的实现原理思想相同,都是通过客户端添加拦截器,在拦截器中实现负载均衡。

1、#RestTemplate,提供了一个方法setInterceptors,用于设置拦截器,拦截器需要实现ClientHttpRequestInterceptor接口即可,在实际远程去请求服务端接口之前会先调用拦截器的intercept方法逻辑。这里的拦截器相当于Servlet技术中的Filter功能。

// 代码实现在抽象父类InterceptingHttpAccessor里 // RestTemplate.InterceptingHttpAccessor#setInterceptors public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) { ?? // Take getInterceptors() List as-is when passed in here ?? if (this.interceptors != interceptors) { ? ? ? this.interceptors.clear(); ? ? ? this.interceptors.addAll(interceptors); ? ? ? AnnotationAwareOrderComparator.sort(this.interceptors); ?? } }

2、#LoadBalancerAutoConfiguration,由于@LoadBalanced注解由spring-cloud-commons实现,查看实现逻辑我们发现spring-cloud-commons存在自动配置类LoadBalancerAutoConfiguration,当满足条件时,将自动创建LoadBalancerInterceptor并注入到RestTemplate中。

? ? @Configuration( ? ? ? ? proxyBeanMethods = false ? ? ) ? ? @Conditional({LoadBalancerAutoConfiguration.RetryMissingOrDisabledCondition.class}) ? ? static class LoadBalancerInterceptorConfig { ? ? ? ? LoadBalancerInterceptorConfig() { ? ? ? ? } ? ? ? ? @Bean ? ? ? ? public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { ? ? ? ? ? ? return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); ? ? ? ? } ? ? ? ? @Bean ? ? ? ? @ConditionalOnMissingBean ? ? ? ? public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) { ? ? ? ? ? ? return (restTemplate) -> { ? ? ? ? ? ? ? ? List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors()); ? ? ? ? ? ? ? ? list.add(loadBalancerInterceptor); ? ? ? ? ? ? ? ? restTemplate.setInterceptors(list); ? ? ? ? ? ? }; ? ? ? ? } ? ? }

3、#LoadRalancerLnterceptor,LoadBalancerInterceptor实现了ClientHttpRequestInterceptor接口,实现intercept方法,用于实现负载均衡的拦截处理。

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { ? ? private LoadBalancerClient loadBalancer; ? ? private LoadBalancerRequestFactory requestFactory; ? ? public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { ? ? ? ? this.loadBalancer = loadBalancer; ? ? ? ? this.requestFactory = requestFactory; ? ? } ? ? public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { ? ? ? ? this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); ? ? } ? ? public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { ? ? ? ? URI originalUri = request.getURI(); ? ? ? ? String serviceName = originalUri.getHost(); ? ? ? ? Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); ? ? ? ? return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); ? ? } }

4、#LoadBalancerClient,负载均衡客户端,用于进行负载均衡逻辑,从服务列表中选择出一个服务地址进行调用。Spring Cloud LoadBalancer的默认实现为BlockingLoadBalancerClient,

? ? public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ? ? ? ? String hint = this.getHint(serviceId); ? ? ? ? LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter(request, new DefaultRequestContext(request, hint)); ? ? ? ? Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId); ? ? ? ? supportedLifecycleProcessors.forEach((lifecycle) -> { ? ? ? ? ? ? lifecycle.onStart(lbRequest); ? ? ? ? }); ? ? ? //选择服务 ? ? ? ? ServiceInstance serviceInstance = this.choose(serviceId, lbRequest); ? ? ? ? if (serviceInstance == null) { ? ? ? ? ? ? supportedLifecycleProcessors.forEach((lifecycle) -> { ? ? ? ? ? ? ? ? lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, new EmptyResponse())); ? ? ? ? ? ? }); ? ? ? ? ? ? throw new IllegalStateException("No instances available for " + serviceId); ? ? ? ? } else { ? ? ? ? ? ? return this.execute(serviceId, serviceInstance, lbRequest); ? ? ? ? } ? ? } ? ? public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { ? ? ? ? DefaultResponse defaultResponse = new DefaultResponse(serviceInstance); ? ? ? ? Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId); ? ? ? ? Request lbRequest = request instanceof Request ? (Request)request : new DefaultRequest(); ? ? ? ? supportedLifecycleProcessors.forEach((lifecycle) -> { ? ? ? ? ? ? lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)); ? ? ? ? }); ? ? ? ? try { ? ? ? ? ? ? T response = request.apply(serviceInstance); ? ? ? ? ? ? Object clientResponse = this.getClientResponse(response); ? ? ? ? ? ? supportedLifecycleProcessors.forEach((lifecycle) -> { ? ? ? ? ? ? ? ? lifecycle.onComplete(new CompletionContext(Status.SUCCESS, lbRequest, defaultResponse, clientResponse)); ? ? ? ? ? ? }); ? ? ? ? ? ? return response; ? ? ? ? } catch (IOException var9) { ? ? ? ? ? ? supportedLifecycleProcessors.forEach((lifecycle) -> { ? ? ? ? ? ? ? ? lifecycle.onComplete(new CompletionContext(Status.FAILED, var9, lbRequest, defaultResponse)); ? ? ? ? ? ? }); ? ? ? ? ? ? throw var9; ? ? ? ? } catch (Exception var10) { ? ? ? ? ? ? supportedLifecycleProcessors.forEach((lifecycle) -> { ? ? ? ? ? ? ? ? lifecycle.onComplete(new CompletionContext(Status.FAILED, var10, lbRequest, defaultResponse)); ? ? ? ? ? ? }); ? ? ? ? ? ? ReflectionUtils.rethrowRuntimeException(var10); ? ? ? ? ? ? return null; ? ? ? ? } ? ? } ....? ? ? public ServiceInstance choose(String serviceId) { ? ? ? ? return this.choose(serviceId, ReactiveLoadBalancer.REQUEST); ? ? } //通过不同的负载均衡客户端实现选择不同的服务 ? ? public <T> ServiceInstance choose(String serviceId, Request<T> request) { ? ? ? ? ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId); ? ? ? ? if (loadBalancer == null) { ? ? ? ? ? ? return null; ? ? ? ? } else { ? ? ? ? ? ? Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose(request)).block(); ? ? ? ? ? ? return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer(); ? ? ? ? } ? ? }

5、#LoadBalancerClientFactory,BlockingLoadBalancerClient中持有LoadBalancerClientFactory通过调用其getInstance方法获取具体的负载均衡客户端。客户端实现了不同的负载均衡算法,比如轮询、随机等。LoadBalancerClientFactory继承了NamedContextFactory,NamedContextFactory继承ApplicationContextAware,实现Spring ApplicationContext容器操作。

public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification> implements Factory<ServiceInstance> { ? ? public static final String NAMESPACE = "loadbalancer"; ? ? public static final String PROPERTY_NAME = "loadbalancer.client.name"; ? ? public LoadBalancerClientFactory() { ? ? ? ? super(LoadBalancerClientConfiguration.class, "loadbalancer", "loadbalancer.client.name"); ? ? } ? ? public String getName(Environment environment) { ? ? ? ? return environment.getProperty("loadbalancer.client.name"); ? ? } ? ? public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) { ? ? ? ? return (ReactiveLoadBalancer)this.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class); ? ? } }

在spring-cloud-loadbalabcer中的LoadBalancerAutoConfiguration实现了LoadBalancerClientFactory缺省值:

? ? @ConditionalOnMissingBean ? ? @Bean ? ? public LoadBalancerClientFactory loadBalancerClientFactory() { ? ? ? ? LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory(); ? ? ? ? clientFactory.setConfigurations((List)this.configurations.getIfAvailable(Collections::emptyList)); ? ? ? ? return clientFactory; ? ? }

6、#ReactiveLoadBalancer,负载均衡器,实现服务选择。Spring Cloud Balancer中实现了轮询RoundRobinLoadBalancer和随机数RandomLoadBalancer两种负载均衡算法。

如果没有显式指定负载均衡算法,默认缺省值为RoundRobinLoadBalancer。

LoadBalancerClientConfiguration#LoadBalancerClientConfiguration

? ? @Bean ? ? @ConditionalOnMissingBean ? ? public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { ? ? ? ? String name = environment.getProperty("loadbalancer.client.name"); ? ? ? ? return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); ? ? }

7、#LoadBalancerRequestFactory,LoadBalancerRequest工厂类,用于创建LoadBalancerRequest,调用createRequest方法。在内部持有LoadBalancerClient属性对象,即BlockingLoadBalancerClient。

public class LoadBalancerRequestFactory { ? ? private LoadBalancerClient ; ? ? private List<LoadBalancerRequestTransformer> transformers; ? ? public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers) { ? ? ? ? this.loadBalancer = loadBalancer; ? ? ? ? this.transformers = transformers; ? ? } ? ? public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) { ? ? ? ? this.loadBalancer = loadBalancer; ? ? } ? ? public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) { ? ? ? ? return (instance) -> { ? ? ? ? ? ? HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer); ? ? ? ? ? ? LoadBalancerRequestTransformer transformer; ? ? ? ? ? ? if (this.transformers != null) { ? ? ? ? ? ? ? ? for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) { ? ? ? ? ? ? ? ? ? ? transformer = (LoadBalancerRequestTransformer)var6.next(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? return execution.execute((HttpRequest)serviceRequest, body); ? ? ? ? }; ? ? } }

?

整合Feign

在日常项目中,一般负载均衡都是结合Feign使用,下面我们讨论下结合Fegin的使用情况

1、引入依赖

? ? ? ? <!-- SpringCloud Openfeign --> ? ? ? ? <dependency> ? ? ? ? ? ? <groupId>org.springframework.cloud</groupId> ? ? ? ? ? ? <artifactId>spring-cloud-starter-openfeign</artifactId> ? ? ? ? </dependency> ? ? ?? <!-- 其他 loadbalancer依赖 -->

2、定义Feign接口

@FeignClient(name = "my-service") public interface RemoteLogService { ? ? @PostMapping("/sys/log") ? ? R<Boolean> saveLog(@RequestBody SysLog sysLog); }

3、调用Feign接口调试(正常负载均衡已经可以使用,无需做其他配置)

4、原理分析

下图是Feign的实现原理,详情可参考博文:Feign原理 (图解)

4.1、查看Feign的loadbalancer的自动配置:FeignLoadBalancerAutoConfiguration,存在

LoadBalancerClient和LoadBalancerClientFactory的bean时,配置生效,默认使用DefaultFeignLoadBalancerConfiguration。请注意,如果引用了OkHttp或HttpClient,将使用不同的configuration文件。

@ConditionalOnClass({Feign.class}) @ConditionalOnBean({LoadBalancerClient.class, LoadBalancerClientFactory.class}) @AutoConfigureBefore({FeignAutoConfiguration.class}) @AutoConfigureAfter({BlockingLoadBalancerClientAutoConfiguration.class, LoadBalancerAutoConfiguration.class}) @EnableConfigurationProperties({FeignHttpClientProperties.class}) @Configuration( proxyBeanMethods = false ) @Import({HttpClientFeignLoadBalancerConfiguration.class, OkHttpFeignLoadBalancerConfiguration.class, HttpClient5FeignLoadBalancerConfiguration.class, DefaultFeignLoadBalancerConfiguration.class}) public class FeignLoadBalancerAutoConfiguration { public FeignLoadBalancerAutoConfiguration() { } }

4.2、#DefaultFeignLoadBalancerConfiguration,将缺省创建FeignBlockingLoadBalancerClient并注入LoadBalancerClient和LoadBalancerClientFactory,这两个bean的创建请参考上文。

@Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({LoadBalancerProperties.class}) class DefaultFeignLoadBalancerConfiguration { DefaultFeignLoadBalancerConfiguration() { } @Bean @ConditionalOnMissingBean @Conditional({OnRetryNotEnabledCondition.class}) public Client feignClient(LoadBalancerClient loadBalancerClient, LoadBalancerProperties properties, LoadBalancerClientFactory loadBalancerClientFactory) { return new FeignBlockingLoadBalancerClient(new Default((SSLSocketFactory)null, (HostnameVerifier)null), loadBalancerClient, properties, loadBalancerClientFactory); } ... }

4.3、#FeignBlockingLoadBalancerClient,实现excute方法,实现Feign具体请求操作,通过loadBalancerClient.choose获取实例并执行请求,具体选择逻辑和RestTemplate一致。

public class FeignBlockingLoadBalancerClient implements Client { private final Client delegate; private final LoadBalancerClient loadBalancerClient; private final LoadBalancerProperties properties; private final LoadBalancerClientFactory loadBalancerClientFactory; public Response execute(Request request, Options options) throws IOException { URI originalUri = URI.create(request.url()); String serviceId = originalUri.getHost(); Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri); String hint = this.getHint(serviceId); DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(LoadBalancerUtils.buildRequestData(request), hint)); Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); supportedLifecycleProcessors.forEach((lifecycle) -> { lifecycle.onStart(lbRequest); }); ServiceInstance instance = this.loadBalancerClient.choose(serviceId, lbRequest); org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance); String message; if (instance == null) { message = "Load balancer does not contain an instance for the service " + serviceId; if (LOG.isWarnEnabled()) { LOG.warn(message); } supportedLifecycleProcessors.forEach((lifecycle) -> { lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, lbResponse)); }); return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build(); } else { message = this.loadBalancerClient.reconstructURI(instance, originalUri).toString(); Request newRequest = this.buildRequest(request, message); return LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(this.delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors); } } } ?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Spring #Cloud #Cloud负载均衡器的唯一实现