imhamburger 님의 블로그

에어플로우(Airflow) - dags 에서 파이썬 하나의 함수로 여러 오퍼레이터에서 받기 본문

카테고리 없음

에어플로우(Airflow) - dags 에서 파이썬 하나의 함수로 여러 오퍼레이터에서 받기

imhamburger 2024. 8. 1. 19:53

1. max_active_runs 와 max_active_tasks

에어플로우에선 다양한 옵션을 제공하는 데 그중 max_active 옵션이 있다.

 

max_active_runs : 최대 몇개의 실행을 활성화할지

max_active_tasks : 최대 몇개의 task를 활성화할지 

지정해줄 수 있다.

with DAG(
     'movie',
    default_args={
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(seconds=3),
    },
    max_active_runs=1,
    max_active_tasks=3,
    description='hello world DAG',
    schedule="10 2 * * *",
    start_date=datetime(2024, 7, 24),
    catchup=True,
    tags=['movie'],
) as dag:

 

위 코드를 에어플로우 Details에서 확인할 수 있다. (아래사진 참고)

 

 

 

2. 파이프라인 정리

파이프라인에 Task를 추가하다보면 복잡한 모양의 파이프라인이 만들어 질 수 있다.

 

 

Task 1~2개를 추가하여 복잡한 모양의 파이프라인을 깔끔하게 정리해보자.

get.start와 get.end 라는 Task 2개를 추가하였다.

 

 

3. op_args 와 op_kwargs

에어플로우 dags 에서 파이썬 하나의 함수로 여러 오퍼레이터에 받는 방법

 

방법 1 : 함수 인자에 *args와 **kwargs 로 설정하지 않고 인자명을 명시하여 사용하기

def func_multi(a, b):
        from mov.api.call import save2df
        df = save2df(load_dt = a, url_param = b)

        for key, value in b.items():
            df[key] = value

        p_cols = ['a'] + list(b.keys())
        df.to_parquet('~/tmp/test_parquet/load_dt', partition_cols=p_cols)
        print(df.head(5))

*args와 **kwargs를 직접적으로 넣지 않고 인자명을 내가 지정해도 사용할 수 있다.

내가 만든 save2df 함수를 import로 불러와 save2df 의 인자를 받아 op_kwargs를 사용하려 한다.

 

save2df 함수는 다음과 같다.

def save2df(load_dt='20120101', url_param={}):
    df = list2df(load_dt, url_param)
    df['load_dt'] = load_dt

    return df

load_dt='20120101'로 디폴트 파라미터로 받고 있지만, 에어플로우에선 Jinja 템플릿을 이용해 다른 날짜로 바꿔줄 예정이다.

그리고 url_param이라는 파라미터가 존재한다. url_param의 type은 딕셔너리 형태이다.

 

다시 돌아와서...

 

그리고 func_multi 함수에서 나는 인자를 a와 b를 넣었고, 각각

load_dt = a

url_param = b 으로 save2df의 파라미터를 받았다.

 

save2df에서 url_param은 딕셔너리 형태였고, b를 for문을 이용하여 key와 value가 있다면 넣어주게끔 하였다. (없으면 없는대로 실행)

 

그리고 나는 save2df에서 받아온 데이터를 partition_cols로 이용하여 parquet 파일로 저장해줄 것이다.

partition_cols 이 뭔지 잘 모르겠다면 이전글 참고!

p_cols = ['a'] + list(b.keys())

p_cols 을 이렇게 한 이유는 partiton_cols을 사용할 때 'a' 별 즉 날짜별 + b의 키값별로 분리하기 위해서 이다.

참고로 b의 키값은 총 네개로 저장하려 하는데 multiMovieYn=N / multiMovieYn=Y / repNationCd=F / repNationCd=K 가 있다.

 

결과를 먼저 보여주자면, 아래와 같다. (일자별 + 키값별 분리)

 

func_multi 함수 작업이 끝났다면 Operator에서 어떻게 load_dt = a 와 url_param = b

a, b인자를 설정하여 실행시키도록 할 수 있는지! 살펴보자.

 

2개의 Task로 예시로 든다면, 다음과 같다.

multi_y = PythonOperator(
            task_id='multi.y',
            python_callable=func_multi,
            op_args=["{{ds_nodash}}"],
            op_kwargs={'url_param': {"multiMovieYn": "Y"}}
            )

multi_n = PythonOperator(
            task_id='multi.n',
            python_callable=func_multi,
            op_args=["{{ds_nodash}}"],
            op_kwargs={'url_param': {"multiMovieYn": "N"}}
            )

 

위에서 설정한 함수의 인자를 받기 위해서는 2개만 추가하면 된다.

op_args= {입력}
op_kwargs= {입력}

 

위에서 load_dt = a로 설정하였다. save2df에서 load_dt는 '20120101' 같은 형태였다. 이 형태는 에어플로우 Jinja 템플릿에서 ds_nodash를 이용하면 간단하게 해결할 수 있다. 그러면 날짜별로 출력할 수 있다. 

참고로 args는 리스트 형태로 받기 때문에 op_args = [ {입력} ] 형태로 적어줘야 한다.

 

그리고 나머지 파라미터 url_param = b  두번째 인자는 op_kwargs을 쓰면 된다. kwargs는 딕셔너리 형태이므로 딕셔너리로 받는다.

save2df에서 url_param = {}  즉 딕셔너리 형태였으므로 "url_param"이 Key가 되고, {} 이 부분이 Value가 된다. 

따라서 op_kwargs = { "Key 입력" : Value입력 } 이런 형태로 와야한다.

op_kwargs={'url_param': {"multiMovieYn": "N"}} #{'Key': Value}

내가 넣고자 하는 Value는 {"multiMovieYn": "N"}이므로 Value 자리에 넣어줬다.

 

위 작업을 에어플로우에서 정상적으로 들어갔는지 확인을 위해서는 에어플로우를 실행한 후, Logs > Rendered Template에서 내가 넣은 값을 확인할 수 있다. 

 

방법 2 : 함수 인자에 *args와 **kwargs 로  사용하기

사실 방법 1과 별다를건 없다. 인자를 명시안해주고 *args와 **kwargs를 그 자리에 대신한 것뿐이다.

def func_multi(load_dt, **kwargs):
        from mov.api.call import save2df
        df = save2df(load_dt=load_dt, url_param=kwargs['url_param'])

        for k, v in kwargs['url_param'].items():
            df[k] = v

        p_cols = ['load_dt'] + list(kwargs['url_param'].keys())
        df.to_parquet('~/tmp/test_parquet/load_dt', partition_cols=p_cols)
        print(df.head(5))

아까 다뤘던 함수와 똑같지만 인자에 **kwargs로 설정하였다.

그리고 df =save2df 에서 url_param을 kwargs['url_param']만 언팩킹한다.

아래 for문과 p_cols 부분도 kwargs['url_param'] 로 설정해줘야 kwargs가 'url_param'만 선택하여 꺼내온다.

 

 

그리고 2개의 Task는 다음과 같다.

multi_y = PythonOperator(
            task_id='multi.y',
            python_callable=func_multi,
            op_args=["{{ds_nodash}}"],
            op_kwargs={'url_param': {"multiMovieYn": "Y"}}
            )

    multi_n = PythonOperator(
            task_id='multi.n',
            python_callable=func_multi,
            op_args=["{{ds_nodash}}"],
            op_kwargs={'url_param': {"multiMovieYn": "N"}}
            )

사실 위에서 보여줬던 Task와 동일하니 설명은 생략하겠다.

 

**kwargs 사용 시 주의사항

def test1(**params):
        print(params['task_name'])

def test2(task_name, **params):
		print(params['task_name'])

위와 같이 함수가 2개가 있을 때, test1은 정상적으로 출력을 하지만 test2는 오류가 난다.

task_name을 이미 **params으로 부터 분리되어 인자를 받았는데 params['task_name'] 으로 출력을 했기 때문이다.

test2처럼 분리해서 사용할 시 직접적으로 명시한 인자는 **params으로부터 제외되었으니 이점을 주의해야 한다.