Dubbo分层架构之服务注册中心层的源码分析(下)

原创 吴就业 144 0 2019-12-15

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/4c9a0f0f59d94c89a8fa51e7744c0ffa

作者:吴就业
链接:https://wujiuye.com/article/4c9a0f0f59d94c89a8fa51e7744c0ffa
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2019年12月15日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

基于Redis实现的注册中心为什么不被推荐使用,你知道原因吗?

由于我在实际项目中并未使用Redis作为服务注册中心,所以一直没有关注这个话题。那么,使用Redis作为服务注册中心有哪些缺点,希望本篇文章能给你答案。

首先,源码中使用了keys命令扫描以”/通信协议/接口名/“开头的key,所以,不要将业务使用的Redis集群作为注册中心,必须要独立部署一个Redis集群,否则会把项目搞挂。

其次,基于Redis实现的注册中心,是利用其发布/订阅的特性,每个接口的提供者、消费者都是使用一个hsah结构存储的,field为具体的提供者或消费者的url,而value则为过期时间。默认过期时间为当前时间加上60秒,因此需要开启一个定时任务定期的更新过期时间。在服务非正常下线时,服务提供者就没办法发送一个事件,因此只有当该Service其它提供者或消费者定时更新过期时间时向此key发布一个事件,消费者才能感知到。

综上,就是Redis注册中心不被推荐使用的原因。本篇文章继续分析基于Redis实现的注册中心其注册与订阅流程,只要了解服务注册需要做什么,以及订阅到事件之后要做什么,整个服务注册中心层我们就了解了,也能自己手写一个注册中心。

RegistryProtocol如何调度服务注册与订阅

继上篇,我们分析到了服务在导出(export)与引入(refer)时,由自适应扩展点机制取得RegistryProtocol,之后的事情就交由RegistryProtocol去完成了。

public class RegistryProtocol implements Protocol {
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    }
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    }
}

01

服务导出的流程图与引入的流程图在上篇文章已经给出,因此,我们直接分析代码,先分析服务导出(export),即RegistryProtocol的export方法。

img

RegistryProtocol的export方法做的事情大体分为4步。第一步是获取NotifyListener,这些NotifyListener将在注册中心有事件回调时被回调执行;第二步导出服务,即调用DubboProtocol的export方法,在此不做深入分析;第三步才是将服务注册到注册中心,判断是否需要注册到注册中心,可在配置文件中配置,比如本地Debug时不需要将服务注册到注册中心,此时可以配置为false;注册完成之后就是第四步,开始订阅。那参数originInvoker是什么?

img

Export方法的originInvoker参数包装了真实的Invoker与导出服务的元数据。Invoker就是能够被远程消费者调用的接口的封装(代理),它持有Service的实现类实例,如DemoServiceImpl。在上篇我们分析过,此时的Invoker的url最外层包上了注册中心的url,所以在export方法中可以根据Invoker拿到实际的注册中心的url以及服务提供者的url。

img

(RegistryProtocol的export传入的originInvoker参数的来源)

举个栗子来理解“Invoker的url最外层包装了注册中心的url”这句话,假设服务提供者的url为

dubbo://10.1.0.251:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-annotation-provider
&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService
&bind.ip=10.1.0.251&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=81839&register=true&release=&side=provider&timestamp=1576309881128

将服务提供者的url经过URLEncode后为

dubbo%3A%2F%2F10.1.0.251%3A20880%2Forg.apache.dubbo.demo.DemoService%3F
anyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider
%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D10.1.0.251%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D81839%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1576309881128&pid=81839&timestamp=1576309876113

注册中心的url为

regist://127.0.0.1:6379/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2

在调用RegistryProtocol的export方法之前,并不关心是使用何种注册中心,分层的设计各层有各层的职责,因此此时的注册中心的协议为“regist”,也因此Dubbo才能通过Protocol的自适应扩展点机制拿到RegistryProtocol。

registry=org.apache.dubbo.registry.integration.RegistryProtocol

将提供者的url包装上注册中心url之后,就变成

redis://127.0.0.1:6379/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2
&export=dubbo%3A%2F%2F10.1.0.251%3A20880%2........

偷梁换柱,绕来绕去,目的就是为了将两个url合并为一个url,让注册中心实现服务的导出与引入,非常巧妙的包装,这也是值得学习的地方。包装,其实就是代理,在Dubbo中用得非常多,比如将Invoker包了一层又一层,最后又巧妙的利用包装将Invoker变为Exporter,每层包装都为实现各层的职责,竟没想到,这种包装还能用到字符串上,佩服。

02

服务引入 (refer)与导出(export)是有很大区别的,服务导出似乎少了订阅的解析,是的,因为服务导出只关心配置的改变事件,在此我不打算往下分析。继承看RegistryProtocol的refer方法。

img

Refer方法有两个参数,第一个是引用的远程服务的接口类型,第二个与export方法的Invoker参数封装的url一样,都是包装上注册中心的。所以,第一步先是将url的”register://“替换为“redis://”,从注册器工厂中通过SPI获取到Redis注册中心的注册器RedisRegistry;接着就是从注册中心url中提取消费者将要注册到注册中心的url,解析url取得所有参数,根据group参数决定使用何种Cluster,关于Cluster本篇不深入分析。最后都是调用doRefer方法完成服务引入。

img

这里引入了一个新的类RegistryDirectory,每个接口(Service)对应一个Directory,持有注册中心的引用以及rpc协议(DubboProtocol)的引用,所以服务的订阅是委托给RegistryDirectory实现的。服务的引入依然是先将当前消费者注册到注册中心,再开始订阅。

public class RegistryDirectory<T> extends AbstractDirectory<T> 
     implements NotifyListener {
}

RegistryDirectory实际上也是一个NotifyListener,它需要订阅注册中心事件,更新自身缓存的服务提供者目录,既然RegistryDirectory是一个NotifyListener,那么RegistryProtocol干脆把订阅的实现交给它实现就行了。

上篇提到注册中心扩展契约,通过在服务提供者与服务消费者的url上添加category约定服务提供者只订阅配置改变事件,服务消费者订阅配置改变、路由改变、服务提供者改变事件,在源码截图的第三个框中,调用RegistryDirectory的subscribe委拖订阅的实现,就是给消费者的url加上category。

img

继续看RegistryDirectory的subscribe方法的实现。除了添加多几个监听器之外,就是调用注册中心注册器RegistryService的subscribe方法。

public void subscribe(URL url) {
        .......
        // 调用注册中心的订阅方法
        registry.subscribe(url, this);
}

订阅注册中心事件所注册的监听器NotifyListener是RegistryDirectory,在不考虑注册中心如何实现订阅功能之前,先看下当RegistryDirectory监听到事件时都做了什么。

 @Override
    public synchronized void notify(List<URL> urls) {
       // 先将url分类(category)
        Map<String, List<URL>> categoryUrls = urls....
        // configurator
        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
        // router
        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);
        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        refreshOverrideAndInvoker(providerURLs);
    }
    
    // 更新或添加新的服务提供者Invoker
    private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

一句话概括就是更新配置、更新路由、更新服务提供者,具体不做分析。

RedisRegistry源码详细分析

通过前面的分析,我们知道,要添加一种注册中心的支持,我们要做的就是遵守契约,实现RegistryService接口的注册与订阅方法,其它的都不需要关心。而为了封装通用的逻辑,如提供注册失败重试功能,Dubbo提供了一个FailbackRegistry,因此,我们一般通过继承FailbackRegistry免去大量的工作,而只专注服务注册与订阅的实现。

FailbackRegistry其实就是通个一个定时器去延时重试注册,本篇不会去分析FailbackRegistry。RedisRegistry也不例外,它也是继承FailbackRegistry。

public class RedisRegistry extends FailbackRegistry {
}

RedisRegistry继承父类FailbackRegistry,因此它不是直接实现RegistryService的register、unregister、subscribe、unsubscribe方法,而是实现父类提供的与之对于的抽象方法doRegister、doUnregister、doSubscribe、doUnsubscribe。其中doUnregister、doUnsubscribe方法本篇不做分析。

RedisRegistry的构造方法初始化Redis的连接池,我们并不关心这些,但有个定时任务不能忽略。

 this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
 this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> 
            deferExpired(), 
            expirePeriod / 2, 
            expirePeriod / 2, TimeUnit.MILLISECONDS);

定时更新过期时间,过期时间默认为60s,需要在过期之前更新过期时间,所以定时任务每隔30s执行一次。也因如此,当有服务提供者下线时,在30s后消费者才能感知到。

private void deferExpired() {   
   for (URL url : new HashSet<>(getRegistered())) {
        ......
        // key===> 如:/dubbo/org.apache.dubbo.demo.DemoService/providers
        // filed ===> 当前服务提供者|消费者的url
        // value ===> 新的过期时间,当前时间+60s
        if (jedis.hset(key, url.toFullString(), 
               String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
               jedis.publish(key, REGISTER);
        }
    }
 }

deferExpired方法先更新自己的过期时间,然后发布一个事件,所以每30s,所有服务提供者以及消费者都能订阅到一个事件。订阅到事件之后的逻辑稍后分析。由于每个服务提供者、消费者都需要开启一个定时任务去更新过期时间,且每次更新过期时间都会发布一个事件,这将会导致本地目录RegistryDirectory频率更新,当服务提供者与消费者越多,缺陷就越明显。

01

先看doRegister是如何将服务提供者或者消费者注册到Redis的,将demo的服务提供者以及消费者先跑起来,再通过redis-cli工具连接到Redis,看Redis存储了什么信息,怎么存储。

img

有强迫症的读者可能会看着keys这个命令不爽,那接下来看源码你会更不爽。服务注册到Redis都是以hash结构存储,key为”rpc协议/服务接口名/提供者|消费者”。

img

服务提供者的存储,field为提供者的url,value为过期时间,也是定时任务每隔30s要更新的。消费者与提供者的存在都是一样,field为消费者的url,value为过期时间。

img

我明明只启动了一个消费者,但是图中却有两个消费者,这是由于我前一次启动时,未正常退出遗留下来的,所以,使用redis作为注册中心,最好自己起个定时任务去删除历史遗留的数据。

img

doRegister将服务注册到Redis就是一条hset命令,随后立即发布一个注册事件,要注意的是key,对于服务提供者而言,key为”/dubbo/org.apache.dubbo.demo.DemoService/provider”,对于服务消费者而言,key为”/dubbo/org.apache.dubbo.demo.DemoService/consumer”。

02

注册的逻辑很简单,相比注册,订阅的实现逻辑稍复杂些。截图的代码会比较长,所以我直接贴代码,去掉一些不是很重要的逻辑。

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
   // service ===> /dubbo/{服务接口名}/{provider|consumer}
   String service = toServicePath(url);
   for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
       JedisPool jedisPool = entry.getValue();
       try (Jedis jedis = jedisPool.getResource()) {
         // service ==> /dubbo/org.apache.dubbo.demo.DemoService/
         // ANY_VALUE ==> *
         // jedis.keys 获取的是/dubbo/org.apache.dubbo.demo.DemoService/* ,所以结果包括了服务提供者和消费者的key
         doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
         break; // Just read one server's data
       }
   }
 }

首次订阅,通过keys命令获取与此Service(org.apache.dubbo.demo.DemoService)相关的服务提供者与消费者的key。模拟一次事件回调通知,主要目的是让消费者能获取当前所有可用的服务提供者,更新本地缓存目录。所以弊端,不能使用业务用到的Redis集群作为注册中心。

doSubscribe方法的参数url为服务提供者的url “provider://”或服务消费者的url “consumer://”。如果是服务消费者,第二个参数listener就是RegistryDirectory。

img

在服务提供者或是消费者启动时,该方法被调用一次,但是在这个方法中,我们并没有看到任何订阅的实现,只是调用doNotify方法模拟一次事件通知。这是因为我去掉了一部分逻辑。

 @Override
 public void doSubscribe(final URL url, final NotifyListener listener) {
        // service ===> /dubbo/{服务接口名}/{provider|consumer}
        String service = toServicePath(url);
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            if (notifier == newNotifier) {
                //  开启线程
                notifier.start();
            }
        }
 }

Notifier是一个Thread,调用notifier.start()方法启动线程,在run方法中会往Redis注册一个订阅者。每个接口(Service)都开启一个线程。

// 如果是服务启动即为true
if (first) {
     // service ==> /dubbo/org.apache.dubbo.demo.DemoService
     first = false;
     doNotify(jedis, service);
}
// 添加订阅者,订阅key: /dubbo/org.apache.dubbo.demo.DemoService/*
jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE);

psubscribe命令订阅一个或多个符合给定模式的频道,此处不管是服务提供者还是消费者,key都是“/dubbo/org.apache.dubbo.demo.DemoService/*”,订阅到事件时处理逻辑在NotifySub。

img

除了服务首次注册到注册中心时,会发送一个注册事件外,还有一个定时更新过期时间的任务,每次更新过期时间后都会发布一个时间。doNotify方法比较多,我们拆分为两部分来看。

private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
   List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
   // 获取服务接口(url可能是提供者,也可能是消费者)
   String consumerService = url.getServiceInterface();
   for (String key : keys) {
         // 从key中获取分类(/dubbo/{分类}),如果不存在url的category中跳过
         String category = toCategoryName(key);
         if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
              continue;
         }
         // 只有服务消费者会走到这里
         // ====【这里是第二部分代码】 =====
    }
    // 通知更新
    for (NotifyListener listener : listeners) {
            notify(url, listener, result);
    }
}

方法第二个参数keys,如果是服务提供者发布的事件,就是“/dubbo/org.apache.dubbo.demo.DemoService/provider”,如果是服务消费者发布的事件就是“/dubbo/org.apache.dubbo.demo.DemoService/consumer”,由于注册的订阅者订阅的key(channel)是“/dubbo/org.apache.dubbo.demo.DemoService/*”,所以不管服务提供者还是消费者都能感知到服务变更事件。

契约在for循环中实现,假设当前事件是某个服务消费者注册到注册中心后发布的一个注册事件,而当前服务是服务提供者,则不会做任何事情。而如果是消费者,则会进入到将要分析的第二部分代码。最后会回调所有监听器NotifyListener。

第二部分代码

 List<URL> urls = new ArrayList<>();
 // 获取所有服务(提供者|订阅者)
 Map<String, String> values = jedis.hgetAll(key);
 if (CollectionUtils.isNotEmptyMap(values)) {
      for (Map.Entry<String, String> entry : values.entrySet()) {
            URL u = URL.valueOf(entry.getKey());
            // 判断是否过期,过期说明服务掉线了
            if (!u.getParameter(DYNAMIC_KEY, true)
                || Long.parseLong(entry.getValue()) >= now) {
                    if (UrlUtils.isMatch(url, u)) {
                      // dubbo://10.1.0.164:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=81202&register=true&release=&side=provider&timestamp=1576237488706
                      // 将provider替换为dubbo(dubbo为通信协议)
                      urls.add(u);
           }
      }
 }

获取此事件的key下的所有注册信息,如获取所有服务提供者,并判断服务提供者是否过期,过滤掉过期的,获取到新增的,最后更新本地提供者目录缓存。

03

因为RegistryProtocol是通过SPI从配置文件中拿到注册器工厂RegistryFactory,再从RegistryFactory获取到注册器Registry的。除了提供一个Registry,还需要提供一个RegistryFactory,以及在Resources目录下添加一个org.apache.dubbo.registry.RegistryFactory配置文件。

[RedisRegistryFactory]

public class RedisRegistryFactory extends AbstractRegistryFactory {
    @Override
    protected Registry createRegistry(URL url) {
        return new RedisRegistry(url);
    }
}

[org.apache.dubbo.registry.RegistryFactory配置文件]

redis=org.apache.dubbo.registry.redis.RedisRegistryFactory

通过两篇文章的分析,想必大家已经能自己动手实现。实现一个注册中心并不难,但实现一个适用于生产环境的注册中心,我是觉得没必要重复造轮子。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

反向理解ThreadLocal,或许这样更容易理解

已经有那么多作者写ThreadLocal的源码分析,我还是想写下这篇,换个思路去分析。

Dubbo RPC远程调用过程源码分析(服务提供者)

在前面分析Dubbo注册中心层源码的文章中,我们知道,服务的导出与引入由RegistryProtocol调度完成。对于服务提供者,服务是先导出再注册到注册中心;对于服务消费者,先将自己注册到注册中心,再订阅事件,由RegistryDirectory将所有服务提供者转为Invoker。

Dubbo RPC远程调用过程源码分析(服务消费者)

本篇继续分析服务提供者发起一个远程RPC调用的全过程,也是跳过信息交换层和传输层,但发起请求的逻辑会复杂些,包括负载均衡和失败重试的过程,以及当消费端配置与每个服务提供端保持多个长连接时的处理逻辑。

Dubbo分层架构之服务注册中心层的源码分析(上)

服务注册与发现是Dubbo核心的一个模块,假如没有注册中心,我们要调用远程服务,就必须要预先配置,就像调用第三方http接口一样,需要知道接口的域名或者IP、端口号才能调用。

缓存雪崩、穿透如何解决,如何确保Redis只缓存热点数据?

缓存雪崩如何解决?缓存穿透如何解决?如何确保Redis缓存的都是热点数据?如何更新缓存数据?如何处理请求倾斜?实际业务场景下,如何选择缓存数据结构。

线上RPC远程调用频繁超时问题排查,大功臣Arthas

项目不断新增需求,难免不会出现问题,特别是近期新增的增加请求处理耗时的需求。以及一些配置的修改而忽略掉的问题,如dubbo工作线程数调增时,忽略了redis连接池的修改。由于redis可用连接远小于工作线程数,就会出现多个线程竞争redis连接,影响性能。