Programing/Akka

Persistence Actor

BUST 2018. 5. 27. 20:52

Persistence Actor

- Actor를 이용하여 비즈니스 로직을 구현을 할때, Actor가 상태를 필요한 경우가 있을수 있다. 이런 경우에는 Akka의 Persistence 기능을 이용하여 구현을 할수가 있다.
- 상태를 저장하는 방식을 2가지 방식이 있다. (Event Sourcing)
  - 상태의 데이터를 그대로 저장하는 방식.
  - 상태의 데이터를 변환하는 과정을 저장을 하는 방식 (Append)
- Event Sourcing 관련 내용 : https://docs.microsoft.com/en-us/previous-versions/msp-n-p/jj591559(v=pandp.10)

Gradle

dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-persistence_2.12', version: '2.5.12'
}


Event Sourcing / Snapshot

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.SnapshotOffer;

import java.io.Serializable;
import java.util.ArrayList;

class Cmd implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String data;

    public Cmd(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}

class Evt implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String data;

    public Evt(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}

class ExampleState implements Serializable {
    private static final long serialVersionUID = 1L;
    private final ArrayList<String> events;

    public ExampleState() {
        this(new ArrayList<>());
    }

    public ExampleState(ArrayList<String> events) {
        this.events = events;
    }

    public ExampleState copy() {
        return new ExampleState(new ArrayList<>(events));
    }

    public void update(Evt evt) {
        events.add(evt.getData());
    }

    public int size() {
        return events.size();
    }

    @Override
    public String toString() {
        return events.toString();
    }
}

class ExamplePersistentActor extends AbstractPersistentActor {

    private ExampleState state = new ExampleState();
    private int snapShotInterval = 1000;

    public int getNumEvents() {
        return state.size();
    }

    @Override
    public String persistenceId() { return "sample-id-1"; }

    @Override
    public Receive createReceiveRecover() {
        return receiveBuilder()
            .match(Evt.class, state::update)
            .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
            .build();
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(Cmd.class, c -> {
              final String data = c.getData();
              final Evt evt = new Evt(data + "-" + getNumEvents());
              persist(evt, (Evt e) -> {
                  state.update(e);
                  getContext().getSystem().eventStream().publish(e);
                  if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
                      // IMPORTANT: create a copy of snapshot because ExampleState is mutable
                      saveSnapshot(state.copy());
              });
            })
            .matchEquals("print", s -> System.out.println(state))
            .build();
    }
}


- persistenceId : identifies, snapshot은 저장할때, recover가 될때 persistenceId를 기준으로 복구가 진행이 된다. Actor의 Primary Key 같은 역활을 함.

- persist(...) : 비동기적으로 event를 저장을 함. 

- saveSnapshot(...) : events를 누적해서 저장을 함.



At-Least-Once Delivery

class Msg implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;
  public final String s;

  public Msg(long deliveryId, String s) {
    this.deliveryId = deliveryId;
    this.s = s;
  }
}

class Confirm implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;

  public Confirm(long deliveryId) {
    this.deliveryId = deliveryId;
  }
}


class MsgSent implements Serializable {
  private static final long serialVersionUID = 1L;
  public final String s;

  public MsgSent(String s) {
    this.s = s;
  }
}
class MsgConfirmed implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;

  public MsgConfirmed(long deliveryId) {
    this.deliveryId = deliveryId;
  }
}

class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
  private final ActorSelection destination;

  public MyPersistentActor(ActorSelection destination) {
      this.destination = destination;
  }

  @Override public String persistenceId() {
    return "persistence-id";
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder().
      match(String.class, s -> {
        persist(new MsgSent(s), evt -> updateState(evt));
      }).
      match(Confirm.class, confirm -> {
        persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt));
      }).
      build();
  }

  @Override
  public Receive createReceiveRecover() {
    return receiveBuilder().
        match(Object.class, evt -> updateState(evt)).build();
  }

  void updateState(Object event) {
    if (event instanceof MsgSent) {
      final MsgSent evt = (MsgSent) event;
      deliver(destination, deliveryId -> new Msg(deliveryId, evt.s));
    } else if (event instanceof MsgConfirmed) {
      final MsgConfirmed evt = (MsgConfirmed) event;
      confirmDelivery(evt.deliveryId);
    }
  }
}

class MyDestination extends AbstractActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(Msg.class, msg -> {
        // ...
        getSender().tell(new Confirm(msg.deliveryId), getSelf());
      })
      .build();
  }
}

- 적어도 한번 데이터 전달 모델 (At-Least-Once Delivery Model)

- deliveryId를 통해 데이터를 전달을 받고 전달받은 Actor에서는 데이터를 처리한후 deliveryId를 다시 전송함으로써 데이터가 제대로 받은 것을 확인하는 구조 이다.

- 특정시간 (기본값 : 5second)가 지나가게 되면 자동으로 메시지를 retry하게 한다.



직접 구현하는 하기 (Custom Persistence)

- Akka의 Persistence Actor를 이용하여 persistence를 구현을 하는 방법도 있지만, 실제 Mysql 등의 데이터베이스를 이용하여 Persistence 기능을 구현을 할수 있다.
- Event Sourcing와 At-least-once Delivery 기능을 하나의 공통점이 있다고 한다면 deliveryId, persistenceId 등의 unique Key 값을 이용한다.

하나의 Actor 상태를 유지하는 방법
1. Actor가 하나의 unique key의 값을 가지고 있는다.
2. Actor 상태를 Mysql 등에 unique key의 값과 함께 저장을 한다.
3. Actor가 죽든지, 새로 생성될때 기존의 unique key를 이용하여 상태를 불러온다.
4. 상태가 변경이 될때마다 unique key를 이용하여 상태를 업데이트를 한다.

한번 이상의 데이터 전달 방법
1. Message에 Id를 부여를 하고 Mysql 등에 저장을 한다. 
2. 메시지가 완료가 되었으면 MessageId를 이용하여 Message가 전달이 되었다는 것을 ack 처리를 한다.
3. 내부적으로 Scheduler를 통해 보낸 메세지중 Ack 처리를 받지 못한 (timeout 등의 로직)를 다시 전달을 한다.


번외)

- AWS 등의 Cloud 상황에서는  서버가 계속 바뀌기 때문에 Unique Key 값을 가지기가 어렵다. 이와 관련되서 로직이 고민을 해야 된다.
-> (18. 12. 29 추가. 해당하는 actor의 task에 대한 uniuqe key값을 활용한다.)
- Actor가 죽기 전에 다른 cluster에 있는 node에 데이터를 전달을 한다. 현재 살아있는 node에 새롭게 unique id와 함께 actor가 생성이 된다. (Singletone Actor?)
  - (18. 12. 29 추가) 자동으로 Recovery와 상태가 복구가 되어야 되기 때문에 외부 Coordinator가 필요할것 같다 (Zookeeper)
- 좀더 고민이 필요한 부분, 필요에 따라 고도화하는 작업이 필요하다.


'Programing > Akka' 카테고리의 다른 글

Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현  (0) 2018.06.02
Akka Stream을 이용한 Kafka Producer 개발  (0) 2018.06.02
Akka Cluster  (0) 2017.10.22
Akka Tutorial  (0) 2017.07.01
Akka Actor  (0) 2017.06.25