Programing/Akka

Akka Stream을 이용한 Kafka Producer 개발

BUST 2018. 6. 2. 19:21

Akka Stream을 이용한 Kafka Producer 개발


Reference

https://doc.akka.io/docs/akka-stream-kafka/current/producer.html

- Kafka Library에서 KafkaProducer를 이용한다.

Settings

protected final ProducerSettings<byte[], String> producerSettings = ProducerSettings
  .create(system, new ByteArraySerializer(), new StringSerializer())
  .withBootstrapServers("localhost:9092");
필요한 정보
- Kafka BootstrapServer
- Key/Value Serializer

Configuration

# Properties for akka.kafka.ProducerSettings can be

# defined in this section or a configuration section with

# the same layout. 

akka.kafka.producer {

  # Tuning parameter of how many sends that can run in parallel.

  parallelism = 100

  

  # How long to wait for `KafkaProducer.close`

  close-timeout = 60s

  

  # Fully qualified config path which holds the dispatcher configuration

  # to be used by the producer stages. Some blocking may occur.

  # When this value is empty, the dispatcher configu

red for the stream

  # will be used.

  use-dispatcher = "akka.kafka.default-dispatcher"


  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig

  # can be defined in this configuration section.

  kafka-clients {

  }

}

- ProducerSettings를 이용하여 프로그래밍 적으로 셋팅을 할수도 있고, application.conf 파일을 이용하여 설정을 할수가 있다.

- ProducerSettings 역시 Kafka Library의 ProducerConfig을 이용한다.


Sink 형태로 사용하기

CompletionStage<Done> done =

  Source.range(1, 100)

    .map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>("topic1", elem))

    .runWith(Producer.plainSink(producerSettings), materializer);

- ProducerRecord를 이용하여 producing을 할수 있다.


Flow 형태로 사용하기

CompletionStage<Done> done =

  Source.range(1, 100)

    .map(n -> {

      //int partition = Math.abs(n) % 2;

      int partition = 0;

      String elem = String.valueOf(n);

      return new ProducerMessage.Message<byte[], String, Integer>(

        new ProducerRecord<>("topic1", partition, null, elem), n);

    })

    .via(Producer.flow(producerSettings))

    .map(result -> {

      ProducerRecord<byte[], String> record = result.message().record();

      System.out.println(record);

      return result;

    })

    .runWith(Sink.ignore(), materializer);


- flow를 이용하여 produce의 결과값을 받을수가 있다.

'Programing > Akka' 카테고리의 다른 글

Akka Stream 기본 개념 Source, Sink, Flow  (0) 2018.06.29
Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현  (0) 2018.06.02
Persistence Actor  (0) 2018.05.27
Akka Cluster  (0) 2017.10.22
Akka Tutorial  (0) 2017.07.01