imhamburger 님의 블로그
데이터엔지니어 부트캠프 - 영화 박스오피스 데이터 ETL(Extraction / Transform / Load) (7월의 기록) 본문
데이터엔지니어 부트캠프 - 영화 박스오피스 데이터 ETL(Extraction / Transform / Load) (7월의 기록)
imhamburger 2024. 8. 4. 14:331. requests로 영화데이터 가져오기
사전에 영화진흥위원회에 가입하여 Key를 발급받아야 한다.
import requests
import os
import pandas as pd
#영화진흥위원회에서 정해진 REST 방식을 이용하여 url 요청형식 맞추기
def gen_url(dt="20120101", url_param={}):
base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
key = get_key()
url = f"{base_url}?key={key}&targetDt={dt}"
for key, value in url_param.items():
url = url + f"&{key}={value}"
return url
#영화데이터를 가져오기 위해서 부여받은 Key값 입력이 필요하다.
#Key값은 영화진흥위원회 가입 및 API 키 생성 후 환경변수 선언이 필요하다.
def get_key():
key = os.getenv('MOVIE_API_KEY')
return key
#영화데이터 JSON 파일가져오기
def req(load_dt="20120101", url_param={}):
url = gen_url(load_dt, url_param)
r = requests.get(url)
data = r.json()
code = r.status_code
print(data)
return code, data
JSON파일로 가져올 것이 때문에 JSON 응답 예시를 참고하여 gen_url 함수를 완성시켰다.
2. 가져온 영화데이터에서 가져올 컬럼만 추출하기
def req2list(load_dt='20120101', url_param={}):
_, data = req(load_dt, url_param)
l = data['boxOfficeResult']['dailyBoxOfficeList'] #가져올 컬럼만 입력
return l
1번에서 code = r.status_code는 확인차 return 한 것이기 때문에 2번에선 가져오지 않아도 되어 _로 대체하여 생략하였다.
영화 JSON파일 예시
3. 컬럼만 추출한 데이터를 DataFrame으로 변환
def list2df(load_dt='20120101', url_param={}):
l = req2list(load_dt, url_param)
df = pd.DataFrame(l)
df['load_dt'] = load_dt #load_dt 라는 컬럼 생성
return df
4. 에어플로우 dags에서 위 모듈을 불러와 데이터를 parquet 파일형식으로 저장해주기
- 에어플로우 데이터 파이프라인
task_start >> branch_op
task_start >> throw_err >> save_data
branch_op >> rm_dir >> get_start
branch_op >> get_start
branch_op >> echo_task
get_start >> [multi_y, multi_n, nation_k, nation_f] >> get_end
get_end >> save_data >> task_end
- task_start와 task_end는 EmptyOperator 이용
task_start = EmptyOperator(task_id='start')
task_end = EmptyOperator(task_id='end')
- task_start > branch_op
데이터 멱등성을 위해 BranchPythonOperator를 start 뒤에 넣었다. (에어플로우를 실행할때마다 같은 데이터가 누적되어 쌓이면 안되니까 조건을 넣어준 것이다.) BranchPythonOperator 에 대해 잘 모르겠다면 이전글 참고
지정한 path에 파일이 있다면 rm_dir.task_id로 이동, 없다면 get.start 와 echo.task로 이동
def branch_func(ds_nodash):
import os
home_dir = os.path.expanduser("~")
path = os.path.join(home_dir, f"tmp/test_parquet/load_dt/load_dt={ds_nodash}")
if os.path.exists(path):
return rm_dir.task_id #task_id 입력
else:
return "get.start", "echo.task" #task_id 입력
branch_op = BranchPythonOperator(
task_id="branch.op",
python_callable=branch_func
)
- branch_op > rm_dir : func에서 path에 파일이 있다면 파일 삭제 후, 파일 get_start로 보내기 (데이터 파이프라인 확인)
rm_dir = BashOperator(
task_id='rm.dir',
bash_command='rm -rf ~/tmp/test_parquet/load_dt/load_dt={{ ds_nodash }}'
)
- branch_op > get_start : func에서 path에 파일이 없다면 파일을 get_start로 불러온다.
get_start = EmptyOperator(task_id='get.start', trigger_rule='all_done')
- branch_op > echo_task : echo_task는 데이터가 잘 들어오는지 확인하기 위해 만들어 놓은 것이다.
echo_task = BashOperator(
task_id='echo.task',
bash_command="echo 'task'",
trigger_rule='all_success'
)
- task_start > throw_err > save_data
task_start에서부터 데이터가 잘 들어오지 않는다면 오류 리포트를 보내는 task인 throw_err도 추가하였다.
throw_err = BashOperator(
task_id='throw.err',
bash_command="exit 1",
trigger_rule="all_done"
)
- get_start > [multi_y, multi_n, nation_k, nation_f]
위 Task들은 영화데이터에서 한국/외국영화 분류, 상업/독립영화를 분류하여 저장하기 위해 partition_cols을 이용
def func_multi(load_dt, **kwargs):
from mov.api.call import list2df
df = list2df(load_dt=load_dt, url_param=kwargs['url_param'])
for k, v in kwargs['url_param'].items():
df[k] = v
p_cols = ['load_dt'] + list(kwargs['url_param'].keys())
df.to_parquet('~/tmp/test_parquet/load_dt', partition_cols=p_cols)
위에 코드가 잘 이해가 가지않는다면 이전글 참고!
위 함수를 실행하기 위한 에어플로우 Task들. PythonVirtualenvOperator가 생소하다면 여기 참고
#상업독립영화
multi_y = PythonVirtualenvOperator(
task_id='multi.y',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={
'url_param': {"multiMovieYn": "Y"}
},
requirements=["git+https://github.com/hamsunwoo/movie.git@0.3/api"],
system_site_packages=False
)
multi_n = PythonVirtualenvOperator(
task_id='multi.n',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={
'url_param': {"multiMovieYn": "N"}
},
requirements=["git+https://github.com/hamsunwoo/movie.git@0.3/api"],
system_site_packages=False
)
#한국외국영화
nation_k = PythonVirtualenvOperator(
task_id='nation.k',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={
'url_param': {"repNationCd": "K"}
},
requirements=["git+https://github.com/hamsunwoo/movie.git@0.3/api"],
system_site_packages=False
)
nation_f = PythonVirtualenvOperator(
task_id='nation.f',
python_callable=func_multi,
op_args=["{{ds_nodash}}"],
op_kwargs={
'url_param': {"repNationCd": "F"}
},
requirements=["git+https://github.com/hamsunwoo/movie.git@0.3/api"],
system_site_packages=False
)
이를 실행하면 아래와 같이 저장이 된다.
- [multi_y, multi_n, nation_k, nation_f] > get_end
get_end는 [multi_y, multi_n, nation_k, nation_f]에서 잘 저장되었는지 확인차 추가해주었다.
get_end = EmptyOperator(task_id='get.end')
5. parquet 파일형식으로 저장된 데이터를 가공해주기
새로운 utils.py이라는 파이썬 파일을 만들어 데이터를 Transform해줄 기능을 만든다.
import pandas as pd
def merge(load_dt="20240727"):
read_df = pd.read_parquet('~/tmp/test_parquet/load_dt')
cols = ['movieCd', #영화의 대표코드를 출력합니다.
'movieNm', #영화명(국문)을 출력합니다.
'openDt', #영화의 개봉일을 출력합니다.
'audiCnt', #해당일의 관객수를 출력합니다.
'load_dt', # 입수일자
'multiMovieYn', #다양성영화 유무
'repNationCd', #한국외국영화 유무
]
df = read_df[cols]
#날짜조건 load_dt int로 데이터타입 변환
dw = df[['load_dt'] == int(load_dt)].copy()
#카테고리 타입 -> Object
dw['load_dt'] = dw['load_dt'].astype('object')
dw['multiMovieYn'] = dw['multiMovieYn'].astype('object')
dw['repNationCd'] = dw['repNationCd'].astype('object')
근데 여기서 문제가 있다. 해당 데이터를 출력해보면 다음과 같다.
같은 영화가 2개씩 중복인데 multiMovieYn값이 들어간 것과 repNationCd가 들어간 것. 각각 나눠져있기 때문이다. 원래는 데드풀과 울버린같은 경우 N(상업영화)이면서 F(외국영화)이니 이 두개를 하나로 만들어줘야 한다. 아래와 같이.
movieCd | movieNm | load_dt | multiMovieYn | repNationCd |
20247781 | 데드풀과 울버린 | 20240727 | N | F |
위 같이 데이터를 가공해주도록 하자.
방법 1 : merge 이용
import pandas as pd
def merge(load_dt="20240727"):
read_df = pd.read_parquet('~/tmp/test_parquet/load_dt')
cols = ['movieCd', #영화의 대표코드를 출력합니다.
'movieNm', #영화명(국문)을 출력합니다.
'openDt', #영화의 개봉일을 출력합니다.
'audiCnt', #해당일의 관객수를 출력합니다.
'load_dt', # 입수일자
'multiMovieYn', #다양성영화 유무
'repNationCd', #한국외국영화 유무
]
df = read_df[cols]
#날짜조건 load_dt int로 데이터타입 변환
dw = df[['load_dt'] == int(load_dt)].copy()
#카테고리 타입 -> Object
dw['load_dt'] = dw['load_dt'].astype('object')
dw['multiMovieYn'] = dw['multiMovieYn'].astype('object')
dw['repNationCd'] = dw['repNationCd'].astype('object')
#NaN값 unknown으로 변경
dw['multiMovieYn'] = dw['multiMovieYn'].fillna('unknown')
dw['repNationCd'] = dw['repNationCd'].fillna('unknown')
#merge
u_multi = dw[dw['multiMovieYn'] == 'unknown']
u_nation = dw[dw['repNationCd'] == 'unknown']
m_df = pd.merge(u_multi, u_nation, on="movieCd", suffixes=('_m', '_n'))
위 코드까지 작업을 해주면 아래와 같이 출력된다. NaN값이 unknown으로 변경하였고 unknown 값으로 하나의 테이블을 2개로 나누었다. 그리고 두개의 테이블을 다시 하나로 merge하였다.
여기서 버릴 열들을 제거해주고 중복값이 있을 수 있으니 그것도 제거해주는 작업을 해야한다.
따라서 아래 코드를 추가하였다. + 컬럼명도 rename 함수를 이용하여 변경하였다.
filtered = m_df.iloc[:,[0,1,6,3,4]]
filtered.drop_duplicates()
filtered.rename(columns={"movieNm_m": "movieNm",
"load_dt_n": "load_dt",
"multiMovieYn_m": "multiMovieYn",
"repNationCd_m": "repNationCd"})
결과
방법 2 : 피벗테이블 이용
read_df = pd.read_parquet('~/tmp/test_parquet/load_dt')
cols = ['movieCd', #영화의 대표코드를 출력합니다.
'movieNm', #영화명(국문)을 출력합니다.
'openDt', #영화의 개봉일을 출력합니다.
'audiCnt', #해당일의 관객수를 출력합니다.
'load_dt', # 입수일자
'multiMovieYn', #다양성영화 유무
'repNationCd', #한국외국영화 유무
]
df = read_df[cols]
#카테고리 타입 변경
df['load_dt'] = df['load_dt'].astype('object')
df['audiCnt'] = df['audiCnt'].astype('int')
df['multiMovieYn'] = df['multiMovieYn'].astype('object')
df['repNationCd'] = df['repNationCd'].astype('object')
#열에 모든 경우의 수 카테고리 설정
df['multiMovieYn'] = pd.Categorical(df['multiMovieYn'], categories=['N', 'Y','NaN' ])
df['repNationCd'] = pd.Categorical(df['repNationCd'], categories=['F', 'K', 'NaN'])
def table(load_dt='20120101'):
# 주어진 load_dt 값으로 필터링
filtered_df = df[df['load_dt'] == load_dt]
# 피벗 테이블 생성
pivot_table = filtered_df.pivot_table(
index=['movieCd', 'movieNm', 'openDt', 'audiCnt', 'load_dt'],
values=['multiMovieYn', 'repNationCd'],
aggfunc=lambda x: ', '.join(x.dropna().unique()),
fill_value='NaN'
).reset_index()
#누적 관객수 내림차순 정렬 (어떤 영화를 제일 많이 보았는지 확인)
return pivot_table.sort_values(by='audiCnt', ascending=False)
table(20240724)
결과
방법 1도 데이터를 잘게잘게 쪼개어? 진행하는 것도 단계별로 결과를 확인하고 진행할 수 있어 좋지만, 피벗테이블을 이용하면 훨씬 간단하게 사용할 수 있었다. 피벗테이블 함수는 사용할줄만 알면 아주 유용한 것 같다! aggfunc은 특히 데이터를 가공할 때 매우 편리한 기능이다.
6. 가공한 데이터 에어플로우에 불러와 저장하기
def save_data(ds_nodash):
from mov_agg.utils import table
transform_data = table(load_dt=ds_nodash)
transform_data.to_parquet('~/tmp/transformed_parquet', partition_cols=['ds_nodash']
#task 설정
save_data = PythonVirtualenvOperator(
task_id="save.data",
python_callable=save_data,
requirements=["git+https://github.com/hamsunwoo/mov_agg.git@0.5/mov_agg"],
system_site_packages=False,
trigger_rule='one_success'
)
이렇게 가공한 데이터를 ~/tmp/transformed_parquet' 경로에 저장이 되어 있을 것인데...
생각해보니 멱등성을 고려하지 않아 branch오퍼레이터도 추가해줘야할 것 같다..
일주일을 보내면서...
이번 일주일도 엄청 빠르게 지나간 느낌이다. 에어플로우로 데이터를 불러오고 가공하고 저장하는 것까지 모든 플로우를 배우니 복잡하기는 해도 데이터의 흐름을 알 수 있어 신기했다. 에어플로우의 다양한 기능들을 익히는 과정에서 여러 가지 도전 과제들이 있었지만, 이를 해결하면서 많은 것을 배울 수 있었다. 실무에 적용할 수 있는 스킬들을 습득하는 데 많은 도움이 되었다. 여러 번의 테스트와 실험을 통해 각 단계를 성공적으로 수행할 수 있었다. 에어플로우를 통해 데이터 파이프라인을 구축하는 경험은 매우 유익했다. 앞으로도 더 많은 것을 배우고 싶다는 생각이 들었다.
앞으로 나의 방향
우선 화요일까지는 팀 프로젝트 때문에 정신이 없을 것 같다. 프로젝트 기간이 짧아서 더 그런 것 같다.
그래도 팀프로젝트를 통해 하루밖에 안지났지만 여러 에러들을 경험하면서 많은 것을 배울 수 있었다.
팀프로젝트가 무사히 끝나길..!
'판다스(Pandas)' 카테고리의 다른 글
판다스(Pandas) - Partition_cols 이해하기 (0) | 2024.07.27 |
---|---|
판다스(Pandas) - csv파일 불러오기, unicode 에러 해결하기 (0) | 2024.07.25 |