Spring Cloud + Alibaba Sentinel 源码原理深度剖析!(中)

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

接:

Spring Cloud + Alibaba Sentinel 源码原理深度剖析!(上)

4、StatisticSlot

StatisticSlot(@SpiOrder(-7000))


官方文档:

StatisticSlot用于记录、统计不同纬度的 runtime 指标监控信息。

@Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,

                  boolean prioritized, Object... args) throws Throwable {

    try {

        // Do some checking.

        // 先将调用链继续下去,等到后续链调用结束了,再执行下面的步骤

        fireEntry(context, resourceWrapper, node, count, prioritized, args);


        // Request passed, add thread count and pass count.

        node.increaseThreadNum();

        node.addPassRequest(count);


        if (context.getCurEntry().getOriginNode() != null) {

            // Add count for origin node.

            context.getCurEntry().getOriginNode().increaseThreadNum();

            context.getCurEntry().getOriginNode().addPassRequest(count);

        }


        if (resourceWrapper.getEntryType() == EntryType.IN) {

            // Add count for global inbound entry node for global statistics.

            Constants.ENTRY_NODE.increaseThreadNum();

            Constants.ENTRY_NODE.addPassRequest(count);

        }


        // Handle pass event with registered entry callback handlers.

        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {

            handler.onPass(context, resourceWrapper, node, count, args);

        }

    } catch (PriorityWaitException ex) {

        node.increaseThreadNum();

        if (context.getCurEntry().getOriginNode() != null) {

            // Add count for origin node.

            context.getCurEntry().getOriginNode().increaseThreadNum();

        }


        if (resourceWrapper.getEntryType() == EntryType.IN) {

            // Add count for global inbound entry node for global statistics.

            Constants.ENTRY_NODE.increaseThreadNum();

        }

        // Handle pass event with registered entry callback handlers.

        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {

            handler.onPass(context, resourceWrapper, node, count, args);

        }

    } catch (BlockException e) {

        // Blocked, set block exception to current entry.

        context.getCurEntry().setBlockError(e);


        // Add block count.

        node.increaseBlockQps(count);

        if (context.getCurEntry().getOriginNode() != null) {

            context.getCurEntry().getOriginNode().increaseBlockQps(count);

        }


        if (resourceWrapper.getEntryType() == EntryType.IN) {

            // Add count for global inbound entry node for global statistics.

            Constants.ENTRY_NODE.increaseBlockQps(count);

        }


        // Handle block event with registered entry callback handlers.

        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {

            handler.onBlocked(e, context, resourceWrapper, node, count, args);

        }


        throw e;

    } catch (Throwable e) {

        // Unexpected internal error, set error to current entry.

        context.getCurEntry().setError(e);


        throw e;

    }

}


StatisticSlot 会先将链往下执行,等到后面的节点全部执行完毕,再进行数据统计。


5、AuthoritySlot

@SpiOrder(-6000)

AuthoritySlot


官方文档:

AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制。


@Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)

    throws Throwable {

    // 黑白名单权限控制

    checkBlackWhiteAuthority(resourceWrapper, context);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);

}


void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {

    Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();


    if (authorityRules == null) {

        return;

    }


    Set<AuthorityRule> rules = authorityRules.get(resource.getName());

    if (rules == null) {

        return;

    }


    for (AuthorityRule rule : rules) {

        if (!AuthorityRuleChecker.passCheck(rule, context)) {

            throw new AuthorityException(context.getOrigin(), rule);

        }

    }

}

6、SystemSlot

@SpiOrder(-5000)

SystemSlot


官方文档:

SystemSlot:这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                 boolean prioritized, Object... args) throws Throwable {
   // 系统规则校验
   SystemRuleManager.checkSystem(resourceWrapper);
   fireEntry(context, resourceWrapper, node, count, prioritized, args);
}


7、FlowSlot 限流规则引擎

@SpiOrder(-2000)

FlowSlot


官方文档:

这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:

  • 指定应用生效的规则,即针对调用方限流的;

  • 调用方为 other 的规则;

  • 调用方为 default 的规则。


-     入口     -

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                 boolean prioritized, Object... args) throws Throwable {
   // 检查限流规则
   checkFlow(resourceWrapper, context, node, count, prioritized);

   fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
   throws BlockException {
   checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}


1、所有规则检查

调用了FlowRuleChecker.checkFlow(…)方法。

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                     Context context, DefaultNode node, int count, boolean prioritized) throwsBlockException {
   if (ruleProvider == null || resource == null) {
       return;
  }
   // 根据资源名称找到对应的
   Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
   if (rules != null) {
       // 遍历规则,依次判断是否通过
       for (FlowRule rule : rules) {
           if (!canPassCheck(rule, context, node, count, prioritized)) {
               throw new FlowException(rule.getLimitApp(), rule);
          }
      }
  }
}


2、单个规则检查

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNodenode, int acquireCount,
                                               boolean prioritized) {
   String limitApp = rule.getLimitApp();
   if (limitApp == null) {
       return true;
  }
   // 集群限流的判断
   if (rule.isClusterMode()) {
       return passClusterCheck(rule, context, node, acquireCount, prioritized);
  }
   // 本地节点的判断
   return passLocalCheck(rule, context, node, acquireCount, prioritized);
}


3、非集群模式的限流判断

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                     boolean prioritized) {
   // 根据请求的信息及策略,选择不同的node节点
   Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
   if (selectedNode == null) {
       return true;
  }
   // 根据当前规则,获取规则控制器,调用canPass方法进行判断
//       rule.getRater()放回的是TrafficShapingController接口的实现类,使用了策略模式,根据使用的控制措施来选择使用哪种实现。
   return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}


这里是先根据请求和当前规则的策略,找到该规则下存储统计信息的节点,然后根据当前规则获取相应控制器,通过控制器的canPass(…)方法进行判断。


4、获取节点

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Contextcontext, DefaultNode node) {
   // The limit app should not be empty.
   String limitApp = rule.getLimitApp();
   int strategy = rule.getStrategy();
   String origin = context.getOrigin();

   // 判断调用来源,这种情况下origin不能为default或other
   if (limitApp.equals(origin) && filterOrigin(origin)) {
       // 如果调用关系策略为STRATEGY_DIRECT,表示仅判断自己,则返回origin statistic node.
       if (strategy == RuleConstant.STRATEGY_DIRECT) {
           // Matches limit origin, return origin statistic node.
           return context.getOriginNode();
      }

       // 采用调用来源进行判断的策略
       return selectReferenceNode(rule, context, node);
  } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // 如果调用来源为default默认的
       if (strategy == RuleConstant.STRATEGY_DIRECT) { // 如果调用关系策略为STRATEGY_DIRECT,则返回clusterNode
           // Return the cluster node.
           return node.getClusterNode();
      }

       return selectReferenceNode(rule, context, node);
  } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
       && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // 如果调用来源为other,且调用来源不在限制规则内,为其他来源
       if (strategy == RuleConstant.STRATEGY_DIRECT) {
           return context.getOriginNode();
      }
       return selectReferenceNode(rule, context, node);
  }
   return null;
}


5、流量整形控制器

rule.getRater()方法会返回一个控制器,接口为TrafficShapingController,该接口的实现类图如下:

attachments-2021-01-DrsBFFbd5ffe6a2b50ab6.jpg

从类图可以看出,是很明显的策略模式,分别针对不同的限流控制策略。


1、默认策略

DefaultController该策略是sentinel的默认策略,如果请求超出阈值,则直接拒绝请求。

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
   // 当前已经统计的数
   int curCount = avgUsedTokens(node);
   if (curCount + acquireCount > count) {
       // 如果是高优先级的,且是基于qps的限流方式,则可以尝试从下个未来的滑动窗口中预支
       if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
           long currentTime;
           long waitInMs;
           currentTime = TimeUtil.currentTimeMillis();
           // 从下个滑动窗口中提前透支
           waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
           if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
               node.addWaitingRequest(currentTime + waitInMs, acquireCount);
               node.addOccupiedPass(acquireCount);
               sleep(waitInMs);

               // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
               throw new PriorityWaitException(waitInMs);
          }
      }
       return false;
  }
   return true;
}

private int avgUsedTokens(Node node) {
   if (node == null) {
       return DEFAULT_AVG_USED_TOKENS;
  }
   // 如果当前是线程数限流,则返回node.curThreadNum()当前线程数
   // 如果是QPS限流,则返回node.passQps()当前已经通过的qps数据
   return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

private void sleep(long timeMillis) {
   try {
       Thread.sleep(timeMillis);
  } catch (InterruptedException e) {
       // Ignore.
  }
}


2、匀速排队策略

RateLimiterController

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
   // Pass when acquire count is less or equal than 0.
   if (acquireCount <= 0) {
       return true;
  }
   // Reject when count is less or equal than 0.
   // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
   if (count <= 0) {
       return false;
  }

   long currentTime = TimeUtil.currentTimeMillis();
   // Calculate the interval between every two requests.
   // 计算两个请求之间的时间间隔
   long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

   // Expected pass time of this request. 该请求的预计通过时间 = 上一次通过的时间 + 时间间隔
   long expectedTime = costTime + latestPassedTime.get();

   // 如果预计时间比当前时间小,表示可以请求完全可以通过
   if (expectedTime <= currentTime) {
       // Contention may exist here, but it's okay.
       // 这里可能存在竞争,但是不影响。
       latestPassedTime.set(currentTime);
       return true;
  } else {
       // Calculate the time to wait.
       // 计算等待时间
       long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
       // 如果等待时间超出了等待队列的最大时间,则无法放入等待队列,直接拒绝
       if (waitTime > maxQueueingTimeMs) {
           return false;
      } else {
           long oldTime = latestPassedTime.addAndGet(costTime);
           try {
               // 重新计算等待时间
               waitTime = oldTime - TimeUtil.currentTimeMillis();
               // 判断等待时间是否超过等待队列的最大时间,如果超过了,拒绝,并且将latestPassedTime最后一次请求时间重新设置为原值
               if (waitTime > maxQueueingTimeMs) {
                   latestPassedTime.addAndGet(-costTime);
                   return false;
              }
               // in race condition waitTime may <= 0
               // 线程等待
               if (waitTime > 0) {
                   Thread.sleep(waitTime);
              }
               return true;
          } catch (InterruptedException e) {
          }
      }
  }
   return false;
}


从代码可以看出,匀速排队策略是使用了虚拟队列的方法,通过控制阈值来计算出请求的时间间隔,然后将上一次请求的时间加上时间间隔,表示下一次请求的时间,如果当前时间比这个值大,说明已经超出时间间隔了,当然可以请求,反之,表示需要等待,那么等待的时长就应该是要等到当前时间达到预期时间才能请求,这里就有个虚拟的等待队列,而等待其实是通过线程的等待来实现的。而这里所说的虚拟队列实际上是由一系列的处于sleep状态的线程组成的,但是实际的数据结构上并没有构成队列。

接:

Spring Cloud + Alibaba Sentinel 源码原理深度剖析!(下)

attachments-2021-01-63Dw1CSi5ffe6c607bdd6.jpeg


  • 发表于 2021-01-13 11:36
  • 阅读 ( 28 )

0 条评论

请先 登录 后评论
奈学教育
奈学教育

官方

150 篇文章

作家榜 »

  1. NX小编 1251 文章
  2. 58沈剑 322 文章
  3. 热爱技术的小仓鼠 169 文章
  4. 奈学教育 150 文章
  5. 李希沅 | 奈学教育 51 文章
  6. 江帅帅 | 奈学教育 32 文章
  7. 林淮川 | 奈学教育 12 文章
  8. 邱鹏超 3 文章