imhamburger 님의 블로그
스파크(Spark) - 정형화 스트리밍 Part.2 본문
상태 정보 유지 스트리밍 집계
시간과 연관 없는 집계 (Global Aggregation)
가장 기초적인 집계로, 스트림 시작부터 현재까지의 모든 데이터를 대상으로 한다.
작동 원리: 새로운 데이터가 들어올 때마다 기존 카운트나 합계에 더해서 결과를 업데이트합니다.
주의사항: 일반적인 Batch 처리에서 쓰는 df.count()는 최종 결과 하나만 딱 내놓고 끝나지만, 스트리밍에서는 계속 업데이트되어야 하므로 반드시 groupBy().count() 형태를 사용해야 한다.
이벤트 타임 윈도우 (Event-time Windowing)
데이터가 서버에 도착한 시간(Processing Time)이 아니라, 실제로 발생한 시간(Event Time)을 기준으로 묶는 방식이다.
- 텀블링 윈도우 (Tumbling Window): 5분 단위라면 12:00~12:05, 12:05~12:10처럼 겹치지 않는 고정된 간격
- 슬라이딩 윈도우 (Sliding Window): 10분 크기의 윈도우를 5분마다 생성
예: 12:00~12:10, 12:05~12:15
-> 하나의 데이터가 두 개 이상의 윈도우에 동시에 포함되어 계산될 수 있다.
워터마크(Watermark): 지연 데이터 판단
데이터가 늦게 도착하면 스파크는 얼마나 기다려줘야 하나?
무한정 기다리면 메모리가 부족해지니, 이때 워터마크가 필요하다.
정의: 현재까지 본 가장 늦은 이벤트 시간 - 허용하는 지연 시간(Delay Threshold)
동작 방식: 엔진이 데이터를 읽으며 가장 최근의 이벤트 타임을 추적. 워터마크 임계값을 넘어서 너무 늦게 도착한 데이터는 과감히 버린다.
해당 윈도우가 워터마크보다 뒤처지면, 스파크는 "이제 이 윈도우는 끝났다"라고 판단하고 메모리에서 상태 정보를 삭제한다.
(리소스 관리의 핵심!)
출력 모드(Output Mode)의 세밀한 차이
집계된 데이터를 외부 시스템(DB, 콘솔, 파일)에 어떻게 보낼지 결정한다.
| 모드 | 상세 설명 | 제약 사항 |
| Complete (전체) | 매번 업데이트된 전체 테이블을 출력 | 데이터가 늘어날수록 출력 비용이 커짐. |
| Update (갱신) | 지난번 출력 이후 변경된(업데이트된) 행만 출력 | 가장 권장되는 방식이며 메모리 효율이 좋음. |
| Append (추가) | 워터마크를 통해 결과가 확정된(더 이상 변하지 않을) 행만 출력 | 결과가 나올 때까지 지연 시간만큼 기다려야 합니다. 파일 저장 시 주로 사용해야함. |
스트리밍 조인
정형화 스트리밍은 스트리밍 데이터세트를 다른 정적 혹은 스트리밍 데이터세트와 조인할 수 있게 해준다.
1. 스트림-정적 데이터 조인 (Stream-Static Join)
가장 단순한 형태의 조인이다.
실시간으로 흐르는 스트림 데이터에 보충 정보가 담긴 정적인 테이블을 결합하는 방식이다.
예시: 광고 클릭 스트림(Stream) + 광고 상세 정보 테이블(Static)
특징: 스파크는 정적 데이터를 스트림의 각 마이크로 배치에 조인한다.
정적 데이터는 상대적으로 크기가 작아야 효율적이며, 정적 데이터가 업데이트되어도 이미 처리된 스트림 결과에는 반영되지 않는다는 점을 주의해야 한다.
2. 스트림-스트림 조인 (Stream-Stream Join)
두 개의 실시간 스트림을 서로 결합하는 방식이다.
데이터가 서로 다른 시간에 도착할 수 있기 때문에 매우 복잡하다.
문제: 무한한 상태(State) 증가
두 스트림이 언제 조인될지 모르기 때문에 스파크는 조인 대상이 나타날 때까지 과거 데이터를 메모리에 계속 보관하려 한다.
이를 방치하면 메모리 부족으로 시스템이 멈춘다.
해결책: 워터마크와 시간 제약(Time Constraints)
스파크가 언제까지 데이터를 기다려야 할지 명확히 정의해야 한다.
양쪽 스트림에 워터마크 설정: 각 스트림이 얼마나 지연될 수 있는지 정의한다.
시간 범위 제약 조건 추가: "클릭은 노출 발생 후 1시간 이내에 일어나야 한다"와 같은 비즈니스 로직을 쿼리에 작성한다.
(예: clicks.time BETWEEN impressions.time AND impressions.time + interval 1 hour)
3. 내부 조인 vs 외부 조인 (Inner vs Outer Join)
스트리밍 환경에서는 조인의 종류에 따라 동작 방식이 다르다.
내부 조인(Inner Join): 양쪽 조건이 맞을 때만 출력된다.
한쪽이 늦게 오면 조인될 때까지 상태 정보가 유지되다가, 조건이 충족되는 순간 결과가 나온다.
외부 조인(Outer Join): 한쪽에만 데이터가 있어도 결과를 내보내야 한다.
주의: "반대편 데이터가 영영 안 올 것인가?"를 판단해야 하므로, 반드시 워터마크가 필요하다.
워터마크가 지나야 비로소 "상대 데이터가 안 왔구나"라고 확신하고 NULL을 포함한 결과를 출력할 수 있다.
4. 스트리밍 중복 제거 (Streaming Deduplication)
네트워크 오류로 인해 같은 데이터가 두 번 이상 처리되는 'At-least-once' 상황을 방지하기 위한 기능이다.
기본 원리: 유니크한 키(예: eventId)를 기준으로 이전에 처리된 적이 있는지 상태 정보를 확인한다.
워터마크의 역할: 중복 제거 역시 "과거의 모든 ID"를 기억할 수 없으므로, 워터마크를 사용하여 일정 시간이 지난 오래된 ID 정보는 메모리에서 삭제한다.
코드 예시
df.withWatermark("time", "1 hour").dropDuplicates("eventId")
위 코드는 "1시간 이내에 들어온 데이터 중 동일한 eventId를 가진 것은 버린다"는 의미.
5. 임의의 상태 유지 연산 (Arbitrary Stateful Operations)
정해진 조인이나 집계 외에, 복잡한 사용자 정의 로직이 필요할 때 사용한다.
mapGroupsWithState: 그룹별로 상태를 업데이트하고 결과를 반환
flatMapGroupsWithState: 그룹별로 상태를 유지하면서 여러 개의 결과 레코드를 생성 (예: 특정 세션이 종료될 때 요약 보고서 발행)
두 함수의 차이 map vs flatMap
| 비교 항목 | mapGroupsWithState | flatMapGroupsWithState |
| 출력 방식 | 각 그룹당 정확히 하나의 행만 반환 | 각 그룹당 0개 이상의 여러 행을 반환 가능 |
| 유연성 | 제한적 (1:1 대응) | 매우 높음 (1:N 대응) |
| 주요 용도 | 간단한 상태 업데이트 및 요약 | 세션 분석, 타임아웃 발생 시 여러 결과 도출 등 |
mapGroupsWithState와 flatMapGroupsWithState를 사용할 때 필수 Key, Value, State의 개념
스파크가 제공하는 기본 집계(count, sum)만으로는 부족할 때가 있다.
예를 들어 "사용자의 세션이 30분간 유지되다가 종료되면 보고서를 발행하라" 같은 복잡한 로직이 필요할 때 임의의 상태 유지 연산을 사용해야 한다. 이때 중심이 되는 세 가지 요소가 바로 Key, Iterator(Value), GroupState 이다.
1. 상태 유지 연산의 3대 요소 (K, V, S)
개발자는 상태 업데이트 함수를 정의할 때 다음과 같은 파라미터를 다루게 된다.
Key (K): 그룹화의 기준이 되는 식별자. (예: userId, sessionId, deviceId)
Values (V / Iterator): 현재 마이크로 배치(Micro-batch)에서 해당 Key에 할당된 새로운 데이터들의 집합.
State (S / GroupState): 메모리에 저장되어 있는 과거의 요약 정보. 개발자가 직접 정의한 데이터 구조를 가진다.
2. 함수 정의 및 동작 프로세스
상태 업데이트 함수는 매 마이크로 배치마다 호출되며 아래와 같은 순서로 동작한다.
입력: 특정 키(K)와 그 키에 해당하는 새로운 데이터들(Iterator[V]), 그리고 기존 상태(GroupState[S])를 받는다.
로직 수행:
기존 상태가 존재하는지 확인 (state.exists) -> 새로운 데이터들을 순회하며 상태를 업데이트 -> 필요하다면 타임아웃(Timeout)을 설정 -> 업데이트된 값을 다시 저장 (state.update(newState)) -> 계산된 결과를 반환
4. 타임아웃(Timeout) 관리
상태 유지 연산에서 가장 중요한 기능 중 하나이다.
데이터가 더 이상 들어오지 않을 때 해당 상태를 어떻게 처리할지 결정한다.
Processing Time Timeout: 실제 서버 시간 기준으로 특정 시간이 지나면 타임아웃 발생.
Event Time Timeout: 데이터 속의 시간(이벤트 타임)과 워터마크를 기준으로 타임아웃 발생.
활용: 타임아웃이 발생하면 함수가 한 번 더 호출되며, 이때 state.hasTimedOut을 체크해 "세션 종료 처리" 같은 로직을 수행하고 state.remove()로 메모리를 비워준다.
실무 적용 팁
상태 크기 최소화: 상태 정보는 모두 익스큐터(Executor)의 JVM 메모리에 저장된다.
따라서 상태 객체(S)는 최대한 가볍게 설계해야 한다.
체크포인트 필수: 상태 정보는 장애 발생 시 복구되어야 하므로 반드시 체크포인트 경로를 설정해야 한다!
성능 튜닝
1. 상태 정보(State) 크기 관리 (가장 중요!)
앞서 배운 mapGroupsWithState 등에서 상태를 무한정 저장하면 결국 OOM(Out of Memory)이 발생한다.
워터마크 설정: 늦게 온 데이터를 무한정 기다리지 않도록 withWatermark를 반드시 설정하여 오래된 상태를 삭제해야 한다.
타임아웃 활용: 데이터가 더 이상 들어오지 않는 '죽은 세션'은 타임아웃 로직(state.remove())을 통해 메모리에서 명시적으로 제거해야한다.
RocksDB 상태 저장소: 상태 데이터가 너무 커서 JVM 힙(Heap) 메모리로 감당이 안 된다면, 상태를 디스크(내장 DB)에 저장하는 RocksDB State Store 사용을 검토해야 한다.
2. 트리거(Trigger) 간격 최적화
데이터를 얼마나 자주 처리할지 결정하는 설정.
미지정 (Default): 이전 배치가 끝나자마자 바로 다음 배치 실행. 가장 빠른 응답성을 가진다.
Processing Time (고정 간격): 예: Trigger.ProcessingTime("1 minute"). 1분마다 모아서 처리.
너무 짧으면 오버헤드가 크고, 너무 길면 지연 시간(Latency)이 늘어난다.
Once / AvailableNow: 한 번만 실행하고 종료. 배치 작업처럼 스트리밍을 활용할 때 유용하다.
3. 병렬 처리 및 파티션(Partition) 튜닝
Shuffle Partition 수 조절: 스트리밍 집계나 조인 시 발생하는 셔플 파티션 수는 기본값이 200 이다.
데이터 규모가 작다면 이 값을 줄여야 스케줄링 오버헤드가 줄어든다.
설정 예: spark.sql.shuffle.partitions
입력 소스 파티션: Kafka를 사용한다면 Kafka 토픽의 파티션 수와 Spark의 익스큐터 코어 수를 적절히 맞춰야 병목 현상이 생기지 않는다.
4. 체크포인트(Checkpoint)와 저지연(Low Latency)의 균형
안정적인 복구를 위해 체크포인트는 필수지만, HDFS나 S3 같은 외부 스토리지에 매번 상태를 쓰는 작업은 성능 저하를 일으킨다.
신뢰성 있는 스토리지: 체크포인트 경로는 반드시 쓰기 성능이 보장되는 안정적인 파일 시스템을 지정해야 한다.
비동기 상태 업데이트: 최신 버전의 Spark에서는 상태 업데이트를 비동기로 처리하는 옵션들을 제공하므로 이를 활용해 지연 시간을 줄일 수 있다.
5. 모니터링 및 대시보드 활용
성능 튜닝의 시작은 '측정'
StreamingQueryListener: 쿼리가 시작/종료되거나 배치가 완료될 때 발생하는 이벤트를 가로채서 모니터링 시스템(Prometheus, Grafana 등)으로 보낼 수 있다.
Progress Report: 매 배치마다 출력되는 numInputRows, inputRowsPerSecond, processedRowsPerSecond 지표를 확인하여 처리 속도가 유입 속도를 따라가는지(Backpressure 확인) 상시 체크해야 한다.
'스파크(Spark)' 카테고리의 다른 글
| 아파치 스파크 - 정형화 스트리밍 Part.1 (1) | 2026.03.16 |
|---|---|
| 스파크 애플리케이션의 최적화 및 튜닝 (1) | 2026.03.04 |
| 스파크 SQL과 데이터세트 (0) | 2026.02.27 |
| Spark SQL (0) | 2026.02.20 |
| 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개 (0) | 2026.02.06 |