Programing/Akka

Akka Stream Asynchronous operators

BUST 2018. 8. 17. 23:34

Akka Stream Asynchronous operators

  • Akka steram의 Operator중에 비동기(Asynchronous)로 처리할수 있는 기능
  • ask / mapAsync / mapAsyncUnOrdered 총 3가지가 있다.

Source/Flow.ask

  • Actor(=ActorRef)를 통해 데이터를 주고 받을수가 있다.

Source/Flow.mapAsync

  • `CompletionStage` 객체로 리턴하는 함수를 통해 map operator를 작동할수 있다.

Source/Flow.mapAsyncUnordered

  • `CompletionStage` 객체로 리턴하는 함수를 통해 map operator를 작동할수 있다.
  • mapAsync와 동일한 기능이지만 `CompletionStage`가 나오는 순서 상관없이 다음 operator로 전달한다.

참고. Scala와 Java8

  • Scala와 java의 mapAsync, mapAsyncUnordered의 인터페이스가 다르다
    • scala - future
    • java - completionstage
    • futue는 java의 future와 다르다.
  •  mapAsync와 mapAsyncUnordered 기능을 사용을 할려고 하면 
    • ExecutionContext 를 통해 scala의 Future를 생성하고, FutureConverters를 이용하여 CompletionStage로 변경한다.
      • FutureConverters.toJava를 기능을 이용하여 scala의 futue를 CompletionStage로 변경할수있다.
    • 또는 내부적으로 ExecutorService (java)를 사용하여 CompletableFuture(CompletionStage의 구현체)로 전환할수 있다.
      • CompleteableFuture.supplyAsync 등의 메서드를 활용할수 있다.
    • Spring Async와 함께 활용할수도 있다. (CompletableFuture)
final ExecutionContext ec = .....;

Future<String> scalaFuture = Futures.future(() -> {
  assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
  countDownLatch.await(); // do not complete yet
  return "hello";
}, ec);

CompletionStage<String> fromScalaFuture = FutureConverters.toJava(scalaFuture)
  .thenApply(s -> { // 1
    assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
    return s;
  })
  .thenApply(s -> { // 2
    assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
    return s;
  })
  .thenApply(s -> { // 3
    assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
    return s;
  });
  • thenApply이후에는 `ForkJoinPool.commonPool()`을 사용한다.