原创 吴就业 146 0 2019-12-17
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://wujiuye.com/article/fda3a6e3d95f45b98215a5ef7654339a
作者:吴就业
链接:https://wujiuye.com/article/fda3a6e3d95f45b98215a5ef7654339a
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
本篇文章写于2019年12月17日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。
在前面分析Dubbo注册中心层源码的文章中,我们知道,服务的导出与引入由RegistryProtocol调度完成。对于服务提供者,服务是先导出再注册到注册中心;对于服务消费者,先将自己注册到注册中心,再订阅事件,由RegistryDirectory将所有服务提供者转为Invoker。
那么,服务导出在RPC层都做了什么事情,以及服务提供端是如何处理请求、响应请求的,本篇文章内容主要为:
RPC层封装了PRC调用逻辑,以Invocation和Result为中心,Invocation为请求消息描述消费端想调用那个接口的哪个方法,以及参数是什么,可以理解为Servlet的HttpServletRequest;Result为返回结果,可以理解为Servlet的HttpServletResponse。扩展接口为Protocol、Invoker和Exporter,如dubbo协议的DubboProtocol。Exporter定义服务的导出和解除导出,Invoker可以理解为Spring MVC的MethodHandler,真正处理请求的就是Invoker。
本篇文章主要以Dubbo协议为例,传输层使用Netty4(默认的),分析服务的导出过程,以及接收到请求的处理全过程。在分析注册中心层的RegistryProtocol的export方法时,有这么一行代码因为与注册中心没有多大关系而被我们选择忽略。
【RegistryProtocol类的export方法】
// 导出invoker,DubboProtocol
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
这就是我们将要分析的RPC层代码的入口,我们将从doLocalExport方法开始分析。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 包装Invoker
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 根据SPI自适应扩展点机器,导出服务
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
重点关注这行(Exporter) protocol.export(invokerDelegate)。这里又有一个protocol,这个protocol是谁,如果服务提供者使用的是默认的dubbo协议,那么这个protocol就是DubboProtocol。假设当前服务提供者的url如下。
dubbo://10.1.0.251:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=.....
从url中获取到服务提供者使用的协议是dubbo,所以根据SPI自适应扩展点机制,拿到的就是DubboProtocol。
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
知道protocol是DubboProtocol后,我们继续分析DubboProtocol的export方法。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 导出服务,key=分组/接口名:版本号:端口号
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 打开服务
openServer(url);
optimizeSerialization(url);
return exporter;
}
export方法就是将Invoker转为Exporter,然后调用openServer确保底层服务打开。往下分析我们将接触到信息交换层与传输层。我们继续看openServer方法。
private void openServer(URL url) {
String key = url.getAddress(); // return port <= 0 ? host : host + ":" + port;
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
}
}
openServer方法使用了双重检测确保线程安全。url.getAddress()作为key,serverMap缓存所有打开的Server(ExchangeServer),如果不存在则调用createServer方法创建并打开Server。根据Service的ip和端口号决定打开多少个Server,而一个进程只能开启一个监听端口,一个网卡只有一个IP,所以整个进程都只会打开一个Server。那这个Server到底是什么,openServer做了什么?
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// 服务器关闭时发送只读事件
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 加上心跳配置,默认心跳事件为1分钟
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 加上编码解码器配置
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
return server;
}
为了简单,我去掉了一些无关紧要的代码。Exchangers是一个工具类,bind方法封装了信息交换层的创建逻辑,参数url依然要传递给信息交换层,因为url可能携带我们配置的exchanger参数。以及由信息交换层衔接的传输层,也需要从url中获取配置。本篇不再展开分析。最后传输层将创建一个NettyServer,在NettyServer的构造函数中开启一个Netty服务。这部分内容我将会以单独一篇文章详细解析。
回顾往期文章,从《深入理解Dubbo源码(三),Dubbo与Spring的整合之注解方式》这篇文章中,我们知道,服务的导出入口在config层的ServiceConfig的onApplicationEvent方法,即在Spring初始化完成之后,开始导出服务;从《Dubbo分层架构之服务注册中心层的源码分析(上)》以及《Dubbo分层架构之服务注册中心层的源码分析(下)》这两篇文章,我们知道,服务先由注册中心层调用rpc层导出服务之后再注册到注册中心。
从本篇文章中,我们知道,服务在RPC层的导出将根据协议创建Server,以Dubbo协议为例,最终会创建一个NettyServer。所以,整个服务导出流程如下图所示。
整个服务的导出过程都是由dubbo.URL将配置层、注册中心层、RPC层、信息交换层、传输层串起来的。当前,还有一些细节没有分析到。
以 dubbo2.7.2版本源码中的demo为例,我们先屏蔽底层的信息交换层和传输层的调用过程逻辑,从rpc层开始分析,当服务端接收到一个请求时RPC层是怎么找到DemoService服务并调用的,以及响应过程。
以Debug方式启动demo的服务提供者,再启动服务消费者,在DubboProtocol的requestHandler字段的reply方法中下一个断点,当接收到消费端的请求时,将会停在这个断点。下面分析为什么会停在这个断点。
前面分析的,在DubboProtocol的createServer方法中,调用Exchangers的bind方法时传入了一个requestHandler,正是断点停在的ExchangeHandler。这个ExchangeHandler的reply方法将是服务端处理请求的RPC层入口。
Exchangers.bind(url, requestHandler)
这个requestHandler是一个匿名内部类,类型为ExchangeHandlerAdapter,实现了ExchangeHandler接口,ExchangeHandler又是继承ChannelHandler的,所以这个requestHandler会被注册到Netty的pipline中,但不是直接注册到netty的,因为此ChannelHandler并非Netty的Handler,而是dubbo抽象传输层的ChannelHandler。
每个ChannelHandler的实现类都是继承ChannelHandlerDelegate,即实现委托模式。所以每ChannelHandler中都持有前一个ChannelHandler。每New一个ChannelHandler都要传入一个ChannelHandler,所以就构成了一条链。
在Netty接收到消息时,先调用最顶层的NettyServerHandler的channelRead方法,再到Dubbo抽象的ChannelHandler的received方法,最后一层层往下传递处理。先是解码器DecodeHandler,再到信息交换层处理器HeaderExchangeHandler。在信息交换层中改变调用方式,调用ExchangeHandler的reply(回复)方法,交给RPC层处理请求。
public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
CompletableFuture<Object> reply(ExchangeChannel channel,
Object request) throws RemotingException;
}
reply方法的第二个参数类型为Request。可以理解为Servlet的doPost方法接收的HttpServletRequest,包装客户端请求的全部参数信息。如请求id、版本号、请求元数据描述。关于请求元数据描述有必要解析下。
请求元数据描述类型为RpcInvocation(org.apache.dubbo.rpc.Invocation)。Invocation字段包括:序列化标志(决定使用哪个序列化协议,比如2就是hession2)、远程调用的方法名(sayHello)、方法参数类型(String),调用传递的参数(hello)。请求附带的参数:请求路径(dubbo协议为接口名),dubbo版本、接口名、接口版本、分组等。
我们继续从断点开始分析,先看reply方法的前两行代码。
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
经过上面的分析,我们知道,message的类型为Invocation,且是由消费端发送过来的。第二行代码就是将Invocation转为Invoker,因为Invoker才是调用目标方法的入口,就像Spring MVC的HandlerMethod。
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
........
// serviceKey=分组/接口名:版本号:端口号
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
getInvoker方法的代码是不是很眼熟?在服务导出时根据ip、端口号、接口版本、分组拼接成一个key,并将Invoker封装成Exporter缓存到exporterMap中,现在只是从缓存中拿出来用而已。Spring MVC的HandlerMethod也是用一个Map容易缓存的,key为路径,value为HandlerMethod,没什么区别。
最后又从Exporter中拿到Invoker,此Invoker是什么呢?首先这是一个被层层包装的Invoker,既有代理,也有委托,我们只关心它最初是什么样的。因为不管是代理还是委托,最后都会调用到最初始的那个Invoker。那Dubbo层层封装的目的是什么?稍后分析。
还记得图中这几行代码吗,前两篇文章才分析的。在配置层ServiceConfig的doExportUrlsFor1Protocol方法中。而图中的圈圈ref就是DemoService的实现类DemoServiceImpl的实例。
回到requestHandler的reply方法,继续往下分析,我先简化下reply的代码。
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
// 请求消息
Invocation inv = (Invocation) message;
// 获取Invoker
Invoker<?> invoker = getInvoker(channel, inv);
// 请求上下文设置客户端ip地址,不关心这个
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 调用服务,获取响应结果
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
简化代码后好看多了,reply变得简单了,就是根据请求消息获取到Invoker,调用Invoker处理请求并获取结果,最后返回给客户端,只是返回的逻辑涉及到底层,此处不再深入分析。
接收到客户端的请求消息类型解包后是一个Request,而reply方法的第二个参数message类型是Invocation,这是在信息交换层HeaderExchangeHandler中转换的,Invocation是从Request中获取的,即getDate方法。
我们接着看invoker.invoke(inv)的整个调用链。回过头去跟踪下整个服务导出的链路,看看最初的Invoker是什么类型,以及都经过了多少层包装。
如图所示。Invoker首先是由代理工厂创建的,DemoServiceImpl的代理类。先是经过配置层ServiceConfig的包装,变为DelegateProviderMetaDataInvoker;图中还漏了一个注册中心层包装的InvokerDelegate,不是很重要;再经过过滤器层的包装,变成Filter,至于经过多少个过滤器的包装就得看配置了。
因此,DubboProtocol的requestHandler的reply方法中拿到的invoker是经过过滤器层ProtocolFilterWrapper包装的。
分析到这,你应该知道过滤器是怎么起作用了吧。请求先是经过过滤器处理,如果过滤器都不过滤请求,会先到配置层包装的DelegateProviderMetaDataInvoker。跳过过滤器,直接在DelegateProviderMetaDataInvoker的invoker方法中下断点。
也没啥处理的,继续往回走,但是往回走就下不了断点了,因为是javasist动态生成的代码,所以我们要看的就是动态代理工厂给我们生成的代理类的代码。这得借助一些工具。
在org.apache.dubbo.common.bytecode.ClassGenerator的toClass方法中加入如下代码,将生成的类字节码输出到一个路径下,再通过反编译工具查看。当然,你要是能从乱糟糟的javassist拼接的字符串看出是驴是马,我也不说什么了。
使用一个反编译工具打开class文件,我用的是Luyten这个工具。
生成代理类之后还会包装一层代理,将调用Invoker的invoker方法转为调用动态代理类的invokerMethod方法。具体代码看AbstractInvoker这个类。
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 动态代理包装DemoServiceImpl
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 再给生成的动态代理类包装一层代理
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
似乎哪里不对?Invoker要求invoke方法返回一个Result,但是javassist生成的动态代理类的invokeMethod方法返回值是一个Object类型,是什么时候转成Result的?invokeMethod是由AbstractProxyInvoker的doInvoker方法调用的,所以将Object包装为Result就是在AbstractProxyInvoker的invoker方法中实现的。
到这,整个方法的调用流程就都清楚了。你知道过滤器是什么时候被调用了吗?你知道熔断器为什么基于过滤器实现了吗?但要知道处理请求的线程调度,以及为何线程池满了抛出rpc远程调用异常,这些还需要继续深入信息交换层和传输层才能找到答案。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
Dubbo框架的传输层默认使用dubbo协议,这也是一种RPC远程通信协议。学习Dubbo,我们有必要了解dubbo协议长什么样,最好的办法就是从源码中寻找答案。
今天我们来分析下`netty`是如何解析`http`协议数据包的。重点是分析`HttpObjectDecoder`类的`decode`方法的源码,`http`协议数据包的解码操作都是在该方法中完成的。
本篇继续分析服务提供者发起一个远程RPC调用的全过程,也是跳过信息交换层和传输层,但发起请求的逻辑会复杂些,包括负载均衡和失败重试的过程,以及当消费端配置与每个服务提供端保持多个长连接时的处理逻辑。
由于我在实际项目中并未使用Redis作为服务注册中心,所以一直没有关注这个话题。那么,使用Redis作为服务注册中心有哪些缺点,希望本篇文章能给你答案。
服务注册与发现是Dubbo核心的一个模块,假如没有注册中心,我们要调用远程服务,就必须要预先配置,就像调用第三方http接口一样,需要知道接口的域名或者IP、端口号才能调用。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。