原创 吴就业 163 0 2020-09-22
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://wujiuye.com/article/2c7c7c9c6ad749b6aa8f6519a9f33784
作者:吴就业
链接:https://wujiuye.com/article/2c7c7c9c6ad749b6aa8f6519a9f33784
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
上一篇我们简单了解了ProcessorSlot,并且将Sentinel提供的所有ProcessorSlot分成两类,一类是辅助完成资源指标数据统计的ProcessorSlot,一类是实现降级功能的ProcessorSlot。
Sentinel的整体工具流程就是使用责任链模式将所有的ProcessorSlot按照一定的顺序串成一个单向链表。辅助完成资源指标数据统计的ProcessorSlot必须在实现降级功能的ProcessorSlot的前面,原因很简单,降级功能需要依据资源的指标数据做判断,当然,如果某个ProcessorSlot不依赖指标数据实现降级功能,那这个ProcessorSlot的位置就没有约束。
除了按分类排序外,同一个分类下的每个ProcessorSlot可能也需要有严格的排序。比如辅助完成资源指标数据统计的ProcessorSlot的排序顺序为:NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot,如果顺序乱了就会抛出异常,而实现降级功能的ProcessorSlot就没有严格的顺序要求,AuthoritySlot、SystemSlot、FlowSlot、DegradeSlot这几个的顺序可以按需调整。
实现将ProcessorSlot串成一个单向链表的是ProcessorSlotChain,这个ProcessorSlotChain是由SlotChainBuilder构造的,默认SlotChainBuilder构造的ProcessorSlotChain注册的ProcessorSlot以及顺序如下代码所示。
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
如何去掉ProcessorSlot或者添加自定义的ProcessorSlot,下一篇再作介绍。
ProcessorSlot接口的定义如下:
public interface ProcessorSlot<T> {
// 入口方法
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable;
// 调用下一个ProcessorSlot#entry方法
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args) throws Throwable;
// 出口方法
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
// 调用下一个ProcessorSlot#exit方法
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
例如实现熔断降级功能的DegradeSlot,其在entry方法中检查资源当前统计的指标数据是否达到配置的熔断降级规则的阈值,如果是则触发熔断,抛出一个DegradeException(必须是BlockException的子类),而exit方法什么也不做。
方法参数解析:
之所以能够将所有的ProcessorSlot构造成一个ProcessorSlotChain,还是依赖这些ProcessorSlot继承了AbstractLinkedProcessorSlot类。每个AbstractLinkedProcessorSlot类都有一个指向下一个AbstractLinkedProcessorSlot的字段,正是这个字段将ProcessorSlot串成一条单向链表。AbstractLinkedProcessorSlot部分源码如下。
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// 当前节点的下一个节点
private AbstractLinkedProcessorSlot<?> next = null;
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
实现责任链调用是由前一个AbstractLinkedProcessorSlot调用fireEntry方法或者fireExit方法,在fireEntry与fireExit方法中调用下一个AbstractLinkedProcessorSlot(next)的entry方法或exit方法。AbstractLinkedProcessorSlot的fireEntry与fireExit方法的实现源码如下:
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// 当前节点的下一个节点
private AbstractLinkedProcessorSlot<?> next = null;
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
T t = (T) obj;
// 调用下一个ProcessorSlot的entry方法
next.entry(context,resourceWrapper,t,count,prioritized,args);
}
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
// 调用下一个ProcessorSlot的exit方法
next.exit(context, resourceWrapper, count, args);
}
}
}
ProcessorSlotChain也继承AbstractLinkedProcessorSlot,只不过加了两个方法:提供将一个ProcessorSlot添加到链表的头节点的addFirst方法,以及提供将一个ProcessorSlot添加到链表末尾的addLast方法。
ProcessorSlotChain的默认实现类是DefaultProcessorSlotChain,DefaultProcessorSlotChain有一个指向链表头节点的first字段和一个指向链表尾节点的end字段,头节点字段是一个空实现的AbstractLinkedProcessorSlot。DefaultProcessorSlotChain源码如下。
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// first,指向链表头节点
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
// end,指向链表尾节点
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
// 调用头节点的entry方法
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T) obj;
first.entry(context, resourceWrapper, t, count, prioritized, args);
}
// 调用头节点的exit方法
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
first.exit(context, resourceWrapper, count, args);
}
}
责任链模式是非常常用的一种设计模式。在Shiro框架中,实现资源访问权限过滤的骨架(过滤器链)使用的是责任链模式;在Netty框架中,使用责任链模式将处理请求的ChannelHandler包装为链表,实现局部串行处理请求。
Sentinel的责任链实现上与Netty有相似的地方,Sentinel的ProcessorSlot#entry方法与Netty的实现一样,都是按节点在链表中的顺序被调用,区别在于Sentinel的ProcessorSlot#exit方法并不像Netty那样是从后往前调用的。且与Netty不同的是,Netty的ChannelHandler是线程安全的,也就是局部串行,由于Sentinel是与资源为维度的,所以必然实现不了局部串行。
Sentinel会为每个资源创建且仅创建一个ProcessorSlotChain,只要名称相同就认为是同一个资源。ProcessorSlotChain被缓存在CtSph.chainMap静态字段,key为资源ID,每个资源的ProcessorSlotChain在CtSph#entryWithPriority方法中创建,代码如下。
public class CtSph implements Sph {
// 资源与ProcessorSlotChain的映射
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
//......
// 开始构造Chain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//......
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
}
return e;
}
}
如果不借助Sentinel提供的适配器,我们可以这样使用Sentinel。
ContextUtil.enter("上下文名称,例如:sentinel_spring_web_context");
Entry entry = null;
try {
entry = SphU.entry("资源名称,例如:/rpc/openfein/demo", EntryType.IN (或者EntryType.OUT));
// 执行业务方法
return doBusiness();
} catch (Exception e) {
if (!(e instanceof BlockException)) {
Tracer.trace(e);
}
throw e;
} finally {
if (entry != null) {
entry.exit(1);
}
ContextUtil.exit();
}
上面代码我们分为五步分析:
ContextUtil#enter方法负责为当前调用链路创建Context,以及为Conetxt创建EntranceNode,源码如下。
public static Context enter(String name, String origin) {
return trueEnter(name, origin);
}
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
//....
try {
LOCK.lock();
node = contextNameNodeMap.get(name);
if (node == null) {
//....
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
ContextUtil使用ThreadLocal存储当前调用链路的Context,例如web mvc应用中使用Sentinel的spring mvc适配器,在接收到请求时,调用ContextUtil#enter方法会创建一个名为“sentinel_spring_web_context”的Context,并且如果是首次创建还会为所有名为“sentinel_spring_web_context”的Context创建一个EntranceNode。
Context是每个线程只创建一个,而EntranceNode则是每个Context.name对应创建一个。也就是说,应用每接收一个请求都会创建一个新的Context,但名称都是sentinel_spring_web_context,而且都是使用同一个EntranceNode,这个EntranceNode将会存储所有接口的DefaultNode,同时这个EntranceNode也是Constants.ROOT的子节点。
Sentinel的核心骨架是ProcessorSlotChain,所以核心的流程是一次SphU#entry方法的调用以及一次CtEntry#exit方法的调用。
SphU#entry方法调用CtSph#entry方法,CtSph负责为资源创建ResourceWrapper对象并为资源构造一个全局唯一的ProcessorSlotChain、为资源创建CtEntry并将CtEntry赋值给当前调用链路的Context.curEntry、最后调用ProcessorSlotChain#entry方法完成一次单向链表的entry方法调用。
ProcessorSlotChain的一次entry方法的调用过程如下图所示。
只在抛出非BlockException异常时才会调用Tracer#trace方法,用于记录当前资源调用异常,为当前资源的DefaultNode自增异常数。
public class Tracer {
// 调用Tracer的trace方法最终会调用到这个方法
private static void traceExceptionToNode(Throwable t, int count, Entry entry, DefaultNode curNode) {
if (curNode == null) {
return;
}
// .....
// clusterNode can be null when Constants.ON is false.
ClusterNode clusterNode = curNode.getClusterNode();
if (clusterNode == null) {
return;
}
clusterNode.trace(t, count);
}
}
如上代码所示,traceExceptionToNode方法中首先获取当前资源的ClusterNode,然后调用ClusterNode#trace方法记录异常。因为一个资源只创建一个ProcessorSlotChain,一个ProcessorSlotChain只创建ClusterBuilderSlot,一个ClusterBuilderSlot只创建一个ClusterNode,所以一个资源对应一个ClusterNode,这个ClusterNode就是用来统计一个资源的全局指标数据的,熔断降级与限流降级都有使用到这个ClusterNode。
ClusterNode#trace方法的实现如下:
public void trace(Throwable throwable, int count) {
if (count <= 0) {
return;
}
if (!BlockException.isBlockException(throwable)) {
// 非BlockException异常,自增异常总数
this.increaseExceptionQps(count);
}
}
下面是CtEntry#exit方法的实现,为了简短且易于理解,下面给出的exitForContext方法的源码有删减。
@Override
public void exit(int count, Object... args) throws ErrorEntryFreeException {
trueExit(count, args);
}
@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);
return parent;
}
protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
//......
// 1、调用ProcessorSlotChain的exit方法
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// 2、将当前CtEntry的父节点设置为Context的当前节点
context.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
// .....
}
}
CtSph在创建CtEntry时,将资源的ProcessorSlotChain赋值给了CtEntry,所以在调用CtEntry#exit方法时,CtEntry能够拿到当前资源的ProcessorSlotChain,并调用ProcessorSlotChain的exit方法完成一次单向链表的exit方法调用。其过程与ProcessorSlotChain的一次entry方法的调用过程一样,因此不做分析。
CtEntry在退出时还会还原Context.curEntry。上一篇介绍CtEntry时说到,CtEntry用于维护父子Entry,每一次调用SphU#entry都会创建一个CtEntry,如果应用处理一次请求的路径上会多次调用SphU#entry,那么这些CtEntry会构成一个双向链表。在每次创建CtEntry,都会将Context.curEntry设置为这个新的CtEntry,双向链表的作用就是在调用CtEntry#exit方法时,能够将Context.curEntry还原为上一个CtEntry。
ContextUtil#exit方法就简单了,其代码如下:
public static void exit() {
Context context = contextHolder.get();
if (context != null && context.getCurEntry() == null) {
contextHolder.set(null);
}
}
如果Context.curEntry为空,则说明所有SphU#entry都对应执行了一次Entry#exit方法,此时就可以将Context从ThreadLocal中移除。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。
IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。
Instrumentation之所以难驾驭,在于需要了解Java类加载机制以及字节码,一不小心就能遇到各种陌生的Exception。笔者在实现Java探针时就踩过不少坑,其中一类就是类加载相关的问题,也是本篇所要跟大家分享的。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。