基于扩展点,为dubbo支持跨业务调用

原创 吴就业 143 0 2022-02-19

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/38dac05423b5419ab906db2409b5ec37

作者:吴就业
链接:https://wujiuye.com/article/38dac05423b5419ab906db2409b5ec37
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2022年02月19日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

很多规模稍大点的公司,内部都会有多个业务部门,这些业务部门都有自己的业务产品。每个业务部门开发的产品部署的环境物理上也都是相对隔离的,但这些业务部门之间可能存在合作关系,业务关联,因此就有了跨业务RPC调用的需求。

Dubbo的分层架构,提供的各层扩展点,让Dubbo具备了优秀的扩展性。我们基于Dubbo二次开发,借助Registry扩展点、RouterFactory扩展点实现了跨业务RPC调用,不需要修改Dubbo的源码。

简要概括原理

dubbo消费者refer的过程中,会创建一个RegistryDirectory实例,用于缓存服务提供者以提升性能,不必每次都去注册中心查找。

一个消费者可以从多个提供者中选择一个调用,因此消费者端的Invoker经过cluster层包装了路由、负载均衡的逻辑。

实现跨业务RPC调用,只需要让RegistryDirectory实例能够获取其它业务环境的提供者,再通过路由器选择目标业务环境的所有提供者。

让RegistryDirectory实例获取其它业务环境的提供者,简单的实现,就是同时订阅其它业务环境的注册中心。也就是修改RegistryDirectory的subscribe方法,订阅多个注册中心。不过Dubbo提供有扩展点,不需要修改源码,即Registry扩展点。

实现过程

如何利用扩展点,我们从RegistryProtocol#doRefer方法寻找突破口。

public class RegistryProtocol implements Protocol {
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // .....
        // 订阅提供者、配置、路由
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        
        return invoker;
    }
}

doRefer首先为消费者创建RegistryDirectory,然后调用RegistryDirectory实例的subscribe方法,指定只订阅服务提供者、动态配置、路由(消费者不需要订阅消费者)。

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    
    public void subscribe(URL url) {
        setConsumerUrl(url);
        // 
        registry.subscribe(url, this);
    }
}

RegistryDirectory实例则调用Registry实例的subscribe方法实现订阅,而这个Registry实例就是我们通过扩展点注册的自定义Registry。

public class MyRegistry extends AbstractRegistry{

    private Registry defaultRegistry;
    private final ConcurrentMap<String, Registry> registries = new ConcurrentHashMap<>();
  
    public LzHttpRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        this.defaultRegistry = newZookeeperRegistry(url);
        // 初始化registries
      	this.initCrossRegistryIfNeed(url)
    }
  
  	private void initCrossRegistryIfNeed(URL url) {
        String myBusiness = System.getProperty("metadata.business");
        String crossConfig = url.getParameter("CROSS_BUSINESS");
        if (!Strings.isNullOrEmpty(crossConfig)) {
            for (String business : crossConfig.split(",")) {
                // 替换group
                URL newUrl = getUrl().addParameter(Constants.GROUP_KEY, business);
                registries.put(regionBusiness, newZookeeperRegistry(newUrl));
                regionBusinessArray.add(regionBusiness);
            }
        }
    }
  
    private AbstractZookeeperRegistry newZookeeperRegistry(URL url) {
        return new MyProxyRegistry(url, zookeeperTransporter);
    }

    @Override
    public void subscribe(URL url, NotifyListener listener) {
      	defaultRegistry.subscribe(url, listener);
        for (String business : otherBusinesss) {
            Registry registry = registries.get(business);
            registry.subscribe(businessUrl, listener);
        }
    }
}

MyRegistry本身不实现订阅注册中心的逻辑,而是借用设计模式管理多个业务的Registry的订阅。

registries中的每个Registry在MyRegistry的构建方法中初始化,根据构建方法传入的url参数,获取跨业务配置(跨哪些业务)。

MyProxyRegistry也是自定义的Registry,继承AbstractRegistry。目的是代理NotifyListener,以便在notify插入自定义逻辑。

public class MyProxyRegistry extends AbstractRegistry {

    private final String group;
    private final ConcurrentMap<NotifyListener, NotifyListener> delegateMap = new ConcurrentHashMap<>();

    public MyProxyRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url, zookeeperTransporter);
        // 获取注册url的group,注意,不是使用生产者/消费者的group
        this.group = url.getParameter(Constants.GROUP_KEY);
    }

    private List<NotifyListenerDelegate> newNotifyListenerDelegate() {
        ServiceLoader<NotifyListenerDelegate> serviceLoader = ServiceLoader.load(NotifyListenerDelegate.class);
        List<NotifyListenerDelegate> delegates = new ArrayList<>();
        for (NotifyListenerDelegate delegate : serviceLoader) {
            delegates.add(delegate);
        }
        return delegates;
    }

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        NotifyListener delegateListener = listener;
        List<NotifyListenerDelegate> delegates = newNotifyListenerDelegate();
        // 使按注册顺序生效
        for (int i = delegates.size() - 1; i >= 0; i--) {
            NotifyListenerDelegate delegate = delegates.get(i);
            delegate.setNotifyListener(delegateListener);
            delegate.setSubscribeUrl(url);
            delegate.setRegistryGroup(group);
            // 支持包装
            delegateListener = delegate;
        }
        delegateMap.put(listener, delegateListener);
        super.subscribe(url, delegateListener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        NotifyListener delegateListener = delegateMap.remove(listener);
        if (delegateListener == null) {
            delegateListener = listener;
        } else {
            ((NotifyListenerDelegate) delegateListener).destroy();
        }
        super.unsubscribe(url, delegateListener);
    }

}


CrossBusinessListener继承NotifyListenerDelegate,实现notify方法,并通过SPI注册。

public class CrossRouterNotifyListener extends AbstractNotifyListenerDelegate {

    private final String myBusinessEnv;
    private NotifyListener original;
  
    // 全局静态变量
    private final static ConcurrentMap<NotifyListener, CrossServiceProviderManager> MANAGER_MAP = new ConcurrentHashMap<>();

    public CrossRouterNotifyListener() {
        this.myBusinessEnv = System.getProperty("metadata.business");
    }

    @Override
    public void setNotifyListener(NotifyListener delegate) {
        super.setNotifyListener(delegate);
        NotifyListener original = delegate;
        while (original instanceof NotifyListenerDelegate) {
            original = ((NotifyListenerDelegate) original).getDelegate();
        }
        this.original = original;
        createCrossServiceProviderManagerIfNeed();
    }

    private void createCrossServiceProviderManagerIfNeed() {
        if (!MANAGER_MAP.containsKey(original)) {
            MANAGER_MAP.putIfAbsent(original, new CrossServiceProviderManager());
        }
    }

    private CrossServiceProviderManager getCrossServiceProviderManager() {
        return MANAGER_MAP.get(this.original);
    }

    private void removeCrossServiceProviderManager() {
        MANAGER_MAP.remove(this.original);
    }

    private List<URL> updateBusinessEnvParam(List<URL> urls) {
        List<URL> newUrls = new ArrayList<>();
        for (URL url : urls) {
            String business = myBusinessEnv;
            if (!registryGroup.equalsIgnoreCase(DubboConstants.DEFAULT_GROUP)) {
                String group = registryGroup.replace(DubboConstants.DEFAULT_GROUP, "");
                business = group;
            }
            newUrls.add(url.addParameter("BUSINESS", business));
        }
        return newUrls;
    }

    @Override
    public void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            return;
        }

        // 追加business_env和region参数
        urls = updateBusinessEnvParam(urls);

        // 聚合所有环境注册的提供者到CrossServiceProviderManager
        URL firstUrl = urls.get(0);
        CrossServiceProviderManager crossServiceProviderManager = getCrossServiceProviderManager();
        crossServiceProviderManager.notify(firstUrl.getParameter("BUSINESS"), urls);

        // 从CrossServiceProviderManager取可用的提供者
        List<URL> newUrls = new ArrayList<>();
        if (isRoutersCategoryNotify(urls)) {
            // 聚合多注册中心的动态路由
            newUrls.addAll(crossServiceProviderManager.getAllDynamicRoutersAndBusinessFirstFilter( myBusinessEnv));
            // 前面注册的静态路由不能丢
            for (URL url : urls) {
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category)
                        && !Boolean.parseBoolean(url.getParameter(Constants.DYNAMIC_KEY, "true"))) {
                    newUrls.add(url);
                }
            }
            // 添加AutoRegEnvRouter静态路由
            newUrls.add(regionRouter2Url());
        } else {
            // 聚合多注册中心的提供者
            newUrls.addAll(crossServiceProviderManager.getAllProvider());
        }

        super.notify(newUrls.isEmpty() ? urls : newUrls);
    }

    /**
     * 向注册中心写入路由规则的操作通常由监控中心或治理中心的页面完成: <link>https://dubbo.apache.org/zh/docsv2.7/user/examples/routing-rule-deprecated/</link>
     * 但这里的路由作为基础功能提供,可以直接在代码中使用,并且不注册到注册中心
     *
     * @return
     * @see RegistryDirectory#notify(java.util.List)
     * //@see RegistryDirectory#toRouters(java.util.List)
     * //@see AbstractDirectory#setRouters(java.util.List)
     */
    private URL regionRouter2Url() {
        String sb = Constants.ROUTE_PROTOCOL + "://" + Constants.ANYHOST_KEY + "/" +
                subscribeUrl.getServiceInterface() + "?" +
                Constants.CATEGORY_KEY + "=" + Constants.ROUTERS_CATEGORY +
                "&" + Constants.ROUTER_KEY + "=" + "crossBusinessRouter" +
                "&runtime=true&dynamic=false" +
                "&" + Constants.VERSION_KEY + "=" + getParameter(Constants.VERSION_KEY) +
                "&" + Constants.GROUP_KEY + "=" + registryGroup;
        return URL.valueOf(sb);
    }

    @Override
    public void destroy() {
        removeCrossServiceProviderManager();
    }

}

CrossBusinessListener负责实现聚合所有业务的Registry监听到的服务提供者,并负责注册路由器,在后续发起RPC调用时,由路由器从聚合的提供者中,选择最优的提供者。

在dubbo中,url是各层、甚至是各功能的衔接剂。添加路由器通过添加路由url实现,由RegistryDirectory实例将路由url转为路由实例。

     private URL regionRouter2Url() {
        String sb = Constants.ROUTE_PROTOCOL + "://" + Constants.ANYHOST_KEY + "/" +
                subscribeUrl.getServiceInterface() + "?" +
                Constants.CATEGORY_KEY + "=" + Constants.ROUTERS_CATEGORY +
                "&" + Constants.ROUTER_KEY + "=" + "crossBusinessRouter" +
                "&runtime=true&dynamic=false" +
                "&" + Constants.VERSION_KEY + "=" + getParameter(Constants.VERSION_KEY) +
                "&" + Constants.GROUP_KEY + "=" + registryGroup;
        return URL.valueOf(sb);
    }

路由器类型还要在SPI中注册:

# /resources/META-INF/dubbo/cm.alibaba.dubbo.rpc.cluster.RouterFactory
crossBusinessRouter=fm.lizhi.dubbo.cluster.router.CrossBusinessRouterFactory

在发起rpc请求时,RegistryDirectory实例先从本地缓存取得所有提供者,然后调用路由器的route方法,获取路由后的服务提供者。

public class CrossBusinessRouter{
    
    private String myBusinessEnv = System.getProperty("metadata.business");
  
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (invocation.getInvoker() == null) {
            // 不是真正的调用
            return invokers;
        }

        // 将invokers按business转为Map
        Map<String, List<Invoker<T>>> categoryInvokerMap = categorize(invokers);

        // 优先选择本业务的提供者,其中myBusinessEnv是本进程部署的业务环境
        List<Invoker<T>> candidates = categoryInvokerMap.get(myBusinessEnv);
        if (candidates != null && !candidates.isEmpty()) {
            return candidates;
        }
        
        // 选择其它业务环境的提供者
        for (String business : allBusiness) {
            candidates = categoryInvokerMap.get(business);
            if (candidates != null && !candidates.isEmpty()) {
                return candidates;
            }
        }

        return Collections.EMPTY_LIST;
    }
  
}

CrossBusinessRouter实现优先选择本业务的提供者,没有则走跨业务调用,即选择其它业务有的提供者。

路由之后就是负载均衡的逻辑了。

扩展实现

以上案例只是一个简单的实现,前提条件是各业务环境内网网络互通,而我们实现的跨环境RPC调用还比较复杂,需要同时支持跨大区机房、跨业务环境,组合起来就有三种情况:同业务跨区域、同区域跨业务、跨业务跨区域。并且我们约定业务之间不能直接RPC调用,需要由一层代理转发,防止腐化。

首先增加一个数据同步服务,负责同步zk数据到其它环境的zk,只同步接口,不同步注册上的服务提供者。

代理服务(rpc网关),在调用端伪装成服务提供者,在提供者端伪装成服务消费者。

假设A业务提供UserService接口,B业务需要调用A业务的UserService接口,代理服务为C,那么: * C注册到B的zk上,伪装为UserService的提供者。 * C注册到A的zk上,伪装为UserService的消费者。 * B调用A的UserService接口,经过代理服务C,转发给A。

#中间件

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

Xxl-job SDK引发的OOM

由于输出的错误日记字符串长度过长,导致xxl-job-admin处理callback请求无法将日记入库。sdk会将失败的callback写入一个重试文件(xxl-job-callback.log),sdk有一个后台线程,定时每几秒会全量load重试文件到内存中...

Dubbo之HTTP RPC vs Dubbo RPC性能压测

此次性能测试对比的是我们基于Dubbo扩展点自实现的Http rpc协议,与Dubbo原生Dubbo rpc协议的单次请求响应平均耗时、吞吐量。

Dubbo为什么会提供泛化调用这个功能

Dubbo的泛化调用功能就类似于Java语言提供的泛型功能,目的都是通用。那为什么需要泛化调用功能呢?

kafka消息重复消费排查

业务使用我们基础部门封装的kafka组件,在一次版本迭代中,我们引入了offset缓存,正是这个缓存,在某种条件触发下,会导致出现消息重复消费现象。

Quartz分布式调度原理

在同一时刻需要触发的Job只有少量的情况下,我们看不到Quartz的性能缺陷,在Job数量明显增加情况下,我们就会发现,调度延迟会有明显增加。尽管横向扩展节点,调度延迟也不会降低,且整体调度性能没有明显好转,反而更糟糕。

重构XXL-JOB,使用响应式编程实现异步RPC提升调度吞吐量

如果同一时刻需要下发几百个执行job的请求给执行器,使用这种阻塞的RPC,意味着需要开启几百个线程,使用几百个连接发送请求,而这几百个线程都需要阻塞等待响应,Job越多,需要的线程数就会越多,对调动中心的性能影响就越大。