話題
我想通過Akka HTTP與Akka Actor進行交互 。 這個想法是要有一個系統,其中HTTP客戶端調用Akka HTTP服務器方法,該方法處理對Akka Actor的請求。 actor處理該消息并響應給調用方(Akka HTTP),該調用方對HTTP客戶端進行應答。 如上所述,我設法做到了,但是由于我的實現似乎受阻,因此我認為我做得不正確。
我會更好地解釋:如果我發出許多并發的HTTP請求,我會看到Akka HTTP“產生了一個隊列”,因此等待參與者處理請求,然后再發送以下請求。
java狀態機,我想獲得的是, Akka HTTP服務器將來自HTTP客戶端的請求立即轉發到目標akka actor,而無需等待actor結束闡述。 我想使用actor郵箱容量參數來確定郵件隊列的大小,如果郵件過多,則拒絕郵件。
因此,我需要一種讓Akka HTTP異常等待actor響應的方法。
我知道郵箱容量正常工作,因為如果我改用一個簡單的actor2.tell(“ Prova1”,system.deadLetters())向我的actor發出許多請求(僅用于測試),則超出郵箱大小的請求是正確的被拒絕。
參考
akka、為了測試我的系統,我按照akka文檔提供的最少示例創建了一個簡單的配置。 這對于akka http: https ://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example
我的密碼
我做的第一件事是使用一個參與者(actor1)創建一個系統,按如下所示配置akka HTTP:
public class TestActor {
java actor?private static ActorSystem system;
public static void main(String[] args) throws InterruptedException
{
String httpBindAddress = "0.0.0.0";
java eval?int httpPort = 8086;
system = ActorSystem.create("deupnp");
ActorMaterializer materializer = ActorMaterializer.create(system);
Http http = Http.get(system);
actor框架、AllDirectives app = new AllDirectives() {
};
Route routeActor = app.get(() ->
app.pathPrefix("mysuburl", () ->
actor用法、app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
app.onSuccess(() ->
CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
app.complete(StatusCodes.get(200), response))))));
Flow routeFlow = app.route(routeActor).flow(system, materializer);
CompletionStage binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);
// create system with one actor
ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
}
private static String actorFunctionCall(String actor, String message)
{
try {
Inbox inbox = Inbox.create(system);
system.actorSelection("user/"+actor).tell(message, inbox.getRef());
String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
return response;
} catch (Exception e) {
//return new ResponseMessage(204,"Error");
e.printStackTrace();
return null;
}
}
}
我的ActorTest如下:
public class ActorTest extends AbstractActor {
private String myName = "";
public ActorTest(String nome){
this.myName = nome;
}
@Override
public void preStart()
{
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class,
message -> {
Thread.sleep(5000l);
System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
})
.matchAny(mex->{
System.out.println("Error");
})
.build();
}
}
我的application.conf非常簡單:
akka
{
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
default-dispatcher {
throughput = 10
}
}
}
my-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}
預期成績
如您所見,在郵箱容量= 1的情況下,我希望,如果我發出多個并發請求,則僅處理一個,其余的將被丟棄。
我認為上面的代碼對于我想要獲取的代碼是不正確的,因為我使用Akka HTTP路由來接收http://127.0.0.1/mysuburl/actor1/my_msg上的HTTP請求,然后使用Inbox將消息發送到演員,等待回應。
所以我的問題是:哪種異步方式將Akka HTTP請求鏈接到Akka Actor actor 1是正確的方法?
如果您需要更多詳細信息,請告訴我。
注意
它解釋了如何創建有限數量的線程以處理多個阻塞請求,但是我認為這只是“減輕”了我的代碼(阻塞)的影響,但必須以不阻塞的方式編寫。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态