imhamburger 님의 블로그

스파크 애플리케이션의 최적화 및 튜닝 본문

스파크(Spark)

스파크 애플리케이션의 최적화 및 튜닝

imhamburger 2026. 3. 4. 12:39

아파치 스파크 설정 확인 및 세팅

스파크 설정을 확인하고 설정하는 데는 세가지 방법이 있다.

 

1. 설정 파일들을 통한 방법

배포한 $SPARK_HOME 디렉터리에는 여러 설정 파일들이 있는데 이 파일들에 있는 기본값을 변경한 뒤 .template 부분을 지우고 저장하면 스파크에 새로운 값을 사용한다고 알려주게 된다.

 

2. 명령 행에서 애플리케이션을 spark-submit으로 제출할 때 --conf 옵션을 직접 써서 설정하는 방법

3. 스파크 셸에서 프로그래밍 인터페이스를 통한 방법 

 

-> 스파크 설정값을 결정하는 여러 가지 방법들 간에도 어떤 값이 우선하는지 결정하는 우선순위가 존재한다.

spark-default.conf에 정의된 값이나 플래그가 가장 먼저 읽힌다. 그 다음 spark-submit, 마지막으로 스파크 애플리케이션에서 SparkSession을 통해 설정된 값.

 

대규모 워크로드를 위한 스파크 규모 확장

자원부족이나 점진적인 성능 저하에 의한 작업 실패를 피하기 위해 사용해볼 수 있는 스파크 설정들이 여럿있다.

스파크 드라이버, 이그제큐터, 이그제큐터에서 실행되는 셔플 서비스 등..

 

  • 스파크 드라이버: 클러스터 매니저와 함께 클러스터에 이그제큐터들을 띄우고 그 위에서 돌아갈 스파크 태스크드을 스케줄링하는 역할을 한다. 대규모 워크로드에서는 수백 개의 태스크가 돌아가게 될 것이다.

정적/동적 자원 할당

spark-submit 에 명령 행 인자로 자원량을 할당한다면 정적 자원 할당.

동적 자원 할당 설정을 이용한다면 요청에 맞춰 컴퓨팅 자원을 더 할당하거나 줄이도록 요청할 수 있다.

 

동적 할당 활성화 설정값들

spark.dynamicAllocation.enabled true #기본값은 false
spark.dynamicAllocation.minExecutors 2 #클러스터 매니저가 최소 이그제큐터 개수인 2개로 시작
spark.dynamicAllocation.schedulerBacklogTimeout 1m #태스크 큐 백로그가 늘어나면 매번 백로그 타임아웃시간이 될 때마다 새로운 이그제큐터 요청
spark.dynamicAllocation.maxExecutors 20 #최대 20개까지 이그제큐터 요청
spark.dynamicAllocation.executorIdleTimeout 2min #이그제큐터가 태스크를 완료하고 2분동안 놀고 있으면 중지

 

스파크 이그제큐터의 메모리와 셔플 서비스 설정

단순히 동적 자원 할당을 활성화하는 것만으로는 충분하지 않다.

메모리 부족 혹은 jvm 가비지 컬렉션으로 문제를 겪지 않게 하려면 이그제큐터 메모리가 어떤 식으로 구성되고 스파크가 어떻게 사용하는지도 알아두어야 한다.

 

각 이그제큐터에서 사용 가능한 메모리의 양은 spark.executor.memory 에 의해 제어된다.

실행 메모리, 저장 메모리, 예비 메모리 세부분으로 나뉜다.

 

  • 실행 메모리는 스파크의 셔플, 조인, 정렬, 집계 등에 사용된다.
  • 저장 메모리는 사용자 데이터 구조를 캐싱하고 데이터 프레임에서 온 파티션들을 저장하는데 쓰인다.
  • 맵이나 셔플 작업이 이루어지는 동안 스파크는 로컬 디스크의 셔플 파일에 데이터를 쓰고 읽으므로 이때 I/O 작업이 발생한다.
  • 기본 설정들은 최적화되어있지 않으므르로 여기서 병목 현상이 발생할 수도 있다.

맵과 셔플 작업 중의 I/O를 조절할 수 있는 스파크 설정들은 생략하기로!

 

스파크 병렬성 최대화

  • 스파크 잡은 여러 스테이지를 거치게 되고 각 스테이지에서 많은 태스크를 처리하게 된다.
  • 스파크는 최대한 각 코어에 태스크를 할당하고 각 태스크에 또 스레드를 스케줄링하고 각 태스크는 개별 파티션을 처리할 것이다.
  • 자원 사용을 최적화하고 병렬성을 최대로 끌어올리려면 이그제큐터에 할당된 코어 개수만큼 파티션들이 최소한으로 할당되는 것이다.
  • 각 이그제큐터의 코어들보다 더 많은 파티션들이 있다면 모든 코어가 바쁘게 돌아갈 것이다.
  • 즉, 파티션이 가장 기본적인 병렬성의 한 단위라고 생각할 수 있고, 하나의 코어에서 돌아가는 하나의 스레드는 하나의 파티션을 처리할 수 있다.

출처: 러닝스파크

 

파티션은 어떻게 만들어지는가

스파크의 태스크들은 데이터를 디스크에서 읽어 메모리로 올리면서 파티션 단위로 처리한다.

스파크에서 한 파티션의 크기는 spark.sql.files.maxPartitionBytes에 따라 결정된다.

기본값은 128MB 이다. 이 크기를 줄이게되면 작은 파티션 파일이 많아지면서 디스크 I/O양이 급증하고 성능 저하를 일으킨다.

 

파티션들은 또한 데이터 프레임의 API의 특정 함수들을 사용하면 만들어지기도 한다.

예를 들어 큰 데이터 프레임을 생성하거나 큰 파일을 디스크에서 읽으면서 명시적으로 스파크에게 특정 개수의 파티션을 만들도록 지시할 수 있다.

 

마지막으로 셔플 단계에서 만들어지는 셔플 파티션이 있다.

기본적으로 셔플 파티션 개수는 spark.sql.shuflle.partitions에 200으로 지정되어 있다. 데이터 사이즈 크기에 따라 이 숫자를 조정하여 너무 작은 파티션들이 이그제큐터들에게 할당되지 않게 할 수 있다.

 

데이터 캐싱과 영속화

캐싱과 영속화의 차이? 스파크에서는 두 단어가 동의어라고 볼 수 있다.

cache()와 persist()의 두가지 API 호출이 이 기능들을 제공한다.

 

DataFrame.cache()

cache()는 허용하는 메모리 수준만큼 스파크 이그제큐터들의 메모리에 읽은 파티션을 최대한 저장하려고 할 것이다.

모든 파티션이 캐시된 것이 아니라면 데이터에 다시 접근을 시도할 때 캐시되지 않은 파티션은 재계산되어야 하고 이는 스파크 잡을 느리게 만들 것이다.

val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache() // 데이터 캐싱
df.count() // 캐시 수행

res3: Long = 10000000
Command took 5.11 sec

df.count() //캐시 사용
res4: Long = 10000000
Command took 0.44 sec

 

DataFrame.persist()

persist(StorageLevel.LEVEL)의 함수 호출 방식은 직관적으로 StorageLevel 을 통해 데이터가 어떤 방식으로 캐시될 것인지 제어할 수 있다는 느낌을 준다.

 

StorageLevel 목록

StorageLevel 설명
MEMORY_ONLY 데이터가 곧바로 객체 형태로 메모리에 저장된다.
MEMORY_ONLY_SER 데이터가 직렬화되어 용량이 최소화된 바이트 배열 형태로 메모리에 저장된다.
사용 시에 역직렬화를 위한 비용이 소모된다.
MEMORY_AND_DISK 데이터가 곧바로 객체 형태로 메모리에 저장되지만 부족한 경우 직렬화되어 디스크에 저장된다.
DISK_ONLY 데이터가 직렬화되어 디스크에 저장된다.
OFF_HEAP 데이터가 오프힙 메모리에 저장된다. 오프힙 메모리는 스파크에서 저장 및 쿼리 실행에 사용된다.
MEMORY_AND_DISK_SER MEMORY_AND_DISK와 비슷하지만 메모리에 저장되는 데이터가 직렬화된다. (디스크에 저장되는 데이터는 항상 직렬화된다)

 

StorageLevel은 동일한 기능을 하는 레벨_이름_2 형태의 옵션들이 존재하는데 이는 서로 다른 스파크 이그제큐터에 복제해서 두벌이 저장된다는 것을 의미한다.

import org.apache.spark.storage.StorageLevel

// 천만 개의 레코드를 가지는 데이터 프레임 생성
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.persist(StorageLevel.DISK_ONLY) // 데이터를 직렬화해서 디스크에 저장
df.count() //캐시 수행

 

마지막으로 캐시는 데이터 프레임뿐만 아니라 데이터 프레임에서 파생된 테이블이나 뷰도 캐시할 수 있다.

df.createOrReplaceTempView("dfTable")
spark.sql("CACHE TABLE dfTable")
spark.sql("SELECT count(*) FROM dfTable").show()

 

캐시나 영속화는 언제 사용해야 하는가

  • 반복적인 머신러닝 학습을 위해 계속 접근해야 하는 데이터 프레임들
  • ETL이나 데이터 파이프라인 구축 시 빈도 높은 트랜스포메이션 연산으로 자주 접근해야 하는 데이터 프레임들

캐시나 영속화는 언제 쓰면 안되는가

  • 데이터프레임이 메모리에 들어가기엔 너무 크다.
  • 크기에 상관없이 자주 쓰지 않는 데이터 프레임에 대해 비용이 크지 않은 트랜스포메이션 수행

스파크 조인의 종류

조인타입

조인타입 설명 특징
Inner 양측에 공통된 키가 있는 경우만 결합 매칭되지 않는 행은 삭제
Left (Outer) 왼쪽 모든 행 유지 + 오른쪽 매칭 데이터 오른쪽 데이터 부재 시 null
Right (Outer) 오른쪽 모든 행 유지 + 왼쪽 매칭 데이터 왼쪽 데이터 부재 시 null
Full Outer 양쪽 테이블의 모든 데이터 유지 어느 한쪽이라도 없으면 null
Left Semi 오른쪽 키가 존재하는 왼쪽 행만 반환 오른쪽 컬럼은 결과에 포함안됨
Left Anti 오른쪽 키가 존재하지 않는 왼쪽 행만 반환 필터링 용도로 사용
Cross 두 테이블의 모든 조합 (데카르트 곱) 결과 크기가 급격히 커짐 (n*m)

 

조인전략

물리적으로 데이터를 어떻게 이동(셔플)시키고 처리할지 결정하는 실행 알고리즘

조인전략 특징 권장상황
Broadcast Hash Join 작은 테이블을 모든 워커 노드에 복제 한쪽 테이블이 10MB 이하일 때 (가장 빠름)
Shuffle Sort Merge 키 기준 셔플 후 정렬하여 병합 대용량 vs 대용량 조인 (가장 일반적)
Shuffle Hash Join 셔플 후 각 파티션에서 해시테이블 생성 데이터가 균등하고 정렬이 불필요할 때
Broadcast Nested Loop 한쪽을 브로드캐스트 후 전체 순회 동등(=) 조건이 아닌 조인이나 아주 작은 테이블
Cartesian Product 모든 파티션을 서로 조인 조인 조건이 없는 Cross Join시 사용

 

성능을 최적화하려면 spark.sql.autoBroadcastJoinThreshold 설정을 조정하거나, 쿼리에 직접 Join Hint를 사용하여 특정 전략을 강제할 수 있다.