原创 吴就业 135 0 2020-09-22
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://wujiuye.com/article/ea3cd855bf5f4a24a2246f1b04ced2ea
作者:吴就业
链接:https://wujiuye.com/article/ea3cd855bf5f4a24a2246f1b04ced2ea
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
从这篇开始,我们学习Sentinel提供的几个实现降级功能的ProcessorSlot,这些ProcessorSlot检查实时指标数据是否达到规则所配置的阈值,当达到阈值时,或抛出Block异常或采取流量效果控制策略处理超阈值的流量。
Sentinel实现限流降级、熔断降级、黑白名单限流降级、系统自适应限流降级以及热点参数限流降级都是由ProcessorSlot、Checker、Rule、RuleManager组合完成。ProcessorSlot作为调用链路的切入点,负责调用Checker检查当前请求是否可以放行;Checker则根据资源名称从RuleManager中拿到为该资源配置的Rule(规则),取ClusterNode统计的实时指标数据与规则对比,如果达到规则的阈值则抛出Block异常,抛出Block异常意味着请求被拒绝,也就实现了限流或熔断。
可以总结为以下三个步骤:
Sentinel在最初的框架设计上,将是否允许请求通过的判断行为交给Rule去实现,所以将Rule定义成了接口。Rule接口只定义了一个passCheck方法,即判断当前请求是否允许通过。Rule接口的定义如下。
public interface Rule {
boolean passCheck(Context context, DefaultNode node, int count, Object... args);
}
因为规则是围绕资源配置的,一个规则只对某个资源起作用,因此Sentinel提供了一个抽象规则配置类AbstractRule,AbstractRule的定义如下。
public abstract class AbstractRule implements Rule {
private String resource;
private String limitApp;
// ....
}
Rule、AbstractRule与其它实现类的关系如下图所示。
FlowRule是限流规则配置类,FlowRule继承AbstractRule并实现Rule接口。FlowRule源码如下,非完整源码,与实现集群限流相关的字段暂时去掉了。
public class FlowRule extends AbstractRule {
// 限流阈值类型 qps|threads
private int grade = RuleConstant.FLOW_GRADE_QPS;
// 限流阈值
private double count;
// 基于调用关系的限流策略
private int strategy = RuleConstant.STRATEGY_DIRECT;
// 配置strategy使用,入口资源名称
private String refResource;
// 流量控制效果(直接拒绝、Warm Up、匀速排队)
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
// 冷启动时长(预热时长),单位秒
private int warmUpPeriodSec = 10;
// 最大排队时间。
private int maxQueueingTimeMs = 500;
// 流量控制器
private TrafficShapingController controller;
//.....
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
return true;
}
}
Rule定义的行为应该只是Sentinel在最初搭建框架时定义的约定,Sentinel自己也并没有都遵守这个约定,很多规则并没有将passCheck交给Rule去实现,Checker可能是后续引入的,用于替代Rule的passCheck行为。
Sentinel中用来管理规则配置的类都以规则类的名称+Manger命名,除此之外,并没有对规则管理器有什么行为上的约束。
用来加载限流规则配置以及缓存限流规则配置的类为FlowRuleManager,其部分源码如下。
public class FlowRuleManager {
// 缓存规则
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
// 获取所有规则
static Map<String, List<FlowRule>> getFlowRuleMap() {
return flowRules;
}
// 更新规则
public static void loadRules(List<FlowRule> rules) {
// 更新静态字段flowRules
}
}
FlowSlot是实现限流功能的切入点,它作为ProcessorSlot插入到ProcessorSlotChain链表中,在entry方法中调用Checker去判断是否需要拒绝当前请求,如果需要拒绝请求则抛出Block异常。FlowSlot的源码如下。
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
// 规则生产者,一个Function
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
// 参数为资源名称
@Override
public Collection<FlowRule> apply(String resource) {
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// check是否限流
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
FlowSlot在构造方法中创建FlowRuleChecker,并在entry方法中调用FlowRuleChecker#checkFlow方法判断是否需要拦截当前请求。在调用FlowRuleChecker#checkFlow方法时传入了一个Function接口实例,FlowRuleChecker可调用该Function的apply方法从FlowRuleManager获取资源的所有规则配置,当然,最终还是调用FlowRuleManager#getFlowRuleMap方法从FlowRuleManager获取。
FlowRuleChecker与FlowRuleManager一样,Sentinel也并没有约定Checker必须具有哪些行为,只是在命名上约定Checker类需以规则类的名称+“Checker”命名。FlowRuleChecker负责判断是否需要拒绝当前请求,由于FlowRuleChecker类的源码很多,所以我们按过程分析用到的每个方法。
首先是由FlowSlot调用的checkFlow方法,该方法源码如下。
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// (1)
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
// (2)
for (FlowRule rule : rules) {
// (3)
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
checkFlow方法我们分三步分析:
canPassCheck即“can pass check”,意思是检查是否允许通过,后面我们也统一将“检查是否允许当前请求通过”使用canPassCheck代指,canPassCheck方法返回true说明允许请求通过,反之则不允许通过。 canPassCheck方法源码如下。
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {
// (1)
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// (2)
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// (3)
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
passLocalCheck方法源码如下。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// (1)
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// (2)
return rule.getRater()
// (3)
.canPass(selectedNode, acquireCount, prioritized);
}
selectNodeByRequesterAndStrategy方法的实现逻辑很复杂,实现根据限流规则配置的limitApp与strategy选择一个StatisticNode,两个字段的组合情况可以有6种。selectNodeByRequesterAndStrategy方法源码如下。
static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
// 限流规则针对哪个来源生效
String limitApp = rule.getLimitApp();
// 基于调用关系的限流策略
int strategy = rule.getStrategy();
// 远程来源
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//(1)
return context.getOriginNode();
}
//(2)
return selectReferenceNode(rule, context, node);
}
else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//(3)
return node.getClusterNode();
}
//(4)
return selectReferenceNode(rule, context, node);
}
else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//(5)
return context.getOriginNode();
}
//(6)
return selectReferenceNode(rule, context, node);
}
return null;
}
如果当前限流规则的limitApp为default,则说明该限流规则对任何调用来源都生效,针对所有调用来源限流,否则只针对指定调用来源限流。
从selectNodeByRequesterAndStrategy方法可以看出,Sentinel之所以针对每个资源统计访问来源的指标数据,也是为了实现对丰富的限流策略的支持。
因为每个调用来源服务对同一个资源的访问频率都是不同的,针对调用来源限流可限制并发量较高的来源服务的请求,而对并发量低的来源服务的请求可不限流,或者是对一些并没有那么重要的来源服务限流。
当两个资源之间具有资源争抢关系的时候,使用STRATEGY_RELATE调用关系限流策略可避免多个资源之间过度的对同一资源争抢。例如查询订单信息和用户下单两个分别读和写数据库订单表的资源,如下图所示。
我们可以给执行读表操作的资源设置限流规则实现写优先的目的,查询订单信息的资源根据用户下单的资源的实时指标数据限流,当写表操作过于频繁时,读表操作的请求就会被限流。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。
IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。
Instrumentation之所以难驾驭,在于需要了解Java类加载机制以及字节码,一不小心就能遇到各种陌生的Exception。笔者在实现Java探针时就踩过不少坑,其中一类就是类加载相关的问题,也是本篇所要跟大家分享的。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。