imhamburger 님의 블로그

데이터엔지니어 부트캠프 - 아파치 스파크(Apache Spark) 이해하기, 에어플로우에 적용시키기 (5주차) 본문

데이터엔지니어 부트캠프

데이터엔지니어 부트캠프 - 아파치 스파크(Apache Spark) 이해하기, 에어플로우에 적용시키기 (5주차)

imhamburger 2024. 8. 11. 13:53

팀프로젝트가 끝나고 아파치 스파크(Apache Spark)를 배웠다.

스파크는 빅데이터 처리를 위한 오픈 소스 분산 처리 시스템이다. 스파크 이전에 하둡이라는 기존 분산 처리 시스템이 있긴한데, 하둡보다 분산 처리 시스템이 빠르고 메모리 내에서 데이터를 처리하기 때문에 효율적인 실시간 데이터 처리도 가능하다.

 

분산 처리를 왜 써야하지?

사실 데이터가 적다면 굳이 쓸 필요가 없을 것이다. 그런데 빅데이터의 경우 분산처리를 안한다면 방대한 양의 데이터를 처리하는데 시간이 엄~청 오래 걸릴 것이다.

 

예를들어,

카페에서 커피를 100잔 만들어야된다고 가정해보자. 그리고 커피 1잔을 만드는 데 드는 시간을 1분이라고 할 때, 일하는 사람이 한 사람뿐이라면 100분을 투자해야 100잔을 만들 수 있다.

그렇지만, 일하는 사람이 2명이라면 50잔씩 나누어(분산하여) 만들면되니 50분만 투자하면 커피 100잔을 만들 수 있다.

 

그렇기 때문에 빅데이터 분산 처리는 엄청난 효율을 가져다 주기때문에 중요하다.

 

사실 분산 처리 시스템은 스파크와 하둡만 있는 건 아니다. 용도에 따라 최적화된 시스템들이 있는데, 간단하게 비교해보자.

시스템 장점 단점 주요 사용 사례
Apache Hadoop 1. 대용량 데이터 처리에 강점
2. 높은 내구성과 안정성
3. 오랜 시간 검증된 기술
1. 느린 처리 속도(디스크 I/O 기반)
2. 복잡한 코드 및 유지보수
3. 실시가 처리에 부적합
배치 처리, 로그 분석
Apache Spark 1. 빠른 처리 속도(메모리 내 처리)
2. 다양한 언어 지원(Scala, Python, Java 등)
3. 풍부한 라이브러리(Spark SQL, MLlib 등)
4. 배치 및 실시간 처리 모두 가능
1. 메모리 사용량이 많음
2. 복잡한 클러스터 관리 및 튜닝 필요
3. 작은 데이터 처리에는 비효율적
실시간 데이터 분석,
머신러닝
Apache Flink 1. 실시간 스트리밍 데이터 처리 최적화
2. 낮은 지연 시간
3. 정확한 상태 관리 및 이벤트 처리
1. 비교적 복잡한 아키텍처
2. 배치 작업은 Spark보다 덜 최적화
실시간 스트리밍 분석,
이벤트 기반 처리
Apache Storm 1. 매우 낮은 지연 시간
2. 이벤트 기반 실시간 처리에 강점
3. 간단한 구조
1. 복잡한 작업에서 성능 저하
2. 라이브러리 부족
3. 배치 처리에 부적합
실시간 이벤트 처리

 

따라서, Spark의 경우 다양한 용도로 사용될 수 있는 반면, 하둡은 대규모 배치 처리에 Flink는 실시간 스트리밍 처리에 적합하다.

참고로 실시간 스트리밍 처리 시스템으로 Kafka 라는 것도 있다!

 

배치처리라는 게 뭔지 잘 안와닿을 수 있다. 배치처리는 대량의 데이터를 모아서 한꺼번에 처리하는 방식이다.

데이터가 일정 시간 동안 축적된 후, 특정한 주기나 트리거에 의해 일괄적으로 처리된다. 따라서 실시간성이 필요없는 작업에 적합하고 데이터 크기가 크고 복잡한 연산이 요구되는 경우에 사용된다고 한다. 그냥 단순하게 실시간 처리와 반대되는 개념이라고 생각하면 된다.

 

 

실습과제

 

팀프로젝트에서 저장한 영화데이터를 다시 repartitioning 하고 sparksql을 이용하여 일별로 독립/상업영화별 매출액 합계와 일별로 국내/해외영화 매출액 합계를 구해 다시 저장하기. 

 

 

제 1 장: 영화데이터를 repartitioning 하는 기능 구현

패키지로써 쓸 기능이였기에 pdm init을 하고 기능을 구현하였다. (+pytest) 내가 작성한 파이썬 코드는 다음과 같다.

import pandas as pd

def re_partition(load_dt='20150101', base_path='/Users/seon-u/data/movie/data/extract'):
    df = pd.read_parquet(f'{base_path}/load_dt={load_dt}')
    df['load_dt'] = load_dt #별도 추가 필요
    df.to_parquet('~/data/movie/repartition', partition_cols=['load_dt','multiMovieYn', 'repNationCd'])

 

우선, 팀프로젝트에서 저장한 영화데이터 경로를 base_path로 지정해주고 load_dt는 에어플로우에서 일자별로 처리해야하기 때문에 넣어야 한다. 이미 일자별로 partition이 된 데이터여서 다시 df['load_dt'] = load_dt를 추가해줘야 일자별로 다시 repartition 할 수 있다.

 

 

제 2 장: 에어플로우로 데이터 파이프라인 구축하기

중간에 branch를 넣어준 이유는 데이터 멱등성을 위해 넣어준 것이다. 실행할때마다 데이터가 누적되어 쌓이면 안되니까!

전체적인 틀은 이렇게 잡고 각 플로우마다 수행하는 기능을 만들어야 한다.

 

 

제 3 장: 에어플로우 dag 파일 date 지정해주기

dag의 start_date와 end_date를 지정해줘야 한다. 내가 가지고 있는 영화데이터는 2015년도 영화이기 때문에 아래처럼 지정해주었다.

with DAG(
     'pyspark_movie',
    default_args={
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(seconds=3),
    },
    max_active_runs=1,
    max_active_tasks=3,
    description='hello world DAG',
    schedule="10 2 * * *",
    start_date=datetime(2015, 1, 1),
    end_date=datetime(2015, 12, 31),
    catchup=True,
    tags=['pyspark', 'spark'],
) as dag:

 

 

제 4 장: branch.op 기능 만들기 

start 다음에 오는 branch.op 기능은 데이터를 저장할 디렉토리에 파일이 있다면 rm.dir로 보내고 없다면 re.partition으로 보낸다.

 def branch_fun(ds_nodash):
        import os
        home_dir = os.path.expanduser("~")
        path = os.path.join(home_dir, f"data/movie/repartition/load_dt={ds_nodash}")
        print('*' * 30)
        print(path)
        print('*' * 30)

        if os.path.exists(path):
            print('존재')
            return "rm.dir" #rmdir.task_id
        else:
            print('존재x')
            return "re.partition"
            
 
  branch_op = BranchPythonOperator(
            task_id='branch.op',
            python_callable=branch_fun
            )

 

 

제 5 장: rm.dir & re.partition Task 만들기

branch.op에서 보낸 명령을 수행하는 두 개의 Task는 각각 삭제하고 생성하는 기능을 수행한다.

re_par라는 함수는 내가 1장에서 만든 repartitioning 하는 기능을 불러와 그것을 수행하는 함수이다. 그리고 이 함수를 re_partition Task가 읽어서 처리한다. 나는 PythonVirtualenvOperator를 이용하였다. 다른사람도 언제든지 사용할 수 있으니까! 내 로컬에만 수행할 기능이었다면 PythonOperator를 써도 된다.

그리고 rm.dir은 단순히 삭제하는 기능이다.

def re_par(load_dt):
        from spark_py.re_partition import re_partition
        re_partition(load_dt=load_dt)
        print("load")
        
re_partition = PythonVirtualenvOperator(
            task_id='re.partition',
            python_callable=re_par,
            trigger_rule="all_done",
            requirements=["git+https://github.com/hamsunwoo/spark_partitioning.git"],
            op_args=["{{ds_nodash}}"],
            system_site_packages=False
             )
                     
rm_dir = BashOperator(
            task_id='rm.dir',
            bash_command="""
                rm -rf ~/data/movie/repartition/load_dt={{ds_nodash}}
            """
            )

 

 

제 6 장: 데이터 집계를 위한 sparksql 기능 만들기

나는 movie_join_df.py 파이썬 파일을 하나 더 만들어 기능을 구현하였다.

spark에서 제공하는 틀?이 있는데 공식문서를 참고하여 작성하면 된다.

from pyspark.sql import SparkSession
import sys

LOAD_DT = sys.argv[1] #일자별로 기능을 적용해야 하기 때문에 추가

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

logFile = "/Users/seon-u/app/spark-3.5.1-bin-hadoop3/README.md"  # Should be some file on your system
logData = spark.read.text(logFile).cache()

df1 = spark.read.parquet(f"/Users/seon-u/data/movie/repartition/load_dt={LOAD_DT}")
df1.createOrReplaceTempView("one_day")

df2 = spark.sql(f"""
SELECT
    movieCd, -- 영화의 대표코드
    movieNm,
    salesAmt, -- 매출액
    audiCnt, -- 관객수
    showCnt, --- 사영횟수
    -- multiMovieYn, -- 다양성 영화/상업영화를 구분지어 조회할 수 있습니다. “Y” : 다양성 영화 “N”
    repNationCd, -- 한국/외국 영화별로 조회할 수 있습니다. “K: : 한국영화 “F” : 외국영화
    '{LOAD_DT}' AS load_dt
FROM one_day
WHERE multiMovieYn IS NULL
""")
df2.createOrReplaceTempView("multi_null")

df3 = spark.sql(f"""
SELECT
    movieCd, 
    movieNm,
    salesAmt,
    audiCnt, 
    showCnt, 
    multiMovieYn,
    -- repNationCd, 
    '{LOAD_DT}' AS load_dt
FROM one_day
WHERE repNationCd IS NULL
""")

df3.createOrReplaceTempView("nation_null")

df_j = spark.sql(f"""
SELECT
    COALESCE(m.movieCd, n.movieCd) AS movieCd,
    COALESCE(m.salesAmt, n.salesAmt) AS salesAmt,
    COALESCE(m.audiCnt, n.audiCnt) AS audiCnt, 
    COALESCE(m.showCnt, n.showCnt) AS showCnt, 
    multiMovieYn, 
    repNationCd, 
    '{LOAD_DT}' AS load_dt
FROM multi_null m FULL OUTER JOIN nation_null n
ON m.movieCd = n.movieCd""")

df_j.createOrReplaceTempView("join_df")

df_j.write.mode('append').partitionBy("load_dt", "multiMovieYn", "repNationCd").parquet("/Users/seon-u/data/movie/hive")

df_j.show()
spark.stop()

 

1. 영화데이터를 불러온다: df1 = spark.read.parquet(f"/Users/seon-u/data/movie/repartition/load_dt={LOAD_DT}")

2. 불러온 데이터를 "one_day"라는 테이블로 저장한다: df1.createOrReplaceTempView("one_day")

3. one_day라는 테이블을 이용해 쿼리를 작성한다: df2 = spark.sql(f"""~~

  • 이 테이블은 독립/상업영화 컬럼에서 모두 null값을 가진 테이블이다.
  • 위에서 repartition을 할 때 일자별/독립상업영화별/국내해외영화별로 저장했기 때문에 일자별로 읽으면 데이터가 중복되어 있기 때문에 각각 null 값을 가지는 컬럼들만 따로 테이블을 만들어 붙여줄 것이다.

4. 방금 작성한 퀴리를 "multi_null이라는 테이블로 저장한다: df2.createOrReplaceTempView("multi_null")

5. 이번엔 국내해외영화 컬럼이 null 값인 테이블을 쿼리로 작성한다: df3 = spark.sql(f"""

6. "nation_null"이라는 이름을 가진 테이블로 저장한다: df3.createOrReplaceTempView("nation_null")

7. 두 개의 테이블 "multi_null"과 "nation_null"을 합친다: df_j = spark.sql(f"""~~

  • 합칠 때 FULL OUTER JOIN 함수를 이용하였으며 COALESCE 함수로 원래는 분리 되어있는 컬럼을 하나로 합쳤다.

8. 방금 만든 쿼리를 "join_df"라는 테이블로 저장한다: df_j.createOrReplaceTempView("join_df")

9.  "join_df" 테이블을 어딘가에 저장하는데 "load_dt", "multiMovieYn", "repNationCd"로 파티셔닝하여 저장한다.

  • df_j.write.mode('append')는 파일을 기존 파일에 계속 붙여서 저장하는 것이다. overwrite는 덮어써서 저장하는 것!

10. 위의 명령들을 수행하면 spark를 멈춘다:  spark.stop()

 

 

제 7 장: 방금 만든 spark기능을 에어플로우에 적용하기

에어플로우에 적용하는건 간단하다. bash 명령어를 쓰면 된다. spark를 실행시키는 명령어는 공식문서에 나와있다.

$SPARK_HOME/bin/spark-submit 이다.

join_df = BashOperator(
            task_id='join.df',
            bash_command="""
                echo "join"
                SPARK_HOME=/Users/seon-u/app/spark-3.5.1-bin-hadoop3 #SPARK홈경로
                FILE_PATH=~/airflow_spark/py #기능을 구현한 파일 경로
                $SPARK_HOME/bin/spark-submit $FILE_PATH/movie_join_df.py {{ds_nodash}}
            """
            )

 

 

제 8 장: 합친 테이블을 다시 어딘가에 저장하기 전에 branch operator로 데이터 멱등성 유지하기

이건 위에 branch.op와 동일하다. 파일이 있다면 rm.hive로 보내고 파일이 없으면 agg.df로 보낸다.

def branch_hive(ds_nodash):
        import os
        home_dir = os.path.expanduser("~")
        path = os.path.join(home_dir, f"data/movie/hive/load_dt={ds_nodash}")
        print('*' * 30)
        print(path)
        print('*' * 30)

        if os.path.exists(path):
            print('존재')
            return "rm.hive"
        else:
            print('존재x')
            return "agg.df"
            
branch_hive = BranchPythonOperator(
            task_id='branch.hive',
            python_callable=branch_hive
            )

 

 

제 9 장: rm.hive & agg.df Task 만들기

이건 위와 동일하기 때문에 설명은 생략! 

agg_df = BashOperator(
            task_id='agg.df',
            bash_command="""
                echo "agg"
                SPARK_HOME=/Users/seon-u/app/spark-3.5.1-bin-hadoop3
                FILE_PATH=~/airflow_spark/py
                $SPARK_HOME/bin/spark-submit $FILE_PATH/movie_agg_df.py
            """,
            trigger_rule='all_done'
            )


    rm_hive = BashOperator(
            task_id='rm.hive',
            bash_command="""
                rm -rf ~/data/movie/hive/load_dt={{ds_nodash}}
            """
            )

 

 

제 10 장: 매출액 합계를 테이블을 만들기 위한 sparksql 기능 만들기

나는 movie_agg_df.py 파이썬 파일을 하나 더 만들어 집계 테이블을 만드는 기능을 추가하였다.

이 파이썬 파일을 위에 에어플로우 agg.df에서 수행한다.

from pyspark.sql import SparkSession
import sys

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

logFile = "/Users/seon-u/app/spark-3.5.1-bin-hadoop3/README.md"  # Should be some file on your system
logData = spark.read.text(logFile).cache()

df1 = spark.read.parquet(f"/Users/seon-u/data/movie/hive")
df1.createOrReplaceTempView("join_df")

#국내/해외영화별 매출액, 관객수 집계
df2 = spark.sql(f"""
                SELECT load_dt as load_dt,
                        repNationCd,
                        sum(salesAmt) as salesAmt,
                        sum(audiCnt) as audiCnt,
                        sum(showCnt) as showCnt
                FROM join_df
                GROUP BY load_dt, repNationCd
                ORDER BY load_dt
                """
                )

df2.createOrReplaceTempView("nation")

#독립/상업영화별 매출액, 관객수 집계
df3 = spark.sql(f"""
                SELECT load_dt as load_dt,
                        multiMovieYn,
                        sum(salesAmt) as salesAmt,
                        sum(audiCnt) as audiCnt,
                        sum(showCnt) as showCnt
                FROM join_df
                GROUP BY load_dt, multiMovieYn
                ORDER BY load_dt
                """
                )

df3.createOrReplaceTempView("multi")

df2.write.mode('overwrite').partitionBy("load_dt", "repNationCd").parquet("/Users/seon-u/data/movie/sum_nation")
df3.write.mode('overwrite').partitionBy("load_dt", "multiMovieYn").parquet("/Users/seon-u/data/movie/sum_multi")

spark.stop()

 

다시 에어플로우 파이프라인을 살펴보았을 때,

agg.df 에서 end로 가기 전에도 멱등성을 위해 branchOperator를 생성해줘야 하는데 처음 파이프라인을 만들 때 고려하지 못한 부분이다.

저장해줄 때 데이터 멱등성을 잊지말자!

 

 

참고로 spark.sql을 작성할 때 수시로 테이블을 확인하고 싶다면 zeppelin에서 확인하면 된다. 게다가 스파크는 데이터 시각화도 간단하게 구현할 수 있다. 아래처럼 컬럼들을 올바른 위치에 옮겨주면 된다. 판다스보다 훨씬 간편해서 좋았다.(팀프로젝트에서 판다스로 데이터 시각화하는데 어려움을 겪은 경험이 있기에....)

그리고 Google Looker Studio처럼 대시보드로 만들 수 있다.

 

 

일주일을 보내면서...

이번 일주일은 새로운 기술을 배우는 데 많은 시간을 할애했는데, 특히 아파치 스파크(Spark)가 주된 내용이었다. 처음에는 분산 처리 시스템이라는 개념 자체가 조금 낯설고 어려워 보였지만, 실습을 통해 점점 익숙해진 것 같다. 특히, 데이터 시각화 작업을 하면서 그 재미가 더해진 느낌이다. 지난주 팀프로젝트에서 파이썬의 판다스(Pandas)를 사용해 데이터 시각화를 시도했을 때는, 복잡한 코드와 다양한 설정에 조금 어려웠던 기억이 있다. 그때는 원하는 그래프 하나 그리는 것도 어려워서 고생했던 것 같다. 근데 스파크는 그런 면에서 훨씬 간단하고 직관적이어서, 이번에는 비교적 수월하게 작업을 진행할 수 있었다.

 

앞으로 나의 방향

지난 주에는 그동안 배운 것들을 정리하는 데 시간을 많이 썼다. 정리는 중요한 작업이지만, 한편으로는 다른 분야의 공부도 함께 병행해야 한다는 생각이 들었다. 그래서 이번 주부터는 정리에만 집중하지 않고, 새로운 주제에 대한 공부도 함께 시작하기로 결심했다. 하지만 또... 실습으로 인해... 결국 다른 공부를 깊이 있게 진행하지 못한 것 같다. 앞으로는 조금 더 균형 잡힌 학습 계획을 세워야겠다는 반성을 하게 되었다.

 

특히 다음 주부터는 코딩 테스트 준비를 본격적으로 시작할 계획이다. 조금씩이라도 매일 꾸준히 문제를 풀고, 부족한 부분을 채워 나가면서 진행해야겠다.