原创 吴就业 137 0 2020-06-27
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://wujiuye.com/article/7ac94a204b934f10aee3af4387ba1ec1
作者:吴就业
链接:https://wujiuye.com/article/7ac94a204b934f10aee3af4387ba1ec1
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
本篇文章写于2020年06月27日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。
在项目中使用Ribbon
的目的是在客户端(服务消费端)实现负载均衡。
在上一篇《Spring Cloud OpenFeign源码分析,为什么不导入Ribbon应用会启动不起来?》中我们分析了为什么使用OpenFeign
时,不配置url
,且不导入Ribbon
的依赖会报错。本篇继续分析OpenFeign
是如何与Ribbon
整合、Ribbon
是如何实现负载均衡的、Ribbon
是如何从注册中心获取服务的。
OpenFeign
与Ribbon
整合后的接口调用流程OpenFeign
与Ribbon
整合实现负载均衡调用接口的流程如下:
spring-cloud-openfeign-core
模块:
* 1、调用LoadBalancerFeignClient
的execute
调用远程方法;
* 2、调用FeignLoadBalancer
的executeWithLoadBalancer
方法实现负载均衡调用。
ribbon-core
模块:
* 3、调用LoadBalancerCommand
的submit
方法实现异步调用同步阻塞等待结果。
* 4、调用LoadBalancerCommand
的selectServer
方法从多个服务提供者中负载均衡选择一个调用;
ribbon-loadbalancer
模块:
* 5、调用ILoadBalancer
的chooseServer
方法选择服务;
* 6、调用IRule
的choose
方法按某种算法选择一个服务,如随机算法、轮询算法;
OpenFeign
是如何与Ribbon
整合的sck-demo
项目项目地址:https://github.com/wujiuye/share-projects/tree/master/sck-demo
。
当我们使用openfeign
时,如果不配置@FeignClient
的url
属性,那么就需要导入spring-cloud-starter-kubernetes-ribbon
的依赖,使用LoadBalancerFeignClient
调用接口。如果我们不需要使用Ribbon
来实现负载均衡,那么我们可以直接将@FeignClient
的url
属性配置为:http://{serviceId}
,而不用添加Ribbon
的依赖。
sck-demo
项目中添加spring-cloud-starter-kubernetes-ribbon
依赖,非Spring Cloud Kubernetes
项目只需添加spring-cloud-starter-netflix-ribbon
。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-kubernetes-ribbon</artifactId>
</dependency>
当我们在项目中添加spring-cloud-starter-kubernetes-ribbon
依赖配置时,会将spring-cloud-starter-netflix-ribbon
和spring-cloud-kubernetes-ribbon
都会导入到项目中,如下图所示。
当项目中使用openfeign
并添加spring-cloud-starter-netflix-ribbon
后,Ribbon
就能通过自动配置与openfeign
整合,为项目注入ILoadBalancer
的实现类实例。默认使用的是ZoneAwareLoadBalancer
,这是spring-cloud-netflix-ribbon
下的类。
spring-cloud-netflix-ribbon
的META-INF
目录下的spring.factories
文件内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
可以说spring-cloud-netflix-ribbon
是spring-cloud-commons
的loadbalancer
接口的实现。
RibbonAutoConfiguration
会注入一个LoadBalancerClient
,LoadBalancerClient
是spring-cloud-commons
定义的负载均衡接口,RibbonLoadBalancerClient
是Ribbon
实现spring-cloud-commons
负载均衡接口LoadBalancerClient
的实现类,是提供给代码中使用@LoadBalanced
注解使用的。
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
在创建RibbonLoadBalancerClient
时调用springClientFactory
方法创建SpringClientFactory
:
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
SpringClientFactory
是NamedContextFactory
的子类,其构建方法调用父类构造方法时传入了一个配置类RibbonClientConfiguration.class
,这是RibbonClientConfiguration
配置类生效的原因。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
static final String NAMESPACE = "ribbon";
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
}
SpringClientFactory
会为每个服务提供者创建一个ApplicationContext
,实现bean
的隔离,解决bean
名称冲突问题,以及实现使用不同配置。
在创建ApplicationContext
时会注册defaultConfigType
到bean
工厂,该defaultConfigType
就是构造方法传递进来的RibbonClientConfiguration.class
。
protected AnnotationConfigApplicationContext createContext(String name) {
// 创建ApplicationContext
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
......
// 注册多个Configuration类
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
......
// 调用ApplicationContext的refresh方法
context.refresh();
return context;
}
那createContext
方法是什么时候被调用的呢?
以sck-demo
中服务消费者调用服务提供者接口为例:
@Service
public class DemoInvokeServiceImpl implements DemoInvokeService {
@Resource
private DemoService demoService;
@Override
public ListGenericResponse<DemoDto> invokeDemo() {
return demoService.getServices();
}
}
DemoService
是被@FeignClient
注解声明的接口,当我们调用DemoService
的某个方法时,经过《Spring Cloud OpenFeign
源码分析》我们知道,最终会调用到LoadBalancerFeignClient
的execute
方法时。
public class LoadBalancerFeignClient implements Client {
//............
private SpringClientFactory clientFactory;
// 后面再分析execute方法
@Override
public Response execute(Request request, Request.Options options) throws IOException{
// .....
IClientConfig requestConfig = getClientConfig(options, clientName);
// .....
}
}
execute
方法中需要调用getClientConfig
方法从SpringClientFactory
获取IClientConfig
实例,即获取客户端配置。getClientConfig
方法就是要从服务提供者的ApplicationContext
工厂中获取实现了IClientConfig
接口的bean
。
当首次调用某个服务提供者的接口时,由于并未初始化AnnotationConfigApplicationContext
,因此会先调用createContext
方法创建ApplicationContext
,该方法将RibbonClientConfiguration
类注册到ApplicationContext
,最后调用context.refresh();
时就会调用到RibbonClientConfiguration
的被@Bean
注释的方法。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
// ........
@RibbonClientName
private String name = "client";
@Autowired
private PropertiesFactory propertiesFactory;
// IClientConfig实例,配置client的连接超时、读超时等
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
// 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
// 配置服务更新器,定时从注册中心拉去服务,由ILoadBalancer启动
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
// 配置ribbon的负载均衡器,默认使用ZoneAwareLoadBalancer
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
// ......其它的暂时不去理解
}
ILoadBalancer
是Ribbon
定义的负载均衡接口。ZoneAwareLoadBalancer
是DynamicServerListLoadBalancer
的子类,DynamicServerListLoadBalancer
封装了服务更新逻辑。
DynamicServerListLoadBalancer
在构造方法中调用enableAndInitLearnNewServersFeature
方法开启服务更新器ServerListUpdater
,ServerListUpdater
定时从注册中心拉取可用的服务更新服务列表缓存。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected volatile ServerListUpdater serverListUpdater;
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void enableAndInitLearnNewServersFeature(){
serverListUpdater.start(updateAction);
}
// 调用ServerList获取服务
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
// 如果需要过滤
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
}
Ribbon
是如何实现负载均衡的ServerList
我们后面再讲解,先搞清楚openfegin
与ribbon
整合后的整个调用链路。我们继续从LoadBalancerFeignClient
的execute
方法继续分析。(LoadBalancerFeignClient
是由FeignRibbonClientAutoConfiguration
自动配置类配置的,如果忘记的话可以再看下上一篇。)
public class LoadBalancerFeignClient implements Client {
//............
private SpringClientFactory clientFactory;
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
// 拿到的是服务的名称,如:sck-demo-prodiver
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
// delegate是:class Default implements Client {}
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
// 首先获取客户端配置
IClientConfig requestConfig = getClientConfig(options, clientName);
return
// 负载均衡选择一个服务提供者
lbClient(clientName)
// 调用接口获取响应结果
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
}
lbClient
创建一个FeignLoadBalancer
对象,调用FeignLoadBalancer
的executeWithLoadBalancer
方法实现负载均衡调用接口,最终会调用到FeignLoadBalancer
的execute
方法。Ribbon
使用RxJava
实现异步调用转同步阻塞获取结果。
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// command也封装了负载均衡的实现逻辑
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 调用FeignLoadBalancer的execute方法
return Observable.just(
AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
);
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
.....
}
}
LoadBalancerCommand
的submit
方法代码比较多,逻辑也比较复杂,因此就不展开说明了。
public Observable<T> submit(final ServerOperation<T> operation) {
// Use the load balancer
Observable<T> o = (server == null ? selectServer() : Observable.just(server))
}
selectServer
方法返回一个Observable<Server>
,Observable
是RxJava
的API
,我们跳过这部分。
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// 调用LoadBalancerContext的getServerFromLoadBalancer方法
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
selectServer
方法中调用LoadBalancerContext
的getServerFromLoadBalancer
方法获取一个服务提供者,此LoadBalancerContext
实际是FeignLoadBalancer
(在buildLoadBalancerCommand
方法中可以找到答案)。
getServerFromLoadBalancer
方法部分代码如下:
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
return svc;
}
// .....
}
}
由于Ribbon
默认使用的ILoadBalancer
是ZoneAwareLoadBalancer
,因此getLoadBalancer
方法返回的是ZoneAwareLoadBalancer
。获取到负载均衡器后调用负载均衡器的chooseServer
选择一个服务提供者。
ZoneAwareLoadBalancer
的chooseServer
方法:
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
}
if
条件成立时,调用的是父类BaseLoadBalancer
的chooseServer
方法:
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 调用IRule的choose方法,rule是在创建ZoneAwareLoadBalancer时通过构造方法注入的
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}
IRule
是服务选择器、是负载均衡算法的实现。在RibbonAutoConfiguration
配置类中注入。
// 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
在创建ZoneAwareLoadBalancer
时通过构造方法注入。
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
// ......
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
至于ZoneAvoidanceRule
是怎么从多个服务提供者中选择一个调用的,这就是负载均衡算法的实现,本篇不做分析。
Ribbon
是如何从注册中心获取服务提供者的前面我们分析到,ZoneAwareLoadBalancer
是DynamicServerListLoadBalancer
的子类,DynamicServerListLoadBalancer
封装了服务更新逻辑,定时调用ServerList
的getUpdatedListOfServers
方法从注册中心拉取服务。
ServerList
是ribbon-loadbalancer
包下的类,并不是spring-cloud
的接口,所以与spring-cloud
的服务发现接口是没有关系的。
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}
在分析RibbonClientConfiguration
时,我们发现有一个方法会注册一个ServerList<Server>
,但这个方法必不会执行到。
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
}
因为我们在sck-demo
项目中使用的是spring-cloud-starter-kubernetes-ribbon
,所以我们现在来看下spring-cloud-kubernetes-ribbon
负责做什么。首先从spring-cloud-starter-kubernetes-ribbon
的spring.factories
文件中找到自动配置类。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.kubernetes.ribbon.RibbonKubernetesAutoConfiguration
自动配置类RibbonKubernetesAutoConfiguration
的源码如下:
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnProperty(value = "spring.cloud.kubernetes.ribbon.enabled",matchIfMissing = true)
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = KubernetesRibbonClientConfiguration.class)
public class RibbonKubernetesAutoConfiguration {
}
SpringClientFactory
我们分析过了,RibbonAutoConfiguration
我们也分析过了,只剩下KubernetesRibbonClientConfiguration
这个配置类。
KubernetesRibbonClientConfiguration
是使用@RibbonClients
注解导入的配置类,也就是通过ImportBeanDefinitionRegistrar
注册的。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KubernetesRibbonProperties.class)
public class KubernetesRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(KubernetesClient client, IClientConfig config,
KubernetesRibbonProperties properties) {
KubernetesServerList serverList;
if (properties.getMode() == KubernetesRibbonMode.SERVICE) {
serverList = new KubernetesServicesServerList(client, properties);
}
else {
serverList = new KubernetesEndpointsServerList(client, properties);
}
serverList.initWithNiwsConfig(config);
return serverList;
}
}
看到这我们就明白了,spring-cloud-kubernetes-ribbon
负责实现ribbon
的服务列表接口ServerList<Server>
。当spring.cloud.kubernetes.ribbon.mode
配置为SERVICE
时,使用KubernetesServicesServerList
,否则使用KubernetesEndpointsServerList
。默认mode
是POD
。
@ConfigurationProperties(prefix = "spring.cloud.kubernetes.ribbon")
public class KubernetesRibbonProperties {
/**
* Ribbon enabled,default true.
*/
private Boolean enabled = true;
/**
* {@link KubernetesRibbonMode} setting ribbon server list with ip of pod or service
* name. default value is POD.
*/
private KubernetesRibbonMode mode = KubernetesRibbonMode.POD;
/**
* cluster domain.
*/
private String clusterDomain = "cluster.local";
}
KubernetesRibbonMode
是个枚举类,支持pod
和service
。
public enum KubernetesRibbonMode {
/**
* using pod ip and port.
*/
POD,
/**
* using kubernetes service name and port.
*/
SERVICE
}
什么意思呢? 当mode
为service
时,就是获取服务提供者在kubernetes
中的service
的名称和端口,使用这种模式会导致Ribbon
的负载均衡失效,转而使用kubernetes
的负载均衡。而当mode
为pod
时,就是获取服务提供者的pod
的ip
和端口,该ip
是kubernetes
集群的内部ip
,只要服务消费者是部署在同一个kubernetes
集群内就能通过pod
的ip
和服务提供者暴露的端口访问pod
上的服务提供者。
如果我们不想使用Ribbon
实现负载均衡,那么我们可以在配置文件中添加如下配置项:
spring:
cloud:
kubernetes:
ribbon:
mode: SERVICE
你学会了吗?下一篇我们了解Spring Cloud Kubernetes
的服务注册。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇分析Spring Cloud Kubernetes服务注册与发现实现原理,以及Spring Cloud Kubernetes Core&Discovery源码分析。
本篇我们再对Ribbon的重试机制地实现做详细分析,从源码分析找出我们想要地答案,即如何配置Ribbon实现调用每个服务的接口使用不一样的重试策略,如配置失败重试多少次,以及自定义重试策略RetryHandler。
本篇介绍OpenFeign与Feign的关系、Feign底层实现原理、Ribbon是什么、Ribbon底层实现原理、Ribbon是如何实现失败重试的?
如果指定了URL,那么getOptional方法不会返回null,且返回的Client是LoadBalancerFeignClient,但不会抛出异常。如果不指定URL,则走负载均衡逻辑,走的是loadBalance方法,且抛出异常。
本篇我们将从一个简单的demo上手Spring Cloud kubernetes,当然,我们只用到Spring Cloud kubernetes的服务注册与发现、配置中心模块。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。