Akka Stream Asynchronous operators
- Akka steram의 Operator중에 비동기(Asynchronous)로 처리할수 있는 기능
- ask / mapAsync / mapAsyncUnOrdered 총 3가지가 있다.
Source/Flow.ask
- Actor(=ActorRef)를 통해 데이터를 주고 받을수가 있다.
Source/Flow.mapAsync
- `CompletionStage` 객체로 리턴하는 함수를 통해 map operator를 작동할수 있다.
Source/Flow.mapAsyncUnordered
- `CompletionStage` 객체로 리턴하는 함수를 통해 map operator를 작동할수 있다.
- mapAsync와 동일한 기능이지만 `CompletionStage`가 나오는 순서 상관없이 다음 operator로 전달한다.
참고. Scala와 Java8
- Scala와 java의 mapAsync, mapAsyncUnordered의 인터페이스가 다르다
- scala - future
- java - completionstage
- futue는 java의 future와 다르다.
- mapAsync와 mapAsyncUnordered 기능을 사용을 할려고 하면
- ExecutionContext 를 통해 scala의 Future를 생성하고, FutureConverters를 이용하여 CompletionStage로 변경한다.
- FutureConverters.toJava를 기능을 이용하여 scala의 futue를 CompletionStage로 변경할수있다.
- 또는 내부적으로 ExecutorService (java)를 사용하여 CompletableFuture(CompletionStage의 구현체)로 전환할수 있다.
- CompleteableFuture.supplyAsync 등의 메서드를 활용할수 있다.
- Spring Async와 함께 활용할수도 있다. (CompletableFuture)
final ExecutionContext ec = .....;
Future<String> scalaFuture = Futures.future(() -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
countDownLatch.await(); // do not complete yet
return "hello";
}, ec);
CompletionStage<String> fromScalaFuture = FutureConverters.toJava(scalaFuture)
.thenApply(s -> { // 1
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
})
.thenApply(s -> { // 2
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
})
.thenApply(s -> { // 3
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
});
- thenApply이후에는 `ForkJoinPool.commonPool()`을 사용한다.
'Programing > Akka' 카테고리의 다른 글
Akka Management (0) | 2018.08.21 |
---|---|
Akka Scheduler (0) | 2018.08.21 |
Cluster Aware Router (0) | 2018.08.16 |
Akka Actor / Stream / Cluster 을 이용한 확장 가능한 Task 단위의 시스템 (0) | 2018.08.15 |
Akka Distributed Publish Subscribe in Cluster #1.Subscribe/Publish (0) | 2018.08.09 |