Programing/Akka

Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현

BUST 2018. 6. 2. 19:31

Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현

- Data stream 기능을 개발을 하다보면 같은 데이터를 Broadcast로 보내고 분석해야 되는 경우가 있다.
- 또 다른 경우에는 데이터를 하나로 합치는 Merge 기능이 필요할때도 있다.
- Akka Stream에서는 GraphDSL를 이용하여 Broadcast/Merge 기능을 제공한다.

gradle

dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.13'
}

Reference

Example


simple-graph-example.png
final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);

final RunnableGraph<CompletionStage<List<String>>> result =
        RunnableGraph.fromGraph(
                GraphDSL     // create() function binds sink, out which is sink's out port and builder DSL
                        .create(   // we need to reference out's shape in the builder DSL below (in to() function)
                                sink,                // previously created sink (Sink)
                                (builder, out) -> {  // variables: builder (GraphDSL.Builder) and out (SinkShape)
                                  final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
                                  final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));

                                  final Outlet<Integer> source = builder.add(in).out(); // Source -> Shape -> Outlet
                                  builder.from(source).via(builder.add(f1)) 
                                          .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
                                          .via(builder.add(f3.grouped(1000))).to(out);  // to() expects a SinkShape
                                  builder.from(bcast).via(builder.add(f4)).toFanIn(merge);
                                  return ClosedShape.getInstance();
                                }));


Shape

- Shape은 Graph의 여러개의 inlet, 여러개의 outlet를 표현을 한다.
- Sink, Source, Flow를 Shape으로 변경을 할수가 있다. Bulider의 add method을 활용하면 된다.
- SinkShape, FlowShape
- 여러개의 Shape이 모여 하나의 Graph를 만든다.

Inlet

- Graph에서의 Input을 해당하는 입구
- SinkShape, FlowShape에서 사용 될수 있다.

Outlet

- Graph에서의 Output를 해당하는 출구
- FlowShape에서 사용될수 있다.



'Programing > Akka' 카테고리의 다른 글

Akka Stream Operator Fusion  (0) 2018.06.29
Akka Stream 기본 개념 Source, Sink, Flow  (0) 2018.06.29
Akka Stream을 이용한 Kafka Producer 개발  (0) 2018.06.02
Persistence Actor  (0) 2018.05.27
Akka Cluster  (0) 2017.10.22