imhamburger 님의 블로그

카프카(Kafka) - 파티션 자동할당, 수동할당 본문

끄적끄적

카프카(Kafka) - 파티션 자동할당, 수동할당

imhamburger 2024. 8. 23. 09:35

카프카에서 컨슈머에게 파티션을 할당하는 방법에는 2가지가 있다. 자동할당과 수동할당이 있다.

 

먼저, 자동할당을 살펴보자.

 

코드예시

from kafka import KafkaConsumer
from json import loads

OFFSET_FILE = 'consumer_offset.txt'

def save_offset(offset):
    with open(OFFSET_FILE, 'w') as f:
        f.write(str(offset))

def read_offset():
    if os.path.exists(OFFSET_FILE):
        with open(OFFSET_FILE, 'r') as f:
            return int(f.read().strip())

    return None

saved_offset = read_offset()

# KafkaConsumer 생성
consumer = KafkaConsumer(
    "topic2",  # 구독할 토픽 이름
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: loads(x.decode('utf-8')),
    consumer_timeout_ms=5000,
    group_id="fbi",  # 컨슈머 그룹 ID
    auto_offset_reset='earliest' if saved_offset is None else 'none',  # 오프셋 초기화 설정
    enable_auto_commit=False,  # 오프셋 자동 커밋 비활성화
)


# 메시지 수신 및 처리
for message in consumer:
    print(f"Received message: {message.value}")

    # 수동 오프셋 커밋 예시 (자동 커밋을 사용하지 않을 때)
    consumer.commit()

 

 

수동 할당을 피하고 자동 할당을 사용하려면,

assign, seek, TopicPartition 등을 사용하지 않고, KafkaConsumer를 설정할 때, 토픽 이름을 구독하도록 설정하면 된다.

 

Kafka는 컨슈머 그룹을 사용하여 파티션을 자동으로 할당하고, auto_offset_reset을 사용하여 오프셋 초기 위치를 설정하면,

Kafka가 자동으로 오프셋을 관리해준다.

 

 

auto_offset_reset 설정

  • earliest : 마지막 커밋 기록이 없을 경우, 가장 예전(낮은 번호 오프셋) 레코드부터 처리
  • latest : 마지막 커밋 기록이 없을 경우, 가장 최근(높은 번호 오프셋) 레코드부터 처리
  • none : 구독하고자 하는 topic의 offset 정보가 없으면 exception을 발생(OffsetOutOfRangeException)

auto_offset_reset 설정은 선택 옵션으로서 입력을 하지 않으면 자동으로 latest로 설정된다. 그렇지만 데이터 유실이 발생할 수 있으니 earliest를 쓰는 것이 더 나은 선택일 수 있다.

 

참고한 블로그

 

*주의사항

컨슈머에게 topic을 설정할 때 topic이 중복으로 설정되면 아래와 같은 오류가 발생한다. 

 

나의 경우는 위에 consumer = KafkaConsumer 한테 이미 topic2 라고 명시해놓았는데,

아래에 또 TopicPartition("topic2", 0)을 적으니 오류가 났었다. 어차피 자동할당이였어서 TopicPartition("topic2", 0)을 지워주니 해결할 수 있었다.

 

 

 

다음은 수동할당을 살펴보자.

 

코드예시

from kafka import KafkaConsumer, TopicPartition
from json import loads
import os

OFFSET_FILE = 'consumer_offset.txt'

def save_offset(offset):
    with open(OFFSET_FILE, 'w') as f:
        f.write(str(offset))

def read_offset():
    if os.path.exists(OFFSET_FILE):
        with open(OFFSET_FILE, 'r') as f:
            return int(f.read().strip())

    return None

saved_offset = read_offset()

consumer = KafkaConsumer(
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: loads(x.decode('utf-8')),
        consumer_timeout_ms=5000,
        group_id="fbi",
        enable_auto_commit=False,
)


p = TopicPartition('topic2', 0)
consumer.assign([p])

if saved_offset is not None:
    consumer.seek(p, saved_offset)
else:
    consumer.seek_to_beginning(p) #저장된 오프셋이 없으면 처음부터 읽기

for m in consumer:
    print(f"offset={m.offset}, value={m.value}")
    save_offset(m.offset + 1)

 

1. 파티션 수동할당

  • TopicPartition("topic2", 0)을 통해 "topic2"의 0번 파티션을 명시적으로 할당
  • consumer.assign([p])을 사용하여 컨슈머가 해당 파티션을 구독하도록 설정

 

2. 오프셋 관리

  • saved_offset이 존재하면 seek(p, saved_offset)을 사용하여 그 오프셋으로 이동
  • 저장된 오프셋이 없다면(saved_offset is None), seek_to_beginning(p)을 사용하여 파티션의 처음부터 메시지를 읽기 시작

 

3. 오프셋 저장

  • 메시지를 소비하면서 각 메시지의 오프셋을 출력하고, 다음 오프셋을 저장하여 이후에 컨슈머가 다시 시작될 때 동일한 위치에서 시작할 수 있도록 한다.
  • m.offset + 1을 저장하여, 다음 읽기에서 방금 처리한 메시지 이후의 메시지를 읽도록 설정

 

수동 할당은 특정 파티션에서만 데이터를 읽고 싶을 때 유용하다.

예를 들어, 특정 파티션에 저장된 데이터를 선택적으로 처리해야 하는 경우, 파티션을 수동으로 할당함으로써 필요한 파티션에서만 데이터를 처리할 수 있다.

 

 

수동할당할 때 특정 오프셋부터 처리하게 하고 싶다면,

partition = TopicPartition("topic2", 0)  # topic2의 0번 파티션에 수동 할당
consumer.assign([partition])

# 특정 오프셋 지정
specific_offset = 123  # 여기서 123은 시작하고 싶은 오프셋 번호.
consumer.seek(partition, specific_offset)

 

위 코드처럼 specific_offset 을 추가해 시작하고 싶은 오프셋 번호를 넣으면 된다.

 

 

그럼 특정 오프셋부터 처리하게 할 수 있는건 수당할당만 가능한건가?

아니다!!

 

 

자동할당도 특정 오프셋부터 처리하게 설정할 수 있다.

# 컨슈머에 할당된 파티션과 오프셋 확인 및 특정 오프셋으로 이동
for partition in consumer.assignment():  # 자동 할당된 파티션들을 가져옴
    specific_offset = 123  # 원하는 오프셋 번호로 설정
    consumer.seek(partition, specific_offset)

 

파이썬의 for문을 사용하여 원하는 오프셋 번호로 설정할 수 있다.

 

 

enable_auto_commit 설정

  • enable_auto_commit=False : 수동으로 오프셋을 커밋
  • 수동 커밋은 consumer.commit()을 호출해야 한다. 이로 인해 컨슈머가 처리한 마지막 오프셋이 Kafka에 저장된다.
  • enable_auto_commit=True : 자동으로 오프셋을 커밋

오프셋을 커밋한다?

 컨슈머가 특정 메시지까지 성공적으로 처리했음을 Kafka에 기록하는 작업을 의미

이를 통해 Kafka는 컨슈머가 어느 지점(오프셋)까지 데이터를 읽었고, 그 이후의 메시지부터 처리해야 하는지를 파악할 수 있다.

 

  enable_auto_commit=True enable_auto_commit=False
장점 간편 True에 비해 간편하지는 않음
단점 메시지를 처리하기 전에 오프셋이 커밋될 수 있어 메시지 처리의 정확성에 문제가 생길 수 있음 commit() 메서드를 호출하여 메시지를 성공적으로 처리한 후에만 오프셋을 커밋할 수 있어,

데이터의 일관성을 유지할 수 있음.

 

 

*부록

  • 오프셋(Offset): Kafka 토픽 내의 각 파티션에서 메시지의 위치를 나타내는 고유한 정수 값(번호)
  • 커밋(Commit): 컨슈머가 메시지를 읽고 처리한 후, 그 메시지의 오프셋을 Kafka에 기록하는 것(저장하는 것)