作者:vivo 互联网服务器团队-Wang Zhi
责任链模式作为常用的设计模式而被大家熟知和使用。本文介绍责任链的常见实现方式,并结合开源框架如Dubbo、Sentinel等进行延伸探讨。
一、责任链介绍
在GoF 的《设计模式》一书中对责任链模定义的:将请求的发送和接收解耦,让多个接收对象都有机会处理这个请求。将这些接收对象串成一条链,并沿着这条链传递这个请求,直到链上的某个接收对象能够处理它为止或者所有接收对象处理一遍。
用通俗的话解释在责任链模式中,多个处理器(接收对象)依次处理同一个请求。一个请求先经过 A 处理器处理,然后再把请求传递给 B 处理器,B 处理器处理完后再传递给 C 处理器,以此类推,形成一个链条。链条上的每个处理器各自承担各自的处理职责,所以叫作责任链模式。
责任链模式有效地降低了发送和接收者之间的耦合度,增强了系统的可扩展性。在责任链的模式下不仅能够针对单个处理器对象进行定制升级(每个处理器对象关注各自的任务),而且能够对整个责任链的处理器对象的顺序的调整以及增删。
本文约定:责任链上的接收对象统一称为处理器;本文中介绍的责任链属于GOF定义中责任链的变种即责任链上的所有处理器都会参与任务的处理。
二、责任链实现
责任链模式有多种实现方式,从驱动责任链上处理器方式的角度可以分类两类,即责任链驱动和责任链处理器自驱动。
2.1 处理器自驱动
// 1、定义抽象类
public abstract class AbstractHandler {
protected Handler next = null;
// 绑定处理器
public void setSuccessor(Handler next) {
this.next = next;
}
// 处理器执行操作并驱动下一个处理器
服务器托管网 public abstract void handle();
}
// 2、定义处理器A
public class HandlerA extends AbstractHandler {
@Override
public void handle() {
// do something
if (next != null) {
next.handle();
}
}
}
// 3、定义处理器B
public class HandlerB extends AbstractHandler {
@Override
public void handle() {
// do something
if (next != null) {
next.handle();
}
}
}
// 4、构建责任链并添加处理器
public class HandlerChain {
// 通过链表的形式保存责任链
private AbstractHandler head = null;
private AbstractHandler tail = null;
public void addHandler(AbstractHandler handler) {
handler.setSuccessor(null);
if (head == null) {
head = handler;
tail = handler;
return;
}
tail.setSuccessor(handler);
tail = handler;
}
public void handle() {
if (head != null) {
head.handle();
}
}
}
// 5、整体构建责任链添加处理器并进行驱动
public class Application {
public static void main(String[] args) {
// 构建责任链并添加处理器
HandlerChain chain = new HandlerChain();
chain.addHandler(new HandlerA());
chain.addHandler(new HandlerB());
// 责任链负责触发
chain.handle();
}
}
说明:
-
责任链上的每个处理器对象维护下一个处理器对象,整个责任链的驱动由每个处理器对象自行驱动。
-
每个处理器对象Handler中包含下一个处理器对象next的变量,通过链表形式维护责任链的关系。
2.2 责任链驱动
// 1、定义抽象接口
public interface IHandler {
void doSomething();
}
// 2、定义处理器A
public class HandlerA implements IHandler {
@Override
public void doSomething() {
// do something
}
}
// 3、定义处理器B
public class HandlerB implements IHandler {
@Override
public void doSomething() {
// do something
}
}
// 4、构建责任链并添加处理器
public class HandlerChain {
// 通过数组的形式保存处理器
private List handlers = new ArrayList();
public void addHandler(IHandler handler) {
handlers.add(handler);
}
// 由责任链负责遍历所有的处理器并进行调用
public void handle() {
for (IHandler handler : handlers) {
handler.handle();
}
}
}
// 5、整体构建责任链添加处理器并进行驱动
public class Application {
public static void main(String[] args) {
HandlerChain chain = new HandlerChain();
chain.addHandler(new HandlerA());
chain.addHandler(new HandlerB());
chain.handle();
}
}
说明:
-
责任链对象本身以数组的形式维护处理器对象,即上述代码中的handlers 。
-
责任链的处理器的执行由责任链对象循环调用处理器对象驱动,即上述代码中的handle方法。
三、开源框架中责任链应用
责任链低耦合高扩展的特点让它在很多开源的框架中被采用,本文选取了开源框架中的Spring Interceptor、Servlet Filter、Dubbo、Sentinel进行责任链的实现介绍,通过对常用框架中责任链应用的了解能够更好掌握责任链落地并在日常的开发中积极的使用。
3.1 Spring Interceptor
3.1.1 Interceptor介绍
-
Spring中的拦截器(Interceptor) 用于拦截控制器方法的执行,可以在方法执行前后添加自定义逻辑类似于AOP编程思想。
-
Inteceptor的作用时机是在请求(request)进入servlet后,在进入Controller之前进行预处理。
-
Inteceptor的实际应用包括:认证授权、日志记录、字符编码转换,敏感词过滤等等。
-
Inteceptor中责任链的实现会从处理器的介绍,责任链的构建以及责任链的执行三个角度进行阐述。
3.1.2 处理器介绍
public interface HandlerInterceptor {
boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception;
void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception;
void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception;
}
@Component
public class TimeInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 前置处理
System.out.println("time interceptor preHandle");
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
// 后置处理
System.out.println("time interceptor postHandle");
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
System.out.println("time interceptor afterCompletion");
}
}
说明:
-
处理器Interceptor的接口HandlerInterceptor定义了三个方法,可在控制器方法执行前后添加自定义逻辑。
-
自定义处理器如上的TimeInterceptor需要自定义实现上述3个方法实现自我的逻辑。
-
所有的自定义处理会串联在HandlerExecutionChain类实现的责任链上。
3.1.3 责任链构建
public class HandlerExecutionChain {
private final Object handler;
private HandlerInterceptor[] interceptors;
private List interceptorList;
private int interceptorIndex = -1;
public void addInterceptor(HandlerInterceptor interceptor) {
// 添加拦截器
initInterceptorList().add(interceptor);
}
public void addInterceptors(HandlerInterceptor... interceptors) {
if (!ObjectUtils.isEmpty(interceptors)) {
CollectionUtils.mergeArrayIntoCollection(interceptors, initInterceptorList());
}
}
private List initInterceptorList() {
if (this.interceptorList == null) {
this.interceptorList = new ArrayList();
if (this.interceptors != null) {
// An interceptor array specified through the constructor
CollectionUtils.mergeArrayIntoCollection(this.interceptors, this.interceptorList);
}
}
this.interceptors = null;
return this.interceptorList;
}
}
说明:
-
HandlerExecutionChain类作为串联Interceptor处理器的责任链负责责任链的构建和执行。
-
HandlerExecutionChain类通过集合对象interceptorList保存所有相关的处理器对象。
3.1.4 责任链执行
public class DispatcherServlet extends FrameworkServlet {
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
try {
try {
// mappedHandler代表的是HandlerExecutionChain责任链 mappedHandler = getHandler(processedRequest);
HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());
// 1、执行mappedHandler的applyPreHandle方法
if (!mappedHandler.applyPreHandle(processedRequest, response)) {
return;
}
// 2、执行controller的执行逻辑
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}
applyDefaultViewName(processedRequest, mv);
// 执行mappedHandler的applyPostHandle方法
mappedHandler.applyPostHandle(processedRequest, response, mv);
}
catch (Exception ex) {
}
processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException);
}
catch (Exception ex) {
}
finally {
}
}
}
public class HandlerExecutionChain {
private final Object handler;
private HandlerInterceptor[] interceptors;
private List interceptorList;
private int interceptorIndex = -1;
boolean applyPreHandle(HttpServletRequest request, HttpServletResponse response) throws Exception {
HandlerInterceptor[] interceptors = getInterceptors();
if (!ObjectUtils.isEmpty(interceptors)) {
// 责任链从前往后的顺序执行
for (int i = 0; i = 0; i--) {
HandlerInterceptor interceptor = interceptors[i];
interceptor.postHandle(request, response, this.handler, mv);
}
}
}
}
说明:
-
在servlet的doDispatch方法中依次触发责任链的applyPreHandle的前置处理方法、applyPostHandle的后置处理方法。
-
前置处理方法applyPreHandle会遍历责任链上的处理器从前往后依次处理,后置处理方法applyPostHandle会遍历责任链上的处理器从后往前依次处理。
-
处理器的驱动由责任链对象负责依次触发,非处理器对象自驱执行。
3.2 Servlet Filter
3.2.1 Filter介绍
-
Servlet过滤器是在Java Servlet规范2.3中定义的,它能够对Servlet容器的请求和响应对象进行检查和修改,是个典型的责任链。
-
在Servlet被调用之前检查Request对象并支持修改Request Header和Request内容。
-
在Servlet被调用之后检查Response对象并支修改Response Header和Response内容。
3.2.2 处理器介绍
public interface Filter {
public void init(FilterConfig filterConfig) throws ServletException;
public void doFilter ( ServletRequest request, ServletResponse response, FilterChain chain ) throws IOException, ServletException;
public void destroy();
}
public class TimeFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
System.out.println("time filter init");
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
// 1、执行处理的逻辑
System.out.println("time filter doFilter");
// 2、执行责任链当中的下一个 Filter 对象,等价于执行 FilterChain 的internalDoFilter方法
filterChain.doFilter(servletRequest, servletResponse);
}
}
说明:
-
Servlet过滤器类要实现javax.servlet.Filter接口,该接口定义了通用的3个方法。
-
init方法:负责Servlet过滤器的初始化方法,Servlet容器创建Servlet过滤器实例过程中调用这个方法。
-
doFilter方法:当客户请求访问与过滤器关联的URL时,Servlet容器会调用该方法。
-
destroy方法:Servlet容器在销毁过滤器实例前调用该方法,可以释放过滤器占用的资源。
3.2.3 责任链构建
public final class ApplicationFilterChain implements FilterChain {
// 责任链上 Filter 的维护对象
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
//责任链上待执行的 Filter 对象
private int pos = 0;
// 责任链上拥有的 Filter 数量
private int n = 0;
void addFilter(ApplicationFilterConfig filterConfig) {
// 避免重复添加Filter
for(ApplicationFilterConfig filter:filters)
if(filter==filterConfig)
return;
// 按需进行扩容
if (n == filters.length) {
ApplicationFilterConfig[] newFilters =
new ApplicationFilterConfig[n + INCREMENT];
System.arraycopy(filters, 0, newFilters, 0, n);
filters = newFilters;
}
// 保存Filter 对象
filters[n++] = filterConfig;
}
}
说明:
-
ApplicationFilterChain作为Filter的责任链,负责责任链的构建和执行。
-
责任链通过ApplicationFilterConfig类型的数组对象filters保存Filter处理器。
-
责任链上处理器的添加通过保存到数组filters来实现。
3.2.4 责任链执行
public final class ApplicationFilterChain implements FilterChain {
// 责任链上 Filter 的维护对象
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
//责任链上待执行的 Filter 对象
private int pos = 0;
// 责任链上拥有的 Filter 数量
private int n = 0;
// 责任链的执行
private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
// 在责任链未执行完的情况下执行责任链 if (pos
说明:
-
整个责任链上Filter处理器的执行通过处理器自驱进行实现,而非由责任链对象驱动。
-
Filter处理器的在处理过程中除了执行自我逻辑,会通过 filterChain.doFilter(servletRequest, servletResponse) 触发下一个处理器的执行。
3.3 Dubbo
3.3.1 Dubbo Filter介绍
图片分享自《DUBBO官网》
-
Dubbo的Filter作用时机如上图所示,Filter实现是专门为服务提供方和服务消费方调用过程进行拦截,Dubbo本身的大多功能均基于此扩展点实现,每次远程方法执行该拦截都会被执行。
-
Dubbo官方针对Filter做了很多的原生支持,目前大致有20来个吧,包括我们熟知的RpcContext,accesslog功能都是通过filter来实现了。
-
在实际业务开发中会对Filter接口进行扩展,在服务调用链路中嵌入我们自身的处理逻辑,如日志打印、调用耗时统计等。
3.3.2 处理器介绍
@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
public class AccessLogFilter implements Filter {
@Override
public Result invoke(Invoker> invoker, Invocation inv) throws RpcException {
try {
if (ConfigUtils.isNotEmpty(accessLogKey)) {
AccessLogData logData = buildAccessLogData(invoker, inv);
log(accessLogKey, logData);
}
} catch (Throwable t) {
}
// 执行下一个invoker
return invoker.invoke(inv);
}
}
说明:
-
Dubbo中的自定义Filter需要实现org.apache.dubbo.rpc.Filter类,内部通过实现invoke方法来实现自定义逻辑。
-
自定义Filter内部除了实现必要的自定义逻辑外,核心的需要通过invoker.invoke(inv)触发下一个过滤器的执行。
3.3.3 责任链构建
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
this.protocol = protocol;
}
private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
// 最后的 Invoker 对象
Invoker last = invoker;
// 遍历所有 Filter 对象,构建责任链 List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
// 每个 Filter 封装成一个 Invoker 对象,通过 filter.invoke进行串联
final Filter filter = filters.get(i);
final Invoker next = last;
last = new Invoker() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
}
// 封装了Filter的invoker对象
static final class ProtocolFilterWrapper.1 implements Invoker {
final Invoker val$invoker;
final Filter val$filter;
// 指向下一个Invoker的变量
final Invoker val$next;
public Result invoke(Invocation invocation) throws RpcException {
return this.val$filter.invoke(this.val$next, invocation);
}
ProtocolFilterWrapper.1(Invoker invoker, Filter filter, Invoker invoker2) {
this.val$invoker = invoker;
this.val$filter = filter;
this.val$next = invoker2;
}
}
说明:
-
ProtocolFilterWrapper 通过 buildInvokerChain 构建 Dubbo Filter 的责任链。
-
责任链上的处理器对象是将 Filter 封装的 Invoker 对象,每个 Invoker 对象指向下一个处理器封装的Invoker对象。
3.3.4 责任链执行
public class FailfastClusterInvoker extends AbstractClusterInvoker {
public FailfastClusterInvoker(Directory directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker invoker = select(loadbalance, invocation, invokers, null);
try {
// 执行封装了Filter的invoker对象,驱动处理器的执行
return invoker.invoke(invocation);
} catch (Throwable e) {
}
}
}
static final class ProtocolFilterWrapper.1 implements Invoker {
final Invoker val$invo服务器托管网ker;
final Filter val$filter;
final Invoker val$next;
public Result invoke(Invocation invocation) throws RpcException {
return this.val$filter.invoke(this.val$next, invocation);
}
ProtocolFilterWrapper.1(Invoker invoker, Filter filter, Invoker invoker2) {
this.val$invoker = invoker;
this.val$filter = filter;
this.val$next = invoker2;
}
说明:
-
每个Invoker对象invoke方法会执行自定义逻辑,并触发下一个处理器的执行。
-
整个责任链上处理器的执行通过Invoker对象的驱动,而非责任链对象的驱动。
3.4 Sentinel
3.4.1 Sentinel Slot介绍
图片分享自《Sentinel官网》
-
Sentinel是面向分布式服务架构的流量治理组件,以流量为切入点提供熔断限流的功能保证系统的稳定性。
-
Sentinel 里面以Entry作为限流的资源对象,每个Entry创建的同时会关联一系列功能插槽(slot chain)。
-
Sentinel提供了通用的原生Slot处理不同的逻辑,同时支持自定义Slot来定制功能。
3.4.2 处理器介绍
public interface ProcessorSlot {
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable;
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args) throws Throwable;
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
public abstract class AbstractLinkedProcessorSlot implements ProcessorSlot {
private AbstractLinkedProcessorSlot> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 触发下一个处理器对象的处理
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
// 执行具体处理器的逻辑,由具体的处理器自行实现
entry(context, resourceWrapper, t, count, prioritized, args);
}
public void setNext(AbstractLinkedProcessorSlot> next) {
// 绑定下一个处理器的逻辑
this.next = next;
}
}
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot
说明:
-
Sentinel 中的 Slot 需要实现 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 的通用接口。
-
自定义 Slot 一般继承抽象类 AbstractLinkedProcessorSlot 且只要改写 entry/exit 方法实现自定义逻辑。
-
Slot 通过 next 变量保存下一个处理器Slot对象。
-
在自定义实现的 entry 方法中需要通过 fireEntry 触发下一个处理器的执行,在 exit 方法中通过fireExit 触发下一个处理器的执行。
3.4.3 责任链构建
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
// 责任链的头部对象ProcessorSlotChain
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// sortedSlotList获取所有的处理器对象
List sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
continue;
}
// 通过尾添法将职责slot添加到DefaultProcessorSlotChain当中
chain.addLast((AbstractLinkedProcessorSlot>) slot);
}
return chain;
}
}
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// 创建DefaultProcessorSlotChain的头尾节点first和end
AbstractLinkedProcessorSlot> first = new AbstractLinkedProcessorSlot
说明:
-
ProcessorSlotChain作为Slot的责任链,负责责任链的构建和执行。
-
责任链上的处理器对象 AbstractLinkedProcessorSlot 通过保存指向下一个处理器的对象的进行关联,整体以链表的形式进行串联。
-
责任链上的第一个处理器对象first本身不起任何作用,只是保存链表的头部。
3.4.4 责任链执行
public class CtSph implements Sph {
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
// 省略相关代码
ProcessorSlot
说明:
-
整个责任链上处理器的执行通过Invoker对象的驱动,而非责任链对象的驱动。
-
DefaultProcessorSlotChain的entry首先头部对象first,进而触发处理器的自驱实现处理器的执行。
-
整体按照entry → fireEntry → transformEntry → entry 的循环顺序依次触发处理器的自驱。
四、实践总结
在日常项目实践中,责任链的设计模式会在很多业务场景中落地。
譬如对于支持用户生成内容(UGC)的应用来说,用户生成的内容可能包含一些敏感内容如敏感言论或者图片等。针对这种应用场景,可以通过责任链模式设置多个处理器来处理不同的任务,如文本过滤器处理敏感词,图片过滤器处理敏感图片等等。
譬如对于电商服务中的下单流程来说,一个下单流程包含订单拆合单,优惠计算,订单生成等多个步骤,我们可以通过责任链模式设置多个处理器来处理不同的任务等等。
责任链的应用场景非常广泛,在常见的开源框架中有丰富的落地场景,同样在业务开发中也可以根据场景灵活使用。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net