Personal Documentation

Hello World!

Apache Kafka (Part II)

공식 문서에서 Kafka 를 다운로드 후 실행시키면 아래와 같이 폴더들이 생성될 것이다. 자세한 설치 방법은 구글링을 통해 참조하면 된다.

kafka
├── bin
│   └── windows
├── config
├── libs
└── site-docs

먼저 bin 폴더에 들어간 후 아래 코드를 실행할 것이다.


토픽 생성

실행:

./kafka-topics.sh \
    --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 3 \
    --topic topicA

결과:

Created topic topicA.

Kafka Part 2 - 1

replication-factor (RF) : 복제할 토픽 개수

Producer

실행:

./kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic topicA

결과:

> Hello World
> How are you?
> I'm fine thank you.

Consumer

실행:

./kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic topicA \
    --from-beginning

결과:

Hello World
How are you?
I'm fine thank you.

Consumer Group

실행:

./kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic topicA \
    --group groupA \
    --from-beginning

결과:

I'm fine thank you.
Hello World
How are you?

컨슈머로 데이터를 불렀을 대 위에 나온 결과 순서가 다른 이유는 파티션이 3개 있어서 그렇다. 컨슈머는 데이터를 불러올때 offset 의 순서를 따르지 파티션의 순서를 따라서 불러오지 않는다. 그 이후로 프로듀서가 데이터를 추가하고 컨슈머로 데이터를 읽으면 마지막으로 불러진 offset 이후의 데이터들을 가져온다. 그 이유는 파티션들은 각 컨슈머 그룹이 마지막으로 기록한 offset 정보를 가지고 있기 때문이다.


컨슈머 그룹 리스트

실행:

./kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list

결과:

groupA


컨슈머 그룹 상태 확인

실행:

./kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group groupA \
    --describe

결과:

Consumer group 'groupA' has no active members.

GROUP       TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID     HOST    CLIENT-ID
groupA      topicA      1          4               4               0    -               -       -
groupA      topicA      0          3               3               0    -               -       -
groupA      topicA      2          3               3               0    -               -       -

CURRENT-OFFSET : 컨슈머가 읽은 offset 위치

LOG-END-OFFSET : 파티션의 마지막 offset

LOG : 컨슈머 랙 ( lag = log-end-offset - current-offset )


그 외에도 offset 정보를 --reset-offsets 을 통해 초기화 할 수 있다.


Java + Kafka

Maven Dependencies

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

KProducer.java

public class KProducer {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.ACKS_CONFIG, "all");

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        for(int i = 0; i < 20; i++) {
            String sendData = "Data " + i;
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topicA", sendData);
            
            // Key 를 통해 데이터 보낼 파티션 지정 
            // ProducerRecord<String, String> producerRecordWithKey = new ProducerRecord<>("topicA", Integer.toString(i), sendData);
            
            // 자세한 내용은 ProducerRecord.class 확인
            
            try {
                producer.send(producerRecord);
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

KConsumer.java

public class KConsumer {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 자동커밋 조건
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 6000);

        // Rebalance 컨디션 확인
        // 컨슈머는 3초마다 브로커 중 1대에다 heartbeat 전송
        // 10초 세션 타임아웃 시간안에 heartbeat 를 못받으면 해당 컨슈머는 죽은거로 처리
        // 파티션과 컨슈머들이 rebalance 됨
        // 이때 컨슈머는 작업이 중단됨
        // * RebalanceListener() 사용해도됨
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
        consumer.subscribe(Arrays.asList("topicA"));

        try {
            while(true) {
                ConsumerRecords<String, String> consumerRecord = consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record: consumerRecord) {
                    System.out.println(record.value());
                }
                // 동기 커밋
                consumer.commitSync();

                // 비동기 커밋
                // consumer.commitAsync();
            }
        } catch(WakeupException wakeupException) {
            wakeupException.printStackTrace();
        } catch(CommitFailedException commitFailedException) {
            // 커밋 오류로 재시도 구현
            System.out.println("Commit Error - try again");
        } finally {
            // 갑자기 오류났을 때를 대비해서 마지막 offset을 커밋한 후 종료
            consumer.commitSync();
            consumer.close();
        }
    }
}