imhamburger 님의 블로그

데이터엔지니어 부트캠프 - 에어플로우 dag, Operator 다뤄보기 (3주차) 본문

데이터엔지니어 부트캠프

데이터엔지니어 부트캠프 - 에어플로우 dag, Operator 다뤄보기 (3주차)

imhamburger 2024. 7. 28. 00:38

지난주에 에어플로우 Operator에서 csv파일을 만드는 것까지 진행하였다. (지난글 바로가기)

이번주엔 csv파일로 만든 것을 sql을 이용해 데이터베이스에 저장하고 판다스를 이용해 parquet 파일형태로 다시 저장해보는 것을 하였다. 약간 과정이 복잡하기는한데... 차근차근 알아보자.

 

4. task_create_tbl(table)

task_create_tbl = BashOperator(
                 task_id="create.table",
                 bash_command="""
                    SQL={{var.value.SQL_PATH}}/create_db_table.sql
                    MYSQL_PWD='{{var.value.DB_PASSWD}}' mysql -u root < $SQL
                 """
                 )

위 코드는 dags 에 있는 코드이다. 위 코드를 해석하자면,

SQL이라는 변수를 지정해 해당 변수는 {{var.value.SQL_PATH}}에 create_db_table.sql 파일을 의미한다.

{{var.value.SQL_PATH}}는 에어플로우에 저장해놓은 건데 ~/airflow/sql 폴더를 의미한다.

 

SQL은 어쨋든 최종적으로 create_db_table.sql이고 아래 MYSQL_PWD는 mysql을 시작할 때 매번 비밀번호를 입력해야하는데 그걸 자동적으로 입력되게 해놓은 것이다. 그리고 바로 위에서 변수로 설정한 $SQL 을 실행시킨다. 

 

내가 짠 SQL 코드는 다음과 같다.

CREATE DATABASE IF NOT EXISTS history_db;

USE history_db;

CREATE TABLE IF NOT EXISTS history_db.tmp_cmd_usage (
    dt VARCHAR(500),
    command VARCHAR(500),
    cnt VARCHAR(500)
);


CREATE TABLE IF NOT EXISTS history_db.cmd_usage (
    dt DATE,
    command VARCHAR(500),
    cnt INT,
    tmp_dt VARCHAR(500)
);
  • history_db가 존재하지 않는다면, history_db 라는 데이터베이스를 생성한다.
  • history_db를 이용한다.(=접근한다.)
  • tmp_cmd_usage라는 테이블을 없으면 생성한다. 컬럼은 dt / command / cnt 총 3개를 만들고 문자열이다.
  • cmd_usage라는 테이블을 없으면 생성한다. 컬럼은 dt / command / cnt / tmp_dt 총 4개를 만들고 dt는 DATE타입, cnt는 숫자타입 나머지는 문자열로 지정한다.

그럼 이제 history_db라는 데이터베이스를 만들었다. 그리고 빈 테이블들을 만들었다. 이제 csv파일을 열어 history_db안에 있는 빈 테이블에 채워넣으면 된다.

 

5. task_tmp

task_tmp = BashOperator(
            task_id="to.tmp",
            bash_command="""
                echo "to tmp"
                CSV_FILE=~/data/csv/{{logical_date.strftime("%y%m%d")}}/csv.csv
                bash {{var.value.SH_HOME}}/csv2mysql.sh $CSV_FILE {{ds}}

            """
            )

CSV_FILE이라는 변수에 csv파일들이 있는 경로를 지정하였다. 그리고 아래 명령어를 실행한다.

csv2mysql.sh 실행파일을 실행시키는데 $1은 $CSV_FILE을 가리키고 $2는 어떠한 날짜를 가리킨다. 그럼 csv2mysql.sh 실행파일이 어떤일을 하는지 확인해보자.

 

다음은 csv2mysql.sh 코드를 보여준다.

#!/bin/bash

CSV_FILE=$1
DEL_DT=$2

user="root"
database="{데이터베이스명}"

MYSQL_PWD="{비밀번호}" mysql --local-infile=1 -u"$user" "$database"<<EOF
DELETE FROM history_db.tmp_cmd_usage WHERE dt='${DEL_DT}';

LOAD DATA LOCAL INFILE '$CSV_FILE'
INTO TABLE {데이터베이스}.tmp_cmd_usage
CHARACTER SET latin1
FIELDS TERMINATED BY ',' ESCAPED BY '' ENCLOSED BY '^'
LINES TERMINATED BY '\n';
EOF
  • $1은 CSV_FILE을 가리킨다. 아까 위 코드에 CSV_FILE은 csv파일이 있는 경로를 가리킨다.
  • $2는 DEL_DT를 받는데 아래 DELETE FROM~ 이 부분에 해당 변수가 들어가있다. DELETE는 채워진 데이터를 삭제하는 sql 명령어?이다.

여기서 잠깐, 근데 왜 데이터를 삭제하지?

DELETE FROM history_db.tmp_cmd_usage WHERE dt='${DEL_DT}';

위 DELETE 코드는 멱등성때문에 넣었다.

예를들어, 현재 7월을 기준으로 6월 데이터를 추출한다고 했을 때랑 나중에 12월에도 6월 데이터를 추출했을 때 6월 데이터는 변하지 않아야 한다. 근데 에어플로우는 실행할 때마다 데이터가 누적되어 쌓인다. 그래서 날짜가 같으면 데이터를 삭제하고 다시 데이터를 LOAD 하는 방식으로 코드를 짠 것이다.

 

이렇게하면, 데이터는 멱등성을 유지할 수 있다. 

 

다시돌아와서...

  • mysql 유저명과 데이터베이스를 변수로 지정하고 아래 코드에 $user, $database로 받았다. 유저와 데이터베이스를 지정을 안하면 bash는 어떤 mysql 유저인 어떤 데이터베이스인지 인지를 못해 에러를 낸다.
  • 그리고 데이터베이스를 지정해주고 비밀번호를 읽고 그제서야 mysql에 접근한다.
  • --local-infile=1은 local 서버로 데이터를 불러오기위해 적어줘야 한다. 이건 이전글에서 다뤄서 에러메세지4를 참고~
  • <<EOF는 End Of File의 약자이며 EOF 사이에 적은 내용을 입력받아 명령을 수행한다. 사용 시작(EOF)을 알리고 끝날 때도 동일하게 EOF를 입력해야 종료된다.
  • 다음은 $CSV_FILE을 LOCAL에서 가져와서 tmp_cmd_usage에 담는다. 라는 내용이며 필드를 ',' 기준으로 나눈다.
  • ESCAPED BY와 ENCLOSED BY는 에러해결하기 에서 다루었기 때문에 해당글을 참고하면 된다.
  • 라인은 \n (줄바꿈) 기준으로 나눈다.

 

tmp_cmd_usage 테이블에 멱등성을 유지하면서 csv 데이터를 잘 넣었다.

사실 tmp_cmd_usage는 cmd_usage 테이블에 넣기전에 임시로 넣기위해 만든 테이블이다. 가공되지 않은 데이터를 먼저 넣어놓고 cmd_usage에서 가공할 예정이다. 만들어도되고 안만들어도되는데 데이터가 크면 클수록 잘게 단계를 쪼개서 데이터를 관리하는 것이 좋다. 그래야 어느 단계에서 에러가 나는지 명확하기 파악하기 쉽다.

 

이제 tmp_cmd_usage에 있는 데이터를 다시 cmd_usage로 옮기자.

 

6. task_base

task_base = BashOperator(
            task_id="to.base",
            bash_command="""
                echo "to base"
                bash {{var.value.SH_HOME}}/tmp2base.sh {{ds}}
            """
            )

이미 tmp_cmd_usage에서 데이터를 잘 넣어놨기 때문에 옮겨주기만 하면 된다.

위 코드는 tmp2base.sh 실행파일을 실행하며 {{ds}}이 $1로 들어가있다. tmp2base.sh 는 아래코드로 짜여져 있다.

#!/bin/bash

DT=$1

user="root"
database="history_db"

MYSQL_PWD="qwer123" mysql --local-infile=1 -u"$user" "$database"<<EOF
DELETE FROM history_db.cmd_usage WHERE dt='${DT}';

INSERT INTO cmd_usage
SELECT
    CASE WHEN dt LIKE '%-%-%'
            THEN str_to_date(dt, '%Y-%m-%d')
            ELSE str_to_date('1970-01-01', '%Y-%m-%d')
    END as dt,
    command,
    CASE WHEN cnt REGEXP '[0-9]+$'
            THEN cast(cnt as unsigned)
            ELSE -1
    END as cnt,
    '${DT}' as tmp_dt
FROM tmp_cmd_usage
WHERE dt = '${DT}';
EOF

사실, INSERT 전까지는 위에 sql 코드와 차이는 없다. 어쨋든 cmd_usage도 멱등성을 유지해야 하니 DELETE를 해줘야한다.

 

cmd_usage에서는 데이터를 불러올 필요없이 이미 있는 tmp_cmd_usage 테이블의 데이터를 넣어주기만 하면되서 INSERT문을 사용하였다. 그치만 이제 로우데이터를 가공해줘야 한다.

  • '%-%-%'형태라면 문자열을 날짜타입으로 '%Y-%m-%d'으로 지정한다. %Y-%m-%d는 '2024-07-10' 이런 형태이다. 그 외는 '1970-01-01'로 지정한다.
  •  command는 그대로 지정한다.
  • cnt가 0~9999로 이루어져있다면 숫자로 형변환하여 저장한다. cast는 형변환하는 명령어로 as unsigned는 숫자타입을 의미한다. 그 외는 '-1'로 지정한다.

여기서, REGEXP는 정규표현식으로 [0-9]는 0부터 9까지의 숫자들을 가리키고 +$는 $는 문자열의 종료를 의미한다. 앞에 [0-9]로 종료된다는 의미라 0부터 9까지 어느숫자든 숫자로 종료된다는 뜻이다. (참고글)

  • $DT를 불러온다. $DT는 $1 이며,  아까 dags파일에 {{ds}}이 $1로 들어가있다. {{ds}}는 에어플로우에서 쓰이는 날짜타입이다. (이 컬럼을 넣은 이유는 데이터가 어디서 왔는지 확인하기 위해 넣었다. 왜냐하면 형식에 안맞는 데이터를 1970-01-01과 -1로 주었기 때문이다. 이 컬럼을 넣어야 헷갈리지 않고 명확히 할 수 있다.)
  • FROM tmp_cmd_usage tmp_cmd_usage 테이블에서 데이터를 가져온다.
  • WHERE dt = $DT 조건은 tmp_cmd_usage 의 dt 컬럼이 $DT와 같아야 한다. 그리고 명령을 종료한다. EOF

위 코드를 출력하면 아래와 같다.

 

이제 csv파일을 데이터베이스에 저장완료하였다. 해당 dag에서 task는 이제 딱 하나 남았다. 바로 END 이다. 모든 데이터가 잘 수행되었다면 어떤 DONE 파일을 만들고 task를 마무리하는 것이다.

 

7. task_done

task_done = BashOperator(
            task_id="make.done",
            bash_command="""
                figlet "make.done.start"

                DONE_PATH={{var.value.IMPORT_DONE_PATH}}/{{logical_date.strftime("%y%m%d")}}
                mkdir -p $DONE_PATH
                echo "IMPORT_DONE_PATH=$DONE_PATH"
                touch $DONE_PATH/_DONE

                figlet "make.done.end"
            """
            )
  • DONE_PATH라는 변수를 아래 경로로 지정하고 해당 경로에 폴더를 생성한다.

  • IMPORT_DONE_PATH=%y%m%d 를 출력한다. (예시 IMPORT_DONE_PATH=240710)
  • DONE_PATH 경로에 _DONE을 생성한다.
  • figlet 부분은 그냥 추가한건데, 오류없이 잘 수행되었다면 에어플로우에서 출력을 확인할 수 있다. 

 

이제 끝인가?

싶겠지만, csv파일들을 parquet 파일형식으로 만들어 저장할 것이다... (아직 끝난게 아니다...)

 

Parquet(파케이)

parquet 파일이 생소할 수 있다. 나는 처음들어봤다. parquet는 프랑스어로 마룻바닥을 의미한다.

마룻바닥 모양이 뭔가 차곡차곡 붙여져있는 모양이라 그렇게 이름을 지었나보다.

 

어쨋든,

parquet은 하둡에서 컬럼방식으로 저장하는 저장 포맷이다. parquet은 열 기반 압축인데 열 기반으로 압축하면,

  • 행 기반보다 데이터 압축률이 높다.
  • 필요한 열 데이터만 읽어서 처리하는 것이 가능하기 때문에 데이터 처리하는데 자원을 절약할 수 있다.

뭔가 잘 안와닿을 수 있지만, 아래 그림을 보면 이해하기 조금 더 수월하다.

parquet은 컬럼방식으로 저장하는 포맷으로 열기반 압축이다. 열기반이면 위 그림처럼 데이터 타입이 같기때문에 압축할 때 속도가 더 빠르고 동일한 데이터 타입이 저장되기 때문에 컬럼별로 적합한 인코딩을 사용할 수 있다.

그러니 압축속도도 빠르고 용량도 작다. 그리고 특정언어에 종속되지 않는다.

parquet과 유사한 형식으로 ORC라는 파일 형식도 있는데 예전에 ORC는 Hive에서만 사용할 수 있었다고 한다.

 

반면, 행기반 압축은 데이터 타입이 각기 다르기 때문에 압축 속도가 parquet에 비해 느리며 압축용량도 더 크다.

 

그리하여...

csv파일을 parquet 파일 형식으로 바꿔 저장해보려 한다.

 

make_parquet.py 라는 파일을 dags 에 하나 더 만들었다.

 

make_parquet 파이프라인

 

1. task_check

task_check = BashOperator(
            task_id="check.done",
            bash_command="""
                DONE_FILE={{var.value.IMPORT_DONE_PATH}}/{{logical_date.strftime("%y%m%d")}}/_DONE
                bash {{var.value.CHECK_SH}} $DONE_FILE
            """
            )

위에서 DONE 이 만들어진 것이 확인되면 실행되는 task이다. 그러니까 2개의 dags가 하나로 이어져 있는 것이다.

위 코드는 CHECK_SH 를 실행한다. 그리고 $1 은 $DONE_FILE을 가리킨다.

 

CHECK_SH 파일은 다음과 같다.

#!/bin/bash


DONE_PATH_FILE=$1

if [ -e "${DONE_PATH_FILE}" ]; then
    figlet "DONE"
    exit 0
else
    echo "I'll be back => ${DONE_PATH_FILE}"
    exit 1
fi

위 코드를 해석하면, 만약 DONE_PATH_FILE 즉 DONE_FILE이 존재한다면 "DONE"을 출력한다.

없다면, "I'll be back => DONE_FILE"을 출력한다.

DONE파일이 전에 만들어졌으니 DONE을 출력했다.

 

이제 데이터를 체크했으니 csv 파일을 parquet으로 변환하자.

2. task_parquet

  task_parquet = BashOperator(
            task_id="to.parquet",
            bash_command="""
                echo "to parquet"

                READ_PATH=~/data/csv/{{logical_date.strftime("%y%m%d")}}/csv.csv
                SAVE_PATH=~/data/parquet

                #mkdir -p $SAVE_PATH

                python ~/airflow/dags/to_parquet.py $READ_PATH $SAVE_PATH
            """
            )

우선 data 폴더 안에 parquet이라는 폴더를 생성하고 to.parquet.py 라는 파일을 실행시킨다.

(근데 아래에서 partition_cols를 이용해서 폴더를 생성을 안해도된다. partition_cols을 쓰면 경로만 지정해주면 알아서 폴더를 만들고 저장된다.)

$1 = $READ_PATH , $2 = $SAVE_PATH를 가리킨다.

 

to.parquet.py 을 살펴보면, 다음과 같다. 판다스를 이용하여 parquet 파일로 변환한다.

import pandas as pd
import subprocess
import sys

READ = sys.argv[1]
SAVE = sys.argv[2]

df = pd.read_csv(READ,on_bad_lines='skip',names=['dt', 'cmd', 'cnt'], encoding = "latin")

df['dt'] = df['dt'].str.replace('^', '')
df['cmd'] = df['cmd'].str.replace('^', '')
df['cnt'] = df['cnt'].str.replace('^', '')

#'coerce'는 변환할 수 없는 데이터를 만나면 그 값을 강제로 NaN으로 바꾼다.
df['cnt'] = pd.to_numeric(df['cnt'],errors='coerce')
#NaN값을 원하는 방식으로 처리한다.(예: 0으로 채우기)
df['cnt'] = df['cnt'].fillna(0).astype(int)

df.to_parquet(f'{SAVE}', partition_cols=['dt'])
  • READ를 $1로 받고 SAVE를 $2로 받는다.
  • pd.read_csv > csv 파일을 읽어온다. 컬럼명을 dt / cmd / cnt로 지정한다. (오류없이 가져오기 위해 문제가 있는 라인은 스킵하는 옵션을 넣었다.)
  • 내 csv파일 문자들이 모두 '^'로 감싸져있기 때문에 그것을 없애기 위해 replace를 썼다.
  • cnt는 숫자로 변환하고 변환이 불가능한 데이터는 NaN으로 반환하게끔 errors='coerce' 를 추가하였다.
  • 그런다음 NaN값을 0으로 채우고 그 값을 다시 숫자로 변환한다.
  • df.to_parquet : 데이터를 parquet으로 변환한다. 어디에 저장하나? $SAVE에(~/data/parquet)! 그리고 추가적으로 partition_cols을 썼다. partition_cols에 대한 자세한 기능은 여기를 참고!

 

이제 csv파일을 parquet으로 변환하는 것까지 완료하였다. 그리고 다 완료하였으니 또 DONE파일을 만들어 완전하게 끝내도록 하자.

 task_make = BashOperator(
            task_id="make.done",
            bash_command="""
                echo "make done"
                MAKE_DONE_PATH=~/data/done/make_done
                MAKE_DONE_FILE=$MAKE_DONE_PATH/_DONE

                mkdir -p ~/data/done/make_done
                touch $MAKE_DONE_PATH/_DONE

                bash ~/airflow/dags/make_done.sh ~/data/parquet

            """
             )
  • ~/data/done/make_done 폴더를 생성한다.
  • 만든 폴더 안에 _DONE 파일을 생성한다.
  • make_done.sh 실행파일을 실행한다. $1는 ~/data/parquet을 가리킨다.

make_done.sh 파일을 살펴보면 다음과 같다.

#!/bin/bash


DONE_PATH_FILE=$1

if [ -d "${DONE_PATH_FILE}" ]; then
    figlet "DONE"
    exit 0
else
    echo "I'll be back => ${DONE_PATH_FILE}"
    exit 1
fi

-d는 디렉토리를 의미한다. 파일은 -e 였다.

$1, 즉 ~/data/parquet 디렉토리가 존재하면 "DONE"을 출력하고 없으면 "I'll be back => ~/data/parquet"을 출력한다.

 

다행히 잘 출력되었다!

 

 

일주일을 보내면서...

이번 일주일은 정말 초고속으로 지나간 것 같다. 아마 배운게 이것저것 많아서 그런거같다.

공부할 것이 갑자기 엄청 많아졌다. 파이썬, 판다스 등

이제 진짜 코드를 짜는 방법을 익혀야 하는데 쉽지가 않다. 아직 문법도 잘 이해가 되지않는다구...

그래도 나에게 칭찬해주고 싶은건 배운 것들을 차근차근 정리하고 있는 것이다. 수업을 들을 땐 사실 이것저것 정신없이 배워서.. '뭐지?' 싶은데 이렇게 정리를 하면 내가 지금 어느 단계를 진행하고 있는지 명확히 파악할 수 있다. 실제로 까먹었던 것이 있으면 내가 정리한 글을 찾아보기도 한다. 그래서 더 잘 정리해야겠다라는 마음이 더 크다. 그러면서도 너무 정리만 하면 안되는데... 다른 것도 해야하는데...! 참 어렵다. 

 

앞으로 나의 방향

앞으로는 수업에서 배우는 것도 배우는 것이지만 별도로 내가 공부해야 할 것이 생겼다. 지금 수업도 따라가기에 급급하지만... 

그래도 수업시간에 초집중해서 바로바로 이해할 수 있게 만들어야겠다.

 

이번에 데이터를 불러오고 가공하고 데이터베이스 저장하고...를 반복하면서 데이터 유실없이 가져와야 한다는 것이 매우 중요한 작업이라는 것을 알 수 있었다. 나는 항상 이미 깨끗하고 잘 정리된 데이터만 봐왔어서 이렇게 복잡할줄이야..^^ 그렇지만 이렇게 처음부터 데이터를 깨끗하게 가져와야 데이터 분석가나 데이터 사이언티스트가 업무를 하는데 더...수월하겠지? 데이터쪽 뿐만 아니라 다른 부서 또한 마찬가지일 것이다. 사실 회사에서 데이터는 모두가 봐야하는 것이니까. 언젠가 부서마다 데이터를 분석할 수 있는 사람이 있어야 할지도?!