Programing/Akka

Akka Stream Error Handling

BUST 2018. 8. 1. 22:35

Akka Stream Error Handling

Akka Stream의 Operator에서 에러가 발생되는 경우에는 기본 설정인 Stream의 stop으로 되어있고, 기본적으로 에러 관련 메시지는 나타나지 않는다.

실시간 데이터 처리 등에서는 하나의 데이터가 오류가 난다고 전체의 데이터를 받지 못하면 안되기 때문에 Error Handling에 관련된 설정이 불가피하다.


Logging Error

Source.from(Arrays.asList(-1, 0, 1))
  .map(x -> 1 / x) //throwing ArithmeticException: / by zero
  .log("error logging")
  .runWith(Sink.ignore(), mat);
  • `log()` 를 이용하여 error log를 생성을 할수가 있다.
[error logging] Upstream failed.
java.lang.ArithmeticException: / by zero

Recover

final Materializer mat = ActorMaterializer.create(system);
Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).map(n -> {
  if (n < 5) return n.toString();
  else throw new RuntimeException("Boom!");
}).recover(new PFBuilder()
    .match(RuntimeException.class, ex -> "stream truncated")
    .build()
).runForeach(System.out::println, mat);
  • `recover()` 를 이용하여 에러가 나는 `element`를 복구를 할수 있다.
0
1
2
3
4
stream truncated

Recover with retries

final Materializer mat = ActorMaterializer.create(system);
Source<String, NotUsed> planB = Source.from(Arrays.asList("five", "six", "seven", "eight"));

Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).map(n -> {
  if (n < 5) return n.toString();
  else throw new RuntimeException("Boom!");
}).recoverWithRetries(
    1, // max attempts
    new PFBuilder()
        .match(RuntimeException.class, ex -> planB)
        .build()
).runForeach(System.out::println, mat);
  • `recoveryWithRetries()`를 이용하여 최대 시도 횟수를 지정을 할수 있다.
0
1
2
3
4
five
six
seven
eight


Delayed restarts with a backoff operator

Source<ServerSentEvent, NotUsed> eventStream = RestartSource.withBackoff(
        Duration.ofSeconds(3), // min backoff
        Duration.ofSeconds(30), // max backoff
    0.2, // adds 20% "noise" to vary the intervals slightly
    20, // limits the amount of restarts to 20
    () ->
        // Create a source from a future of a source
        Source.fromSourceCompletionStage(
            // Issue a GET request on the event stream
            Http.get(system).singleRequest(HttpRequest.create("http://example.com/eventstream"))
                .thenCompose(response ->
                    // Unmarshall it to a stream of ServerSentEvents
                    EventStreamUnmarshalling.fromEventStream()
                        .unmarshall(response, materializer)
                )
        )
);
  • `backoff` operator를 이용하여 Restart를 할수 있다.
  • 외부 서비스 연동 등에서 주로 사용하는 패턴이다
  • `RestartSource` 뿐만 아니라 `RestartFlow`, `RestartSink` 도 존재를 한다.

Supervision Strategies

  • `stop` - stream을 중지한다.
  • `resume` - 에러가 난 `element`를 버리고, 다시 stream을 시작을 한다.
  • `restart` - 에러가 난 `element`를 버리고, 새롭게 다시 stream을 시작을 한다. 즉, 새로운 객체가 다시 생성이 된다. operator의 state를 clear가 된다.
  • 기본 값은 모든 exception에 대해서 `stop`으로 설정이 되어 있다.

Materializer에 Decider 등록하기

final Function<Throwable, Supervision.Directive> decider = exc -> {
  if (exc instanceof ArithmeticException)
    return Supervision.resume();
  else
    return Supervision.stop();
};
final Materializer mat = ActorMaterializer.create(
  ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
  system);
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
  .map(elem -> 100 / elem);
final Sink<Integer, CompletionStage<Integer>> fold =
  Sink.fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)


Flow Operator에 Decider 등록하기

final Materializer mat = ActorMaterializer.create(system);
final Function<Throwable, Supervision.Directive> decider = exc -> {
  if (exc instanceof ArithmeticException)
    return Supervision.resume();
  else
    return Supervision.stop();
};
final Flow<Integer, Integer, NotUsed> flow =
    Flow.of(Integer.class).filter(elem -> 100 / elem < 50).map(elem -> 100 / (5 - elem))
    .withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
  .via(flow); 
final Sink<Integer, CompletionStage<Integer>> fold =
  Sink.<Integer, Integer> fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)