imhamburger 님의 블로그

데이터엔지니어 부트캠프 - 에어플로우와 s3 연결하기 + mongodb 연결하기 (11월의 기록) 본문

데이터엔지니어 부트캠프

데이터엔지니어 부트캠프 - 에어플로우와 s3 연결하기 + mongodb 연결하기 (11월의 기록)

imhamburger 2024. 12. 1. 17:08

이번엔 에어플로우를 통해 크롤링한 RAW DATA를 s3에 적재하는 방법에 대해 공유하고자 한다.

(사실 너무 간단해서 의아했다.....ㅎ)

 

나의 파이프라인은 다음과 같다.

 

s3에 적재에 성공하면 라인을 통해 success 메세지를 받는다.

 

 

먼저, dag 파일을 작성하기 전에 에어플로우 창에서 s3 관련 정보들을 입력해줘야 한다.

 

1. Admin - Connections 들어가기

 

 

 

2. 하나 만들어 다음과 같이 입력하고 save 하기

 

 

그런다음 다시 DAG파일로 돌아오자.

에어플로우와 s3를 연결하기 위해서는 airflow.providers.amazon.aws.hooks.s3 이 필요하다.

에어플로우에서 제공하는 모듈이니 import를 하면된다.

 

 

나의 DAG

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

for item in crawling_list:
        try:
            # 데이터를 문자열로 가정하고 io.StringIO로 처리
            soup = item["HTML"]  # 크롤링 데이터의 HTML 내용

            # BeautifulSoup 객체를 HTML 문자열로 변환
            if hasattr(soup, "prettify"):
                html_content = soup.prettify()  # 예쁘게 정리된 HTML
            else:
                html_content = str(soup)  # 일반 문자열로 변환
                
            # S3Hook을 사용하여 AWS S3에 파일 업로드
            hook = S3Hook(aws_conn_id='{위에서 지정한 conn id 입력}')
            
            file_key = f"{item['ID']}.html"  # 파일 키 정의
            s3_key = f"{폴더명 입력}/{file_key}"
            file_obj = io.BytesIO(html_content.encode('utf-8'))
            
            # 인코딩된 HTML 콘텐츠를 S3에 업로드
            hook.get_conn().put_object(
                Bucket='{버켓이름}',
                Key=s3_key, #이건 그냥 내가 지정해준 폴더명과 파일명
                Body=file_obj  # HTML을 그대로 업로드
            )
            print(f"S3에 파일 업로드 완료: {s3_key}")

 

이렇게만 해주면 끝이다. 여기서 중요한 점은 conn id를 정확하게 입력해야 한다.

hook = S3Hook(aws_conn_id='{위에서 지정한 conn id 입력}')

 

위에서 지정한 id와 다를 경우, 에러가 난다.

 

 

실행결과

 

 

다음은 MongoDB와 연결을 해보자.

나의 경우 MongoDB Atlas를 사용하였으니 참고!!

 

 

먼저, MongoDB Atlas를 사용하려면 가입을 해야한다. (여기서!!)

나의 경우 파이썬을 이용하기 때문에 MongoDB와 연결하려면 pymongo 라는 모듈 설치가 필요하다.

 

pymongo 설치하기

pip install pymongo

 

그럼 모든 준비 끝이다.

 

mongodb.py (참고문서)

from pymongo import MongoClient

#mongodb
mongopassword = os.getenv("MONGOPASS")
url = f"mongodb+srv://{내계정ID}:{mongopassword}@cluster0.c1zjv.mongodb.net/"
client = MongoClient(url, tlsCAFile=certifi.where())
db = client.{DB이름}

db.{컬렉션명}.insert_one({
                        "title": title,
                        "category": category,
                        "area": area,
                        "location": performance_place,
                        "price": price,
                        "open_date": None,
                        "pre_open_date": None,
                        "start_date": start_date,
                        "end_date": end_date,
                        "show_time": show_time,
                        "running_time": running_time,
                        "rating": age_rating,
                        "description": final_description,
                        "poster_url": poster_img,
                        "hosts": [{"site_id": 2, "url":ticket_url}]
                    })

 

여기서 mongopassword는 가입하면 발급받는다. 그 비밀번호를 입력해주면 된다.

저렇게만 설정해주면 끝이라... 매우 간단하다. 편하긴하다...!!

 

 

결과

 

사실 파이널프로젝트에서 MongoDB말고 MariaDB를 쓰려고 계획했었다.

근데 멘토님이 MongoDB를 추천하셨다. MongoDB는 NoSQL이며 JSON 형식의 도큐먼트를 저장하는 시스템이다.

나는 MySQL 쿼리만 작성해보았어서 MongoDB 쿼리가 매우 낯설었다. 그리고 사용법을 익히는 것도...새로웠다.

 

그래도 MongoDB만의 장점이 있어 사용하게 되었다.

우선 JSON 형태로 저장하기 때문에 JSON 형식과 매우 유사하여 웹 애플리케이션 및 API와의 통합이 용이하다. 우리 프로젝트의 경우, 검색 기능이 메인인지라 백엔드에서 API를 처리할 때 용이할 것 같았다.

 

그리고 무엇보다 읽고 쓰기가 빠르다!

 

관계형 데이터베이스와 비교하자면 다음과 같다.

특징 관계형 데이터베이스 MongoDB
데이터 모델 테이블(행과 열) 기반 문서 기반 (JSON/BSON 형식)
스키마 고정된 스키마 유연한 스키마 (동적 스키마 가능)
확장성 수직적 확장 (서버 성능 향상) 수평적 확장 (샤딩을 통한 분산 저장)
조인(Join) 여러 테이블 간 복잡한 JOIN 연산 가능 JOIN이 없거나 제한적, 문서 내에 관련 데이터 포함
성능 작은 데이터셋에 유리, 복잡한 쿼리에서 느릴 수 있음 대규모 데이터에 빠르고, JOIN이 없어서 성능 최적화 가능
복제 및 고가용성 복제본 세트(주로 마스터-슬레이브 구조) 복제본 세트(복제본 간 읽기 분산 가능)
수정 및 확장 스키마 변경 시 데이터 마이그레이션 필요 스키마 변경이 용이하고, 데이터 구조 유연함

 

관계형 데이터베이스는 고정된 스키마와 강력한 트랜잭션, SQL 쿼리 언어를 사용해 데이터 무결성을 중시하는 시스템에 적합하다. 

그러나, MongoDB는 유연한 스키마와 수평적 확장성을 통해 대규모 데이터를 빠르게 처리할 수 있으며, 복잡한 JOIN이 없는 구조에서 성능이 좋다.

 

 

 

일주일을 보내면서...


파이널 프로젝트를 시작한 지 2주가 지났다. 처음에는 큰 그림을 그리며 기분 좋게 시작했지만, 실제로 매일 마주치는 에러와 싸우다 보니 시간이 너무 빨리 흘렀다. 특히 에어플로우와 카프카 통합 작업이 예상보다 훨씬 더 힘들었다. 카프카와 연결은 잘 되는데, 컨슈머가 메시지를 못 받는 문제는 여전히 해결되지 않는다. 처음에는 설정을 잘못했나 싶어 이것저것 바꿔봤지만, 여전히 답이 보이지 않는다. 구글링을 해보면 에어플로우와 카프카를 동시에 사용하는 정보가 많지 않아서 더욱 답답했다. 이런 상황에서 계속 시간만 잡아먹는 것 같아 초조하기도 하고 스트레스를 많이 받았다.

하지만, 카프카를 바로 해결하려 하기보다는, 일단 S3에 데이터를 저장하는 작업을 진행해야 다음 단계로 나아갈 수 있다는 것을 깨달았다. 카프카는 이후에 다시 돌아와서 테스트할 수 있으니, 현재는 우선 S3 적재 작업에 집중하기로 했다. 이런 결정이 맞았던 것 같고, 일단 진행하고 나니 마음이 조금 가벼워졌다. 카프카 문제는 후에 다시 돌아와서 차근차근 해결해야겠다.

이 프로젝트에서 느낀 점은 기술적인 문제 외에도, 구체적인 정보가 부족할 때 얼마나 답답한지, 그리고 예상보다 시간이 많이 소요될 때 얼마나 정신적으로 힘든지를 잘 알게 된 것이다. 그래도 계속해서 새로운 도전을 하고 있다는 생각에 보람을 느끼기도 한다. 프로젝트가 순조롭지 않더라도, 그 안에서 배울 점이 많으니까. 이 경험을 통해 앞으로 비슷한 문제를 만났을 때 더 쉽게 해결할 수 있을 거라 생각한다.

 


앞으로 나의 방향

 

11월이 끝나고 이제 남은 시간은 한 달밖에 없다. 더는 미룰 수 없다는 생각이 든다. 그동안 많이 뒤로 미뤄왔던 이력서 작성을 본격적으로 시작하려고 한다. 솔직히 아직 이력서에 들어갈 내용이 많지 않아서 감이 안 잡히지만, 일단 초안을 작성해보려고 한다. 어떻게 보면, 이력서를 처음부터 완벽하게 쓰는 것보다는 작성하고 피드백을 받으며 수정해가는 과정이 중요하다는 생각이 든다.

그리고 프로젝트를 진행하면서 너무 나 자신에게 스트레스를 주었던 것 같다. 평일에는 프로젝트에 집중하되, 주말에는 쉼이 필요하다는 것을 느낀 주였다...ㅎ 앞으로 한 달 남았으니 밸런스를 잘 잡아야겠다!!