Java同步和异步的区别,聊斋java_Deferred,一种Java异步管理机制

 2023-09-23 阅读 22 评论 0

摘要:场景描述:现在我们考虑用两种方式来实现远程rpc调用:方案一:采用同步阻塞方式,单个线程按顺序做所有的事情,没有什么特别的,不赘述。Java同步和异步的区别。方案二:采用异步方式,对3和4分别创建一个异步线程去做(这样

场景描述:

现在我们考虑用两种方式来实现远程rpc调用:

方案一:采用同步阻塞方式,单个线程按顺序做所有的事情,没有什么特别的,不赘述。

Java同步和异步的区别。方案二:采用异步方式,对3和4分别创建一个异步线程去做(这样做可能有点作,只是为了演示,勿喷)。

如果采用方案二,最麻烦的地方在于4是依赖3的,意味着4必须等到3返回之后才开始,等到4返回之后,才开始5。可以理解为在3之后还有一个处理链(pipeline),下一个节点根据上一个节点的返回值进行处理。

想想如果我们用netty,应该需要在messageReceived中接收消息后,再启动另一个线程,然后在该线程的messageReceived方法中接收消息后,再做反序列化等后继操作。如果不愿意这么做,自己定义一个状态机也可以的。

有没有一种优雅的,更易理解和易管理的方式来实现呢?

java类与类之间的调用,答案当然是有!

async包提供了一种线程安全的异步处理机制,该机制基于Twisted的pythonDeferred API,该API不需要显式定义有限状态机,采用一种简单优雅的方式来管理异步和动态pipeline(处理链)。

其中包含的类很少,其中主要的类为Deferred.class,基本上看懂了这个类就OK了。

Deferred和Future最大的差异是Deferred包含了一个callback链,能够在结果返回后进行一系列的操作。

java的异常处理机制,Deferred机制包含如下特征:

•由动态callback链组成,当deferred结果返回时,该链开始逐个按顺序触发,只有当上一个环节执行并返回结果后,下一个环节才继续执行,如果发生错误则切换到错误处理链。

•包含正常和异常两个处理链,异常链用于处理异常和错误,异常链仅能捕获Exception,不能捕获Throwable和Error。

•Deferred本身并不创建Thread线程,仅提供状态机和多个callback之间切换管理的机制。

java序列化、内部提供了一个有限状态机,如下:

新增Deferred对象(假设名称为D1)时,状态为Pending;当开始执行callback时,状态变为Running;若某个callback返回的结果为Deferred对象(假设名称为D2),则进入该D2对象中继续执行callback链,此时上一个D1对象的状态为Paused,若D2对象中所有的callback执行完毕,则返回D1对象,D1对象的状态变为Running,所有callback执行完毕后,D1状态变为Done,然后D2状态变为Done。

示例代码:

packagecom.ioe.metric.reader;

javaweb,importjava.util.concurrent.Executors;

importjava.util.concurrent.locks.Lock;

importjava.util.concurrent.locks.ReentrantLock;

importcom.ioe.common.cache.LocalCache;

java线程池,importcom.stumbleupon.async.Callback;

importcom.stumbleupon.async.Deferred;

publicclassCacheTest {

publicstaticvoidmain(String[]args)throwsInterruptedException {

java编程思想?CacheTesttest=newCacheTest();

test.getValue("test");

}

privateLocalCachelocalCache=newLocalCache(3600 * 1000L,

java面向对象,100000L);

privateLocklock=newReentrantLock();

// Callback中第一个泛型参数为内部的call方法返回值类型,第二个为内部的call方法入参类型

publicclassServerQueryimplementsCallback,String> {

@Override

publicDeferred call(finalStringkey)throwsException {

//需要等异步线程返回后,才能把获取的值返回

finalDeferredd=newDeferred();

d.addCallback(newCallback() {

@Override

publicString call(Strings)throwsException {

System.out.println("query index server:"+s);

returns;

}

});

Executors.newSingleThreadExecutor().execute(newRunnable() {

@Override

publicvoidrun() {

d.callback(queryServerFromRemote(key));

}

});

returnd;

}

privateString queryServerFromRemote(Stringkey) {

return"10.1.10.100";

}

}

publicclassDataQueryimplementsCallback,String> {

Stringdata=null;

@Override

publicDeferred call(finalStringserver)throwsException {

//需要等异步线程返回后,才能把获取的值返回

finalDeferredd=newDeferred();

d.addCallback(newCallback() {

@Override

publicString call(Strings)throwsException {

System.out.println("query data success:"+s);

returns;

}

});

Executors.newSingleThreadExecutor().execute(newRunnable() {

@Override

publicvoidrun() {

d.callback(queryDataFromRemote(server));

}

});

returnd;

}

/**

*@paramserver

*@return

*/

protectedString queryDataFromRemote(Stringserver) {

return"xxxxxxx";

}

}

publicclassDecodeimplementsCallback {

@Override

publicTest call(Stringdata)throwsException {

System.out.println("try to decode the data.");

finalTestobject= decode(data);

returnobject;

}

/**

*@paramdata

*@return

*/

privateTest decode(Stringdata) {

returnnewTest("hahah");

}

}

privateTest getDataFromRemote(Stringkey) {

Testvalue=null;

try{

lock.lock();

value=localCache.get(key);

if(null==value) {

//定义一个pipeline处理链

// Deferred.fromResult(key)表示根据key来创建一个Deferred对象,返回类型会自动推断为Test

//如果使用Deferred d = new

// Deferred();然后d.callback(key);你就悲剧了

//发现怎么也编译不通过,类型还要强制转换,泛型推断不能起作用

value= Deferred.fromResult(key)

.addBothDeferring(newServerQuery())

.addBothDeferring(newDataQuery())

.addBoth(newDecode()).joinUninterruptibly();

//上面的代码形成了3个Deferred对象,其中ServerQuery一个D1,

//后面的.addBothDeferring(newDataQuery()).addBoth(new

// Decode())形成了另一个D2,

//然后在D2内部还有一个DataQuery形成的Deferred对象D3

//执行顺序为D1、D2(第一个callback后调到D3)、D3、D2(D3中callback执行完毕后,回到D2),结束顺序为D1、D2、D3

System.out.println("put data to cache. value:"+value);

localCache.put(key,value);

}

}catch(Exceptione) {

e.printStackTrace();

}finally{

lock.unlock();

}

returnvalue;

}

publicclassTest {

Stringname;

publicTest(Stringname) {

this.name=name;

}

@Override

publicString toString() {

returnname;

}

}

publicTest getValue(Stringkey) {

Testvalue=localCache.get(key);

if(null==value) {

value= getDataFromRemote(key);

}

returnvalue;

}

}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/5/88659.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息