Ribbon重试策略RetryHandler的配置与源码分析

原创 吴就业 142 0 2020-06-29

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/0095414aa07d4509adc41baeea7b58c7

作者:吴就业
链接:https://wujiuye.com/article/0095414aa07d4509adc41baeea7b58c7
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2020年06月29日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

Spring Cloud Kubernetes微服务实战与源码分析

在《OpenFeignRibbon源码分析总结》这篇文章中,我们只是简单地了解Ribbon的重试机制的实现原理,本篇我们再对Ribbon的重试机制地实现做详细分析,从源码分析找出我们想要地答案,即如何配置Ribbon实现调用每个服务的接口使用不一样的重试策略,如配置失败重试多少次,以及自定义重试策略RetryHandler

本篇源码分析部分涉及到的关键类说明

Ribbon重试机制地实现源码分析

Ribbon的重试机制使用了RxJavaAPI,而重试次数以及是否重试的决策由RetryHandler实现。Ribbon提供两个RetryHandler的实现类,如下图所示。

现在我们要找出Ribbon使用的是哪个RetryHandler,我们只分析OpenFeignRibbon整合的使用,Spring Cloud@LoadBalanced注解方式使用我们不做分析。

spring-cloud-netflix-ribbonspring.factories文件导入的自动配置类是RibbonAutoConfiguration,该配置类向Spring容器注入了一个RibbonLoadBalancerClientRibbonLoadBalancerClient正是RibbonSpring Cloud的负载均衡接口提供的实现类。

在创建RibbonLoadBalancerClient时给构造方法传入了一个SpringClientFactory,源码如下。

@Configuration
public class RibbonAutoConfiguration{
    // 创建RibbonLoadBalancerClient
    @Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}
}

SpringClientFactoryRibbon使用的ApplicationContextRibbon会为每个Client都创建一个AnnotationConfigApplicationContext,用作环境隔离。

SpringClientFactory在调用父类构造方法时传入了一个配置类:RibbonClientConfiguration,源码如下。

public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>{

	public SpringClientFactory() {
		super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
	}
}

RibbonClientConfiguration配置类在每个Client对应的AnnotationConfigApplicationContext初始化时生效,在第一次调用服务的接口时AnnotationConfigApplicationContext才被创建。创建ApplicationContext并且调用register方法注册RibbonClientConfiguration配置类以及其它一些配置类,最后调用其refresh方法初始化该ApplicationContext

RibbonClientConfiguration负责为每个Client对应的ApplicationContext注入服务列表ServerList<Server>、服务列表更新器ServerListUpdater、负载均衡器ILoadBalancer、负载均衡算法IRule、客户端配置IClientConfig、重试决策处理器RetryHandler等。

由于RibbonClientConfiguration注册的Bean是注册在Client隔离的ApplicationContext中的, 所以调用每个服务提供者的接口将可以使用不同的客户端配置(IClientConfig)、重试决策处理器(RetryHandler)等。这也是我们能够为Ribbon配置调用每个服务的接口使用不一样的重试策略的前提条件,不过这也不是充分必要条件。

RibbonClientConfiguration配置类会注册一个重试决策处理器RetryHandler,但这个RetryHandler并未被使用,也可能是别的地方使用。

@Configuration
public class RibbonClientConfiguration{
    // 未使用
    @Bean
	@ConditionalOnMissingBean
	public RetryHandler retryHandler(IClientConfig config) {
		return new DefaultLoadBalancerRetryHandler(config);
	}
}

OpenFeign整合Ribbon时,真正使用的RetryHandlerRequestSpecificRetryHandler。前面我们分析OpenFeign整合Ribbon源码时提到一个启到桥接作用的类:FeignLoadBalancer

OpenFeign整合Ribbon使用时,OpenFeigin使用的ClientLoadBalancerFeignClient,由LoadBalancerFeignClient创建FeignLoadBalancer,并调用FeignLoadBalancerexecuteWithLoadBalancer方法实现负载均衡调用。

executeWithLoadBalancer方法实际是FeignLoadBalancer的父类AbstractLoadBalancerAwareClient提供的方法,其源码如下(有删减)。

public abstract class AbstractLoadBalancerAwareClient{
    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
        try {
            return command.submit({....})
                .toBlocking()
                .single();
        } catch (Exception e) {
        }
    }
}

executeWithLoadBalancer方法中会创建一个LoadBalancerCommand,然后调用LoadBalancerCommandsubmit方法提交请求,submit方法源码如下(有删减):

   public Observable<T> submit(final ServerOperation<T> operation) {
        // .......
        //  获取重试次数
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
        // Use the load balancer
        Observable<T> o = (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    public Observable<T> call(Server server) {
                        //.......
                        // 相同节点的重试
                        if (maxRetrysSame > 0)
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
        // 不同节点的重试
        if (maxRetrysNext > 0 && server == null)
            o = o.retry(retryPolicy(maxRetrysNext, false));
        return o.onErrorResumeNext(...);
    }

submit方法中调用retryHandlergetMaxRetriesOnSameServer方法和getMaxRetriesOnNextServer方法分别获取配置maxRetrysSamemaxRetrysNextmaxRetrysSame表示调用相同节点的重试次数,默认为0maxRetrysNext表示调用不同节点的重试次数,默认为1

retryPolicy方法返回的是一个包装RetryHandler重试决策者的RxJava API的对象,最终由该RetryHandler决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在retryPolicy返回的Func2中完成,这是RxJavaAPIretryPolicy方法的源码如下。

private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
    return new Func2<Integer, Throwable, Boolean>() {
        @Override
        public Boolean call(Integer tryCount, Throwable e) {
            if (e instanceof AbortExecutionException) {
                return false;
            }
            // 大于最大重试次数
            if (tryCount > maxRetrys) {
                return false;
            }
            if (e.getCause() != null && e instanceof RuntimeException) {
                e = e.getCause();
            }
            // 调用RetryHandler判断是否重试
            return retryHandler.isRetriableException(e, same);
        }
    };
}

那么这个retryHandler是怎么来的呢?

FeignLoadBalancerexecuteWithLoadBalancer方法中调用buildLoadBalancerCommand方法构造LoadBalancerCommand对象时创建的,buildLoadBalancerCommand方法源码如下。

    protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
        // 获取RetryHandler
		RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
		// 使用Builder构造者模式构造LoadBalancerCommand
		LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
				.withLoadBalancerContext(this)
				// 传入RetryHandler
				.withRetryHandler(handler)
				.withLoadBalancerURI(request.getUri());
		return builder.build();
	}

从源码中可以看出,Ribbon使用的RetryHandlerRequestSpecificRetryHandler。这里还用到了Builder构造者模式。

FeignLoadBalancergetRequestSpecificRetryHandler方法源码如下:

@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
	RibbonRequest request, IClientConfig requestConfig) {
	//.....
	if (!request.toRequest().httpMethod().name().equals("GET")) {
	    // 调用this.getRetryHandler()方法获取一次RetryHandler
		return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
				requestConfig);
	}
	else {
	    // 调用this.getRetryHandler()方法获取一次RetryHandler
		return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
				requestConfig);
	}
}

RequestSpecificRetryHandler的构造方法可以传入一个RetryHandler,这有点像类加载器ClassLoader实现的双亲委派模型。比如当RequestSpecificRetryHandler配置的重试次数为0时,则会获取父RetryHandler配置的重试次数。

this.getRetryHandler方法获取到的又是哪个RetryHandler?(源码在FeignLoadBalancer的祖父类LoadBalancerContext中)

[FeignLoadBalancer的父类的父类LoadBalancerContext]
public class LoadBalancerContext{
    protected RetryHandler defaultRetryHandler = new DefaultLoadBalancerRetryHandler();
    public final RetryHandler getRetryHandler() {
        return defaultRetryHandler;
    }
}
[FeignLoadBalancer]
public class FeignLoadBalancer extends
		AbstractLoadBalancerAwareClient{
    public FeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig,
			ServerIntrospector serverIntrospector) {
		super(lb, clientConfig);
		// 使用DefaultLoadBalancerRetryHandler
		this.setRetryHandler(RetryHandler.DEFAULT);
		this.clientConfig = clientConfig;
		// IClientConfig,RibbonClientConfiguration配置类注入的
		this.ribbon = RibbonProperties.from(clientConfig);
		RibbonProperties ribbon = this.ribbon;
		// 从IClientConfig中读取超时参数配置
		this.connectTimeout = ribbon.getConnectTimeout();
		this.readTimeout = ribbon.getReadTimeout();
		this.serverIntrospector = serverIntrospector;
	}
}

FeignLoadBalancer的构造方法中可以看出,RequestSpecificRetryHandler的父RetryHandlerDefaultLoadBalancerRetryHandler

RetryHandler接口的定义如下图所示。

RetryHandler接口方法说明: * isRetriableException方法:该异常是否可重试; * isCircuitTrippingException方法:是否是Circuit熔断类型异常; * getMaxRetriesOnSameServer方法:调用同一节点的最大重试次数; * getMaxRetriesOnNextServer方法:调用不同节点的最大重试次数;

Ribbon的重试策略配置

最大重试次数、连接超时等参数的配置

FeignLoadBalancer在创建RequestSpecificRetryHandler时传入了IClientConfig,这个IClientConfig是从哪里创建的我们稍会再分析。RequestSpecificRetryHandler在构造方法中从这个IClientConfig中获取调用同服务节点的最大重试次数和调用不同服务节点的最大重试次数,源码如下。

public class RequestSpecificRetryHandler implements RetryHandler {
    public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, 
            boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
        // .....
        // 从 IClientConfig中获取两种最大重试次数的配置
        if (requestConfig != null) {
           if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
               // 获取同节点调用最大重试次数
               this.retrySameServer = (Integer)requestConfig.get(CommonClientConfigKey.MaxAutoRetries);
           }    
           if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
                // 获取不同节点调用最大重试次数
               this.retryNextServer = (Integer)requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer);
           }
        }
    }
}

requestConfig是在LoadBalancerFeignClient创建FeignLoadBalancer时,从SpringClientFactory中获取的,也正是RibbonClientConfiguration自动配置类注入的。

public FeignLoadBalancer create(String clientName) {
    FeignLoadBalancer client = this.cache.get(clientName);
    if (client != null) {
        return client;
    }
    // this.factory就是SpringClientFactory
    IClientConfig config = this.factory.getClientConfig(clientName);
    ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
    ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,ServerIntrospector.class);
    // 创建FeignLoadBalancer
    client = this.loadBalancedRetryFactory != null
		? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,this.loadBalancedRetryFactory)
		: new FeignLoadBalancer(lb, config, serverIntrospector);
	// 缓存FeignLoadBalancer
    this.cache.put(clientName, client);
    return client;
}

IClientConfig是在RibbonClientConfiguration中配置的,其源码如下:

public class RibbonClientConfiguration {
	// 默认连接超时
	public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
	// 默认读超时
	public static final int DEFAULT_READ_TIMEOUT = 1000;

    // 自动注入,${ribbon.client.name}
	@RibbonClientName
	private String name;
    
    // 注册IClientConfig实例,使用DefaultClientConfigImpl
	@Bean
	@ConditionalOnMissingBean
	public IClientConfig ribbonClientConfig() {
		DefaultClientConfigImpl config = new DefaultClientConfigImpl();
		config.loadProperties(this.name);
        // 配置连接超时
		config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        // 配置读超时
		config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
		config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
		return config;
	}
}

那么我们要怎么修改配置呢?

第一种方法:配置文件配置方法

如何在application配置文件中配置Ribbon的重试次数等参数。

我们可以在RibbonClientConfiguration这个配置类的ribbonClientConfig方法下断点调试,如下图所示。

从图中可以看出,配置参数key的格式为:

<服务提供者的名称(serverId)>:<ribbon>:<参数名>=<value>

假设我们针对服务提供者sck-demo-provider配置最大同节点重试次数为10,配置最大不同节点重试次数为12,配置连接超时为15秒,那么我们需要在application-[环境].yaml配置文件中添加如下配置。

sck-demo-provider:
  ribbon:
    MaxAutoRetries: 10
    MaxAutoRetriesNextServer: 12
    ConnectTimeout: 15000

其中MaxAutoRetriesMaxAutoRetriesNextServer都能生效,但是ConnectTimeout配置是不生效的,原因是在RibbonClientConfiguration中创建DefaultClientConfigImpl时,先调用loadProperties方法(传入的name参数就是服务名称)从配置文件获取配置,再调用set方法覆盖了三个配置:连接超时配置、读超时配置、是否开启gzip压缩配置。所以这种方式配置连接超是不生效的。

第二种方法:代码配置

代码配置就是我们手动注册IClientConfig,而不使用RibbonClientConfiguration自动注册的。RibbonClientConfiguration自动注册IClientConfig的方法上添加了@ConditionalOnMissingBean条件注解,正因为如此,我们才可以自己注册IClientConfig

但要注意一点,RibbonClientConfiguration是在Ribbon为每个Client创建的ApplicationContext中生效的,所以我们需要创建一个配置类(Configuration),并将其注册到SpringClientFactory。这样,在SpringClientFactoryClient创建ApplicationContext时,就会将配置类注册到ApplicationContext,向SpringClientFactory注册的配置类也就成了创建的ApplicationContext的配置类。

@Configuration
public class RibbonConfiguration implements InitializingBean {

    @Resource
    private SpringClientFactory springClientFactory;

    @Override
    public void afterPropertiesSet() throws Exception {
        List<RibbonClientSpecification> cfgs = new ArrayList<>();
        RibbonClientSpecification configuration = new RibbonClientSpecification();
        // 针对哪个服务提供者配置
        configuration.setName(ProviderConstant.SERVICE_NAME);
        // 注册的配置类
        configuration.setConfiguration(new Class[]{RibbonClientCfg.class});
        cfgs.add(configuration);
        springClientFactory.setConfigurations(cfgs);
    }

    // 指定在RibbonClientConfiguration之后生效
    @AutoConfigureBefore(RibbonClientConfiguration.class)
    public static class RibbonClientCfg {

        @Bean
        public IClientConfig ribbonClientConfig() {
            DefaultClientConfigImpl config = new DefaultClientConfigImpl();
            config.setClientName("随便填,不影响,用不到");
            config.set(CommonClientConfigKey.MaxAutoRetries, 1);
            config.setProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, 3);
            config.set(CommonClientConfigKey.ConnectTimeout, 15000);
            config.set(CommonClientConfigKey.ReadTimeout, 15000);
            return config;
        }

    }

}

因为Ribbon是在第一次调用接口时才会创建ApplicationContext,所以我们在应用程序的Spring容器初始化阶段获取SpringClientFactory并为其添加自定义配置类能够生效。

RibbonClientCfg声明在RibbonClientConfiguration之前生效,这样RibbonClientConfiguration就不会向容器中注册IClientConfig了。

如何替换RetryHandler

OpenFeign整合Ribbon使用时,默认使用的是FeignLoadBalancergetRequestSpecificRetryHandler方法创建的RequestSpecificRetryHandler,笔者也看了一圈源码,实在找不到怎么替换RetryHandler,可能OpenFeign就是不想给我们替换吧。这种情况我们只能另寻辟径了。

既然使用的是FeignLoadBalancergetRequestSpecificRetryHandler方法返回的RetryHandler,那么我们是不是可以继承FeignLoadBalancer并重写getRequestSpecificRetryHandler方法来替换RetryHandler呢?答案是可以的。

自定义的FeignLoadBalancer代码如下:

/**
     * 自定义FeignLoadBalancer,替换默认的RequestSpecificRetryHandler
     */
    public static class MyFeignLoadBalancer extends FeignLoadBalancer {

        public MyFeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig, ServerIntrospector serverIntrospector) {
            super(lb, clientConfig, serverIntrospector);
        }

        @Override
        public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonRequest request, IClientConfig requestConfig) {
            // 返回自定义的RequestSpecificRetryHandler
            // 参数一:是否连接异常重试时重试
            // 参数二:是否所有异常都重试
            return new RequestSpecificRetryHandler(false, false,
                    getRetryHandler(), requestConfig) {
                /**
                 * @param e 抛出的异常
                 * @param sameServer 是否同节点服务的重试
                 * @return
                 */
                @Override
                public boolean isRetriableException(Throwable e, boolean sameServer) {
                    if (e instanceof ClientException) {
                        // 连接异常重试
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.CONNECT_EXCEPTION) {
                            return true;
                        }
                        // 连接超时重试
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.SOCKET_TIMEOUT_EXCEPTION) {
                            return true;
                        }
                        // 读超时重试,读超时重试只允许不同服务节点的重试
                        // 所以同节点的重试不支持,读超时了就不要重新请求同一个节点了。
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.READ_TIMEOUT_EXCEPTION) {
                            return !sameServer;
                        }
                        // 服务端异常
                        // 服务端异常切换新节点重试
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
                            return !sameServer;
                        }
                    }
                    // 连接异常时重试
                    return isConnectionException(e);
                }
            };
        }
    }

由于FeignLoadBalancer是在OpenFeignLoadBalancerFeignClient中调用一个CachingSpringLoadBalancerFactory创建的,所以我们还需要替换OpenFeignFeignRibbonClientAutoConfiguration配置类注册的CachingSpringLoadBalancerFactory,并且重写CachingSpringLoadBalancerFactorycreate方法,代码如下。

@Configuration
public class RibbonConfiguration {
    /**
     * 使用自定义FeignLoadBalancer缓存工厂
     *
     * @return
     */
    @Bean
    public CachingSpringLoadBalancerFactory cachingSpringLoadBalancerFactory() {
        return new CachingSpringLoadBalancerFactory(springClientFactory) {

            private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>();

            @Override
            public FeignLoadBalancer create(String clientName) {
                FeignLoadBalancer client = this.cache.get(clientName);
                if (client != null) {
                    return client;
                }
                IClientConfig config = this.factory.getClientConfig(clientName);
                ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
                ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
                        ServerIntrospector.class);
                // 使用自定义的FeignLoadBalancer
                client = new MyFeignLoadBalancer(lb, config, serverIntrospector);
                this.cache.put(clientName, client);
                return client;
            }
        };
    }
}
#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

Spring Boot与Spring Cloud应用启动流程

本篇我们一起学习Spring Boot与Spring Cloud应用的启动流程。

Spring Cloud动态配置实现原理与源码分析

本篇从源码分析Spring Cloud实现动态配置的原理。Spring Cloud实现动态配置需要结合Spring源码分析。

Spring Cloud Kubernetes服务注册与发现实现原理与源码分析

本篇分析Spring Cloud Kubernetes服务注册与发现实现原理,以及Spring Cloud Kubernetes Core&Discovery源码分析。

OpenFeign与Ribbon源码分析总结与面试题

本篇介绍OpenFeign与Feign的关系、Feign底层实现原理、Ribbon是什么、Ribbon底层实现原理、Ribbon是如何实现失败重试的?

Spring Cloud Ribbon源码分析

本篇继续分析OpenFeign是如何与Ribbon整合、Ribbon是如何实现负载均衡的、Ribbon是如何从注册中心获取服务的。

Spring Cloud OpenFeign源码分析,为什么不导入Ribbon应用会启动不起来?

如果指定了URL,那么getOptional方法不会返回null,且返回的Client是LoadBalancerFeignClient,但不会抛出异常。如果不指定URL,则走负载均衡逻辑,走的是loadBalance方法,且抛出异常。