场景描述:
现在我们考虑用两种方式来实现远程rpc调用:
方案一:采用同步阻塞方式,单个线程按顺序做所有的事情,没有什么特别的,不赘述。
Java同步和异步的区别。方案二:采用异步方式,对3和4分别创建一个异步线程去做(这样做可能有点作,只是为了演示,勿喷)。
如果采用方案二,最麻烦的地方在于4是依赖3的,意味着4必须等到3返回之后才开始,等到4返回之后,才开始5。可以理解为在3之后还有一个处理链(pipeline),下一个节点根据上一个节点的返回值进行处理。
想想如果我们用netty,应该需要在messageReceived中接收消息后,再启动另一个线程,然后在该线程的messageReceived方法中接收消息后,再做反序列化等后继操作。如果不愿意这么做,自己定义一个状态机也可以的。
有没有一种优雅的,更易理解和易管理的方式来实现呢?
java类与类之间的调用,答案当然是有!
async包提供了一种线程安全的异步处理机制,该机制基于Twisted的pythonDeferred API,该API不需要显式定义有限状态机,采用一种简单优雅的方式来管理异步和动态pipeline(处理链)。
其中包含的类很少,其中主要的类为Deferred.class,基本上看懂了这个类就OK了。
Deferred和Future最大的差异是Deferred包含了一个callback链,能够在结果返回后进行一系列的操作。
java的异常处理机制,Deferred机制包含如下特征:
•由动态callback链组成,当deferred结果返回时,该链开始逐个按顺序触发,只有当上一个环节执行并返回结果后,下一个环节才继续执行,如果发生错误则切换到错误处理链。
•包含正常和异常两个处理链,异常链用于处理异常和错误,异常链仅能捕获Exception,不能捕获Throwable和Error。
•Deferred本身并不创建Thread线程,仅提供状态机和多个callback之间切换管理的机制。
java序列化、内部提供了一个有限状态机,如下:
新增Deferred对象(假设名称为D1)时,状态为Pending;当开始执行callback时,状态变为Running;若某个callback返回的结果为Deferred对象(假设名称为D2),则进入该D2对象中继续执行callback链,此时上一个D1对象的状态为Paused,若D2对象中所有的callback执行完毕,则返回D1对象,D1对象的状态变为Running,所有callback执行完毕后,D1状态变为Done,然后D2状态变为Done。
示例代码:
packagecom.ioe.metric.reader;
javaweb,importjava.util.concurrent.Executors;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
importcom.ioe.common.cache.LocalCache;
java线程池,importcom.stumbleupon.async.Callback;
importcom.stumbleupon.async.Deferred;
publicclassCacheTest {
publicstaticvoidmain(String[]args)throwsInterruptedException {
java编程思想?CacheTesttest=newCacheTest();
test.getValue("test");
}
privateLocalCachelocalCache=newLocalCache(3600 * 1000L,
java面向对象,100000L);
privateLocklock=newReentrantLock();
// Callback中第一个泛型参数为内部的call方法返回值类型,第二个为内部的call方法入参类型
publicclassServerQueryimplementsCallback,String> {
@Override
publicDeferred call(finalStringkey)throwsException {
//需要等异步线程返回后,才能把获取的值返回
finalDeferredd=newDeferred();
d.addCallback(newCallback() {
@Override
publicString call(Strings)throwsException {
System.out.println("query index server:"+s);
returns;
}
});
Executors.newSingleThreadExecutor().execute(newRunnable() {
@Override
publicvoidrun() {
d.callback(queryServerFromRemote(key));
}
});
returnd;
}
privateString queryServerFromRemote(Stringkey) {
return"10.1.10.100";
}
}
publicclassDataQueryimplementsCallback,String> {
Stringdata=null;
@Override
publicDeferred call(finalStringserver)throwsException {
//需要等异步线程返回后,才能把获取的值返回
finalDeferredd=newDeferred();
d.addCallback(newCallback() {
@Override
publicString call(Strings)throwsException {
System.out.println("query data success:"+s);
returns;
}
});
Executors.newSingleThreadExecutor().execute(newRunnable() {
@Override
publicvoidrun() {
d.callback(queryDataFromRemote(server));
}
});
returnd;
}
/**
*@paramserver
*@return
*/
protectedString queryDataFromRemote(Stringserver) {
return"xxxxxxx";
}
}
publicclassDecodeimplementsCallback {
@Override
publicTest call(Stringdata)throwsException {
System.out.println("try to decode the data.");
finalTestobject= decode(data);
returnobject;
}
/**
*@paramdata
*@return
*/
privateTest decode(Stringdata) {
returnnewTest("hahah");
}
}
privateTest getDataFromRemote(Stringkey) {
Testvalue=null;
try{
lock.lock();
value=localCache.get(key);
if(null==value) {
//定义一个pipeline处理链
// Deferred.fromResult(key)表示根据key来创建一个Deferred对象,返回类型会自动推断为Test
//如果使用Deferred d = new
// Deferred();然后d.callback(key);你就悲剧了
//发现怎么也编译不通过,类型还要强制转换,泛型推断不能起作用
value= Deferred.fromResult(key)
.addBothDeferring(newServerQuery())
.addBothDeferring(newDataQuery())
.addBoth(newDecode()).joinUninterruptibly();
//上面的代码形成了3个Deferred对象,其中ServerQuery一个D1,
//后面的.addBothDeferring(newDataQuery()).addBoth(new
// Decode())形成了另一个D2,
//然后在D2内部还有一个DataQuery形成的Deferred对象D3
//执行顺序为D1、D2(第一个callback后调到D3)、D3、D2(D3中callback执行完毕后,回到D2),结束顺序为D1、D2、D3
System.out.println("put data to cache. value:"+value);
localCache.put(key,value);
}
}catch(Exceptione) {
e.printStackTrace();
}finally{
lock.unlock();
}
returnvalue;
}
publicclassTest {
Stringname;
publicTest(Stringname) {
this.name=name;
}
@Override
publicString toString() {
returnname;
}
}
publicTest getValue(Stringkey) {
Testvalue=localCache.get(key);
if(null==value) {
value= getDataFromRemote(key);
}
returnvalue;
}
}
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态