imhamburger 님의 블로그

에어플로우(Airflow) - Celery Executor 사용하기 (13주차) 본문

에어플로우(Airflow)

에어플로우(Airflow) - Celery Executor 사용하기 (13주차)

imhamburger 2024. 10. 2. 23:50

지난글에서는 "ml-worker" 모듈을 자동적으로 실행해줄 cron 세팅하였는데, 이번에는 cron이 아닌 Airflow를 이용해 스케줄러 작업을 해보고자 한다.

그리고 하는김에 CeleryExecutor를 곁들인......(사실 이것이 메인)

 

에어플로우에는 3개의 Executor가 있다.

종류 특징
SequentialExecutor 모든 태스크를 한 번에 하나씩 순차적으로 실행
LocalExecutor 여러 개의 태스크를 병렬로 실행할 수 있지만, 같은 호스트 내에서만 가능
CeleryExecutor 여러 대의 컴퓨터에서 병렬로 태스크를 실행할 수 있으므로 가장 큰 확장성을 제공

 

 

Celery가 뭔데?

 

Celery를 쉽게 설명하자면, Celery는 컴퓨터에게 어떤 "일"(작업)을 나중에 따로 처리하도록 시킬 수 있게 해주는 도구이다.

 

예를들어, 누군가 웹사이트에서 회원가입을 하면 그 사람에게 확인 이메일을 보내야 한다. 하지만 이메일을 보내는 데는 시간이 걸린다.

이때 Celery를 이용하면 이메일 보내는 일을 큐에 넣어두고, 다른 중요한 일을 먼저 할 수 있다.
Celery는 이메일 보내기 같은 일을 백그라운드에서 알아서 처리해주며, 사용자는 빨리 다음 화면으로 넘어갈 수 있다.

 

여기서 잠깐!

Kafka랑 매우 비슷한데? 라고 생각할 수 있다.

 

하지만 조금 다르다!!

Kafka와 Celery는 둘 다 데이터를 처리하는 데 사용되지만, Kafka는 실시간 데이터 처리이고 Celery는 일을 즉시 처리하지 않고, 백그라운드에서 나눠서 처리한다.

특징 Kafka Celery
역할 데이터를 실시간으로 스트리밍하고 전송 작업(태스크)을 대기열에 넣고 비동기로 실행
데이터 흐름 연속적인 데이터 스트림 처리 (실시간) 단발성 작업 처리 (나중에 실행)
예시 이벤트 로그 수집, 센서 데이터, 실시간 분석 이메일 전송, 데이터 처리, 백그라운드 작업
사용 방식 Producer가 데이터를 보내고, Consumer가 데이터를 읽음 작업을 큐에 넣고 Worker가 이를 처리

 

정말 쉽게 설명하자면,

Celery는 우체국과 같다. 우편물을 큐에 넣고, 필요한 작업자(우체부)들이 이를 하나씩 꺼내어 배달한다.

시간이 걸리지만 정확하게 처리가 된다.

 

자 그럼 Celery에 대해 이해하였으니, 에어플로우에 Celery를 세팅해보자.

 

세팅하는 방법은 간단하다. (맞아.....?)

 

 

에어플로우를 설치할 가상환경 만들기

pyenv virtualenv 3.11.9 air2

 

 

나는 위에서 만든 pyenv 가상환경에서 Celery를 설치해줄 것이기 때문에 다음과 같은 명령어를 입력하였다. (설치관련 공식문서)

pip install "apache-airflow[celery]==2.10.2" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt"

 

그리고!

Celery 뿐만 아니라 Redis와 PostgreSQL 도 준비가 되어 있어야한다.

(참고로 나는 Docker 에서 이미지를 Pull 받아 2개를 실행해주었다.)

 

 

Docker Postgres 세팅

 

1. Docker Postgres 이미지 (Docker hub)

docker pull postgres:12

 

 

2. Postgres 실행

docker run -d -e POSTGRES_DB=airflow_db \
-e POSTGRES_USER=airflow_user \
-e POSTGRES_PASSWORD=airflow_pass \
--name airpg -p 15432:5432 \
postgres:12

 

 

3. Postgres db 생성하기 (공식문서)

docker exec -it airpg bash
psql -U airflow_user -d airflow_db #Postgre 접속
\c airflow_db airflow_user #데이터베이스 접속

 

 

결과

 

4. 데이터베이스 초기화

데이터베이스와 사용자 설정이 완료되었다면, Airflow에서 데이터베이스를 초기화해야 한다.

pip install psycopg2 #PostgreSQL 데이터베이스와 파이썬 사이의 연결을 가능하게 해주는 드라이버
airflow db init

 

이 명령어는 airflow_db에 Airflow가 사용하는 모든 메타데이터 테이블을 생성한다. 이 테이블들은 DAG의 정의, 태스크 상태, 로그 등의 데이터를 관리하는 데 사용된다.

 

5. 관리자 사용자 생성

마지막으로, Airflow 웹 UI에 로그인하기 위한 기본 사용자를 생성한다.

airflow users create -u test -p test -f test -l user -e test@user.com -r Admin
  • -u 또는 --username: 사용자 이름
  • -p 또는 --password: 사용자 비밀번호
  • -f 또는 --firstname: 사용자 이름
  • -l 또는 --lastname: 사용자 성
  • -e 또는 --email: 사용자 이메일 주소

 

확인

airflow users list

id | username | email         | first_name | last_name | roles
===+==========+===============+============+===========+======
1  | test     | test@user.com | test       | user      | Admin

 

 

근데 왜 데이터베이스를 세팅해야해? 

 

에어플로우는 여러 개의 작업(DAG와 Task)들을 관리하고 실행하는데,

이때 "어떤 작업이 언제 실행되었는지, 성공했는지 실패했는지, 다음에 뭘 해야 하는지"와 같은 중요한 정보를 기억해야 한다. 

이 정보를 저장할 공간이 필요한데, 그 역할을 데이터베이스가 한다.

 

예를들어,

에어플로우를 사용해서 매일 오전 9시에 데이터 분석 작업을 하도록 설정했다고 가정해보자.
오늘 작업이 성공했는지 실패했는지에 대한 정보를 기억해야 한다. 그래야 내일 또 올바르게 실행할 수 있기 때문!
이 정보를 그냥 컴퓨터 메모리에만 저장하면 컴퓨터가 꺼졌을 때 정보가 사라지는 문제가 있다.
그래서 이런 중요한 정보를 데이터베이스에 저장해서 언제든지 필요할 때 다시 꺼내올 수 있도록 하는 것이다.

 

이러한 작업에 대한 결과를 저장하는 장소를 백엔드 저장소(Result Backend) 라 부른다.

Celery는 실행 결과를 데이터베이스나 다른 백엔드 저장소에 저장해, 에어플로우가 실행 상태를 확인하고 추적할 수 있게 한다.

 

즉, 데이터베이스는 에어플로우가 작업들을 안전하고 효율적으로 관리하고,

문제 발생 시에도 복구할 수 있도록 도와주는 중요한 저장소 역할을 한다!

 

 

그러면 왜 Postgres를 사용해? MySql은 안돼?

아쉽게도 에어플로우에서는 PostgreSQL 을 지원한다........!

 

 

Docker Redis 세팅

docker run -d \
--name airflow_rd \
-p 16379:6379 \
-e REDIS_PASSWORD=airflow \
redis:5.0

 

 

Redis가 뭐야?

 

Redis는 컴퓨터의 메모리를 이용해 데이터를 저장하는 아주 빠른 데이터 저장소이다.

쉽게 말해, 빨리 꺼내서 써야 하는 정보를 저장하는 공간이다.

 

보통 데이터는 하드디스크에 저장되는데, 디스크에 있는 데이터를 찾고 꺼내는 데 시간이 좀 걸리는데,

Redis는 데이터를 메모리에 저장하기 때문에, 훨씬 더 빠르게 정보를 꺼낼 수 있다.

 

예를들어,

웹사이트에 로그인하면 내 정보가 서버에 저장돼야 하잖수? 이 정보를 Redis에 잠깐 저장해 두면, 웹사이트에서 빠르게 내 정보를 확인할 수 있다.
웹사이트를 이동할 때마다 다시 로그인할 필요 없이 한 번 저장해 둔 정보를 Redis에서 빠르게 가져올 수 있게 도와준다.

 

어쨋든 요지는!!

Celery 같은 태스크 처리 시스템에서 작업 큐로 Redis를 사용한다.
태스크가 큐에 추가되고, 작업자가 이를 가져가 실행한다. Celery의 메시지 브로커로 Redis를 사용해 작업을 관리하는 방식

 

 

에어플로우 환경설정 (.zshrc 에 환경변수 추가) 

export AIRFLOW__CELERY__BROKER_URL=redis://:@localhost:16379/0
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow_user:airflow_pass@localhost:15432/airflow_db
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow_user:airflow_pass@localhost:15432/airflow_db

 

 

자 이제 모든 에어플로우 Celery Executor 세팅은 끝났다. 이제 실행을 하면된다.

 

Daemon 실행

airflow scheduler -D --pid scheduler.pid
airflow webserver -D --pid webserver.pid
airflow celery worker -D --pid worker.pid -q q_1 #-q 옵션 사용시 dag의 Pool과 이름이 같아야 함
airflow celery flower -D --pid flower.pid

 

 

 

에러리포트

 

내 컴퓨터에서는 webserver 가 데몬으로 실행되지 않아 nohup을 이용하였다.

nohup airflow webserver --pid webserver.pid &

 

근데 nohup도 잘 실행되는가 싶더니 다시 에어플로우를 실행해보니 off되어 있었다.

별다른 해결방법을 찾지못해 그냥 데몬없이 실행하였다. 그랬더니 오류는 없었다.

 

데몬없이 실행

airflow scheduler #스케줄러 실행
airflow celery worker #워커 실행
airflow webserver #에어플로우 실헹
airflow celery flower #셀러리 플라워 실행

*celery flower는 celery 모니터링을 위한 것

 

 

 

에어플로우 worker 실행 시 다음과 같은 에러가 났다.

 

아래와 같은 명령어를 입력한 다음 PID 를 모두 kill하고 에어플로우 worker와 flower를 재실행하면 된다.

lsof -i :8793

 

 

에러메세지 1

  File "/Users/seon-u/.pyenv/versions/3.11.9/envs/air2/lib/python3.11/site-packages/airflow/operators/python.py", line 588, in _execute_python_callable_in_subprocess
    raise AirflowException(error_msg) from None
airflow.exceptions.AirflowException: Process returned non-zero exit status 1.
[Errno 2] No such file or directory: '/Users/seon-u/code/mnist/img/8644ba9b-eef7-4a05-8bdc-8429aadabcd3.png'
[2024-10-02, 02:46:16 UTC] {local_task_job_runner.py:266} INFO - Task exited with return code 1
[2024-10-02, 02:46:16 UTC] {taskinstance.py:3900} INFO - 0 downstream tasks scheduled from follow-on schedule check

 

API기능을 도커로 띄어서 -v 옵션을 주어 폴더 경로를 이어줘야한다. (-v 옵션을 잊고있었지모야)

 

 

최종결과

 

아래 사진을 보면 3분마다 실행되고 있는 것을 확인할 수 있다. (3분으로 세팅하였다.)

 

 

 

+ 에어플로우 CeleryExecutor 아키텍쳐

 

1. 웹 서버 → Workers (Workers): 웹 서버는 Workers가 실행한 작업의 로그를 가져와서 UI에서 사용자에게 보여준다.

2. 웹 서버 → DAG 파일: 사용자가 DAG가 어떻게 구성되어 있는지 보기 위해 웹 서버에서 DAG 파일을 읽어서 구조를 보여준다.

3. 웹 서버 → 데이터베이스: 웹 서버는 각 작업의 현재 상태(예: 완료, 실패, 실행 중)를 보기 위해 데이터베이스에서 정보를 가져온다.

4. Workers → DAG 파일: Workers는 DAG 파일을 읽어 어떤 작업을 해야 할지 파악하고, 그 내용을 바탕으로 작업을 실행한다.

5. Workers → 데이터베이스: 작업자들은 작업을 실행하면서 필요한 설정 정보나 변수, XCOM(작업 간 데이터 전달) 등을 데이터베이스에서 가져오거나 저장한다.

6. Workers → Celery의 결과 백엔드: 작업이 끝나면, 작업의 상태(성공, 실패 등)를 Celery의 결과 백엔드에 저장한다.

7. Workers → Celery의 브로커: Workers는 Celery의 브로커에 명령을 저장하여 다음에 실행할 작업을 준비한다.

8. 스케줄러 → DAG 파일: 스케줄러도 DAG 파일을 읽어서 어떤 작업들이 들어 있는지 파악하고, 각 작업을 실행한다.

9. 스케줄러 → 데이터베이스: 스케줄러는 새로운 DAG 실행과 관련된 정보를 데이터베이스에 저장하여 추후 작업 상태를 추적할 수 있게 한다.

10. 스케줄러 → Celery의 결과 백엔드: 완료된 작업의 상태를 결과 백엔드에서 가져와서, 다음에 어떤 작업을 해야 할지 결정한다.

11. 스케줄러 → Celery의 브로커: 실행해야 할 명령을 Celery의 브로커에 추가하여 작업자들이 이 작업을 수행할 수 있게 한다.

 

 

 

일주일을 보내며...

이번 주는 Celery라는 분산 작업 큐에 대해 배우는 시간이었는데, 개념이 조금 복잡해서 처음에는 수업에서 바로 이해하기가 쉽지 않았다. 하지만 이렇게 적어보면서 다시 정리하고 나니, 머릿속에서 퍼즐이 맞춰지는 느낌이 든다. 조금씩 감을 잡아가는 과정이 참 의미 있는 것 같다. 그리고 이번 주에는 새로운 팀 프로젝트가 다시 시작되었다. 이전 팀 프로젝트에서 느꼈던 아쉬운 점들을 되돌아보며, 이번에는 개선된 모습으로 프로젝트에 임해보려 한다.


앞으로 나의 방향

이번 주부터 다시 팀 프로젝트가 시작되었으니, 우선은 팀 프로젝트에 온전히 집중할 예정이다. 프로젝트의 목표를 명확히 설정하고, 이를 위해 각자의 역할을 충실히 해나가는 것이 가장 중요하다고 생각한다. 특히 지난번에 아쉬웠던 소통의 문제나 일정 관리에서의 어려움을 이번에는 적극적으로 개선하려고 한다. 팀원들과의 주기적인 점검과 피드백을 통해 더 나은 결과를 만들어 가고 싶다.

그리고 알고리즘 공부도 여전히 함께 해나갈 예정이다. 시간이 촉박해 보이지만, 틈틈이 계획을 세워서 알고리즘 문제를 풀어나가고, 코딩 테스트 준비도 조금씩 해 나가려 한다. 이론적인 이해도 중요하지만, 실전에 활용할 수 있는 기술로 발전시키는 것이 목표다. 꾸준한 연습을 통해, 조금씩이라도 성장해가는 나의 모습을 그려나가고 싶다.