imhamburger 님의 블로그

아파치 카프카 (Apache Kafka) 그래서 어떻게 메세지를 처리하는가? 본문

카테고리 없음

아파치 카프카 (Apache Kafka) 그래서 어떻게 메세지를 처리하는가?

imhamburger 2025. 2. 3. 18:31

Apache Kafka는 분산 메시징 시스템으로, 데이터 스트리밍을 위한 퍼브/섭(pub/sub) 아키텍처를 제공한다.

 

 

퍼브/섭(pub/sub) 아키텍처란?

아파치 카프카의 주요 구성 요소는 다음과 같다.

  • Producer: 데이터를 Kafka로 전송하는 역할
  • Broker: 메시지를 저장하고 관리하는 서버 노드
  • Consumer: Kafka에서 메시지를 가져가 처리하는 역할
  • Topic: 메시지를 구분하여 저장하는 논리적 채널
  • Zookeeper: 클러스터 메타데이터 관리와 리더 선출을 담당

이 때 퍼브/섭(pub/sub) 아키텍처란 Producer가 특정 Topic에 메시지를 발행(Publish)하고, Consumer가 이를 구독(Subscribe)하는 방식이다.

참고로 Redis 데이터베이스도 퍼브/섭(pub/sub) 모델이다!!

 

카프카를 이해하기 위해선 카프카 작동원리를 이해해야 한다.

 

카프카 그래서 어떻게 메세지를 처리하는가?

간단하게 표현하면 프로듀서가 메세지를 전송하고 이를 브로커가 받아 처리하고 같은 Topic을 구독하고 있는 컨슈머에게 보낸다.

더 자세히 들어가자면.....

 

예를들어, Topic A가 있고 여러 메세지가 프로듀서에 들어왔을 때, 프로듀서는 차례대로 메세지를 브로커에 보낸다.

브로커는 이 메세지들을 Partition으로 나누어 담는다. 아래 그림처럼.

 

그럼 컨슈머에 전달될 때는?

당연히 먼저 처리완료된 메세지가 먼저 컨슈머에 전달된다.

 

 

그럼 다른 Message Queue와는 다르게 순서보장이 안되네?

맞다.

카프카는 프로듀서가 메세지를 순서대로 보내기는 하지만, 브로커 안에서 여러 파티션을 둘 경우 순서보장이 안된다.

카프카는 분산 메세징 시스템으로 완전한 선입선출이 아니다.

대신 단일 파티션과 단일 컨슈머로 구성하면 순서보장을 할 수 있다. (그럼 카프카의 기능?을 100% 쓰지 못하는 것...)

 

 

분산처리의 장점

속도가 빠르다. 많은 메세지가 들어와도 분산처리할 수 있어 직렬처리보다 속도에서 빠르다는거!!

 

 

 

여기서 더 살펴봐야할 건 Topic 별로 여러 Partition으로 나눠 보내지는데 그렇다는 얘기는 Partition 안에 메세지들은 한 개의 토픽에서 발생한거라는거다. 

절대로 Partition안에 다른 토픽의 메세지들이 들어가있지 않다.

 

 

그럼 브로커가 장애가 날 경우는 메세지 다 사라지는거 아니야?

그걸 방지하기 위해 여러 브로커를 둔다.

브로커가 3개가 있다고 가정했을 때, 파티션별로 리더가 있고 복제본들이 존재한다.

Leader-Follower 구조로 리더 Partition이 쓰기/읽기를 처리한다.

 

그럼 리더가 아닌 파티션들은?

리더가 아닌 파티션들은 리더 안의 메세지들을 복제하고 있다. (복제본들도 바쁘다구...)

 

그니까, 리더 파티션에 문제가 생길경우, Zookeeper가 복제본들 중에 다시 리더를 선출한다. 

장애가 발생한 리더 브로커가 복구되면, 해당 브로커는 최신 데이터를 복제본에서 받아와서 최신 상태를 유지하게 된다.
이 경우, 새로운 리더가 이미 선출된 상태이므로, Kafka는 장애가 발생한 리더를 다시 리더로 복구하지는 않는다.

(그니까 리더 계속하고싶으면 에러나지마라..)

 

 

아 그럼 리더가 선출되는 동안에 잠깐의 찰나에도 데이터 유실이 나지 않을까?

카프카는 바보가 아니라서 그걸 막는 여러 메커니즘을 제공하지.

 

1. acks 설정

acks 설정은 프로듀서가 데이터를 전송할 때, 몇 개의 브로커가 데이터를 확인해야 하는지를 설정하는 파라미터이다.

  • acks=0 → 프로듀서는 브로커의 응답 없이 메시지를 전송한다. 이 경우 데이터 유실이 발생할 수 있다.
  • acks=1 → 리더 브로커가 메시지를 받으면 응답을 보낸다. 이 경우 리더 브로커가 장애를 겪을 경우 데이터가 유실될 수 있다.
  • acks=all (또는 acks=-1) → 모든 복제본이 메시지를 확인한 후에만 응답을 보낸다. 이 경우 리더가 장애를 겪더라도 복제본에서 데이터를 보유하고 있기 때문에 데이터 유실이 거의 없고 리더 선출 과정에서 일시적인 지연만 발생한다.
  • 그렇지만, 적절히 써야하는게 성능저하를 가져오기 때문..

2. min.insync.replicas 설정
min.insync.replicas는 리더가 데이터를 받기 전에, 최소한 몇 개의 복제본이 데이터를 동기화해야 하는지 설정하는 파라미터이다.
예를 들어, min.insync.replicas=2로 설정하면, 리더는 최소한 두 개의 복제본이 데이터와 동기화되었을 때만 데이터를 받는다. 이를 통해 데이터의 일관성과 유실 방지를 보장한다.

 

 

min.insync.replicas 설정은 브로커가 1개여도돼?

브로커가 1개일 경우, min.insync.replicas를 1 이상으로 설정할 의미가 없지.

사실상 리더가 곧 복제본이기 때문에, 이 설정을 효과적으로 활용하려면 브로커가 최소 2개 이상 있어야만 한다구우우우.

이것과 관련해서 이전글도 참고하면 좋아요~

 

 

브로커 메세지 유실은 해결했고... 이번엔 프로듀서에 문제가 있어서 프로듀서가 메세지를 중복으로 받을 수 있지 않을까?

프로듀서가 중복으로 받으면 아니되지...컨슈머가 메세지를 DB에 저장시키는 것이라면 여러모로 문제가 될 수 있다.

그걸 방지하기 위해 EOS를 활성화해야한다.

EOS (Exactly Once Semantics)는 메시지가 한 번만 전송되고 처리되는 것을 보장하는 전송 보증이다.

다시 말해, 송신자와 수신자 사이에 데이터의 중복 또는 손실 없이 데이터가 한 번만 전송되는 것을 의미한다.

 

그래서 이걸 카프카에서 어떻게 구현하는가?

프로듀서에서 transactional.id를 설정하고 enable.idempotence=true를 활성화하면 된다.

from confluent_kafka import Producer

# ✅ Kafka 프로듀서 설정
producer_conf = {
    "bootstrap.servers": "localhost:9092",
    "transactional.id": "my-transactional-id",  # 트랜잭션 ID (고유해야 함)
    "enable.idempotence": True,  # 멱등성 활성화 (중복 방지)
    "acks": "all",  # 모든 복제본이 메시지를 받았을 때만 성공 처리
}

producer = Producer(producer_conf)

# ✅ 트랜잭션 초기화
producer.init_transactions()

try:
    # ✅ 트랜잭션 시작
    producer.begin_transaction()

    # ✅ 메시지 전송 (트랜잭션 범위 내)
    producer.produce("my-topic", key="key1", value="Hello Kafka!")
    producer.produce("my-topic", key="key2", value="This is EOS Test!")

    # ✅ 트랜잭션 커밋 (정상 처리 완료)
    producer.commit_transaction()
    print("✅ 트랜잭션 커밋 완료!")

except Exception as e:
    # ❌ 장애 발생 시 트랜잭션 롤백
    producer.abort_transaction()
    print(f"❌ 트랜잭션 롤백됨: {e}")

 

컨슈머에서는?

from confluent_kafka import Consumer, KafkaException

# ✅ Kafka 컨슈머 설정 (트랜잭션 지원)
consumer_conf = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "my-consumer-group",
    "auto.offset.reset": "earliest",  # 가장 오래된 메시지부터 읽음
    "isolation.level": "read_committed",  # ✅ 트랜잭션이 완료된 메시지만 읽음
}

consumer = Consumer(consumer_conf)
consumer.subscribe(["my-topic"])

try:
    while True:
        msg = consumer.poll(1.0)  # 1초 대기 후 메시지 확인
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        print(f"📥 Received: {msg.value().decode('utf-8')} (key={msg.key()})")

except KeyboardInterrupt:
    print("❌ 종료됨")
finally:
    consumer.close()
  • 프로듀서는 transactional.id를 설정하고, 트랜잭션을 시작(begin_transaction)하고, 메시지를 전송한 후 커밋(commit_transaction)한다.
  • 컨슈머는 isolation.level=read_committed를 설정하여 완료된 트랜잭션의 메시지만 소비한다.
  • 장애 발생 시 자동 롤백이 수행되어 중복 또는 데이터 유실을 방지할 수 있다.

 

카프카에서 트랜잭션이란?

Kafka에서 트랜잭션(Transaction) 이란 하나 이상의 메시지를 원자적으로(Atomic) 처리하는 기능을 의미한다.
즉, 모든 메시지가 성공적으로 처리되거나, 실패하면 전부 무효화(Rollback)되는 방식.

 

 

오 그럼 데이터중복 문제는 다 해결된건가??

놉.

 

컨슈머에서도 중복문제가 나타날 수 있지...

이걸 먼저 알기 전에 offset과 commit 을 이해해야 한다.

 

오프셋은 "메세지의 위치 번호" 이다.

커밋은 "영구적으로 저장" 한다는 의미이다.

 

Consumer는 메시지를 읽은 위치(Offset)를 저장해야 한다. 그래야 나중에 처리해야할 메세지를 바로바로 알 수 있으니까.
기본적으로 Kafka는 __consumer_offsets라는 내부 Topic에 Offset을 저장한다.

 

 

카프카 컨슈머는 기본적으로 "자동커밋"으로 설정되어 있다.

그니까 컨슈머가 브로커한테 주기적으로 메세지를 poll 한다.

여기서 주기적이라는건 설정값에 poll.interval.ms 를 확인하면 보통 5초 설정되어있는데 이건 내가 바꿀 수 있다.

 

어쨋든, 다시 돌아와서 컨슈머가 5초 간격으로 메세지를 poll해서 자동 commit을 한다고 가정해보자.

그리고 5초가 지나기 전 컨슈머가 한 개 더 추가된다면???

 

그럼 안에서 리밸런싱이 일어나면서.....

 

원래 컨슈머에서 5초가 안되어 커밋되지 않은 7, 8 메세지가 추가된 컨슈머에도 들어간다...

컨슈머들끼리는 커밋되지 않은 메세지를 가져오니까. 그러니 메세지 중복이 발생한다.

 

 

아 그럼 커밋을 수동으로 바꾸면 되는거 아니야?

수동커밋은 메세지를 가져온 것으로 간주되는 시점을 자유롭게 조정할 수 있다는 건데...

 

거기에도 중복문제가 발생할 수 있다.

예를들어, 컨슈머가 DB에 메세지를 보내는 것을 성공했고 DB에 저장된 후 commit하려는데 에러가 난다면?

컨슈머는 commit이 안된거를 다시 처리해서 DB에는 저장되어 있지만 commit이 안되어 다시 저장시키려고 할 것이다. 그러니 이것도 중복문제이다.

 

이를 해결하기 위해 여러가지 방법이 있겠지만...

Kafka의 트랜잭션 프로듀서를 사용하면 DB 트랜잭션과 Kafka 오프셋 커밋을 하나의 트랜잭션으로 묶어 해결할 수 있다.

from confluent_kafka import Producer, Consumer

# ✅ Kafka 트랜잭션 Producer 설정
producer_conf = {
    "bootstrap.servers": "localhost:9092",
    "transactional.id": "transactional-producer-1",
    "enable.idempotence": True
}

producer = Producer(producer_conf)
producer.init_transactions()

consumer_conf = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "consumer-group-1",
    "auto.offset.reset": "earliest",
    "isolation.level": "read_committed"
}

consumer = Consumer(consumer_conf)
consumer.subscribe(["my-topic"])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue

        print(f"📥 메시지 처리 중: {msg.value().decode('utf-8')}")

        try:
            producer.begin_transaction()  # ✅ 트랜잭션 시작
            db.begin_transaction()  # ✅ DB 트랜잭션 시작

            # ✅ 메시지를 DB에 저장
            db.insert("INSERT INTO my_table (data) VALUES (?)", (msg.value(),))

            # ✅ DB 커밋
            db.commit()

            # ✅ Kafka 오프셋을 트랜잭션 내에서 커밋
            producer.send_offsets_to_transaction(
                consumer.position(consumer.assignment()),
                consumer_conf["group.id"]
            )

            # ✅ Kafka 트랜잭션 커밋
            producer.commit_transaction()

        except Exception as e:
            db.rollback()  # ❌ DB 롤백
            producer.abort_transaction()  # ❌ Kafka 트랜잭션 롤백
            print(f"❌ 오류 발생, 롤백 실행: {e}")

except KeyboardInterrupt:
    print("❌ 종료됨")
finally:
    consumer.close()

 

 

그럼 컨슈머에서 메세지 유실은 어떻게 막을 수 있어? 메세지 처리 실패가 난다면?

컨슈머에 auto.offset.reset 옵션이 있는데 이를 earliest 로 하면, 컨슈머가 메세지를 가장 오래된 것부터 읽는거다.

처음부터 읽으면서 커밋되지 않은 메세지들을 처리하여 유실을 막을 수는 있지만... 부하가 많이 들어간다. (아무래도 처음부터 읽다보니...)

 

그렇다고 latest 로 설정하면 메세지 처리 실패 시 유실될 수 있다.

해결방법은 Dead Letter Queue를 구현하여 유실을 방지하면 된다.

Dead Letter Queue(DLQ) 는 메시지 처리 중 오류가 발생한 메시지를 별도의 큐(토픽)로 보내어 저장하는 "실패한 메시지의 격리 저장소" 이다.

 

Kafka에서 메시지를 소비할 때 오류가 발생하면 무한 재시도 문제가 발생할 수 있다.

📌 일반적인 실패 시나리오

  • 메시지 포맷 오류 → JSON 파싱 실패, 필드 누락 등.
  • 비즈니스 로직 오류 → 데이터베이스 제약 조건 위반, 잘못된 값 처리 등.
  • 외부 서비스 오류 → API 호출 실패, 네트워크 타임아웃 등.
  • 재시도해도 해결되지 않는 문제 → 특정 메시지가 지속적으로 실패.
  • DLQ를 사용하면 이런 문제를 가진 메시지를 따로 저장하여 분석하고, 필요하면 재처리할 수 있다.

사실...Kafka에서는 DLQ를 직접 제공하지 않는다..

그렇지만!! 다음과 같은 방법으로 구현할 수 있다.

 

구현방법은 다음과 같다.

1. DLQ를 위한 별도 Kafka 토픽 생성

2. DLQ 역할을 할 dead-letter-topic을 별도로 생성
3. 실패한 메시지는 이 토픽으로 전송하여 저장

 

 

DLQ의 장점?

  • 실패한 메시지를 격리하여 무한 재시도 방지
  • 실패 원인을 분석하여 근본적인 문제 해결 가능
  • 데이터 유실 없이 안전한 메시지 처리 가능
  • 필요 시 DLQ 메시지를 다시 원래 토픽으로 보내 재처리 가능