imhamburger 님의 블로그
데이터엔지니어 부트캠프 - 에어플로우 dag, Operator 다뤄보기 (2주차) 본문
저번주에는 내가 가지고 있는 데이터들을 sql로 불러와 데이터베이스, 데이터 테이블을 생성하여 행렬로 이루어진 테이블에 넣는 것을 배웠었다. 이번주는 에어플로우를 다루는 방법을 중점적으로 배웠다. 에어플로우가 뭔지는 이전글에 작성하였으니 참고!
에어플로우에 대해 다시 간단하게 말하자면 데이터 파이프라인을 관리하는 플랫폼이다. 핵심 요소는 dag, operator, task 이렇게 3가지이며 이를 다루기 위해서는 파이썬을 사용해야 한다. dag 를 데이터 파이프라인이라고 생각하면 쉽다. dag가 10개면 10개의 데이터 파이프라인을 가지고 있는 것이다.
dag을 사용하는 방법은 공식문서에 자세히 적혀있다.
기본적으로 dag파일은 ~/airflow/dags 폴더 안에 {DAG명}.py 파일을 만들어 파일 안에 코드를 작성한다.
dag의 기본 설정은 dag 아이디, dag를 돌릴 주기, tag 달기 등이 있는데, 돌릴 주기인 schedule은 cron 방법? 을 이용하는데 특정 시간에 특정 작업을 하는 데몬을 cron(크론)이라 한다. 실행명령은 * * * * * 로 이루어져 있다. 순서대로 분 시 일 월 요일(minute, hour, day, month, day of the week) 을 나타낸다.
DAG
import datetime
from airflow import DAG
with DAG(
'{dag명}',
default_args={
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(seconds=3)
},
description='{dag 설명}',
schedule="10 4 * * *",
start_date=datetime(2024, 7, 10),
catchup=True,
tags=['{dag 태그}'],
) as dag:
depends_on_past 값이 True 일 땐, DAG를 실행했을 때 한 플로우에서 에러가 난다면, 그 다음 단계는 실행되지 않는다.
start > print_date > copy.log > end 라는 데이터 파이프라인이 있다.
depends_on_past = True 일 때,
데이터 파이프라인 : start > check > to.csv > to.tmp > (to.base > make.done) 혹은 (to.base > report.err) > end
반대로, False로 바꾸면 에러가 나도 그 다음 단계로 넘어가 실행된다. check에서부터 에러가 있었지만, 그 다음단계로 넘어가 실행되었다.
depends_on_past = False 일 때,
또한, 에어플로우는 start_date를 지정할 수 있다. 내가 불러오고 싶은 데이터가 과거라면 과거 날짜를 지정해주면 된다. 만약, 2024년 6월 데이터를 불러오고 싶다면 해당 날짜를 지정하면 된다. 2024년 6월 데이터는 현재 7월에 몇번을 실행해도 미래에 12월에 몇번을 실행해도 데이터의 값은 변하지 않는다.
Operator
아래 데이터 파이프라인 모양처럼 만들고자 할 때, 총 8개의 Operator를 만들어야 한다. 그리고 각각 어떤 명령을 줄지 파이썬 코드를 통해 설계하면 된다.
연습삼아 ~/airflow/dags 에 import_db.py라는 파일을 만들었다. 파일명은 아무거나 지정하면 된다.
위의 그림을 코드로 표현하면 다음과 같다.
task_start >> task_check
task_check >> task_csv >> task_tmp
task_tmp >> task_base
task_base >> task_done >> task_end
task_check >> task_err >> task_end
*task_err는 데이터가 들어올 때 에러가 나면 알려주는 용도로 만들었다.
task_start 부터 차근차근 만들어보자!
1. task_start
task_start는 우선 EmptyOperator로 설계할 것이며 task_id 는 'start' 이다. 위에 그림을 보면 'start'라고 보이는데 그게 task_id를 지정해준 것이다. 빈 start task를 만들었다.
task_start = EmptyOperator(task_id='start')
2. task_check
task_check = BashOperator(
task_id="check",
bash_command="""bash {{ var.value.CHECK_SH }} {{logical_date.strftime("%y%m%d")}}""",
trigger_rule="all_success"
)
그 다음 만들어야할 건 check 이다. 데이터가 들어올 때 확인하는 용도로 만들려고 하는데 check는 Empty가 아닌 BashOperator로 만들었다. airflow에서는 Trigger_rule 기능?을 제공하는데 Operator가 실행될 조건을 걸어주는 것이다.
위에서 all_success의 의미는 '이전 Task가 모두 성공적으로 실행되었다면 해당 Task를 실행시킨다.' 라는 의미이다.
Trigger_rule에는 여러가지가 있다. Trigger_rule을 사용하면 Task간의 종속성을 더 유연하게 정의할 수 있다.
all_success | 이전 Task가 모두 성공했을 때 실행한다. Task 중 하나라도 실패하면 실행하지 않는다. |
all_failed | 이전 Task가 모두 실패했을 때 실행한다. Task 중 하나라도 성공하면 실행하지 않는다. |
all_done | 이전 Task가 모두 완료되었을 때 실행한다. Task 중 하나라도 실패하면 실행하지 않는다. |
one_success | 이전 Task 중 하나가 성공하면 실행한다. |
one_failed | 이전 Task 중 하나가 실패하면 실행한다. |
none_failed | 이전 Task 중 하나도 실패하지 않았을 때 실행한다. Task 중 하나라도 실패하면 실행하지 않는다. |
none_failed_or_skipped | 이전 Task 중 하나도 실패하지 않았거나 혹은 실행이 스킵되었을 때 실행한다. |
bash_command에는 내가 어떤 명령을 내릴 것인지 적어주는 곳인데,
bash {{ var.value.CHECK_SH }} {{logical_date.strftime("%y%m%d")}}
맨 앞에 bash는 실행명령어이다. bash 말고도 ./ 도 같은 의미이다.
var.value.CHECK_SH는 에어플로우에서 제공하는 Variables 기능을 사용한 것인데, Admin > Variables 에서 등록하면 된다. (코드를 쓸 때 정해진 규칙이 있으니 참고!)
긴 코드를 반복적으로 적어줘야할 때 유용하게 쓰인다. 내 CHECK_SH는 ~/airflow/dags/check.sh 을 의미한다.
내가 만들어 놓은 check.sh 파일을 살펴보면 다음과 같다.
#!/bin/bash
YYMMDD=$1
echo "check"
DONE_PATH=~/data/done/${YYMMDD}
DONE_PATH_FILE="${DONE_PATH}/_DONE"
if [ -e "${DONE_PATH_FILE}" ]; then
figlet "Let's move on"
exit 0
else
echo "I'll be back => ${DONE_PATH_FILE}"
exit 1
fi
YYMMDD 라는 변수를 $1 로 선언했다.
여기서 $1이 의미하는건?
터미널에선 이전글에 sys.argv를 정리하였을 때 나온 개념과 비슷한 느낌이다. 보통 우리는 터미널에 명령을 할 때 명령어 + {무언가} 를 입력한다. 명령어는 열번호가 없지만 그 다음에 내가 적는 건 열번호가 있다. 위치값이라고 생각하면 쉽다.
예를들어 위와 같이 작성하였을 때 bash는 명령어이고 ham의 자리(열번호)는 0을 가리키고 burger는 1을 가리킨다.
그럼 위에서 $1이 의미하는 건 예시로 보았을 때 burger 자리를 가리킨다.
다시! 내가 아까 task_check에 적은 코드를 보면, {{logical_date.strftime("%y%m%d")}} 가 $1 이 된다.
bash {{ var.value.CHECK_SH }} {{logical_date.strftime("%y%m%d")}}
{{logical_date.strftime("%y%m%d")}} 는 내가 날짜형식으로 된 폴더를 만든 것인데 이름은 240716 이다.
*에어플로우엔 DATE 타입이 여러가지가 있다.
참고로 {{ds_nodash}}는 데이터 타입이 str이며, {{logical_date}}의 데이터 타입은 Pendulum.DateTime 이다. (공식문서 참고)
따라서 strftime을 이용해 date 형태를 바꿔주고 싶을 땐 logical_date를 이용해야 잘 실행된다.
처음에 나는 {{ds_nodash}}와 strftime 을 함께 썼었는데 에러가 났고, 공식문서를 찾아보니 ds_nodash는 str타입이라 strftime 함수가 안먹혔던 것이다..!
그럼 최종적으로 DONE_PATH는 저 변수를 받아 ~/data/done/240716 를 의미하며 DONE_PATH_FILE 은 결국 240716안에 있는 _DONE 파일을 의미한다.
그리고 만약 그 파일이 존재한다면( *-e 의 의미는 "존재하면 True" 라는 의미를 가지고 있다.) "Let's move on"이 출력된다.
그렇지 않으면 "I'll be back => _DONE" 이 출력된다.
여기서 exit 0과 exit 1의 의미는?
exit 0 | 어떠한 에러가 없다. 깨끗하다. |
exit 1 | 어떠한 에러가 있다. 문제가 있다. |
3. task_csv
task_csv는 log타입의 데이터들을 csv 파일로 변환하는 Operator로 설계하려 한다.
task_csv = BashOperator(
task_id="to.csv",
bash_command="""
echo "to csv"
U_PATH=~/data/count/{{logical_date.strftime("%y%m%d")}}/count.log
CSV_PATH=~/data/csv/{{logical_date.strftime("%y%m%d")}}
mkdir -p ${CSV_PATH}
cat ${U_PATH} | awk '{print "{{ds}}," $2 "," $1}' > ${CSV_PATH}/csv.csv
"""
)
bash_command의 코드를 풀어쓰자면,
- "to csv"를 출력한다.
- data/count/{날짜로 표기된 어떤 폴더} 안에 count.log를 U_PATH 라는 변수로 선언한다.
- data/csv/{날짜로 표기된 어떤 폴더} 경로를 CSV_PATH 라는 변수로 선언한다.
- CSV_PATH 경로의 폴더를 생성한다. (mkdir -p ~)
- U_PATH 즉 count.log 안에 내용들을 "{{ds}}," + 2열 + "," + 1열 로 가공하여 그 내용을 CSV_PATH 경로에 있는 csv.csv 에 넣는다.
*{{ds}}는 YYYY-MM-DD 형태의 DATE 형태이다.
근데 awk 가 뭐지?
awk는 오크라고 읽는데 텍스트가 저장되어 있는 파일을 원하는대로 필터링하거나 가공을 통해 나온 결과를 행과 열로 출력해주는 리눅스 명령이다.
예를들어, 아래와 같은 텍스트 파일이 있을 때 2열만 출력하고 싶다면?
이렇게 입력하면 된다.
cat test_file.txt | awk '{print $2}'
아직 만들어야 할 Task가 잔뜩이지만 이번주는 여기까지 완료하였다.
에러를 해결하면서...
처음에 Operator를 만드는 과정에 {{ds_nodash}}와 strftime 을 함께 썼었는데 에러가 났었다. 다른 수강생분들은 애초에 날짜형식이 YYYYMMDD 였기에 {{ds_nodash}}만 써도 됐지만.. 나는 날짜형식이 YYMMDD 였어서 맞춰줘야 했다. strftime을 쓰면 되는데 자꾸 에러가 났다. 문법도 맞고 쓰임새도 맞는데 왜? 라는 의문이 들었고 에어플로우 공식문서를 읽어보았다.
공식문서를 찾아보니 ds_nodash는 str타입이었고 그래서 DATE를 읽어 형태를 바꿔주는 함수인 strftime가 안먹혔던 것이다.
str타입은 string(문자)타입이라 DATE가 아니다. 그렇게 보이지만.. 그래서 ds_nodash가 아닌 DATE타입인 logical_date를 쓰니 에러를 해결할 수 있었다.
모든 해답은 공식문서에 있다...공식문서를 잘 읽어보자.
일주일을 보내면서...
이번 일주일은 저번때보다 뭔가 더 많이 어려워진 느낌이었다. 에어플로우라는 걸 처음 접했을 때 이해가 가지않았고 개념조차 잡히지도 않았다. 그래서 내가 도대체 뭘 하고 있는거지? 라는 의문이 계속 들었던 것 같다. 지금은 에어플로우를 이해하고 코드를 작성하니 내가 지금 무엇을 하고 있고 왜 하고있는지 감이 잡힌 것 같다. (물론, 아직 깊이 한 것도 아니지만...)
메인은 에어플로우였지만, 여러가지 새로운 명령어들도 많이 배웠다. awk 라든지 $ 표시라든지 cron이라던지.
저번에는 sql을 이용해 데이터베이스를 만들고 데이터 파일을 불러와 테이블을 생성하고 가공을 하는 것을 배웠었다. 그리고 이번 한주는 에어플로우를 통해 데이터가 잘 들어오게끔 데이터 파이프라인을 만들어보고 잘 들어오는지 확인하는 시간을 가졌다. (이것만 했는데도 이렇게나 내용이 많다.) 그리고 에어플로우를 이용해 데이터를 가공하는 것도 가능하다. sql로도 가능하지만, 에어플로우를 이용하면 데이터 파이프라인을 직관적으로 확인할 수 있고 데이터가 잘 들어오는지 확인을 할 수 있으며 시간설정만 해놓으면 내가 매일 명령할 필요도 없이 데이터를 불러오는 등 여러가지 이점들이 많다. (쓰는 방법은 복잡한 것 같지만..!)
다음주는 에어플로우에서 sql을 사용하여 데이터를 가공하는 방법을 배운다고 한다..! 그리고 판다스까지.
판다스를 얼핏 읽어보았을 때는 엑셀처럼 행렬로 이루어진 데이터 테이블을 다루는 거 같은데 이것도 파이썬 코드를 이용하는거라 앞으로 공부할 것이 더 많아지는 것 같으다.... 그래도 해야하는 건 해야지.
앞으로 나의 방향
지난주에 마케팅의 도메인 지식을 갖춘 데이터 엔지니어가 되는 것이 목표라고 했었다. 사실 마케터로서 근무하였을 때 불편했던 업무가 여러개 있었다. 불편했다기보단 비효율적이고 시간도 오래걸리고...(+귀찮고..)
그래서 그러한 불편했던 나의 경험을 토대로 만들고 싶은 기능이 있다. 근데 그걸 만들 수 있는 것?인지는 아직 잘 모르겠다. 만들 수 있었다면 누군가가 먼저 만들지 않았을까 하는 생각이 든다. 나중에 시도해보고싶긴 하다.
'데이터엔지니어 부트캠프' 카테고리의 다른 글
데이터엔지니어 부트캠프 - 아파치 스파크(Apache Spark) 이해하기, 에어플로우에 적용시키기 (5주차) (1) | 2024.08.11 |
---|---|
데이터엔지니어 부트캠프 - 첫번째 팀프로젝트 (8/2~8/6) 2-3일차 (0) | 2024.08.05 |
데이터엔지니어 부트캠프 - 첫번째 팀프로젝트 (8/2~8/6) 1일차 (1) | 2024.08.04 |
데이터엔지니어 부트캠프 - 에어플로우 dag, Operator 다뤄보기 (3주차) (1) | 2024.07.28 |
데이터엔지니어 부트캠프 - 맥에서 Secure File Priv 에러 해결과 데이터베이스 생성하기 (1주차) (3) | 2024.07.14 |