Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현
- Data stream 기능을 개발을 하다보면 같은 데이터를 Broadcast로 보내고 분석해야 되는 경우가 있다.
- 또 다른 경우에는 데이터를 하나로 합치는 Merge 기능이 필요할때도 있다.
- Akka Stream에서는 GraphDSL를 이용하여 Broadcast/Merge 기능을 제공한다.
gradle
dependencies {
compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.13'
}
Reference
Example
final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);
final RunnableGraph<CompletionStage<List<String>>> result =
RunnableGraph.fromGraph(
GraphDSL // create() function binds sink, out which is sink's out port and builder DSL
.create( // we need to reference out's shape in the builder DSL below (in to() function)
sink, // previously created sink (Sink)
(builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape)
final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));
final Outlet<Integer> source = builder.add(in).out(); // Source -> Shape -> Outlet
builder.from(source).via(builder.add(f1))
.viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
.via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape
builder.from(bcast).via(builder.add(f4)).toFanIn(merge);
return ClosedShape.getInstance();
}));
Shape
- Shape은 Graph의 여러개의 inlet, 여러개의 outlet를 표현을 한다.
- Sink, Source, Flow를 Shape으로 변경을 할수가 있다. Bulider의 add method을 활용하면 된다.
- SinkShape, FlowShape
- 여러개의 Shape이 모여 하나의 Graph를 만든다.
Inlet
- Graph에서의 Input을 해당하는 입구
- SinkShape, FlowShape에서 사용 될수 있다.
Outlet
- Graph에서의 Output를 해당하는 출구
- FlowShape에서 사용될수 있다.
'Programing > Akka' 카테고리의 다른 글
Akka Stream Operator Fusion (0) | 2018.06.29 |
---|---|
Akka Stream 기본 개념 Source, Sink, Flow (0) | 2018.06.29 |
Akka Stream을 이용한 Kafka Producer 개발 (0) | 2018.06.02 |
Persistence Actor (0) | 2018.05.27 |
Akka Cluster (0) | 2017.10.22 |