Programing/Akka

Akka Cluster

BUST 2017. 10. 22. 15:08

Akka Cluster

시작하기전에 Gradle 라이브러리 추가

compile group: 'com.typesafe.akka', name: 'akka-cluster_2.12', version: '2.5.6'

Simple Configuration

application.conf

akka {
  actor {
    # cluster mode로 실행을 의미
    provider = "cluster"
  }
  # 원격 접근을 위한 설정
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      # local로 실행할때에는 127.0.0.1이지만 실제 배포시에는 hostname을 변경해줘야된다.
      hostname = "127.0.0.1"
      # 0 - Random Port
      port = 0
    }
  }
 
  # 클러스터 관련 설정
  cluster {
    # 중요한 부분, Akka에서는 Gossip Portocol를 사용하는데 이 때 필요한 것은
    # seed-node이다, 반드시 2개이상의 seed-node를 설정을 할 필요가 있다.
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }
}
 
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
  • cluster 관련 이벤트를 Actor를 통해 Subscribe 할수 있다.
  • 멤버(node)의 join, unreachable, remove 관련 이벤트를 받을수가 있다.
package jdocs.cluster;
 
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
 
public class SimpleClusterListener extends AbstractActor {
  LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  Cluster cluster = Cluster.get(getContext().getSystem());
 
  //subscribe to cluster changes 
  @Override
  public void preStart() {
    cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
        MemberEvent.class, UnreachableMember.class);
  }
 
  //re-subscribe when restart 
  @Override
  public void postStop() {
    cluster.unsubscribe(getSelf());
  }
 
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(MemberUp.class, mUp -> {
        // 새로운 멤버(node)가 추가 될경우 
        log.info("Member is Up: {}", mUp.member());
      })
      .match(UnreachableMember.class, mUnreachable -> {
        // 멤버(node)에 접근을 할수 없는 경우 
        log.info("Member detected as unreachable: {}", mUnreachable.member());
      })
      .match(MemberRemoved.class, mRemoved -> {
        // 멤버(node)가 삭제되는 경우 
        log.info("Member is Removed: {}", mRemoved.member());
      })
      .match(MemberEvent.class, message -> {
        // ignore 
      })
      .build();
  }
}

Seed-Node에 참가(join)하기

  • akka cluster는 기본적으로 Gossip Protocol를 사용하고 있다.
    • Gossip Protocol이란? 마스터가 없는 대신 각 노드가 서로에 대해 주기적으로 메타 정보를 주고 받는다. 이러한 프로토콜이 통해 노드가 죽었는지 살아 있는지를 확인을 할수 있다.
  • akka seed-node 참가(join) 하는 방법을 2가지가 있다.
  1. application.conf에 설정을 통해 join
akka.cluster.seed-nodes = [
  "akka.tcp://ClusterSystem@host1:2552",
  "akka.tcp://ClusterSystem@host2:2552"]
  1. JVM 옵션을 통해 join
-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
  1. 프로그래밍적으로 join
Cluster.get(system).joinSeedNodes(....)

Downing

  • node가 작동이 되지 않았다고 판단이 될경우에는
  • 프로그래밍적으로도 control이 가능하다.
Cluster.get(system).down(address)

Leaving

  • cluster에서 member(node)를 제거할수 있다.
  • 2가지 방법이 있다.
  1. 일반적인 방법, actor-system 종료시키기
  2. JMX or HTTP를 이용하여 종료시키기, 아래와 같은 코드를 이용하여 아름답게(graceful) 종료시킬수 있다.
final Cluster cluster = Cluster.get(system);
cluster.leave(cluster.selfAddress());

Subscribe to Cluster Events

cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);

Node Roles

akka.cluster.roles
  • node roles를 통해 각각의 node의 역활을 지정할수가 있다.
  • ex) master, worker 를 나뉘어 worker에서 실제 작업을 진행을 하고 master에서 worker에게 작업을 시키고, 결과를 받아 결과값을 정리하는 역활 등
  • ex) 물건 주문 서비스를 할때 order, product, search 등의 역활을 나뉘어 각각의 기능(domain)에 맞는 기능을 구현을 할수가 있다.

Cluster Aware Router

  • akka router를 기능을 cluster에도 사용할수 있다.
  • router에 2가지 방법이 있다.
    • group : actor를 생성 및 관리를 해주지 않는다. 즉 생성 및 관리를 세밀하게 할 경우에 필요하다.
    • pool : actor를 생성 및 관리를 해준다.

Group

  • applicaiton.conf
akka.actor.deployment {
  /statsService/workerRouter {
      # group이기 때문에 실제 actor 생성이 되지 않고 각각의 node에 생성된
      # actor를 찾는다.
      router = consistent-hashing-group
      routees.paths = ["/user/statsWorker"]
      cluster {
        enabled = on
        allow-local-routees = on
        # compute를 role을 가진 node에서 /user/statsWorker를 actor를 찾는다.
        use-roles = ["compute"]
      }
    }
}

Pool

  • application.conf
akka.actor.deployment {
  /statsService/singleton/workerRouter {
      router = consistent-hashing-pool
      cluster {
        enabled = on
        max-nr-of-instances-per-node = 3
        allow-local-routees = on
        use-roles = ["compute"]
      }
    }
}

Source code

https://github.com/akka/akka-samples/tree/2.5/akka-sample-cluster-java

참고 자료

https://doc.akka.io/docs/akka/current/scala/cluster-usage.html

'Programing > Akka' 카테고리의 다른 글

Akka Stream을 이용한 Kafka Producer 개발  (0) 2018.06.02
Persistence Actor  (0) 2018.05.27
Akka Tutorial  (0) 2017.07.01
Akka Actor  (0) 2017.06.25
Akka Actor Throttle  (0) 2017.06.24