imhamburger 님의 블로그

데이터엔지니어 부트캠프 - 두번째 팀프로젝트 (8/26~8/28) (8주차) 본문

데이터 엔지니어링

데이터엔지니어 부트캠프 - 두번째 팀프로젝트 (8/26~8/28) (8주차)

imhamburger 2024. 9. 1. 13:38

8월 26일 월요일부터 첫 팀프로젝트를 시작하였다. 프로젝트를 완성하는데 주어진 시간은 3일이다.

 

사실 8월 23일 금요일 오후부터 팀프로젝트에 대해 팀원분들과 계획을 세웠었다. 

팀구성은 4인 1조인데 우리의 팀 이름은 "Asset-No-1" 으로 정했다.

이유는 미래 자산 1조 클럽 가입을 꿈꾸며....ㅋ

 

팀프로젝트 주제

 

업무용 메신저 개발 (사내 채팅 기능 + 영화 정보 검색 챗봇 기능)

  • 업무 채팅 기능
  • 업무 채팅 감사 기능 ( 검색, 대화 주제 통계 ) - zeppelin
  • 영화 챗봇기능 ( @bot 파묘 감독은 누구야? )
  • 시스템 챗봇기능 ( airflow JOB ... 성공 실패 )
  • 일정 챗봇기능 ( 애자일 칸반 미팅 시간 입니다 )

여기에서 주요기능은 실시간으로 업무 대화 기능을 구현하는 것이다. 우리는 주요기능인 업무 채팅을 우선순위로 두고 시간이 남는다면 일정 챗봇기능을 추가하기로 하였다.

 

선정 배경

 

사내 기술 유출 우려

  • 사내 정보 보호를 위한 회사만의 커뮤니케이션 툴의 필요성

부적절한 메시지 감찰

  • 감사팀이 사내 메신저의 대화 내용을 모니터링할 수 있음

 

개발 일정

 

2024년 8월 25일 ~ 2024년 8월 28일 (3일)

 

 

개발목록

 

전체적인 진행계획은 다음과 같다. (크게 보았을때)

  • 영화진흥위원회에서 API 호출하여 영화데이터를 json파일로 저장
  • 영화데이터 json 파일을 에어플로우에서 자동으로 저장하게 스케줄링 (+ 저장 성공 혹은 실패 시 라인알람)
  • 실시간 채팅 기능
  • 채팅방에서 '@'를 이용해 영화제목 입력 시 영화정보 출력 (상대방에게는 검색한 것이 안보이게)
  • 일정을 채팅방에 알려주는 챗봇기능
  • README.md 및 발표자료 작성

 

나는 여기에서 영화 챗봇 기능과 영화데이터를 json 파일로 저장시키는 업무를 맡았다.

 

챗봇 기능을 만들기에 앞서,

당연히 영화 데이터가 있어야하기 때문에 API를 호출하여 json 파일로 저장하는 에어플로우 dag를 만들었다.

원래 내 머릿속 생각으로는 json파일로 저장하지 않고 바로 불러오고 챗봇기능을 넣으려고 하였는데, 팀원분들께서 영화진흥위원회에서 제공하는 API 서비스가 하루 이용량이 제한적이니 차라리 저장시키고 그 안에서 챗봇이 영화정보를 찾아오는게 낫지 않겠냐고 의견을 주셨다.

 

생각해보니, 나도 그게 더 나을 것 같았다.

왜냐하면 하루 정해진 API 서비스를 다쓴다면 당연히 챗봇기능도 못쓰게 되어버리니까...! (+게다가 사용자가 챗봇기능을 10000번 쓴다면....?)

 

 

1. 영화 데이터 Load

 

데이터 파이프라인

 

사실 파이프라인은 특별한 것은 없고, 데이터 멱등성을 위해 BranchPythonOperator를 추가한 것과,

만약, 돌리고있는데 어디 멀~리 나가있을 때를 고려하여 라인알람 기능을 추가하였다.

 

성공하면 "success" 알림을 받고 실패하면 "fail" 알림 메세지를 받는다. (아래는 내 휴대폰 화면을 캡쳐한 것이다.)

 

라인을 사용한 이유... 기능적으로 구현이 매우 간단해서 사용하기 쉽다.

 

 

 

API 호출을 위한 소스코드

import requests
import os
import json

def req(url):
    r = requests.get(url)
    j = r.json()

    return j

def save_movies(year, sleep_time=1):
    #연도별 저장
    home_path = os.path.expanduser("~")
    file_path = f"{home_path}/data/mov_data/year={year}/data.json"

    #위 경로가 있으면 API 호출을 멈추고 프로그램 종료
    if os.path.exists(file_path):
        print(f"파일이 이미 존재합니다. (연도: {year})")
        return []
    else:
        print(f"데이터를 저장합니다. (연도: {year})")

    API_KEY = os.getenv('MOVIE_API_KEY')
    url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieList.json?key={API_KEY}&openStartDt={year}&openEndDt={year}"
    r = req(url_base)
    data = r['movieListResult']['movieList']
    return data

def save_json(data, file_path):
    #파일저장 경로 mkdir
    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

    return True

 

나는 개봉 연도별로 영화 데이터를 저장하게끔 하였다. 사실 나중에 챗봇 기능을 쓸 때는 한꺼번에 저장되어 있는 것이 코드를 짜기에 편할 것 같았는데, 그럼 나중에 시간이 지날수록 데이터양은 늘어나고.. 한 파일에 너무 많은 양의 데이터가 들어가면 관리하기에도 어려울 것 같고 무엇보다 용량이...엄청나게 쌓이게 될 것이다. 따라서, 연도별로 구분하는 방법을 택했다.

 

그리고 한국어를 json 파일로 저장할 때 ensure_ascii=False를 추가해줘야 잘 인코딩되어 한국어가 제대로 보인다.

 

 

 

2. 영화 챗봇기능

 

영화 챗봇기능을 구현하기 위해 먼저 세부적으로 무엇부터 할지 고민하였었다.

  • Kafka를 이용하여 실시간 채팅이 가능한 공간을 만들기
  • Kafka Producer로부터 영화제목을 입력을 받으면 Consumer에서 출력받기 위해 영화제목이 Key값으로 들어간 딕셔너리 형태로 만들어주기
  • 데이터에 영화제목이 없으면 "영화정보를 찾을 수 없습니다." 넣어주기 

 

Kafka Producer 소스코드

from kafka import KafkaProducer
import time
import json

producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
        

)

while True:
    movie_title = input("MovieName: ")
    if movie_title == 'exit':
        break

    data = {'MovieName': movie_title}
    producer.send('movies', value=data)

    producer.flush()

print("챗봇 종료")

 

사실 Producer 코드는 굉장히 간단하다. 어떠한 입력을 하기만 하면 되기 때문에...!

나는 입력을 "MovieName: ~~~" 이러한 형태로 받는걸로 우선 세팅해두었다.

 

 

Kafka Consumer 소스코드

from kafka import KafkaConsumer, TopicPartition
import os
from json import loads, dumps, load
import glob

# 경로 저장
home_path = os.path.expanduser("~")
FILE_PATH = os.path.join(home_path, 'data', 'mov_data', '*_data.json')
MOVIE_INFO = os.path.join(home_path, 'movie_info.json')

#검색한 영화 정보 저장
def save_movie_info(value):
    with open(MOVIE_INFO, 'a', encoding='utf-8') as f:
        f.write(dumps(value, ensure_ascii=False, indent=4))


# 저장된 영화데이터(JSON파일) 읽기
def read_json():
    all_data = []
    # 모든 *_data.json 파일 경로를 찾기
    file_paths = glob.glob(FILE_PATH)
    
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = load(f)  # 파일 객체를 사용하여 JSON을 로드
            all_data.extend(data)  # 모든 데이터를 리스트에 추가

    return all_data

#Consumer 설정
consumer = KafkaConsumer(
        "movies",
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: loads(x.decode('utf-8')),
        consumer_timeout_ms=10000,
        auto_offset_reset='latest',
        group_id="movie_data",
        enable_auto_commit=False,
)


#키값을 영화제목을 주기위해 데이터를 담을 빈 딕셔너리 생성
movie_dic = {}
data = read_json()


for i in data:
    movie_dic[i["movieNm"]] = i

#Consumer 실행
try:
    for m in consumer:
        input_data = m.value
        movie_name = input_data.get("MovieName")

        if movie_name in movie_dic:
            value = movie_dic[movie_name]
            save_movie_info(value)
            
            print(f"영화 정보 저장 완료: {movie_name}")
            print(value)
            consumer.commit()

        else:
            print(f"'{movie_name}'에 대한 정보를 찾을 수 없습니다.")
        

except KeyboardInterrupt:
    print("챗봇 종료")


finally:
    consumer.close()

 

영화데이터가 연도별로 저장되어 있어서, 이거를 어떻게 다 읽어올 수 있을까 고민하였다.

검색을 해보니 여러가지 방법이 있었는데, glob 이라는 모듈이 가장 구현이 간단해보였고 glob을 사용하였다.

 

사용방법은 간단하다. 우선 아래코드처럼 내 영화데이터의 파일경로를 적으면 된다.

나는 영화데이터가 2021_data.json, 2022_data.json 이런식으로 저장되어 있었고 *을 넣어주면 glob이 _data.json으로 끝나는 파일형식을 다 불러온다.

  • FILE_PATH = os.path.join(home_path, 'data', 'mov_data', '*_data.json') 

 

여기서 주의해야할 건, glob은 경로를 리스트 형태로 반환하기 때문에, for문을 써서 모든 연도의 json파일을 읽어오게 하여 모든 영화데이터를 all_data 리스트에 담아주었다.

# 저장된 영화데이터(JSON파일) 읽기
def read_json():
    all_data = []
    # 모든 *_data.json 파일 경로를 찾기
    file_paths = glob.glob(FILE_PATH)
    
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = load(f)  # 파일 객체를 사용하여 JSON을 로드
            all_data.extend(data)  # 모든 데이터를 리스트에 추가

    return all_data

 

 

그리고나서, 영화제목으로 검색할 때 영화정보를 출력되게 하기위해 영화제목을 Key값으로 넣는 작업을 추가하였다.

사실... for문을 최대한 사용하지않고 바로 해시맵을 써서 구현하려고 했는데... 애초에 제공받는 영화데이터 형식으로는 불가능할 것 같았고, 이 작업을 추가할 수 밖에 없었다. 

#키값을 영화제목을 주기위해 데이터를 담을 빈 딕셔너리 생성
movie_dic = {}
data = read_json()


for i in data:
    movie_dic[i["movieNm"]] = i

 

결과

 

 

영화 챗봇 기능은 어느정도 구현하였고, 바로 실시간 채팅 기능을 위한 소스코드에 그대로 옮기는 작업을 하려고 하였다.

그런데, 실시간 채팅 코드가 우리팀에서 만든 것이 아닌 다른사람 코드를 가져와썼던 것이고 이건 우리팀에게 전혀 도움이 안될 것이라고 생각하였다. 그래서 팀원분들께 다른사람 코드 가져와쓰지 말고 우리가 처음부터 만들자고 하였다. 그래야 코드 이해도도 높아지고, 그게 팀프로젝트를 하는 의미라고 생각하였기 때문이다. 감사하게도 모두 동의를 하였고, 그게 이미 27일이었고 우리에겐 하루반밖에 시간이 없었다!!

 

그렇지만, 그거대로 배움이 있는 것이니까!

 

우리팀은 이 기능만큼은 나눠서 하는 것보다 Pair programing 으로 진행하는 것이 훨씬 효율적이라 생각하였고 다같이 모여 한 사람이 코드를 작성하였다.

 

실시간 채팅기능 관련하여 소스코드는 여기에서 확인하면 된다.

 

 

실시간 채팅기능에 영화 챗봇을 넣을 때 몇 가지 추가한 점은...

 

1. 메시지가 '@'로 시작하면 영화 제목을 추출하여 저장된 영화 데이터에서 해당 영화 정보를 찾아 출력하게 만들었으며, 상대방에게는 보이지 않는다. 기본적으로 챗봇은 본인이 검색한 것이라 생각했고, 본인에게만 보이게끔 하였다.

 

2. 원래는 영화제목 그대로 Key값이 들어갔기 때문에 띄어쓰기까지 동일하게 입력해줘야 출력이 가능하였는데, if문을 추가하여 영화제목을 띄어쓰기 주지않아도 영화정보를 불러오게끔 구현하였다.

 

관련 소스코드

if message['message'].startswith('@'):
	# '@' 뒤에 있는 영화 제목을 추출
    movie_title = message['message'][1:].strip().replace(" ","")
    matched_title = None
    
    for title in movie_dic:
    	if title.replace(" ", "") == movie_title:
        	matched_title = title
            break
            
    if matched_title:
    	value = movie_dic[matched_title]
        message_win.addstr(f"사용자: {message['message']} - {value}\n {timestamp}\n")
    
    elif not matched_title:
    	message_win.addstr(f"사용자: 영화 정보를 찾을 수 없습니다. {timestamp}\n")
	else:
		message_win.addstr(f"사용자: {message['message']}  {timestamp}\n")

 

 

결과

 

 

3. 트러블슈팅

  이슈 가설설정 해결방법
채팅버그 1 채팅을 실행 후, 내가 입력하고 있는 동안 다른사람이 엔터를 누르면 내가 입력하고 있던 것이 사라지는 오류가 있었음 같은 서버를 공유하고 있어 터미널창이 비동기가 되지 않다고 생각 curses 사용

터미널 화면에서 별도의 독립적인 영역을 만들어, 그 영역에서 독립적으로 텍스트를 출력하거나 입력을 받을 수 있게 하는 기능
채팅버그 2 채팅창에 있다가 나갔다 들어오면, 나갔다 들어온사람은 본인이 보낸 메세지를 볼 수 없었음. (근데 잘 보내짐) consumer.close()가 제대로 작동하지 않는건가 생각.

consumer의 group_id가 겹쳐서 그런가 생각.
둘 다 수정해보았는데 해결되지 않았음.

영화챗봇을 추가하면서 if문이 추가되었었는데 그곳에서 꼬이면서 발생한 문제였고 

if message['message'] == 'exit': break 에

and message['user'] == username 도 추가하였더니 해결되었음
JSONDecodeError 영화 챗봇 기능 개발 후 테스트를 진행하였는데 잘되다가? 갑자기 해당 에러가 발생 이는 Producer에서 입력시 공백을 주었기때문에 나타나는 오류였음.
왜냐하면 JSON형태로 받는걸로 설정을 해놨기 때문..
value_serializer에 errors='ignore' 추가

curses란? 
터미널 화면을 하나의 "큰 종이"라고 생각했을 때, curses 창을 생성하는 것은 이 종이 위에 작은 "상자"를 그리는 것과 같다. 이 상자 안에서 내용을 쓸 수 있으며, 상자 밖의 영역에는 영향을 주지 않는다. 따라서, 터미널 화면에서 별도의 독립적인 영역을 만들어, 그 영역에서 독립적으로 텍스트를 출력하거나 입력을 받을 수 있게 하는 기능을 제공한다.
curses.newwin() : 새로운 curses 창을 생성하는 함수

 

 

 

4. 업무 채팅 감사 기능 ( 검색, 대화 주제 통계 )

 

우리팀은 채팅방에서 어떤 대화가 오갔고, 누가 욕을 했고, 어떤 특정 단어를 많이 썼는지를 Spark를 이용하여 집계하였고 시각화를 진행하였다.  

 

채팅 로그 데이터 (RAW DATA)는 아래와 같다.

 

 

 

1. 누가 제일 메세지를 많이 보냈나?

 

%spark.pyspark
many = df.groupby(['user']).size().reset_index(name='counts')

 

 

 

2. 유저별 특정단어 사용 빈도

%sql

SELECT user,
	SUM(CASE WHEN message LIKE '%ㅋ%' THEN 1 ELSE 0 END) AS num_k,
        SUM(CASE WHEN message LIKE '' THEN 1 ELSE 0 END) AS num_blank,
        SUM(CASE WHEN message LIKE '%ㅎ%' THEN 1 ELSE 0 END) AS num_h
FROM (SELECT * FROM chatlog WHERE user = 'PM' OR user = 'TL' OR user = 'GM' OR user = 'AC')
GROUP BY user
ORDER BY user

 

FROM절에 서브쿼리로 user를 선택한 이유는 우리가 테스트를 많이 진행하면서 user이름을 아무렇게나 준 값들을 제외하기 위해 서브쿼리로 특정 user만 가져왔다.

 

 

서브쿼리를 사용하지 않고 WHERE 절에 조건을 주어 가져올 수도 있다.

SELECT user,
	SUM(CASE WHEN message LIKE '%ㅋ%' THEN 1 ELSE 0 END) AS num_k,
        SUM(CASE WHEN message LIKE '' THEN 1 ELSE 0 END) AS num_blank,
        SUM(CASE WHEN message LIKE '%ㅎ%' THEN 1 ELSE 0 END) AS num_h
FROM chatlog
WHERE user IN ('PM', 'TL', 'GM', 'AC')
GROUP BY user
ORDER BY user

 

 

 

3. 누가 많이 욕했나

SELECT user,
	SUM(CASE
    		WHEN message LIKE '%시발%' THEN 1
            WHEN message LIKE '%씨발%' THEN 1
            WHEN message LIKE '%개새%' THEN 1
            ELSE 0
        END) AS badword_number
FROM (SELECT * FROM chatlog WHERE user = 'PM' OR user = 'TL' OR user = 'GM' OR user = 'AC')
GROUP BY user,
ORDER BY user

 

참고로 욕은... 진짜로 한게 아니고 테스트를 위해 한 것......

 

 

 

4. 시간대별(09:00 - 18:00) 메세지 수

WITH hours AS (
	SELECT
        EXPLODE(ARRAY(9, 10, 11, 12, 13, 14, 15, 16, 17, 18)) AS hour
)

SELECT
	CONCAT(LPAD(hours.hour, 2, '0'), ':00') AS hour,
    COALESCE(chat_data.message_count, 0) AS message_count
FROM
	hours
    
LEFT JOIN (
	SELECT HOUR(time) AS hour_only,
    		COUNT(*) AS message_count
    FROM chatlog
    WHERE (user = 'PM' OR user = 'TL' OR user = 'GM' OR user = 'AC')
    WHERE
    	HOUR(time) BETWEEN 9 AND 18
    GROUP BY
    	hour_only
   ) AS chat_data
   ON hours.hour = chat_data.hour_only
   ORDER BY hour;

 

with절을 사용한 이유?!

  • 사실 with절을 사용하지 않고 UNION 방식으로 명시적인 테이블 정의가 가능하다.
  • 그렇지만 WITH절로 만든 테이블을 다른 곳에서도 쓰일 수 있기 때문에 WITH절을 사용하는 것이 확장성 측면에서 더 좋은 선택이다.

 

 

5. 일정 챗봇기능(구현X)

 

우리팀은 아쉽게도 이 기능까지 넣을 시간이 없어서 구현은 하지 못했다.

대신, 채팅 데이터를 저장하고, 저장된 데이터를 가지고 위에서 만든 통계보고서를 에어플로우를 돌리면 자동적으로 pdf파일로 만들어주는 기능을 시도해보고 있었는데, 아직 진행중이다.

 

에어플로우 DAG

 

 

새롭게 알게된 건,

에어플로우에서 BashOperator를 이용해 spark-submit 명령어를 실행하였었는데 그렇게되면 문제점은...

다른 컴퓨터에서도 실행할 때 spark라던지.. 실행하기 위한 모든 환경셋팅이 되어있어야 한다는 것이다.

 

근데 PythonVirtualenvOperator로도 실행 명령을 줄 수 있는 방법을 찾았다. 그렇게되면, 다른 컴퓨터에서 별다른 환경셋팅없이 에어플로우를 돌려 같은 결과를 얻을 수 있다.

 

 

 

팀 프로젝트를 진행하면서...

 

좋았던 점

  • 팀원들 간의 활발한 의사소통을 통해 각자의 아이디어와 의견을 공유하며 더 나은 해결책을 도출할 수 있었다. 각자의 강점을 살려 역할을 수행함으로써 지금까지 원활하게 프로젝트를 진행했다.
  • 채팅 기능을 재개발하면서 페어 프로그래밍으로 진행한 것이 매우 좋았다. 페어 프로그래밍을 통해 개발 속도가 향상되었다. 함께 작업함으로써, 각각의 작업이 더 빠르게 진행될 수 있었고, 그로 인해 프로젝트 전반의 속도가 빨라졌다. 코드 리뷰나 추가적인 디버깅 과정도 페어 프로그래밍 동안 실시간으로 이루어지기 때문에, 전체 개발 사이클이 단축할 수 있었다.

 

아쉬운 점

  • GitHub에서 풀리퀘스트(PR)와 릴리즈 관리를 효과적으로 하지 못했다. PR을 통해 코드 리뷰를 진행하고 코드 변경사항을 검토하는 과정이 소홀해지면서 코드 품질 유지에 어려움이 있었다.
  • 또한, 릴리즈 관리 측면에서도 체계적인 버전 관리가 이루어지지 않아, 특정 버전으로의 롤백이나 배포가 필요한 상황에서 혼란이 발생했다. 명확한 릴리즈 노트 작성과 태그 관리를 통해 각 버전의 변경 사항을 명확하게 기록했어야 했지만, 이 부분이 미흡하여 프로젝트의 추적 가능성과 유지 보수성에 부정적인 영향을 미쳤다.
  • 채팅 기능을 예쁘게 꾸며도보고 이것저것 기능도 추가해보고 싶었지만, 우리팀은 채팅 기능을 다시 새로 만들었어서 그러한 부분을 하지 못한 것이 아쉬웠다.

 

개선해야할 점

  • 다음 프로젝트에서는 이점을 개선하기 위해 GitHub 워크플로우를 철저히 따르고, 코드 리뷰 및 릴리즈 관리에 더 많은 신경을 쓸 필요가 있다고 느꼈다.
  • 강사님이 별도로 만드신 걸 보았는데 많이 참고해봐야겠다..ㅎㅎ (강사님이 만든 기능)
  • 다른팀에서도 다른 기능을 추가한 것이 있던데 다른팀 코드도 참고해봐야겠다.

 

 

자산1조 README 바로가기

자산1조 발표자료 바로가기

 

 

 

프로젝트 칸반보드

 

1일차 칸반보드

 

 

2일차 칸반보드

 

 

3일차 칸반보드