1 模板方法
public abstract class AbstractInvoker implements Invoker { @Override public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { Throwable te = e.getTargetException(); if (te == null) { return new RpcResult(e); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return new RpcResult(te); } } catch (RpcException e) { if (e.isBiz()) { return new RpcResult(e); } else { throw e; } } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable;}AbstractInvoker作为抽象父类界说了invoke方法这个方法骨架,并且界说了doInvoke抽象方法供子类扩展,例如子类InjvmInvoker、DubboInvoker各自实现了doInvoke方法。
class InjvmInvoker extends AbstractInvoker { @Override public Result doInvoke(Invocation invocation) throws Throwable { Exporter exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE, 0); return exporter.getInvoker().invoke(invocation); }}DubboInvoker相对复杂一些,需要考虑同步异步调用方式,配置优先级,长途通信等等。
public class DubboInvoker extends AbstractInvoker { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients; } else { currentClient = clients; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 超时时间方法级别配置优先级最高 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter futureAdapter = new FutureAdapter(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }}
2 动态代理
2.1 JDK动态代理
public interface StudentJDKService { public void addStudent(String name); public void updateStudent(String name);}public class StudentJDKServiceImpl implements StudentJDKService { @Override public void addStudent(String name) { System.out.println("add student=" + name); } @Override public void updateStudent(String name) { System.out.println("update student=" + name); }}第二步界说一个事务代理对象:
public class TransactionInvocationHandler implements InvocationHandler { private Object target; public TransactionInvocationHandler(Object target) { this.target = target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("------前置通知------"); Object rs = method.invoke(target, args); System.out.println("------后置通知------"); return rs; }}第三步界说代理工厂:
public class ProxyFactory { public Object getProxy(Object target, InvocationHandler handler) { ClassLoader loader = this.getClass().getClassLoader(); Class[] interfaces = target.getClass().getInterfaces(); Object proxy = Proxy.newProxyInstance(loader, interfaces, handler); return proxy; }}第四步进行测试:
public class ZTest { public static void main(String[] args) throws Exception { testSimple(); } public static void testSimple() { StudentJDKService target = new StudentJDKServiceImpl(); TransactionInvocationHandler handler = new TransactionInvocationHandler(target); ProxyFactory proxyFactory = new ProxyFactory(); Object proxy = proxyFactory.getProxy(target, handler); StudentJDKService studentService = (StudentJDKService) proxy; studentService.addStudent("JAVA火线"); }}ProxyGenerator.generateProxyClass是生成字节码文件核心方法,我们看一看动态字节码到底如何界说:
public class ZTest { public static void main(String[] args) throws Exception { createProxyClassFile(); } public static void createProxyClassFile() { String name = "Student$Proxy"; byte[] data = ProxyGenerator.generateProxyClass(name, new Class[] { StudentJDKService.class }); FileOutputStream out = null; try { String fileName = "c:/test/" + name + ".class"; File file = new File(fileName); out = new FileOutputStream(file); out.write(data); } catch (Exception e) { System.out.println(e.getMessage()); } finally { if (null != out) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } }}最终生成字节码文件如下,我们看到代理对象被织入了目的对象:
import com.xpz.dubbo.simple.jdk.StudentJDKService;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.lang.reflect.UndeclaredThrowableException;public final class Student$Proxy extends Proxy implements StudentJDKService { private static Method m1; private static Method m2; private static Method m4; private static Method m3; private static Method m0; public Student$Proxy(InvocationHandler paramInvocationHandler) { super(paramInvocationHandler); } public final boolean equals(Object paramObject) { try { return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue(); } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final String toString() { try { return (String)this.h.invoke(this, m2, null); } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final void updateStudent(String paramString) { try { this.h.invoke(this, m4, new Object[] { paramString }); return; } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final void addStudent(String paramString) { try { this.h.invoke(this, m3, new Object[] { paramString }); return; } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final int hashCode() { try { return ((Integer)this.h.invoke(this, m0, null)).intValue(); } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } static { try { m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") }); m2 = Class.forName("java.lang.Object").getMethod("toString", new Class); m4 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("updateStudent", new Class[] { Class.forName("java.lang.String") }); m3 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("addStudent", new Class[] { Class.forName("java.lang.String") }); m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class); return; } catch (NoSuchMethodException noSuchMethodException) { throw new NoSuchMethodError(noSuchMethodException.getMessage()); } catch (ClassNotFoundException classNotFoundException) { throw new NoClassDefFoundError(classNotFoundException.getMessage()); } }}
2.2 DUBBO源码应用
public class JdkProxyFactory extends AbstractProxyFactory { @Override publicT getProxy(Invoker invoker, Class[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); }}public class JavassistProxyFactory extends AbstractProxyFactory { @Override publicT getProxy(Invoker invoker, Class[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }}InvokerInvocationHandler将参数信息封装至RpcInvocation进行通报:
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker invoker; public InvokerInvocationHandler(Invoker handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args); } // RpcInvocation , arguments=, attachments={}] RpcInvocation rpcInvocation = createInvocation(method, args); return invoker.invoke(rpcInvocation).recreate(); } private RpcInvocation createInvocation(Method method, Object[] args) { RpcInvocation invocation = new RpcInvocation(method, args); if (RpcUtils.hasFutureReturnType(method)) { invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } return invocation; }}
3 策略模式
Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it
(1) 当需求变化时应该通过扩展而不是通过修改已有代码来实现变化,这样就包管代码的稳固性,避免牵一发而动全身
(2) 扩展也不是随意扩展,因为事先界说了算法,扩展也是根据算法扩展,体现了用抽象构建框架,用实现扩展细节
(3) 标准意义的二十三种计划模式说到底最终都是在遵照开闭原则
3.1 策略模式实例
public enum DocTypeEnum { HTML(1, "HTML"), TEXT(2, "TEXT"); private int value; private String description; private DocTypeEnum(int value, String description) { this.value = value; this.description = description; } public int value() { return value; } }public class ParserManager { public void parse(Integer docType, String content) { // 文本范例是HTML if(docType == DocTypeEnum.HTML.getValue()) { // 解析逻辑 } // 文本范例是TEXT else if(docType == DocTypeEnum.TEXT.getValue()) { // 解析逻辑 } }}这种写法功能上没有题目,但是当本文范例越来越多时,那么parse方法将会越来越冗余和复杂,if else代码块也会越来越多,所以我们要使用策略模式。
public enum DocTypeEnum { HTML(1, "HTML"), TEXT(2, "TEXT"); private int value; private String description; private DocTypeEnum(int value, String description) { this.value = value; this.description = description; } public int value() { return value; }}public class BaseModel { // 公共字段}public class HtmlContentModel extends BaseModel { // HTML自界说字段}public class TextContentModel extends BaseModel { // TEXT自界说字段}第二步界说策略:
public interface Strategy { public T parse(String sourceContent);}@Servicepublic class HtmlStrategy implements Strategy { @Override public HtmlContentModel parse(String sourceContent) { return new HtmlContentModel("html"); }}@Servicepublic class TextStrategy implements Strategy { @Override public TextContentModel parse(String sourceContent) { return new TextContentModel("text"); }}第三步界说策略工厂:
@Servicepublic class StrategyFactory implements InitializingBean { private Map strategyMap = new HashMap(); @Resource private Strategy htmlStrategy ; @Resource private Strategy textStrategy ; @Override public void afterPropertiesSet() throws Exception{ strategyMap.put(RechargeTypeEnum.HTML.value(), htmlStrategy); strategyMap.put(RechargeTypeEnum.TEXT.value(),textStrategy); } public Strategy getStrategy(int type) { return strategyMap.get(type); }} 第四步界说策略执行器:
@Servicepublic class StrategyExecutor { @Resource private StrategyFactory strategyFactory; public T parse(String sourceContent, Integer type) { Strategy strategy = StrategyFactory.getStrategy(type); return strategy.parse(sourceContent); }}第五步执行测试用例:
public class Test { @Resource private StrategyExecutorexecutor; @Test public void test() { // 解析HTML HtmlContentModel content1 = (HtmlContentModel) executor.parse("测试内容",DocTypeEnum.HTML.value()); System.out.println(content1); // 解析TEXT TextContentModel content2 = (TextContentModel)executor.calRecharge("测试内容",DocTypeEnum.TEXT.value()); System.out.println(content2); }}假如新增文本范例我们再扩展新策略即可。我们再回顾策略模式界说会有更深的体会:界说一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。
3.2 DUBBO源码应用
SPI(Service Provider Interface)是一种服务发现机制,本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件加载实现类,这样可以在运行时动态为接口替换实现类,我们通过SPI机制可以为程序提供拓展功能。
3.2.1 JDK SPI
(1) 新建DataBaseDriver工程并界说接口
public interface DataBaseDriver { String connect(String hostIp);}(2) 打包这个工程为JAR
com.javafont.spiDataBaseDriver1.0.0-SNAPSHOT(3) 新建MySQLDriver工程引用上述依赖并实现DataBaseDriver接口
import com.javafont.database.driver.DataBaseDriver;public class MySQLDataBaseDriver implements DataBaseDriver { @Override public String connect(String hostIp) { return "MySQL DataBase Driver connect"; }}(4) 在MySQLDriver项目新建文件
src/main/resources/META-INF/services/com.javafont.database.driver.DataBaseDriver(5) 在上述文件新增如下内容
com.javafont.database.mysql.driver.MySQLDataBaseDriver(6) 按照上述雷同步骤创建工程OracleDriver
(7) 打包上述两个项目
com.javafont.spiMySQLDriver1.0.0-SNAPSHOTcom.javafont.spiOracleDriver1.0.0-SNAPSHOT(8) 新建测试项目引入上述MySQLDriver、OracleDriver
public class DataBaseConnector { public static void main(String[] args) { ServiceLoader serviceLoader = ServiceLoader.load(DataBaseDriver.class); Iterator iterator = serviceLoader.iterator(); while (iterator.hasNext()) { DataBaseDriver driver = iterator.next(); System.out.println(driver.connect("localhost")); } }}// 输出效果// MySQL DataBase Driver connect// Oracle DataBase Driver connect我们并没有指定使用哪个驱动连接数据库,而是通过ServiceLoader方式加载所有实现了DataBaseDriver接口的实现类。假设我们只需要使用MySQL数据库驱动那么直接引入相应依赖即可。
我们发现JDK SPI机制照旧有一些不完善之处:例如通过ServiceLoader会加载所有实现了某个接口的实现类,但是不能通过一个key去指定获取哪一个实现类,但是DUBBO自己实现的SPI机制解决了这个题目。
META-INF/services/META-INF/dubbo/META-INF/dubbo/internal/配置方式和JDK SPI方式配置不一样,每个实现类都有key与之对应:
public class ReferenceConfig extends AbstractReferenceConfig { private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private T createProxy(Map map) { if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } }}getAdaptiveExtension()是加载自顺应扩展点,javassist会为自顺应扩展点生成动态代码:
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol { public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); org.apache.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys()"); org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); }}extension对象就是根据url中protocol属性即是injvm最终加载InjvmProtocol对象,动态获取到了我们订定的业务对象,所以我认为SPI体现了策略模式。
4 装饰器模式
(1) Component(抽象构件)
(2) ConcreteComponent(具体构件)
(3) Decorator(抽象装饰器)
(4) ConcreteDecorator(具体装饰器)
4.1 装饰器实例
(1) Component
/** * 抽象构件(可以用接口替换) */public abstract class Component { /** * 踢足球(业务核心方法) */ public abstract void playFootBall();}(2) ConcreteComponent
/** * 具体构件 */public class ConcreteComponent extends Component { @Override public void playFootBall() { System.out.println("球员踢球"); }}(3) Decorator
/** * 抽象装饰器 */public abstract class Decorator extends Component { private Component component = null; public Decorator(Component component) { this.component = component; } @Override public void playFootBall() { this.component.playFootBall(); }}(4) ConcreteDecorator
/** * 球袜装饰器 */public class ConcreteDecoratorA extends Decorator { public ConcreteDecoratorA(Component component) { super(component); } /** * 界说球袜装饰逻辑 */ private void decorateMethod() { System.out.println("换上球袜战力值增长"); } /** * 重写父类方法 */ @Override public void playFootBall() { this.decorateMethod(); super.playFootBall(); }}/** * 球鞋装饰器 */public class ConcreteDecoratorB extends Decorator { public ConcreteDecoratorB(Component component) { super(component); } /** * 界说球鞋装饰逻辑 */ private void decorateMethod() { System.out.println("换上球鞋战力值增长"); } /** * 重写父类方法 */ @Override public void playFootBall() { this.decorateMethod(); super.playFootBall(); }}(5) 运行测试
public class TestDecoratorDemo { public static void main(String[] args) { Component component = new ConcreteComponent(); component = new ConcreteDecoratorA(component); component = new ConcreteDecoratorB(component); component.playFootBall(); }}// 换上球鞋战力值增长// 换上球袜战力值增长// 球员踢球
4.2 DUBBO源码应用
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }}public class ProtocolListenerWrapper implements Protocol { private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }}在配置文件中配置装饰器:
public class ReferenceConfig extends AbstractReferenceConfig { private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private T createProxy(Map map) { if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } }}最终生成refprotocol为如下对象:
5 责任链模式
5.1 应用场景:命中立即中断
(1) 实现方式一
public interface ContentFilter { public boolean filter(String content);}public class AaaContentFilter implements ContentFilter { private final static String KEY_CONTENT = "aaa"; @Override public boolean filter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}public class BbbContentFilter implements ContentFilter { private final static String KEY_CONTENT = "bbb"; @Override public boolean filter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}public class CccContentFilter implements ContentFilter { private final static String KEY_CONTENT = "ccc"; @Override public boolean filter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}具体过滤器已经完成,我们下面构造过滤器责任链路:
@Servicepublic class ContentFilterChain { private List filters = new ArrayList(); @PostConstruct public void init() { ContentFilter aaaContentFilter = new AaaContentFilter(); ContentFilter bbbContentFilter = new BbbContentFilter(); ContentFilter cccContentFilter = new CccContentFilter(); filters.add(aaaContentFilter); filters.add(bbbContentFilter); filters.add(cccContentFilter); } public void addFilter(ContentFilter filter) { filters.add(filter); } public boolean filter(String content) { if (CollectionUtils.isEmpty(filters)) { throw new RuntimeException("ContentFilterChain is empty"); } for (ContentFilter filter : filters) { boolean isValid = filter.filter(content); if (!isValid) { System.out.println("校验不通过"); return isValid; } } return Boolean.TRUE; }}public class Test { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); ContentFilterChain chain = (ContentFilterChain) context.getBean("contentFilterChain"); System.out.println(context); boolean result1 = chain.filter("ccc"); boolean result2 = chain.filter("ddd"); System.out.println("校验效果1=" + result1); System.out.println("校验效果2=" + result2); }}
(2) 实现方式二
public abstract class FilterHandler { /** 下一个节点 **/ protected FilterHandler successor = null; public void setSuccessor(FilterHandler successor) { this.successor = successor; } public final boolean filter(String content) { /** 执行自身方法 **/ boolean isValid = doFilter(content); if (!isValid) { System.out.println("校验不通过"); return isValid; } /** 执行下一个节点链路 **/ if (successor != null && this != successor) { isValid = successor.filter(content); } return isValid; } /** 每个节点过滤方法 **/ protected abstract boolean doFilter(String content);}public class AaaContentFilterHandler extends FilterHandler { private final static String KEY_CONTENT = "aaa"; @Override protected boolean doFilter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}// 省略其它过滤器代码具体过滤器已经完成,我们下面构造过滤器责任链路:
@Servicepublic class FilterHandlerChain { private FilterHandler head = null; private FilterHandler tail = null; @PostConstruct public void init() { FilterHandler aaaHandler = new AaaContentFilterHandler(); FilterHandler bbbHandler = new BbbContentFilterHandler(); FilterHandler cccHandler = new CccContentFilterHandler(); addHandler(aaaHandler); addHandler(bbbHandler); addHandler(cccHandler); } public void addHandler(FilterHandler handler) { if (head == null) { head = tail = handler; } /** 设置当前tail继任者 **/ tail.setSuccessor(handler); /** 指针重新指向tail **/ tail = handler; } public boolean filter(String content) { if (null == head) { throw new RuntimeException("FilterHandlerChain is empty"); } /** head发起调用 **/ return head.filter(content); }}public class Test { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); FilterHandlerChain chain = (FilterHandlerChain) context.getBean("filterHandlerChain"); System.out.println(context); boolean result1 = chain.filter("ccc"); boolean result2 = chain.filter("ddd"); System.out.println("校验效果1=" + result1); System.out.println("校验效果2=" + result2); }}
5.2 应用场景:全链路执行
(1) 实现方式一
public interface QuestionGenerator { public Question generateQuestion(String gradeInfo);}public class AaaQuestionGenerator implements QuestionGenerator { @Override public Question generateQuestion(String gradeInfo) { if (!gradeInfo.equals("一年级")) { return null; } Question question = new Question(); question.setId("aaa"); question.setScore(10); return question; }}// 省略其它生成器代码具体生成器已经编写完成,我们下面构造生成器责任链路:
@Servicepublic class QuestionChain { private List generators = new ArrayList(); @PostConstruct public void init() { QuestionGenerator aaaQuestionGenerator = new AaaQuestionGenerator(); QuestionGenerator bbbQuestionGenerator = new BbbQuestionGenerator(); QuestionGenerator cccQuestionGenerator = new CccQuestionGenerator(); generators.add(aaaQuestionGenerator); generators.add(bbbQuestionGenerator); generators.add(cccQuestionGenerator); } public List generate(String gradeInfo) { if (CollectionUtils.isEmpty(generators)) { throw new RuntimeException("QuestionChain is empty"); } List questions = new ArrayList(); for (QuestionGenerator generator : generators) { Question question = generator.generateQuestion(gradeInfo); if (null == question) { continue; } questions.add(question); } return questions; }}public class Test { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); System.out.println(context); QuestionChain chain = (QuestionChain) context.getBean("questionChain"); List questions = chain.generate("一年级"); System.out.println(questions); }}
(2) 实现方式二
public abstract class GenerateHandler { /** 下一个节点 **/ protected GenerateHandler successor = null; public void setSuccessor(GenerateHandler successor) { this.successor = successor; } public final List generate(String gradeInfo) { List result = new ArrayList(); /** 执行自身方法 **/ Question question = doGenerate(gradeInfo); if (null != question) { result.add(question); } /** 执行下一个节点链路 **/ if (successor != null && this != successor) { List successorQuestions = successor.generate(gradeInfo); if (null != successorQuestions) { result.addAll(successorQuestions); } } return result; } /** 每个节点生成方法 **/ protected abstract Question doGenerate(String gradeInfo);}public class AaaGenerateHandler extends GenerateHandler { @Override protected Question doGenerate(String gradeInfo) { if (!gradeInfo.equals("一年级")) { return null; } Question question = new Question(); question.setId("aaa"); question.setScore(10); return question; }}// 省略其它生成器代码具体生成器已经完成,我们下面构造生成器责任链路:
@Servicepublic class GenerateChain { private GenerateHandler head = null; private GenerateHandler tail = null; @PostConstruct public void init() { GenerateHandler aaaHandler = new AaaGenerateHandler(); GenerateHandler bbbHandler = new BbbGenerateHandler(); GenerateHandler cccHandler = new CccGenerateHandler(); addHandler(aaaHandler); addHandler(bbbHandler); addHandler(cccHandler); } public void addHandler(GenerateHandler handler) { if (head == null) { head = tail = handler; } /** 设置当前tail继任者 **/ tail.setSuccessor(handler); /** 指针重新指向tail **/ tail = handler; } public List generate(String gradeInfo) { if (null == head) { throw new RuntimeException("GenerateChain is empty"); } /** head发起调用 **/ return head.generate(gradeInfo); }}public class Test { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); GenerateChain chain = (GenerateChain) context.getBean("generateChain"); System.out.println(context); List result = chain.generate("一年级"); System.out.println(result); }}
5.3 DUBBO源码应用
生产者过滤器链路EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvoker消费者过滤器链路ConsumerContextFilter > FutureFilter > MonitorFilter > DubboInvokerProtocolFilterWrapper作为链路生成核心通过匿名类方式构建过滤器链路,我们以消费者构建过滤器链路为例:
public class ProtocolFilterWrapper implements Protocol { private staticInvoker buildInvokerChain(final Invoker invoker, String key, String group) { // invoker = DubboInvoker Invoker last = invoker; // 查询符合条件过滤器列表 List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker next = last; // 构造一个简化Invoker last = new Invoker() { @Override public Class getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { // 构造过滤器链路 Result result = filter.invoke(next, invocation); if (result instanceof AsyncRpcResult) { AsyncRpcResult asyncResult = (AsyncRpcResult) result; asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation)); return asyncResult; } else { return filter.onResponse(result, invoker, invocation); } } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } @Override publicInvoker refer(Class type, URL url) throws RpcException { // RegistryProtocol不构造过滤器链路 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } Invoker invoker = protocol.refer(type, url); return buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); }}
6 保护性暂停模式
6.1 保护性暂停实例
class Resource { private MyData data; private Object lock = new Object(); public MyData getData(int timeOut) { synchronized (lock) { // 运行时长 long timePassed = 0; // 开始时间 long begin = System.currentTimeMillis(); // 假如效果为空 while (data == null) { try { // 假如运行时长大于超时时间退出循环 if (timePassed > timeOut) { break; } // 假如运行时长小于超时时间表示虚伪唤醒 -> 只需再等待时间差值 long waitTime = timeOut - timePassed; // 等待时间差值 lock.wait(waitTime); // 效果不为空直接返回 if (data != null) { break; } // 被唤醒后计算运行时长 timePassed = System.currentTimeMillis() - begin; } catch (InterruptedException e) { e.printStackTrace(); } } if (data == null) { throw new RuntimeException("超时未获取到效果"); } return data; } } public void sendData(MyData data) { synchronized (lock) { this.data = data; lock.notifyAll(); } }}/** * 保护性暂停实例 */public class ProtectDesignTest { public static void main(String[] args) { Resource resource = new Resource(); new Thread(() -> { try { MyData data = new MyData("hello"); System.out.println(Thread.currentThread().getName() + "生产数据=" + data); // 模拟发送耗时 TimeUnit.SECONDS.sleep(3); resource.sendData(data); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(() -> { MyData data = resource.getData(1000); System.out.println(Thread.currentThread().getName() + "接收到数据=" + data); }, "t2").start(); }}
6.2 加一个编号
@Getter@Setterpublic class MyNewData implements Serializable { private static final long serialVersionUID = 1L; private static final AtomicLong ID = new AtomicLong(0); private Long id; private String message; public MyNewData(String message) { this.id = newId(); this.message = message; } /** * 自增到最大值会回到最小值(负值可以作为辨认ID) */ private static long newId() { return ID.getAndIncrement(); } public Long getId() { return this.id; }}class MyResource { private MyNewData data; private Object lock = new Object(); public MyNewData getData(int timeOut) { synchronized (lock) { long timePassed = 0; long begin = System.currentTimeMillis(); while (data == null) { try { if (timePassed > timeOut) { break; } long waitTime = timeOut - timePassed; lock.wait(waitTime); if (data != null) { break; } timePassed = System.currentTimeMillis() - begin; } catch (InterruptedException e) { e.printStackTrace(); } } if (data == null) { throw new RuntimeException("超时未获取到效果"); } return data; } } public void sendData(MyNewData data) { synchronized (lock) { this.data = data; lock.notifyAll(); } }}class MyFutures { private static final Map FUTURES = new ConcurrentHashMap(); public static MyResource newResource(MyNewData data) { final MyResource future = new MyResource(); FUTURES.put(data.getId(), future); return future; } public static MyResource getResource(Long id) { return FUTURES.remove(id); } public static Set getIds() { return FUTURES.keySet(); }}/** * 保护性暂停实例 */public class ProtectDesignTest { public static void main(String[] args) throws Exception { for (int i = 0; i < 3; i++) { final int index = i; new Thread(() -> { try { MyNewData data = new MyNewData("hello_" + index); MyResource resource = MyFutures.newResource(data); // 模拟发送耗时 TimeUnit.SECONDS.sleep(1); resource.sendData(data); System.out.println("生产数据data=" + data); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } TimeUnit.SECONDS.sleep(1); for (Long i : MyFutures.getIds()) { final long index = i; new Thread(() -> { MyResource resource = MyFutures.getResource(index); int timeOut = 3000; System.out.println("接收数据data=" + resource.getData(timeOut)); }).start(); } }}
6.3 DUBBO源码应用
我们顺着这一个链路跟踪代码:消费者发送哀求 > 提供者接收哀求并执行,并且将运行效果发送给消费者 > 消费者接收效果。
(1) 消费者发送哀求
final class HeaderExchangeChannel implements ExchangeChannel { @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }}class DefaultFuture implements ResponseFuture { // FUTURES容器 private static final Map FUTURES = new ConcurrentHashMap(); private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; // 哀求ID this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); FUTURES.put(id, this); CHANNELS.put(id, channel); }}(2) 提供者接收哀求并执行,并且将运行效果发送给消费者
public class HeaderExchangeHandler implements ChannelHandlerDelegate { void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { // response与哀求ID对应 Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // message = RpcInvocation包罗方法名、参数名、参数值等 Object msg = req.getData(); try { // DubboProtocol.reply执行实际业务方法 CompletableFuture future = handler.reply(channel, msg); // 假如哀求已经完成则发送效果 if (future.isDone()) { res.setStatus(Response.OK); res.setResult(future.get()); channel.send(res); return; } } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }}(3) 消费者接收效果
class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public static void received(Channel channel, Response response) { try { // 取出对应的哀求对象 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } @Override public Object get(int timeout) throws RemotingException { if (timeouttimeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // response对象仍然为空则抛出超时异常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } private void doReceived(Response res) { lock.lock(); try { // 接收到服务器相应赋值response response = res; if (done != null) { // 唤醒get方法中处于等待的代码块 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }}
7 双重查抄锁模式
public class MyDCLConnection { private static volatile MyDCLConnection myConnection = null; private MyDCLConnection() { System.out.println(Thread.currentThread().getName() + " -> init connection"); } public static MyDCLConnection getConnection() { if (null == myConnection) { synchronized (MyDCLConnection.class) { if (null == myConnection) { myConnection = new MyDCLConnection(); } } } return myConnection; }}在DUBBO服务本地袒露时使用了双重查抄锁模式判断exporter是否已经存在避免重复创建:
public class RegistryProtocol implements Protocol { privateExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper) bounds.get(key); if (exporter == null) { final Invoker invokerDelegete = new InvokerDelegate(originInvoker, providerUrl); final Exporter strongExporter = (Exporter) protocol.export(invokerDelegete); exporter = new ExporterChangeableWrapper(strongExporter, originInvoker); bounds.put(key, exporter); } } } return exporter; }}
8 文章总结