imhamburger 님의 블로그
에어플로우(Airflow) - dags 에서 파이썬 하나의 함수로 여러 오퍼레이터에서 받기 본문
1. max_active_runs 와 max_active_tasks
에어플로우에선 다양한 옵션을 제공하는 데 그중 max_active 옵션이 있다.
max_active_runs : 최대 몇개의 실행을 활성화할지
max_active_tasks : 최대 몇개의 task를 활성화할지
지정해줄 수 있다.
with DAG(
'movie',
default_args={
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(seconds=3),
},
max_active_runs=1,
max_active_tasks=3,
description='hello world DAG',
schedule="10 2 * * *",
start_date=datetime(2024, 7, 24),
catchup=True,
tags=['movie'],
) as dag:
위 코드를 에어플로우 Details에서 확인할 수 있다. (아래사진 참고)
2. 파이프라인 정리
파이프라인에 Task를 추가하다보면 복잡한 모양의 파이프라인이 만들어 질 수 있다.
Task 1~2개를 추가하여 복잡한 모양의 파이프라인을 깔끔하게 정리해보자.
get.start와 get.end 라는 Task 2개를 추가하였다.
3. op_args 와 op_kwargs
에어플로우 dags 에서 파이썬 하나의 함수로 여러 오퍼레이터에 받는 방법
방법 1 : 함수 인자에 *args와 **kwargs 로 설정하지 않고 인자명을 명시하여 사용하기
def func_multi(a, b):
from mov.api.call import save2df
df = save2df(load_dt = a, url_param = b)
for key, value in b.items():
df[key] = value
p_cols = ['a'] + list(b.keys())
df.to_parquet('~/tmp/test_parquet/load_dt', partition_cols=p_cols)
print(df.head(5))
*args와 **kwargs를 직접적으로 넣지 않고 인자명을 내가 지정해도 사용할 수 있다.
내가 만든 save2df 함수를 import로 불러와 save2df 의 인자를 받아 op_kwargs를 사용하려 한다.
save2df 함수는 다음과 같다.
def save2df(load_dt='20120101', url_param={}):
df = list2df(load_dt, url_param)
df['load_dt'] = load_dt
return df
load_dt='20120101'로 디폴트 파라미터로 받고 있지만, 에어플로우에선 Jinja 템플릿을 이용해 다른 날짜로 바꿔줄 예정이다.
그리고 url_param이라는 파라미터가 존재한다. url_param의 type은 딕셔너리 형태이다.
다시 돌아와서...
그리고 func_multi 함수에서 나는 인자를 a와 b를 넣었고, 각각
load_dt = a
url_param = b 으로 save2df의 파라미터를 받았다.
save2df에서 url_param은 딕셔너리 형태였고, b를 for문을 이용하여 key와 value가 있다면 넣어주게끔 하였다. (없으면 없는대로 실행)
그리고 나는 save2df에서 받아온 데이터를 partition_cols로 이용하여 parquet 파일로 저장해줄 것이다.
partition_cols 이 뭔지 잘 모르겠다면 이전글 참고!
p_cols = ['a'] + list(b.keys())
p_cols 을 이렇게 한 이유는 partiton_cols을 사용할 때 'a' 별 즉 날짜별 + b의 키값별로 분리하기 위해서 이다.
참고로 b의 키값은 총 네개로 저장하려 하는데 multiMovieYn=N / multiMovieYn=Y / repNationCd=F / repNationCd=K 가 있다.
결과를 먼저 보여주자면, 아래와 같다. (일자별 + 키값별 분리)
func_multi 함수 작업이 끝났다면 Operator에서 어떻게 load_dt = a 와 url_param = b
a, b인자를 설정하여 실행시키도록 할 수 있는지! 살펴보자.
2개의 Task로 예시로 든다면, 다음과 같다.
multi_y = PythonOperator(
task_id='multi.y',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={'url_param': {"multiMovieYn": "Y"}}
)
multi_n = PythonOperator(
task_id='multi.n',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={'url_param': {"multiMovieYn": "N"}}
)
위에서 설정한 함수의 인자를 받기 위해서는 2개만 추가하면 된다.
op_args= {입력}
op_kwargs= {입력}
위에서 load_dt = a로 설정하였다. save2df에서 load_dt는 '20120101' 같은 형태였다. 이 형태는 에어플로우 Jinja 템플릿에서 ds_nodash를 이용하면 간단하게 해결할 수 있다. 그러면 날짜별로 출력할 수 있다.
참고로 args는 리스트 형태로 받기 때문에 op_args = [ {입력} ] 형태로 적어줘야 한다.
그리고 나머지 파라미터 url_param = b 두번째 인자는 op_kwargs을 쓰면 된다. kwargs는 딕셔너리 형태이므로 딕셔너리로 받는다.
save2df에서 url_param = {} 즉 딕셔너리 형태였으므로 "url_param"이 Key가 되고, {} 이 부분이 Value가 된다.
따라서 op_kwargs = { "Key 입력" : Value입력 } 이런 형태로 와야한다.
op_kwargs={'url_param': {"multiMovieYn": "N"}} #{'Key': Value}
내가 넣고자 하는 Value는 {"multiMovieYn": "N"}이므로 Value 자리에 넣어줬다.
위 작업을 에어플로우에서 정상적으로 들어갔는지 확인을 위해서는 에어플로우를 실행한 후, Logs > Rendered Template에서 내가 넣은 값을 확인할 수 있다.
방법 2 : 함수 인자에 *args와 **kwargs 로 사용하기
사실 방법 1과 별다를건 없다. 인자를 명시안해주고 *args와 **kwargs를 그 자리에 대신한 것뿐이다.
def func_multi(load_dt, **kwargs):
from mov.api.call import save2df
df = save2df(load_dt=load_dt, url_param=kwargs['url_param'])
for k, v in kwargs['url_param'].items():
df[k] = v
p_cols = ['load_dt'] + list(kwargs['url_param'].keys())
df.to_parquet('~/tmp/test_parquet/load_dt', partition_cols=p_cols)
print(df.head(5))
아까 다뤘던 함수와 똑같지만 인자에 **kwargs로 설정하였다.
그리고 df =save2df 에서 url_param을 kwargs['url_param']만 언팩킹한다.
아래 for문과 p_cols 부분도 kwargs['url_param'] 로 설정해줘야 kwargs가 'url_param'만 선택하여 꺼내온다.
그리고 2개의 Task는 다음과 같다.
multi_y = PythonOperator(
task_id='multi.y',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={'url_param': {"multiMovieYn": "Y"}}
)
multi_n = PythonOperator(
task_id='multi.n',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={'url_param': {"multiMovieYn": "N"}}
)
사실 위에서 보여줬던 Task와 동일하니 설명은 생략하겠다.
**kwargs 사용 시 주의사항
def test1(**params):
print(params['task_name'])
def test2(task_name, **params):
print(params['task_name'])
위와 같이 함수가 2개가 있을 때, test1은 정상적으로 출력을 하지만 test2는 오류가 난다.
task_name을 이미 **params으로 부터 분리되어 인자를 받았는데 params['task_name'] 으로 출력을 했기 때문이다.
test2처럼 분리해서 사용할 시 직접적으로 명시한 인자는 **params으로부터 제외되었으니 이점을 주의해야 한다.