Lewis's Tech Keep

[Kafka] 아파치 카프카 애플리케이션 3.5장 & 관련 내용 발표 검색 본문

kafka/kafka 스터디

[Kafka] 아파치 카프카 애플리케이션 3.5장 & 관련 내용 발표 검색

Lewis Seo 2024. 1. 1. 19:11

진행 범위


3.5 카프카 스트림즈

스트림즈 애플리케이션

  • 토픽에 적재된 데이터를 상태기반 또는 비상태기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리
  • 공식 지원 라이브러리 ⇒ 카프카 클러스터와 완벽하게 호환되면서 스트림 처리에 필요한 편리 기능 제공함
  • 정확히 한번(exactly once) 할 수 있도록 장애 허용 시스템(fault tolerant system)을 가지고 있어 데이터 안정성이 뛰어남
  • 실시간 스트림 처리를 해야 한다면 스트림즈 애플리케이션으로 개발하는 것을 고려하자.

스레드 & 태스크

  • 스트림즈 애플리케이션은 내부적으로
    스레드를 1개 이상 생성 가능함
  • 스레드는 1개 이상의 태스크를 가진다.
  • 태스크는 스트림즈 데이터 처리 최소 단위다.
  • 3개 파티션으로 이루어진 토픽을 처리하는 스트림즈 애플리케이션은 내부에 3개의 태스크가 생김
  • 스레드 개수를 늘리면 처리량도 늘어난다.

토폴로지

  • 토폴로지란 2개 이상의 노드들과 선으로 이루어진 집합
  • 링형, 트리형, 성형(star) 등이 있는데 스트림즈에서 쓰는 형은 트리형
  • 노드(동그라미)를 프로세서라고 부르고
    노드와 노드를 이은 선(화살표)를 스트림이라고 부름
    • 스트림 = 토픽의 데이터(레코드)
  • 프로세서
    • 소스 프로세서 → 스트림 프로세서 →싱크 프로세서
    • 소스 프로세서
      • 데이터를 처리하기 위해 최초로 선언해야하는 노드
      • 토픽에서 데이터를 가져오는 역할
    • 스트림 프로세서
      • 다른 프로세서가 반환한 데이터를 처리하는 역할
        • 변환 & 분기처리와 같은 데이터 처리
    • 싱크 프로세서
      • 데이터를 특정 카프카 토픽으로 저장

스트림즈 개발 방법

  • 스트림즈 DSL & 프로세서 API 2가지 방법이 있음

1. 스트림즈 DSL

  • 메시지 값 기반 토픽 분기 처리
  • 지난 10분간 들어온 데이터 개수 집계
  • 토픽과 다른 토픽의 결합으로 새로운 데이터 생성

2. 프로세서 API

  • 메시지 값의 종류에 따라 토픽을 가변적으로 전송
  • 일정한 시간 간격으로 데이터 처리

 

3.5.1 스트림즈 DSL

  • 레코드의 흐름을 추상화한 3가지 개념이 있음 (스트림즈DSL에서만 사용되는 개념)
    • KStream
    • KTable
    • GlobalKTable

KStream

  • KStream은 레코드의 흐름을 표현함
    • 메시지 키와 메시지 값으로 구성
  • KStream으로 조회하면 토픽에 존재하는 모든 레코드가 출력
  • 컨슈머로 토픽을 구독하는 것과 동일 선상에서 사용하는 것

KTable

  • 메시지 키를 기준으로 묶어서 사용
  • KTable은 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용함
  • KTable로 데이터를 조회하면 메시지 키를 기준으로 가장 최신에 추가된 레코드 데이터가 출력
  • 동일한 메시지 키가 존재하면 업데이트

GlobalKTable

  • https://www.confluent.io/blog/kafka-streams-tables-part-3-event-processing-fundamentals/
  • GlobalKTable은 KTable과 동일하게 메시지 키 기준으로 묶어서 사용됨
    • KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용
    • GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용됨
  • KStream과 KTable의 데이터 조인 시 반드시 코파티셔닝(co-partitioning)되어 있어야 함
    • 코파티셔닝이란 조인을 하는 2개 데이터의 파티션 수가 동일하고 파티션 전략이 동일하도록 맞추는 것
    • 코파티셔닝이 된 경우 동일한 메시지 키를 가진 데이터가 동일한 태스크에 들어가는 것을 보장함
  • 코파티셔닝 되어있지 않은 2개의 토픽 조인을 위해 GlobalKTable를 사용할 수 있음
    • 각 태스크 마다 GlobalKTable로 정의된 모든 데이터를 저장하고 사용하기 때문에 작은 용량 데이터일 경우에만 사용하자.

코파티셔닝

  • 토픽들이 코파티셔닝을 보장할 수 없는 경우 GlobalKTable을 사용할 수 있음
    • 보장할 수 없는 경우KTable로 하려고 하면 TopologyException이 발생함
  • 코파티셔닝이 되어 있지 않으면 KStream 또는 KTable을 리파티셔닝하는 과정을 거쳐야 함

리파티셔닝

  • 새로운 토픽에 새로운 메시지 키를 가지도록 재배열함
  • 리파티셔닝을 거쳐야 코파티셔닝 되도록 할 수 있음 
    • 기존 데이터 중복 생성 뿐만아니라 파티션 재배열하기 위해 프로세싱하는 과정도 거쳐야 함
    • 예시
      • 먼저, 필요한 의존성을 pom.xml에 추가합니다:그런 다음, Kafka Streams 설정을 application.yml 또는 application.properties에 추가합니다:이제, Kafka Streams를 사용하여 groupBy를 통한 리파티셔닝 예제를 구현합니다:이 코드는 input-topic에서 메시지를 읽어, 각 메시지를 콤마(,)로 분리한 후 첫 번째 요소를 기준으로 그룹화합니다. 그런 다음 이 그룹별로 카운트를 수행하고, 결과를 output-topic으로 보냅니다.
      • 이 예제는 Kafka Streams와 Spring Boot를 사용한 간단한 리파티셔닝의 예시입니다. 실제 환경에서는 Kafka 클러스터 설정, 에러 처리, 메시지 포맷 등 추가적인 고려사항이 있을 수 있습니다.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class KafkaStreamsExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsExampleApplication.class, args);
    }

    @Bean
    public KafkaStreams kafkaStreams() {
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> sourceStream = builder.stream("input-topic");
        sourceStream
            .groupBy((key, value) -> value.split(",")[0]) // 키를 기준으로 그룹화
            .count()
            .toStream()
            .to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig());
        streams.start();

        return streams;
    }

    // Kafka Streams 설정을 로드하는 메소드
    private Properties getStreamsConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }
}

 


스트림즈DSL 주요 옵션

  • 필수옵션
    • bootstrap.servers
      • 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름을 작성 (1개 이상 설정)
        • 2개 이상 브로커 정보를 입력하면 일부 브로커 이슈가 발생하더라도 이슈가 없도록 설정 가능
    • application.id
      • 스트림즈 애플리케이션 구분용 아이디
      • 서로 다른 로직을 가진 스트림즈 애플리케이션들은 서로 다른 application.id값을 가져야 함
      • 이 아이디 값 기준으로 병렬 처리함
  • 선택 옵션
    • default.key.serde
      • 레코드 메시지 키 직렬화, 역직렬화 클래스 지정
        • 기본 값은 바이트 직렬화, 역직렬화 클래스임
    • defualt.value.serde
      • 레코드 메시지 값 직렬화, 역직렬화 클래스 지정
        • 기본 값은 바이트 직렬화, 역직렬화 클래스임
    • num.stream.threads
      • 스트림 프로세싱 실행 시 실행될 스레드 개수
        • 기본 값 1
    • state.dir
      • rocksDB 저장소가 위치할 디렉토리를 지정
        • rocksDB는 고성능 key-value DB로 상태기반 데이터 처리 시 로컬 저장소
        • 기본값 /tmp/kafka-streams

스트림즈DSL 기능들

stream(), to()

  • 특정 토픽의 데이터를 다른 토픽으로 전달
  • 예시는 책 127p(e-book 150p) 참고
  • 사용법
    • builder.stream(”스트림 처리할 토픽”)
    • kstream.to(”스트림 보낼 토픽”)
    • 이후 build 후 start()

filter()

  • 특정 조건에 맞는 데이터를 골라 낼 때
  • 예시는 책 132p(e-book 155p) 참고
  • 사용법
    • builder.stream(”스트림 처리할 토픽”)
    • kstream.filter((key, value) → value.length > 5)
    • kstream.to(”스트림 보낼 토픽”)
    • 이후 build 후 start()
  • fluent interface 사용한 경우
    • kstream.filter((key, value) → value.length() > 5).to(”스트림 보낼 토픽”)

join() - KTable & KStream

  • 코파티셔닝 되었고, 메시지 키 기준 조인
  • 예시는 책 137p(e-book 160p) 참고
  • 코파티셔닝 확인 필요 😉
  • 토픽 3개 생성
    • KTable 용
    • KStream 용
    • to되고 나서 들어갈 토픽
  • 사용법
    • kTable = builder.table(”KTable용 토픽”)
    • builder.stream(”KStream용 토픽”)
    • kstream.join(kTable, (kStream, kTable) → kStream + “ test “ + kTable)
    • kstream.to(”join된 데이터 들어갈 토픽”)
    • kstream.start()
  • kTable의 주소가 바뀐 경우에는 최신 값만 보기 때문에 바뀐 값의 레코드가 join됨

join() - GlobalKTable & KStream

  • 코파티셔닝 된 토픽 조인
  • 토픽 3개 생성
    • GlobalKTable 용
    • KStream 용
    • to되고 나서 들어갈 토픽
  • 사용법
    • globalTable = builder.table(”GlobalKTable용 토픽”)
    • builder.stream(”KStream용 토픽”)
    • kstream.join(globalTable,
      (kStreamKey, kStreamValue) → kTableKey,
      (kStream, globalTable) → kStream + “ test “ + globalTable)
    • kstream.to(”join된 데이터 들어갈 토픽”)
    • kstream.start()

 


 

3.5.2. 프로세서 API

  • 스트림즈 DSL보다 투박한 코드를 가지지만 토폴로지 기준으로 데이터를 처리한다는 관점에서는 동일한 역할을 함
  • 예시 책 149p(e-book 172p) 참고
  • 사용법 (참고 : https://velog.io/@jwpark06/Kafka-스트림즈-구현하기)
    • 기준을 충족한 경우에 다음 토폴로지(다음 프로세서)로 넘어간다.
    • commit을 명시적으로 호출하여 명시적으로 데이터가 처리되었음을 선언한다.
    • addProcessor에서 filterProcessor를 추가

 


 

영상 정리

1. Kafka를 활용한 이벤트 기반 아키텍처 구축

  • 영상 링크
  • 전체 요약
    • Kafka 의 기술적 활용 이야기보다는 이벤트 기반을 어떻게 적용했는지 얘기에 초점이 좀 더 맞춰져 있었던 것 같음
    • 우리가 여기까지 가기엔 거리가 멀기에 볼 때 집중력이 좀 떨어졌음
    • 하지만 일부 도움되는 내용들이 있다고 생각하여 정리
    • 그래서 카프카보다는 eda 쪽에 좀 더 내용이 많음

내용

  • 배민 딜리버리 서비스팀과 배차 시스템팀은 왜 이벤트 기반을 선택하였는가?
    • 역할이 점점 다양해지고 커져가는 배달의 기능으로 인해
      복잡도를 낮출 수 있는 방법으로 이벤트 아키텍처를 선택
      • 기존에는 배달만 잘하면 됐었음
      • 그러나 알림 & 배달 시간 & 통계 & 쿠폰과 같은 복잡도가 증가함
      • 이러한 기능들이 모두 강한 일관성을 필요로 했던 것은 아님
  • 이벤트 아키텍처 적용 후
    • 순서 보장 이슈가 있었음
      • 주문 취소 이후 주문 생성 이벤트가 들어오는 경우
  • 이벤트 파이프라인
    • 순서 보장을 위해 파이프라인을 설정
    • 파이프라인으로 kafka를 선택한 이유
      • 순서보장
      • 고성능 고가용성
      • 통합도구
  • kafka 이벤트 파이프라인 적용 후 이슈들
    • 프로듀서 발행 순서 변경 이슈
    • 이벤트 발행 실패 또는 재시도하면서 순서 변경됨
      • 브로커 ebs 볼륨 이슈
      • 주키퍼 통신 이슈
      • 네트워크 이슈
        • 이러한 이슈들은 시스템 이슈로 확산되었음
    • transactional outbox 패턴 도입
      • 데이터베이스 시스템 트랜잭션을 활용하여 outbox 테이블에 적재하고 message relay가 이벤트 발행을 보장해줌
        1. 발행해야 할 이벤트를 도메인 트랜잭션과 묶어 저장
        2. 메시지 릴레이가 outbox 테이블에 저장된 데이터를 순서대로 읽어서 발행
      • message relay는 debezium을 선택함 (cdc같은 것)
        • debezium 카프카 connect를 통해 등록 가능
        • binary log를 통한 순서 보장 및 offset을 활용한 발행을 보장함
        • 파티셔닝을 통한 처리량 증대 가능
  • 활용 사례
    • 이벤트 스트림을 활용해 CQRS 적용함
    • 이벤트 스트림 기반으로 쿼리 모델 구축하여 해결
    • 집계 데이터 활용해 스트림즈 구현함

느낀점

  • Kafka를 이용할 때 이벤트 기반 아키텍처를 많이 이용하는 것 같다.
    • 아무래도 이벤트 기반으로 의존성의 경계를 세우면 많은 부분들을 깔끔하게 처리할 수 있어서 그런가? 생각함
  • 매번 순서 보장을 생각할 때 발행 순서 오류보다는 소비 순서 오류를 더 많이 생각했었는데 retry나 네트워크 오류로 인한 오류에는 발행 순서 보장을 위한 처리도 필요하다는 걸 깨달음
    • debezium, transactional outbox pattern

2. Kafka Streams를 활용한 이벤트 스트림 처리 삽질기

  • 영상 링크
  • 전체 요약
    • 카프카 전반에 대한 전략은 없고 사전 지식으로 이미 챙기고 있어서 딱 카프카 스트림즈를 썼을 때의 내용만 있어서 아쉬웟다.
    • 그럼에도 불구하고 스트림즈의 도입을 고민할 때 고려할 요소에 대한 이야기가 있어서 정리할만하다고 생각함
    • 배치로 할 건지 스트림 처리로 할 건지의 기로에 서 있을 때 보면 좋은 영상
    • 스트림즈 도입 시에는 필요한지 재검토와 리밸런싱과 lag 이슈 & 토픽 설정 이슈를 잘 챙겨보자.

내용

  • 도입 배경
    • 기존에는 배치로 처리함
      • 이벤트를 db에 저장하고 배치로 처리하면서 조회하고 가공하여 분석 저장소에 저장하는 형태였음
      • 그러나..
        • 일 수백만 건 주문 처리가 필요해지자 대규모 파생 데이터가 발생함
        • 요구사항 증가로 인해 처리 로직이 복잡해짐
        • 주문 수 편차(피크타임 or 월드컵 이벤트)가 많이 발생하여 배치 처리 시간 예측이 어려워짐
        • 배치 주기에 따라 느린 이상 탐지가 발생
      • → 대량 데이터를 실시간으로 처리해보자!
  • kafka 매커니즘 활용
    • 각 행정동 배달 수행에 대한 정보를 스냅샷 스트림으로 생성 후 분산처리 & 백업 저장소에 저장

나왔던 이슈 및 해결방안

  • 스트림 프로세서만 잘 구성한다고 되는 것은 아님
  • 잦은 메시지 발행 실패
    • 스트림 처리 시 재시도 n번하고도 실패하는 경우 있었음
      • 각종 설정 값 최적화 및 토픽, 파티션 검토를 통해 해결
  • 과도한 토픽 파티션 수 이슈
    • 파티션 수는 한번 늘리면 줄일 수 없음!
      • 적은 수로 시작 하고 늘려가는 것이 좋음
      • 파티션 조정이 필요할 경우 중간 토픽을 만들자.
        • 이미 거대한 파티션이 구성되고 단기간에 조치하기 어려운 경우 중간 토픽하는 것이 좋음
          • 원본 토픽: original_topic, 현재 파티션 수 10개
          • 목표 파티션 수: 5개
          • 중간 토픽: intermediate_topic
          • 단계별 과정
          1. 중간 토픽 생성
            • intermediate_topic이라는 새 토픽을 생성하고, 목표 파티션 수인 5개로 설정합니다.
            • 생성 명령 예시: kafka-topics --create --topic intermediate_topic --partitions 5 --replication-factor 1 --zookeeper localhost:2181
          2. 데이터 이동 (원본 → 중간)
            • Kafka 컨슈머를 사용하여 original_topic에서 데이터를 읽습니다.
            • Kafka 프로듀서를 사용하여 읽은 데이터를 intermediate_topic에 씁니다.
            • 이 과정은 Kafka Streams, Kafka Connect 또는 사용자 정의 스크립트를 통해 수행할 수 있습니다.
          3. 원본 토픽 삭제 및 재생성
            • original_topic을 삭제합니다.
            • 삭제 명령 예시: kafka-topics --delete --topic original_topic --zookeeper localhost:2181
            • original_topic을 다시 생성하지만, 이번에는 5개의 파티션으로 설정합니다.
            • 재생성 명령 예시: kafka-topics --create --topic original_topic --partitions 5 --replication-factor 1 --zookeeper localhost:2181
          4. 데이터 복원 (중간 → 원본)
            • intermediate_topic에서 데이터를 읽어 original_topic에 다시 씁니다.
            • 이 과정도 Kafka 컨슈머와 프로듀서 API를 통해 수행합니다.
          5. 중간 토픽 삭제
            • intermediate_topic이 더 이상 필요하지 않으므로 삭제합니다.
            • 삭제 명령 예시: kafka-topics --delete --topic intermediate_topic --zookeeper localhost:2181
            주의 사항
          • 이 과정은 실시간 데이터 스트리밍 환경에서 데이터 손실, 메시지 순서 보장, 클러스터 부하 관리에 특별한 주의를 요합니다.
          • 재생성된 original_topic의 파티션 수가 달라진 것에 따라 컨슈머 그룹의 파티션 재할당이 필요할 수 있습니다.
          • 이 방법은 트래픽이 낮은 시간에 수행하는 것이 좋습니다.
          • 이 예시는 Kafka를 사용하는 환경과 설정에 따라 다를 수 있으며, 실제 운영 환경에서는 이러한 작업을 수행하기 전에 충분한 테스트와 계획이 필요합니다.
        • Apache Kafka에서 파티션 수를 조정하기 위해 중간 토픽을 사용하는 과정의 예시를 들어 설명해 드리겠습니다. 이 예시에서는 Kafka의 파티션 수를 줄이는 방법을 단계별로 보여주겠습니다. 가정은 다음과 같습니다:
  • 토픽 단위 이슈
    • 최초 슈퍼 토픽이어서 하나의 토픽에 모든 데이터를 발행했음
    • 여기서는 도메인 단위로 구분하고 효과를 얻음
  • 리밸런싱과 LAG 이슈
    • 과도한 LAG 이슈
      • 임계치를 정하고 넘지 않도록 관리하였음
      • 서버 증설로는 한계가 있음
      • 인스턴스의 쓰레드 수를 늘리는 것을 고려함
      • 권장 : 파티션 수 = 인스턴스 수 x 쓰레드 수
    • 리밸런싱 이슈
      • 리밸런싱은 매우 고비용 작업임
        • 아예 안 할 순 없지만 최대한 줄여야 함
        • 전략이 eager인 경우 모든 파티션 할당을 끊고 재조정하기 때문에 STW가 발생
        • 파티션이나 컨슈머 멤버 수 많은지 체크
  • 처리 지연으로 인한 컨슈머 그룹 탈락
    • 컨슈머 처리 속도에 따른 적절한 옵션값 설정
      • max.poll.intervals.ms
      • max.poll.records
      • n초 안에 m개 레코드 처리를 기대하는 값을 잘 설정하자
    • consumer gorup leave 로그를 적극적으로 모니터링
  • 디스크 알람 이슈
    • 카프카 = 디스크 기반
    • 카프카 스트림즈 상태 저장소
      • 의 내부토픽 retention, delete policy 잘 관리하는 것이 필요
      • delete or compact
  • 상태 저장소 잘 쓰기
    • 분산 처리 할 수 있는 걸 단일 DB로 모으는 행위를 하였기에 이슈가 됨
    • 고집하지도 말자 로컬 인스턴스 디스크/인메모리에 저장됨 → 임시저장소에 적합한 방식임
  • 배치처리로도 충분하다면 스트림즈를 통한 해결도 고집하지 말자. (적정 기술)
  • 리파티셔닝 남용 금지
    • 키가 바뀌면 파티션도 모두 이동하는 작업이지만 네트워크/디스크 io 부하 발생이 가능함
      • 코드로 짜기엔 편리하지만 비용이 큼

모니터링 및 도구 이용

  • burrow 이용함
  • monitoring with JMX 이용
  • 스트림 프로세서 메트릭화
    • org.apache.kafka.streams.KafkaStreams.State
  • 카프카 매니저
  • 스트림즈 리셋 (app-reset-tool)

느낀점

  • 배치처리로 충분하다면 스트림즈를 굳이 고민할 필요는 없다.
    • 스트림즈에 대한 여러 새로운 이슈들이 나오기 때문
  • 스트림즈를 도입한다면 메시지 키를 어떻게 적용할 지 잘 알아봐야 할 것 같다.

 


 

3. 신뢰성 있는 카프카 애플리케이션을 만드는 3가지 방법 (최원영 Cory)

  • 영상 링크
  • 전체 요약
    • 신뢰성 있는 카프카를 위해 어떤 전략을 가져갈 것인지 신중하게 판단하는 것이 중요
      • exactly-once? at-least-once? at-most-once
    • 데이터 파이프라인을 설정하는 데이터 플랫폼 쪽이라 실제 내용 쪽이 와닿지는 않았지만 스트림즈를 본격적으로 사용한다면 충분히 나올 이슈들인 것 같아서 좋았다.
    • 전체 내용이 카프카 클러스터가 단일 일때를 기준으로 하고 있기 때문에(스트림즈 제약사항) 이런 부분 고려도 나와있는 것이 좋았다.
    • 스트림즈가 우리가 당장에는 필요하지는 않아보이지만 추후에 고려할 필요가 있을 때 참고하면 좋을 것 같다!

내용

  • 간략한 구조
    • 데이터가 전달될 때 Acknowledgement, Offset으로 확인함
  1. 프로듀서 영역에서의 신뢰성 만들기
    • 실제로는 적재가 되었지만 acknowledgement가 손실된 경우 중복 적재될 수 있음
    • 네트워크가 불안정한 경우 2~3번 이상 중복 데이터가 적재될 수 있음
    • 멱등성 프로듀서를 제공하고 있음
      • enable.imdempotence(아이뎀포턴스) 옵션을 제공함 기본값은 false이지만 3.0 이후부터는 true로 설정됨
        • 각 레코드에 대해 PID와 SEQ를 확인하고 중복 적재 요청이면 적재하지 않는다.
        • 이 설정 true한 경우 acks=all로 자동 설정됨
          • 리더와 팔로우 파티션에 모두 데이터가 적재되는 것을 확인하는 것
        • 잘 작동을 하기 위해서 Async 프로듀서로 콜백으로 확인하도록 만들어야 최종적으로 원하는 성능을 가질 수 있음
      • enable.imdempotence 인 경우
  2. 토픽 to 토픽 메시지 전달
    • 컨슘 후에 2군데로 프로듀싱 할 때 장애 발생
      • 이런 경우 중복 발생 가능성이 있음
      • 왜냐면 b 토픽에는 갔었으니까
      • 이런 경우 방법 1개 밖에 없음 전체 실패 or 전체 성공하도록 하는 것
    • 트랜잭션 컨슈머 + 프로듀서 사용
    • 프로듀서가 send 요청과 함께 컨슈머에 오프셋 요청을 함
    • 트랜잭션이 완료가 되지 않는다면 가져가지 않는다.
      • 다음 가져가지 않는 것에 대해 반드시 isolation.level= read_committed로 해줘야함
      • 커밋 된 것만 보겠다.
    • 이런 트랜잭션 경우에 속도 고민?
      • 테스트 해본 결과 프로듀서는 3% 낮은 성능
        • 컨슈머는 일반 컨슈머와 다르지 않음
          • 그러나 committed이 완료된 데이터만 처리하기 때문에 e2e 완료 시각이 증가할 가능성이 있다.
  3. 컨슈머의 중복 적재 방지
    • 데이터 적재와 커밋을 동시에 수행해야 하는 경우 토픽to토픽 트랜잭션을 묶는 것은 어려울 수 있음
      • 묶으면 적재 방지 되겠지만
      • 적재해야하는 경우 묶는 것 자체가 좀 어려울 수 있음
    1. 유니크 키를 활용한 멱등성 컨슈머
      • 유니크 키 지원하는 데이터 베이스를 이용할 때 유용
        • ex. oracle, mysql 은 unique key를 제공함
      • 유니크 키 중복을 막을 수 있음
    2. upsert 활용한 멱등성 컨슈머
      • 중간 결과 값을 insert 형태로 적재하고 최종 결과값을 update하는 형태로 할 수 있음
    3. Write-ahead log를 활용한 컨슈머
      • 원자 단위로 묶는 것
      • 적재하고 어디까지 적재했는지 기록하는 것
        • 체크하는 로직 필요
      • 레코드의 오프셋 관리 로직 & 체크하는 로직이 필요해져서 복잡해질 수 있음

정리

  • 마무리
    • 자바 아프치 카프카 기준 얘기였음
    • 메시지 전달 신뢰성 테스트가 실제로 수행하기 까다로움
      • 장애상황을 동일하게 발생시키기 까다로움
    • 단일 카프카 클러스터에서 대응 가능
      • 복수라면 다른 관점 고민 필요
  • 어느 정도 전달 신뢰성을 가질 것인가? 정말 필요한가? 장애로 판단하고 넘어갈 수 잇는가? 를 고민해야 한다.
    • exactly, atleast, atmost 중에 어떤 것을 선택했을 때 우리 비즈니스에 맞는 지
    • 일부분만 충족해도 된다면 말했던 옵션들을 모두 적용할 필요가 없음

느낀점

  • 카프카를 메인으로 이용한다고 할 때 어느 정도 전달 신뢰성을 가질 것인지 의사결정을 하는 것이 중요하구나 깨달았음
  • 멱등성을 보장하여 전송하는 것이 생각보다 되게 중요하구나 깨달았음
    • 위에서 했던 세미나 까지 합치면 그에 따른 순서 보장도 중요
    • 보장하기 위해서는 생각보다 많은 점들을 고려해야 했음
      • idempotence producer, transaction consumer & producer, key managing

 


 

그 외 볼만한 것들

 


 

느낀 점

  • Kafka를 이용할 때 이벤트 기반 아키텍처를 많이 이용하는 것 같다.
    • 아무래도 이벤트 기반으로 의존성의 경계를 세우면 많은 부분들을 깔끔하게 처리할 수 있어서 그런가? 생각함
  • 매번 순서 보장을 생각할 때 발행 순서 오류보다는 소비 순서 오류를 더 많이 생각했었는데 retry나 네트워크 오류로 인한 오류에는 발행 순서 보장을 위한 처리도 필요하다는 걸 깨달음
    • debezium, transactional outbox pattern
  • 배치처리로 충분하다면 스트림즈를 굳이 고민할 필요는 없다.
    • 스트림즈에 대한 여러 새로운 이슈들이 나오기 때문
  • 스트림즈를 도입한다면 메시지 키를 어떻게 적용할 지 잘 알아봐야 할 것 같다.
  • 카프카를 메인으로 이용한다고 할 때 어느 정도 전달 신뢰성을 가질 것인지 의사결정을 하는 것이 중요하구나 깨달았음
  • 멱등성을 보장하여 전송하는 것이 생각보다 되게 중요하구나 깨달았음
    • 위에서 했던 세미나 까지 합치면 그에 따른 순서 보장도 중요
    • 이를 보장하기 위해서는 생각보다 많은 점들을 고려해야 했음
      • idempotence producer, transaction consumer & producer, key managing

'kafka > kafka 스터디' 카테고리의 다른 글

[Kafka] 아파치 카프카 애플리케이션 1장  (0) 2023.12.18
Comments