imhamburger 님의 블로그
에어플로우(Airflow) - PythonOperator, PythonVirtualenvOperator, BranchPythonOperator 다뤄보기 본문
에어플로우(Airflow) - PythonOperator, PythonVirtualenvOperator, BranchPythonOperator 다뤄보기
imhamburger 2024. 7. 31. 19:24에어플로우에는 다양한 오퍼레이터를 제공한다. 이번엔 지난글에서 다룬 BashOperator말고 PythonOperator를 다뤄보자.
1. PythonOperator
from airflow.operators.python import PythonOperator
def print_context(ds=None, **kwargs):
print("::group::All kwargs")
task = PythonOperator(
task_id="print_the_context",
python_callable=print_context
)
PythonOperator는 Dags에 Python 함수(내가 만든 기능)를 입력하고 task 에서 python_callable={함수명} 만 입력하면 에어플로우에서 명령을 받고 실행한다.
python_callable은 BashOperator에서 bash_command와 같은 기능이다.
내가 만든 Python 기능은 다른 환경에서 만들었는데 어떻게 dags에서도 이용하지?
상관없다. 전에 배포된 기능을 pip install을 이용해 설치해서 썼던 것처럼 동일한 방법으로 진행하면 된다.
예를들어, A의 가상환경에 있는 소스코드를 B의 가상환경으로 가져와 쓸 때는 B의 가상환경에서 A의 소스코드를 설치해주면 된다.
설치하는 방법은 기능을 설치받을때랑 동일하다. 단지 환경이 다를뿐.
pip install git+{깃허브 HTTPS 주소} #메인에 있는 기능일 때
pip install git+{깃허브 HTTPS 주소}@{브랜치명} #브랜치에 있는 기능일 때
pip list를 입력하면 설치된 모듈, 기능을 확인할 수 있다.
그러면 B의 가상환경에서도 A의 가상환경에 있는 기능을 import하여 동일하게 쓸 수 있다.
위처럼 사용할 수 있지만, 패키지 버전을 독립적으로 관리하기 어렵다.
예를들어, 여러 개의 소스코드가 필요해 에어플로우가 있는 환경에 그냥 몽땅 설치한다면, dependencies 충돌이 일어날 수 있다.
따라서 에어플로우에서는 이러한 충돌을 방지하기 위해 PythonVirtualenvOperator라는 오퍼레이터를 별도로 제공하고 있다.
2. PythonVirtualenvOperator
PythonVirtualenvOperator는 Python 코드를 가상 환경 내에서 실행하기 위해 사용하는 오퍼레이터이다. 이 오퍼레이터는 특정 의존성 패키지가 필요한 파이썬 코드를 실행할 때 유용하다. 이를 통해 작업 간의 의존성 충돌을 피하고, 필요한 라이브러리나 패키지 버전을 독립적으로 관리할 수 있다.
정리하자면, PythonVirtualenvOperator를 이용하면 각 오퍼레이터마다 가상 환경 내에서 패키지를 독립적으로 사용할 수 있다.
PythonVirtualenvOperator 특징
- 가상 환경 생성: 주어진 작업을 실행하기 전에, 필요한 패키지와 의존성을 포함한 가상 환경 생성
- 패키지 설치: 필요한 패키지를 지정하면, 가상 환경에 설치
- 파이썬 함수 실행: 가상 환경 내에서 지정된 파이썬 함수를 실행
PythonVirtualenvOperator를 사용하기 위해선 Virtualenv를 설치해줘야 한다.
pip install virtualenv
해당 task에서 requirements 에 설치할 모듈의 깃헙 주소를 pip install 했던 것처럼 동일하게 넣어주면 된다. 이 부분은 필요한 패키지를 지정하는 과정이다.
from airflow.operators.python import PythonVirtualenvOperator
task_get_data = PythonVirtualenvOperator(
task_id="get_data",
python_callable=get_data,
requirements=["git+{깃허브 HTTPS 주소}@{브랜치명}"],
system_site_packages=False,
trigger_rule='all_done'
)
system_site_packages를 True로 설정하면, 가상 환경이 시스템의 site-packages 디렉토리를 참조한다. (기본값은 False)
그렇지만, 에어플로우에서 실행하면 속도가 느려진 것을 알 수 있다. 왜냐하면 실행될 때마다 PythonVirtualenvOperator가 가상환경을 만들어 사용하고 다시 완료될 때 삭제하기 때문이다.
따라서, 가상환경 캐시경로를 지정해주면 만들었다가 다시 삭제하지 않고 만들어둔 것을 다시 재사용하기 때문에 에어플로우에서 빠른속도로 실행할 수 있다.
캐시경로를 지정해주는 건 간단하다. Task에 아래 코드를 추가해주면 된다.
venv_cache_path="{저장할 경로}"
#예시
venv_cache_path="tmp2/airflow_venv/get_data"
캐시경로를 추가한 코드
task_get_data = PythonVirtualenvOperator(
task_id="get.data",
python_callable=get_data,
requirements=["git+https://github.com/ham/movie.git@0.2/api"],
system_site_packages=False,
trigger_rule='all_done',
venv_cache_path="tmp2/airflow_venv/get_data"
)
캐시경로를 추가 후 에어플로우를 실행한 결과
venv_cache_path 주의사항
만약, requirements가 가리키는 원본 소스코드가 업데이트가 되었을 땐 venv_cache_path에는 반영이 안되어 있다. venv_cache_path는 단순히 캐시를 저장한 것이기 때문에 원본 소스코드가 업데이트가 되었을 때 해당 cache_path도 삭제하고 다시 업데이트 해줘야 한다. venv_cache_path를 자세히 들여다 살펴보면, 원본 소스코드가 저장되어 있다. 하지만 그건 이전 버전의 원본 소스코드로 확인될 것이다. 따라서 생성되어 있던 cache_path를 다시 삭제 후 실행해줘야 업데이트된 버전의 원본 소스코드가 cache_path에 업데이트되어 있다.
3. BranchPythonOperator
선행 작업의 결과에 따라 다음에 이어지는 Task가 달라야 할 때 BranchPythonOperator를 이용하여 분리하여 작업할 수 있다.
예를들어, 다음과 같은 조건으로 에어플로우에서 실행하고자 한다.
- path가 있다면 "rm_dir" 이라는 Task로 보내고,
- path가 없다면 "task_get_data" 이라는 Task로 보낸다.
- 그리고 "rm_dir" 이 수행을 완료하면 다시 "task_get_data"로 보낸다.
func이라는 파이썬 함수를 보면 다음과 같다. 여기서 조건에 따라 Return값이 달라진다.
def func(ds_nodash):
home_dir = os.path.expanduser("~")
path = os.path.join(home_dir, f"tmp/test_parquet/load_dt/load_dt={ds_nodash}")
if os.path.exists(path):
return rm_dir.task_id
else:
return "get.data" #task_id
그럼 "rm.dir"과 "get.data"는 서로 다른 작업을 수행해야 한다.
따라서, BranchPythonOperator를 만들어 각각 분리해주는 작업을 하자.
branch_op = BranchPythonOperator(
task_id="branch.op",
python_callable=func,
trigger_rule='all_success'
)
"branch_op" 는 위 파이썬 함수인 func을 수행한다.
rm.dir : func에서 path에 파일이 있다면 파일 삭제
rm_dir = BashOperator(
task_id='rm.dir',
bash_command='rm -rf ~/tmp/test_parquet/load_dt/load_dt={{ ds_nodash }}'
)
get.data : func에서 path에 파일이 없다면 파일 생성
task_get_data = PythonVirtualenvOperator(
task_id="get.data",
python_callable=get_data, #함수이름
requirements=["git+https://github.com/ham/movie.git@0.3/api"],
system_site_packages=False,
trigger_rule='all_done',
venv_cache_path="tmp2/airflow_venv/get_data"
)
위 Operator를 에어플로우에서 그래프로 보자면 다음과 같다.
'에어플로우(Airflow)' 카테고리의 다른 글
에어플로우(Airflow) - Celery Executor 사용하기 (13주차) (0) | 2024.10.02 |
---|---|
에어플로우(Airflow) - 에어플로우 이해하기 (1) | 2024.07.19 |