Akka Stream을 이용한 Kafka Producer 개발
Reference
Settings
Configuration
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
# How long to wait for `KafkaProducer.close`
close-timeout = 60s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configu
red for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
}
}
- ProducerSettings를 이용하여 프로그래밍 적으로 셋팅을 할수도 있고, application.conf 파일을 이용하여 설정을 할수가 있다.
- ProducerSettings 역시 Kafka Library의 ProducerConfig을 이용한다.
Sink 형태로 사용하기
CompletionStage<Done> done =
Source.range(1, 100)
.map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>("topic1", elem))
.runWith(Producer.plainSink(producerSettings), materializer);
- ProducerRecord를 이용하여 producing을 할수 있다.
Flow 형태로 사용하기
CompletionStage<Done> done =
Source.range(1, 100)
.map(n -> {
//int partition = Math.abs(n) % 2;
int partition = 0;
String elem = String.valueOf(n);
return new ProducerMessage.Message<byte[], String, Integer>(
new ProducerRecord<>("topic1", partition, null, elem), n);
})
.via(Producer.flow(producerSettings))
.map(result -> {
ProducerRecord<byte[], String> record = result.message().record();
System.out.println(record);
return result;
})
.runWith(Sink.ignore(), materializer);
- flow를 이용하여 produce의 결과값을 받을수가 있다.
'Programing > Akka' 카테고리의 다른 글
Akka Stream 기본 개념 Source, Sink, Flow (0) | 2018.06.29 |
---|---|
Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현 (0) | 2018.06.02 |
Persistence Actor (0) | 2018.05.27 |
Akka Cluster (0) | 2017.10.22 |
Akka Tutorial (0) | 2017.07.01 |