原创 吴就业 118 0 2020-09-22
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://wujiuye.com/article/acc83a0474044c4187d52cf845d23cb1
作者:吴就业
链接:https://wujiuye.com/article/acc83a0474044c4187d52cf845d23cb1
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
NodeSelectorSlot负责为资源的首次访问创建DefaultNode,以及维护Context.curNode和调用树。NodeSelectorSlot被放在ProcessorSlotChain链表的第一个位置,这是因为后续的ProcessorSlot都需要依赖这个ProcessorSlot。NodeSelectorSlot源码如下。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
// Context的name -> 资源的DefaultNode
private volatile Map<String, DefaultNode> map = new HashMap<>(10);
// 入口方法
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
// 使用Context的名称作为key缓存资源的DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 为资源创建DefaultNode
node = new DefaultNode(resourceWrapper, null);
// 替换map
HashMap<String, DefaultNode> cacheMap = new HashMap<>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// 绑定调用树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// 替换Context的curNode为当前DefaultNode
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// 出口方法什么也不做
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
如源码所示,map字段是一个非静态字段,意味着每个NodeSelectorSlot都有一个map。由于一个资源对应一个ProcessorSlotChain,而一个ProcessorSlotChain只创建一个NodeSelectorSlot,并且map缓存DefaultNode使用的key并非资源ID,而是Context.name,所以map的作用是缓存针对同一资源为不同调用链路入口创建的DefaultNode。
在entry方法中,首先根据Context.name从map获取当前调用链路入口的资源DefaultNode,如果资源第一次被访问,也就是资源的ProcessorSlotChain第一次被创建,那么这个map是空的,就会加锁为资源创建DefaultNode,如果资源不是首次被访问,但却首次作为当前调用链路(Context)的入口资源,也需要加锁为资源创建一个DefaultNode。可见,Sentinel会为同一资源ID创建多少个DefaultNode取决于有多少个调用链使用其作为入口资源,直白点就是同一资源存在多少个DefaultNode取决于Context.name有多少种不同取值,这就是为什么说一个资源可能有多个DefaultNode的原因。
为什么这么设计呢?举个例子,对同一支付接口,我们需要使用spring mvc暴露给前端访问,同时也可能会使用dubbo暴露给其它内部服务调用。Sentinel的webmvc适配器在调用链路入口创建名为“sentinel_spring_web_context”的Context,与Sentinel的Dubbo适配器调用ContextUtil#enter方法创建的Context名称不同。针对这种情况,我们可以实现只限制spring mvc进来的流量,也就是限制前端发起接口调用的QPS、并行占用的线程数等。
NodeSelectorSlot#entry方法最难以理解的就是实现绑定调用树这行代码:
((DefaultNode) context.getLastNode()).addChild(node);
这行代码分两种情况分析更容易理解,我们就以Sentinel提供的demo为例进行分析。
Sentinel的sentinel-demo模块下提供了多种使用场景的demo,我们选择sentinel-demo-spring-webmvc这个demo为例,该demo下有一个hello接口,其代码如下。
@RestController
public class WebMvcTestController {
@GetMapping("/hello")
public String apiHello() throws BlockException {
doBusiness();
return "Hello!";
}
}
我们不需要添加任何规则,只是为了调试Sentinel的源码。将demo启动起来后,在浏览器访问”/hello”接口,在NodeSelectorSlot#entry方法的绑定调用树这一行代码下断点,观察此时Context的字段信息。正常情况下我们可以看到如下图所示的结果。
从上图中可以看出,此时的Context.entranceNode的子节点为空(childList的大小为0),并且当前CtEntry父、子节点都是Null(curEntry字段)。当绑定调用树这一行代码执行完成后,Context的字段信息如下图所示。
从上图可以看出,NodeSelectorSlot为当前资源创建的DefaultNode被添加到了Context.entranceNode的子节点。entranceNode类型为EntranceNode,在调用ContextUtil#enter方法时创建,在第一次创建名为“sentinel_spring_web_context”的Context时创建,相同名称的Context都使用同一个EntranceNode。并且该EntranceNode在创建时会被添加到Constant.ROOT。
此时,Constant.ROOT、Context.entranceNode、当前访问资源的DefaultNode构造成的调用树如下。
ROOT (machine-root)
/
EntranceNode (context name: sentinel_spring_web_context)
/
DefaultNode (resource name: GET:/hello)
如果我们现在再访问demo的其他接口,例如访问“/err”接口,那么生成的调用树就会变成如下。
ROOT (machine-root)
/
EntranceNode (context name: sentinel_spring_web_context)
/ \
DefaultNode (resource name: GET:/hello) DefaultNode (resource name: GET:/err)
Context.entranceNode将会存储web项目的所有资源(接口)的DefaultNode。
比如我们在一个服务中添加了Sentinel的webmvc适配模块的依赖,也添加了Sentinel的OpenFeign适配模块的依赖,并且我们使用OpenFeign调用内部其他服务的接口,那么就会存在一次调用链路上出现多次调用SphU#entry方法的情况。
首先webmvc适配器在接收客户端请求时会调用一次SphU#entry,在处理客户端请求时可能需要使用OpenFeign调用其它服务的接口,那么在发起接口调用时,Sentinel的OpenFeign适配器也会调用一次SphU#entry。
现在我们将demo的hello接口修改一下,将hello接口调用的doBusiness方法也作为资源使用Sentinel保护起来,改造后的hello接口代码如下。
@RestController
public class WebMvcTestController {
@GetMapping("/hello")
public String apiHello() throws BlockException {
ContextUtil.enter("my_context");
Entry entry = null;
try {
entry = SphU.entry("POST:http://wujiuye.com/hello2", EntryType.OUT);
// ==== 这里是被包装的代码 =====
doBusiness();
return "Hello!";
// ==== end ===============
} catch (Exception e) {
if (!(e instanceof BlockException)) {
Tracer.trace(e);
}
throw e;
} finally {
if (entry != null) {
entry.exit(1);
}
ContextUtil.exit();
}
}
}
我们可将doBusiness方法看成是远程调用,例如调用第三方的接口,接口名称为“http://wujiuye.com/hello2”,使用POST方式调用,那么我们可以使用“POST:http://wujiuye.com/hello2”作为资源名称,并将流量类型设置为OUT类型。上下文名称取名为”my_context”。
现在启动demo,使用浏览器访问“/hello”接口。当代码执行到apiHello方法时,在NodeSelectorSlot#entry方法的绑定调用树这一行代码下断点。当绑定调用树这行代码执行完成后,Context的字段信息如下图所示。
如图所示,Sentinel并没有创建名称为my_context的Context,还是使用应用接收到请求时创建名为“sentinel_spring_web_context”的Context,所以处理浏览器发送过来的请求的“GET:/hello”资源是本次调用链路的入口资源,Sentinel在调用链路入口处创建Context之后不再创建新的Context。
ROOT (machine-root)
/
EntranceNode (name: sentinel_spring_web_context)
/ \
DefaultNode (GET:/hello) .........
/
DefaultNode (POST:/hello2)
此时,当前调用链路上也已经存在两个CtEntry,这两个CtEntry构造一个双向链表,如下图所示。
虽然存在两个CtEntry,但此时Context.curEntry指向第二个CtEntry,第二个CtEntry在apiHello方法中调用SphU#entry方法时创建,当执行完doBusiness方法后,调用当前CtEntry#exit方法,由该CtEntry将Context.curEntry还原为该CtEntry的父CtEntry。这有点像入栈和出栈操作,例如栈帧在Java虚拟机栈的入栈和出栈,调用方法时方法的栈帧入栈,方法执行完成栈帧出栈。
NodeSelectorSlot#entry方法我们还有一行代码没有分析,就是将当前创建的DefaultNode设置为Context的当前节点,代码如下。
// 替换Context.curNode为当前DefaultNode
context.setCurNode(node);
替换Context.curNode为当前资源DefaultNode这行代码就是将当前创建的DefaultNode赋值给当前CtEntry.curNode。对着上图理解就是,将资源“GET:/hello”的DefaultNode赋值给第一个CtEntry.curNode,将资源“POST:http://wujiuye.com/hello2”的DefaultNode赋值给第二个CtEntry.curNode。
要理解Sentinel构造CtEntry双向链表的目的,首先我们需要了解调用Context#getCurNode方法获取当前资源的DefaultNode可以做什么。
Tracer#tracer方法用于记录异常。以异常指标数据统计为例,在发生非Block异常时,Tracer#tracer需要从Context获取当前资源的DefaultNode,通知DefaultNode记录异常,同时DefaultNode也会通知ClusterNode记录记录,如下代码所示。
public class DefaultNode extends StatisticNode {
......
@Override
public void increaseExceptionQps(int count) {
super.increaseExceptionQps(count);
this.clusterNode.increaseExceptionQps(count);
}
}
这个例子虽然简单,但也足以说明Sentinel构造CtEntry双向链表的目的。
在一个资源的ProcessorSlotChain中,NodeSelectorSlot负责为资源创建DefaultNode,这个DefaultNode仅限同名的Context使用。所以一个资源可能会存在多个DefaultNode,那么想要获取一个资源的总的QPS就必须要遍历这些DefaultNode。为了性能考虑,Sentinel会为每个资源创建一个全局唯一的ClusterNode,用于统计资源的全局并行占用线程数、QPS、异常总数等指标数据。
与NodeSelectorSlot的职责相似,ClusterBuilderSlot的职责是为资源创建全局唯一的ClusterNode,仅在资源第一次被访问时创建。ClusterBuilderSlot还会将ClusterNode赋值给DefaultNode.clusterNode,由DefaultNode持有ClusterNode,负责管理ClusterNode的指标数据统计。这点也是ClusterBuilderSlot在ProcessorSlotChain链表中必须排在NodeSelectorSlot之后的原因,即必须先有DefaultNode,才能将ClusterNode交给DefaultNode管理。
ClusterBuilderSlot的源码比较多,本篇只分析其实现ProcessorSlot接口的entry和exit方法。ClusterBuilderSlot删减后的源码如下。
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 资源 -> ClusterNode
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
// 非静态,一个资源对应一个ProcessorSlotChain,所以一个资源共用一个ClusterNode
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 创建ClusterNode
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
// 添加到缓存
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// node为NodeSelectorSlot传递过来的DefaultNode
node.setClusterNode(clusterNode);
// 如果origin不为空,则为远程创建一个StatisticNode
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
ClusterBuilderSlot使用一个Map缓存资源的ClusterNode,并且用一个非静态的字段维护当前资源的ClusterNode。因为一个资源只会创建一个ProcessorSlotChain,意味着ClusterBuilderSlot也只会创建一个,那么让ClusterBuilderSlot持有该资源的ClusterNode就可以省去每次都从Map中获取的步骤,这当然也是Sentinel为性能做出的努力。
ClusterBuilderSlot#entry方法的node参数由前一个ProcessorSlot传递过来,也就是NodeSelectorSlot传递过来的DefaultNode。ClusterBuilderSlot将ClusterNode赋值给DefaultNode.clusterNode,那么后续的ProcessorSlot就能从node参数中取得ClusterNode。DefaultNode与ClusterNode的关系如下图所示。
ClusterNode有一个Map类型的字段用来缓存origin与StatisticNode的映射,代码如下。
public class ClusterNode extends StatisticNode {
private final String name;
private final int resourceType;
private Map<String, StatisticNode> originCountMap = new HashMap<>();
}
如果上游服务在调用当前服务的接口传递origin字段过来,例如可在http请求头添加“S-user”参数,或者Dubbo rpc调用在请求参数列表加上“application”参数,那么ClusterBuilderSlot就会为ClusterNode创建一个StatisticNode,用来统计当前资源被远程服务调用的指标数据。
例如,当origin表示来源应用的名称时,对应的StatisticNode统计的就是针对该调用来源的指标数据,可用来查看哪个服务访问这个接口最频繁,由此可实现按调用来源限流。
ClusterNode#getOrCreateOriginNode方法源码如下。
public Node getOrCreateOriginNode(String origin) {
StatisticNode statisticNode = originCountMap.get(origin);
if (statisticNode == null) {
try {
lock.lock();
statisticNode = originCountMap.get(origin);
if (statisticNode == null) {
statisticNode = new StatisticNode();
// 这几行代码在Sentinel中随处可见
HashMap<String, StatisticNode> newMap = new HashMap<>(originCountMap.size() + 1);
newMap.putAll(originCountMap);
newMap.put(origin, statisticNode);
originCountMap = newMap;
}
} finally {
lock.unlock();
}
}
return statisticNode;
}
为了便于使用,ClusterBuilderSlot会将调用来源(origin)的StatisticNode赋值给Context.curEntry.originNode,后续的ProcessorSlot可调用Context#getCurEntry#getOriginNode方法获取该StatisticNode。这里我们可以得出一个结论,如果我们自定义的ProcessorSlot需要用到调用来源的StatisticNode,那么在构建ProcessorSlotChain时,我们必须要将这个自定义ProcessorSlot放在ClusterBuilderSlot之后。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。
IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。
Instrumentation之所以难驾驭,在于需要了解Java类加载机制以及字节码,一不小心就能遇到各种陌生的Exception。笔者在实现Java探针时就踩过不少坑,其中一类就是类加载相关的问题,也是本篇所要跟大家分享的。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。