imhamburger 님의 블로그
아파치 스파크 - 정형화 스트리밍 Part.1 본문
아파치 스파크의 스트림 처리 엔진의 진화
스트림 처리란?
끝없이 들어오는 데이터 흐름을 연속적으로 처리하는 것이다.
-> 빅데이터의 등장과 함께 스트림 처리 시스템은 단일 노드 처리 엔진에서 멀티 노드 분산 처리 엔진으로 진화했다.
전통적인 레코드 단위 처리 모델
Event1 → Node A → Node B → Node C → Output
Event2 → Node A → Node B → Node C → Output
Event3 → Node A → Node B → Node C → Output
각 노드는 한번에 하나씩 레코드를 받고 처리하여 다음 노드로 보내주는 모델이다.
장점으로 매우 짧은 응답 시간을 달성할 수 있지만 특정 노드가 장애를 겪거나 느리면 그다지 효과적이지 않다.
마이크로 배치 스트림 처리의 출현
0~1초 동안 들어온 이벤트 → 하나의 batch
1~2초 동안 들어온 이벤트 → 하나의 batch
2~3초 동안 들어온 이벤트 → 하나의 batch
batch1 (0~1초)
[Event1, Event2, Event3]
batch2 (1~2초)
[Event4, Event5]
[Event1, Event2, Event3, Event4, Event5] → 한번에 처리
마이크로 배치로 입력 데이터를 구분하고 테스크별로 마이크로 배치를 필터링 및 처리하는 모델이다.
이 후 처리도 마이크로 배치로 구분되어 출력된다.
레코드 단위 처리는 택배 기사 한 명이 택배 하나 받으면 → 바로 배송 하는거고
마이크로 배치는 택배 기사 한 명이 1분 동안 택배 모음 → 한 번에 배송하는 것.
스파크 스트리밍은 작은 시간 단위로 데이터를 나누어 처리하는 마이크로 배치 방식을 사용한다.
생성된 각 마이크로 배치는 스파크 클러스터에서 작은 태스크들로 분산되어 처리되고, 처리 결과가 생성된다.
마이크로 배치 방식 장점
- 스파크는 태스크를 매우 빠르고 효율적으로 여러 실행 노드에 분배할 수 있기 때문에, 특정 실행 노드가 느려지거나 장애가 발생하더라도 다른 노드에서 태스크를 다시 실행하여 안정적으로 작업을 수행할 수 있다.
- 스파크 스트리밍은 작업이 여러 번 실행되더라도 동일한 결과를 유지할 수 있으며, 결과적으로 전체 스트림 처리 과정에서 각 레코드가 정확히 한 번 처리되는 exactly-once 처리 보장을 제공할 수 있다.
스파크 스트리밍의 초기 구현은 DStream API를 기반으로 만들어졌다.
이 API는 스파크의 배치 처리 구조인 RDD API 위에서 동작하도록 설계되었다. 따라서 DStream은 내부적으로 시간 순서대로 생성되는 여러 개의 RDD 집합으로 표현된다.
즉 스트림 데이터는 연속적인 데이터 흐름이지만, 스파크 내부에서는 이를 일정 시간 간격으로 생성되는 RDD들의 시퀀스로 변환하여 처리한다. 이러한 구조 덕분에 스파크는 배치 처리, 스트리밍 처리, 그리고 인터랙티브 분석을 하나의 통합된 처리 엔진 위에서 안정적으로 수행할 수 있다.
DStream 방식의 문제점
1. 배치와 스트림을 위한 단일 API가 없었다.
Spark 초기에는 배치 처리 → RDD / DataFrame , 스트림 처리 → DStream처럼 API가 분리되어 있었다.
처리방식을 바꾸려면 개발자가 같은 로직을 두 번 구현해야 하는 경우가 많았다.
2. 논리 계획과 물리 계획 구분이 부족
데이터 처리 시스템에는 보통 두 단계가 있다. 물리계획과 논리계획.
물리계획에서 shuffle 수행, partition 나누기, executor에서 task 실행 등 최적화를 현재는 해주지만, DStream은 개발자가 작성한 순서가 곧 실행 순서여서 직접 튜닝해야했다.
3. Event Time 윈도우 지원 부족
Event Time은 데이터가 실제로 발생한 시간이고 Processing Time은 데이터가 시스템에 도착한 시간이다.
DStream은 기본적으로 Processing Time 기준으로 윈도우 연산을 했다. 하지만 많은 분석은 실제 발생 시간(Event Time) 기준이 필요하다.
그래서 등장한 것: Structured Streaming
이 문제들을 해결하기 위해 Spark는 Structured Streaming을 설계했다.
정형화 스트리밍의 철학
정형화 스트리밍에서는 스트림 데이터를 무한히 계속 커지는 테이블(infinite table) 로 본다.
Event1 → table row 추가
Event2 → table row 추가
Event3 → table row 추가
개발자는 스트리밍을 특별하게 생각할 필요 없이 일반 SQL처럼 작성한다.
Structured Streaming은 내부적으로 아래와 같이 동작한다.
1. 새로운 데이터 도착
2. 입력 테이블에 row 추가
3. 변경된 결과 계산
4. 결과 업데이트
이 때, 전체 데이터를 다시 계산하지 않고 새로 들어온 데이터만 반영(증분 처리)해서 결과를 갱신한다. -> incrementalization
결과를 언제 업데이트할지는 Trigger 정책으로 정한다.
1초마다 결과 업데이트
5초마다 업데이트....
| DStream | Structured Streaming |
| RDD 기반 | Table / DataFrame 기반 |
| 스트림을 RDD sequence로 봄 | 스트림을 테이블로 봄 |
| API 복잡 | SQL 기반 |
스트리밍 데이터는 실시간으로 계속 들어온다.
이때마다 계산 결과가 나오는데, 이 결과를 파일이나 데이터베이스에 어떤 방식으로 업데이트할지 결정하는 것이 '결과 모드'다.
종류
| 모드 | 동작 방식 | 상황 |
| 추가(Append) | 새 데이터만 추가 | 단순 로그 기록, 필터링 작업 |
| 갱신(Update) | 바뀐 데이터만 수정 | 실시간 상태 업데이트 |
| 전체(Complete) | 테이블 전체를 새로 작성 | 실시간 순위, 전체 합계 계산 |
정형화 스트리밍 쿼리 만들기 (1~5단계)
실시간 데이터도 일반 데이터(배치)와 똑같은 방식(API)으로 처리할 수 있다.
1단계: 입력 소스(Input Source) 지정
데이터를 어디서 가져올지 정하는 단계
- 도구: spark.readStream (배치 처리의 spark.read 대신 사용)
- 내용: 소켓(Socket), 카프카(Kafka), 파일(JSON, Parquet 등) 중에서 데이터를 읽어올 곳을 지정
- 중요: 이 코드를 실행한다고 바로 데이터를 읽는 게 아니라, "어떻게 읽을지" 정의만 해두는 것
2단계: 데이터 변형(Transformation)
가져온 데이터를 원하는 형태로 가공하는 단계
- 상태 정보가 없는(Stateless) 변형: select, filter처럼 이전 데이터와 상관없이 현재 줄만 보고 처리하는 작업
- 상태 정보 유지(Stateful) 변형: count, groupBy처럼 이전까지 들어온 데이터를 기억해야 하는 작업(집계)
3단계: 출력 싱크(Sink)와 모드(Mode) 결정
가공된 데이터를 "어디에(Sink)", "어떤 방식(Mode)으로" 내보낼지
- 출력 싱크: 콘솔(console), 파일, 데이터베이스 등 결과가 저장될 장소를 정함
- 결과 모드(중요): 앞서 정리한 3가지 모드 중 하나를 선택
- 추가(Append): 새로 계산된 결과만 뒤에 붙임. (집계 쿼리에는 사용 불가)
- 전체(Complete): 매번 전체 결과 테이블을 다시 씀. (단어 세기 같은 집계에 필수)
- 업데이트(Update): 변경된 부분만 수정함.
4단계: 처리 세부사항 지정 (언제, 어떻게?)
데이터를 얼마나 자주 처리할지(트리거)와 사고에 어떻게 대비할지(체크포인트)를 설정
- 트리거(Trigger): "언제 실행할까?"
- 기본(Default): 이전 작업이 끝나자마자 바로 다음 작업 시작.
- 처리 시간 간격: 예) 1 second. 1초마다 새 데이터가 있는지 확인해서 처리.
- 일회 실행(Once): 딱 한 번만 전체 데이터를 처리하고 종료 (비용 절감용).
- 연속(Continuous): 아주 짧은 지연 시간(밀리초 단위)으로 끊임없이 처리 (실시간성이 매우 중요할 때).
- 체크포인트(Checkpoint): "장애가 나면?"
- 중간 진행 상황을 저장해두는 '저장점'. 시스템이 멈춰도 처음부터 다시 할 필요 없이 마지막으로 성공한 지점부터 재시작할 수 있게 해준다.
5단계: 쿼리 시작
모든 설정이 끝났으면 .start()를 호출해 엔진을 가동
- 이 작업은 백그라운드에서 실행된다.
- awaitTermination()을 붙여주어야 프로그램이 바로 종료되지 않고 스트리밍이 끝날 때까지 기다린다.
실행 중인 스트리밍 쿼리의 내부
코드를 실행하면 스파크 엔진은 아래와 같은 루프(반복 작업)를 수행한다.
- 논리 계획 분석: 사용자가 짠 코드를 최적의 실행 경로로 바꿈
- 반복 실행(마이크로 배치):
- 설정한 트리거 간격마다 새 데이터가 왔는지 확인
- 데이터가 있다면 그만큼만 가져와서 계산
- 결과를 외부 저장소에 쓰고, 현재 어디까지 했는지 체크포인트에 기록
- 종료 조건: 에러가 나거나, 사용자가 수동으로 멈추거나, '일회 실행' 모드일 때까지 이 루프를 무한 반복
정확한 일회 실행을 위한 장애 복구
1. 체크포인트
스트리밍 쿼리가 도중에 멈추거나 에러가 나더라도, 처음부터 다시 시작할 필요가 없다.
- 재시작의 공식: 새로운 SparkSession을 만들고, 이전에 썼던 것과 똑같은 '체크포인트 위치'를 지정해서 쿼리를 실행
- 스파크는 체크포인트에 저장된 정보를 보고 실패한 지점부터 정확히 다시 시작
'정확히 한 번(Exactly-once)' 처리를 위한 조건
데이터가 중복되거나 누락되지 않으려면 다음 3가지가 만족되어야 한다.
- 재실행 가능한 소스: 실패한 구간의 데이터를 다시 읽어올 수 있어야 함 (예: 카프카).
- 결정론적 연산: 같은 입력이 들어오면 항상 같은 결과가 나와야 함.
- 멱등성(Idempotent) 싱크: 같은 데이터를 여러 번 써도 결과가 중복되지 않고 동일해야 함.
소켓 소스와 콘솔 싱크는 이 조건을 만족하지 못해 '정확히 한 번'이 보장되지 않는다.
2. 운영 중 쿼리 수정하기
쿼리가 돌아가는 도중에 로직을 살짝 바꾸고 싶을 때?
- 허용되는 수정: 데이터 필터링(filter) 조건을 추가하거나 일부 연산을 바꾸는 것은 가능 (예: 깨진 데이터를 걸러내는 필터 추가)
- 주의사항: 체크포인트 위치는 절대 바꾸면 안된다. 또한 트리거 간격 같은 세부 설정은 바꿀 수 있지만, 소스나 싱크의 근본적인 종류를 바꾸는 것은 제한될 수 있다.
3. 동작 상태 모니터링하기
쿼리가 잘 돌고 있는지 숫자로 확인하는 방법. StreamingQuery 객체를 사용.
- lastProgress(): 가장 최근에 완료된 처리에 대한 상세 통계를 보여준다.
- numInputRows: 새로 들어온 데이터 개수
- inputRowsPerSecond: 초당 데이터 유입 속도
- processedRowsPerSecond: 초당 데이터 처리 속도
- durationMs: 각 단계(데이터 가져오기, 계산하기, 저장하기 등)에서 시간이 얼마나 걸렸는지 밀리초 단위로 보여줌
- source/sink: 지난 배치에서 처리된 데이터 소스와 싱크에 대한 정보
만약 유입 속도(input)보다 처리 속도(processed)가 계속 낮다면, 시스템이 부하를 견디지 못하고 있다는 신호
매번 수동으로 숫자를 확인할 수 없으니, 스파크는 두 가지 자동화 방법을 제공한다.
Dropwizard Metrics
- 설명: 스파크의 수치들을 Ganglia, Graphite 같은 전문 모니터링 대시보드로 보내주는 기능
- 방법: 설정에서 spark.sql.streaming.metricsEnabled를 true로 켜주기
StreamingQueryListener
- 설명: 쿼리가 시작될 때, 처리 중일 때, 종료될 때 특정 동작을 하도록 코드를 짜는 것
- 활용: "에러가 나서 쿼리가 종료되면 나에게 이메일을 보내라" 같은 로직 (주로 스칼라나 자바에서 사용)
스트리밍 데이터 소스와 싱크
스파크는 다양한 곳에서 실시간 데이터를 읽어올 수 있다.
| 소스 종류 | 특징 | 용도 |
| 파일(File) | 디렉토리에 새로 생기는 파일을 감지 | 로그 분석, 주기적 데이터 업로드 처리 |
| 카프카(Kafka) | 가장 널리 쓰이는 메시지 큐 시스템 | 실무 표준, 대규모 실시간 데이터 스트림 |
| 소켓(Socket) | 특정 포트를 통해 들어오는 텍스트 읽기 | 테스트 및 학습용 (운영용으로는 부적합) |
1. 파일 소스 (File Source)
특정 폴더를 지켜보고 있다가, 새로운 파일이 들어오면 즉시 읽어서 처리한다.
- 지원 형식: JSON, CSV, Parquet, Text 등 스파크가 읽을 수 있는 모든 형식
- 중요 설정: path 폴더 경로.
- maxFilesPerTrigger: 한 번에 최대 몇 개의 파일을 처리할지 결정 (너무 많은 파일이 한꺼번에 들어올 때 과부하 방지)
- 주의: 파일은 한 번 쓰여지면 수정되지 않는(Immutable) 상태여야 스파크가 안전하게 읽을 수 있다.
2. 아파치 카프카 소스 (Kafka Source)
현대적인 스트리밍 아키텍처에서 가장 중요한 소스. 스파크와 궁합이 매우 좋다.
- 주요 옵션:
- kafka.bootstrap.servers: 카프카 서버 주소.
- subscribe: 읽어올 주제(Topic) 이름.
- startingOffsets: 처음 시작할 때 어디서부터 읽을지 결정 (earliest는 처음부터, latest는 지금부터).
- 데이터 형태: 카프카에서 읽어온 데이터는 기본적으로 key와 value가 바이너리(binary) 형태이다. 그래서 반드시 cast("string") 등을 통해 우리가 읽을 수 있는 데이터로 변환해주는 과정이 필요하다.
데이터 소스를 사용할 때 데이터 스키마(Schema) 정의하기!!
파일 소스를 사용할 때 특히 중요한 부분.
- 스키마 추론 불가: 스트리밍 데이터는 데이터가 들어오기 전까지 구조를 알 수 없기 때문에, 스파크가 자동으로 스키마를 알아내기 어렵다.
- 명시적 정의: 따라서 사용자가 직접 StructType을 사용해 "이 데이터는 어떤 컬럼이 있고 타입은 무엇이다"라고 미리 알려주는 것이 원칙이다.
자체 제작 스트리밍 소스와 싱크
스파크가 지원하지 않는 특수한 곳에서 데이터를 가져와야 할 때 사용한다.
자체 제작 스트리밍 소스 예시) TextSocketSource
- 동작 원리: 사용자가 Source 인터페이스를 상속받아 직접 클래스를 만든다.
- 핵심 역할:
- 어디까지 읽었는지 관리: getOffset()을 통해 현재 데이터의 위치를 파악
- 데이터 가져오기: getBatch()를 통해 특정 구간의 데이터를 스파크로 불러옴
직접 만들면 복구 로직(장애 시 재시작)도 직접 구현해야 하므로 난이도가 높다.
스트리밍 출력 싱크
가공된 데이터를 어디로 보낼지 결정한다.
| 싱크 종류 | 특징 | 용도 |
| 파일(File) 싱크 | 특정 디렉토리에 결과 저장 | 데이터 백업, 정적 분석용 데이터 생성 |
| 카프카(Kafka) 싱크 | 다시 카프카로 데이터를 쏨 | 실시간 데이터 파이프라인 연결 |
| 포이치(foreach) 싱크 | 각 행마다 사용자 정의 함수 실행 | 외부 DB 저장, 외부 API 호출 |
| 콘솔(Console) 싱크 | 화면에 출력 | 디버깅 및 테스트용 |
| 메모리(Memory) 싱크 | 메모리에 테이블 형태로 저장 | 짧은 시간 내에 대화형 쿼리로 확인용 |
임의의 위치에 저장하기: foreach와 foreachBatch
1. foreach 싱크
- 특징: 데이터의 모든 행(row)에 대해 지정한 로직을 실행합니다.
- 구현: open(), process(), close() 세 단계를 작성
- open: 연결 생성 (예: DB 접속)
- process: 데이터 한 줄 저장
- close: 연결 닫기
2. foreachBatch 싱크 (더 권장됨)
- 특징: 행 단위가 아니라, 마이크로 배치(덩어리) 단위로 로직을 실행
- 장점:
- 기존의 배치용 라이브러리(예: JDBC, 외부 저장소 커넥터)를 그대로 쓸 수 있다.
- 한 번에 여러 줄을 쓰기 때문에 foreach보다 훨씬 빠르고 효율적
간단한 확인은 console이나 memory 싱크
안정적인 저장이 필요하면 file이나 kafka 싱크
MySQL, MongoDB 같은 외부 DB에 저장하고 싶다면 foreachBatch를 써서 배치용 API로 저장
데이터 트랜스포메이션
스파크는 스트리밍 데이터를 처리할 때, 이 작업이 "과거의 데이터를 기억해야 하는가?"에 따라 두 가지로 나뉜다.
1. 상태 정보가 없는(Stateless) 변형
- 개념: 각 행(Row)을 독립적으로 처리. 이전 줄에 뭐가 왔는지 몰라도 지금 줄만 보고 바로 계산할 수 있는 작업
- 주요 연산: select(), filter(), map()
- 예시: "데이터에서 '에러'라는 단어가 포함된 줄만 골라내!"
- 특징: 메모리를 적게 쓰고 처리가 매우 빠름. 배치 처리와 완전히 동일하게 동작.
2. 상태 정보 유지(Stateful) 변형
- 개념: 현재 들어온 데이터를 처리하기 위해 과거의 데이터나 중간 합계를 기억(상태 유지)해야 하는 작업
- 주요 연산: count(), groupBy(), join()
- 예시: "지금까지 들어온 '사과'가 총 몇 개지?" (방금 사과 1개가 들어왔다면, 이전에 99개였다는 사실을 기억하고 있어야 100개라고 답할 수 있다.)
- 특징: 스파크가 내부적으로 '상태(State)'를 관리하며, 체크포인트에 이 상태를 저장. 메모리 관리가 중요.
'스파크(Spark)' 카테고리의 다른 글
| 스파크(Spark) - 정형화 스트리밍 Part.2 (1) | 2026.04.03 |
|---|---|
| 스파크 애플리케이션의 최적화 및 튜닝 (1) | 2026.03.04 |
| 스파크 SQL과 데이터세트 (0) | 2026.02.27 |
| Spark SQL (0) | 2026.02.20 |
| 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개 (0) | 2026.02.06 |