- 前言 -
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
- Sentinel 介绍 -
以下是经过多年分布式经验总结的两个理论基础:
(1)微服务与治理的关系
(2)爬坡理论
我们今天的主题分为以下两个主要部分:
Sentinel设计原理
Sentinel运行流程源码剖析
- Sentinel 设计原理 -
丰富的应用场景:阿里 10 年双十一积累场景,含秒杀、双十一零点持续洪峰、热点商品探测、预热、消息队列削峰填谷、集群流量控制、实时熔断下游不可用应用等多样化的场景。
广泛的开源生态:提供开箱即用的与其它开源框架/库的整合模块,如Dubbo、Spring Cloud、gRPC、Zuul、Reactor 等。
完善的 SPI 扩展点:提供简单易用、完善的 SPI 扩展接口;可通过实现扩展接口来快速地定制逻辑。
完备的实时监控:提供实时的监控功能,可看到接入应用的单台机器秒级数据,及500 台以下规模的集群汇总运行情况。
(1)资源:限流的对象
如下代码/user/select即为一个资源:
@GetMapping("/user/select")
@SentinelResource(value = "select", blockHandler = "exceptionHandler")
public TUser select(@RequestParam Integer userId) {
log.info("post /user/select userid=" + userId);
return userService.select(userId);
}
即被SentinelResource注解修饰的API:
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {
String value() default "";
EntryType entryType() default EntryType.OUT;
int resourceType() default 0;
String blockHandler() default "";
Class<?>[] blockHandlerClass() default {};
String fallback() default "";
......
}
(2)入口:sentinel为每个资源创建一个Entry。
(3)槽链:每个Entry都会有一条用于记录限流以及各种控制的信息Slot chain,以此来实现下图中绿色部分的功能。
- Sentinel 运行流程源码剖析 -
1、入口处
SphU.entry("methodA", EntryType.IN);//入口
核心代码 SphU#lookProcessChain(ResourceWrapper resourceWrapper)
2、入口逻辑
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 从threadLocal中获取当前线程对应的context实例。
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
// 如果context是nullContext的实例,表示当前context的总数已经达到阈值,所以这里直接创建entry实例,并返回,不进行规则的检查。
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
//如果context为空,则使用默认的名字创建一个,就是外部在调用SphU.entry(..)方法前如果没有调用ContextUtil.enter(..),则这里会调用该方法进行内部初始化context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
// 总开关
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 构造链路(核心实现) go in
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
* 当链的大小达到阈值Constants.MAX_SLOT_CHAIN_SIZE时,不会校验任何规则,直接返回。
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 开始进行链路调用。
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
3、上下文信息
Context
Context是当前线程所持有的Sentinel上下文。
进入Sentinel的逻辑时,会首先获取当前线程的Context,如果没有则新建。当任务执行完毕后,会清除当前线程的context。Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。
Context 维持着入口节点(entranceNode)、本次调用链路的 当前节点(curNode)、调用来源(origin)等信息。Context 名称即为调用链路入口名称。
Node
Node是对一个@SentinelResource标记的资源的统计包装。
Context中记录本当前线程资源调用的入口节点。
我们可以通过入口节点的childList,可以追溯资源的调用情况。而每个节点都对应一个@SentinelResource标记的资源及其统计数据,例如:passQps,blockQps,rt等数据。
Entry
Entry是Sentinel中用来表示是否通过限流的一个凭证,如果能正常返回,则说明你可以访问被Sentinel保护的后方服务,否则Sentinel会抛出一个BlockException。
另外,它保存了本次执行entry()方法的一些基本信息,包括资源的Context、Node、对应的责任链等信息,后续完成资源调用后,还需要更具获得的这个Entry去执行一些善后操作,包括退出Entry对应的责任链,完成节点的一些统计信息更新,清除当前线程的Context信息等。
在构建Context时已经完成下图部分:
4、核心流程
这里有两个需要注意的点:
ProcessorSlot chain = lookProcessChain(resourceWrapper); 构建链路。
chain.entry(context, resourceWrapper, null, count, prioritized, args); 进行链路调用首先来看链路是如何构建的。
5、获取槽链
已有直接获取;
没有去创建。
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//在上下文中每一个资源都有各自的处理槽
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
// 双重检查锁保证线程安全
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
// 当链的长度达到阈值时,直接返回null,不进行规则的检查。
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 构建链路 go in
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
6、创建槽链
SlotChainProvider.newSlotChain();
// 基于spi扩展点机制来扩展,默认为DefaultSlotChainBuilder
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
7、SPI加载ProcessorSlot
这里采用了spi的机制来扩展SlotChainBuilder,默认是采用DefaultSlotChainBuilder来实现的,可以看到sentinel源码的sentinel-core包下,META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件下,默认属性是:
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
所以默认采用DefaultSlotChainBuilder来构建链路,因此找到DefaultSlotChainBuilder.build()方法。
8、DefaultSlotChainBuilder
public ProcessorSlotChain build() {
// 定义链路起点
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// Note: the instances of ProcessorSlot should be different, since they are not stateless.
// 基于spi扩展机制,加载ProcessorSlot的实现类,从META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件下获取,并且按指定顺序排序
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
// 遍历构建链路
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
// 将slot节点加入链,因为已经排好序了,只需要加到最后即可
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
9、遍历ProcessorSlots
这里也是通过spi的机制,读取文件META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot:
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
从这里看出,链路由这些节点组成,而slot之间的顺序是根据每个slot节点的@SpiOrder注解的值来确定的。
NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot
- 链路调用 -
chain.entry(…)
上面已经构建好了链路,下面就要开始进行链路的调用了。
回到CtSph#entryWithPriority
NodeSelectorSlot(@SpiOrder(-10000))
直接进入NodeSelectorSlot类的entry方法。
根据官方文档,NodeSelectorSlot类的作用为:
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 双重检查锁+缓存 机制
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// 构建调用链的树形结构
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
// 进入下一个链
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ClusterBuilderSlot(@SpiOrder(-9000))
根据官方文档,ClusterBuilderSlot的作用为:
此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。
LogSlot(@SpiOrder(-8000))
该类对链路的传递不做处理,只有在抛出BlockException的时候,向上层层传递的过程中,会通过该类来输入一些日志信息:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
// 当抛出BlockException异常时,这里会输入日志信息
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
Spring Cloud + Alibaba Sentinel 源码原理深度剖析!(中)
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!