imhamburger 님의 블로그

데이터엔지니어 부트캠프 - Redis Cache로 에어플로우 배치돌렸을 때 마지막 번호부터 실행하게 하기 (20주차) 본문

데이터엔지니어 부트캠프

데이터엔지니어 부트캠프 - Redis Cache로 에어플로우 배치돌렸을 때 마지막 번호부터 실행하게 하기 (20주차)

imhamburger 2024. 12. 1. 15:48

지난주에는 에어플로우 스케줄러를 이용해 정기적으로 크롤링해오는 기능을 만들었다.

 

크롤링 기능을 별개의 가상환경에서 실행하도록 설계했으며, 배치 작업과 크롤링 기능을 분리한 이유는 추후 크롤링에서 에러가 발생했을 때 크롤링 기능만 수정하면 되기 때문에 유지보수가 용이하다는 점이었다.

 

하지만 문제는 배치 작업이 실행될 때마다 크롤링을 처음부터 다시 시작해야 할지에 대한 것인가? 이다.

 

만약 그렇다면 효율적이지 않다.

그래서 offset.ini 파일을 사용해 마지막으로 크롤링한 페이지 번호를 저장하고, 크롤링이 끝난 후 마지막 페이지 번호를 offset에 저장했다.

 

하지만 이 방법을 에어플로우에서 사용하려니 문제가 생겼다.

크롤링 기능은 가상환경에서 실행되기 때문에, 그곳에서 저장된 offset.ini 파일을 에어플로우가 읽을 수 없는 상황이 된 것이다.

 

구글링 해본 결과, Redis Cache를 이용하면 해결할 수 있을 것 같았다.

 

Redis를 사용하면 크롤링 상태를 중앙화된 캐시로 관리할 수 있어, 에어플로우와 다른 시스템들이 공유할 수 있는 방식으로 데이터를 저장하고 읽어올 수 있다.

 

즉, offset 값을 Redis에 저장하고, 크롤링 기능은 이 값을 Redis에서 가져와서 시작 위치를 알 수 있게 된다.

에어플로우는 Redis를 통해 상태 정보를 쉽게 조회하고, 크롤링 작업을 실행할 수 있다.

Redis를 사용하면 여러 작업에서 이 데이터를 공유할 수 있기 때문에, 에어플로우가 실행될 때마다 크롤링을 처음부터 시작하는 문제를 해결할 수 있다.

  • Redis 서버 설정: Redis를 설치하고 설정. 에어플로우와 크롤링 기능이 서로 접근할 수 있는 공통의 저장소 역할. (나는 yaml 에 Redis를 추가하였다.)
  • offset 정보 Redis에 저장: 크롤링 기능이 끝날 때마다, 크롤링한 마지막 페이지 번호를 Redis에 저장. 예를 들어, r.set('crawl_offset', 마지막페이지번호)와 같은 방식으로 저장.
  • 에어플로우에서 Redis 접근: 배치 작업이 실행될 때마다, 에어플로우가 Redis에 저장된 offset 값을 읽어와서 그 값을 기반으로 크롤링을 시작. 이를 통해 매번 크롤링을 처음부터 시작하는 비효율적인 문제를 방지할 수 있다.
  • 크롤링 상태 관리: 크롤링이 끝날 때마다 Redis에서 현재 상태를 업데이트.

이렇게 하면 에어플로우와 크롤링 기능이 서로 독립적으로 유지되면서도, 크롤링이 진행된 상태를 공유하고, 배치 작업에 필요한 효율성을 유지할 수 있다. Redis를 사용하는 것은 상태를 실시간으로 업데이트하고 여러 시스템 간에 공유할 수 있다.

 

 

Docker-compose.yaml 에 다음과 같이 Redis를 추가하였다.

redis:
    image: redis:7
    container_name: redis
    ports:
      - "6379:6379"
    restart: always
    networks:
      - airflow_network

 

 

그리고 크롤링기능에 utils.py를 만들어 다음과 같이 설정해주었다.

import redis

def connect_to_redis():
    try:
        return redis.StrictRedis(
            host='redis',  # Redis 컨테이너의 호스트 이름
            port=6379,
            decode_responses=True
        )
    except redis.ConnectionError as e:
        print(f"Redis 연결 오류: {e}")
        raise

def get_last_id_from_redis(crawler_name, default_id=58410):
    r = connect_to_redis()
    key = f'{crawler_name}_last_processed_id'  # 크롤러 이름에 맞는 키 생성
    last_id = r.get(key)
    if last_id is None:
        r.set(key, default_id)  # Redis에 기본값 설정
        return default_id
    return int(last_id)

def update_last_id_in_redis(crawler_name, new_id):
    r = connect_to_redis()
    key = f'{crawler_name}_last_processed_id'  # 크롤러 이름에 맞는 키 생성
    r.set(key, new_id)
    print(f"마지막 처리된 ID를 Redis에 업데이트했습니다. {crawler_name}: {new_id}")

 

여기서 default_id를 1로 하면 1부터 실행을 한다. 나의 경우 58410번부터 설정하여 크롤링을 실행하였다.

크롤링이 끝나면 new_id에 마지막번호가 저장되고, 다음에 실행될 땐 new_id 부터 실행된다.

 

 

dag파일에 추가

def sending_to_s3():
    print('*' * 30)
    print(f"Processing Task")
    print('*' * 30)

    from crawling.utils import get_last_id_from_redis, update_last_id_in_redis
    from crawling.links import get_link
    
    # 링크 가져오기
    last_id = get_last_id_from_redis()
    links = get_link(start_id=last_id)
    if links:
        #URL에서 마지막 ID 추출
        last_url = links[-1]  # 마지막 URL
        last_processed_id = int(last_url.split('/')[-1])
        update_last_id_in_redis(last_processed_id) #Redis에 저장
        print(f"마지막 ID: {last_processed_id}")
    else:
        print("링크 리스트가 비어 있습니다.")
        return

 

이렇게 설정해주면, 끝!

 

근데 기능을 구현하기는 했는데 Redis Cache가 정확히 뭐지?

 

Redis Cache는 데이터베이스와 애플리케이션 사이에서 데이터를 빠르게 읽고 쓸 수 있도록 도와주는 인메모리 데이터 저장소다.

Redis는 데이터를 메모리에 저장하기 때문에 디스크 기반 데이터베이스보다 매우 빠르게 데이터를 처리할 수 있다.

주로 자주 사용되는 데이터나 계산 결과를 임시로 저장하여 시스템 성능을 향상시키는 데 사용된다.

 

  • 인메모리 데이터 저장소: Redis는 데이터를 디스크가 아니라 메모리에 저장하기 때문에, 디스크 기반의 데이터베이스보다 훨씬 빠르게 데이터를 읽고 쓸 수 있다.
  • 키-값 저장: Redis는 간단한 키-값 형태로 데이터를 저장한다. 예를 들어, r.set('username', 'ham')처럼 'username'이라는 키에 'ham'이라는 값을 저장할 수 있다.
  • 빠른 속도: Redis는 읽기 및 쓰기 속도가 매우 빠르며, 초당 수백만 건의 작업을 처리할 수 있다. 이 때문에 캐시 시스템이나 세션 관리 등 빠른 데이터 처리 성능이 필요한 곳에서 널리 사용된다.
  • 다양한 데이터 구조 지원: Redis는 문자열(String), 해시(Hash), 리스트(List), 셋(Set), 정렬된 셋(Sorted Set), 비트맵(Bitmap) 등 다양한 데이터 구조를 지원한다. 이 덕분에 복잡한 데이터를 효율적으로 저장하고 처리할 수 있다.
  • 영속성 지원: Redis는 기본적으로 인메모리지만, 데이터를 디스크에 백업하거나 복제하는 기능도 제공하여 데이터를 영속적으로 저장할 수 있다. 이 기능을 사용하면 서버가 재시작되더라도 데이터를 복원할 수 있다.

어쨋든 Redis는 빠르고 사용하기 간편하다!!

 

나는 테스트를 위해 마지막 번호를 지정하여 에어플로우를 실행하고 싶었다.

Redis CLI를 사용해 수동으로 설정하는 방법은 다음과 같다.

 

Redis CLI 접속

redis-cli

 

Redis 값 설정

SET last_processed_id {설정할 값}

 

설정된 값 확인

GET last_processed_id

 

테스트를 할 때 이렇게해주면 처음부터 기다릴 필요가 없어 편했다.

 

 

나의 에러 리포트

 

에러 1) redis connection refused

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3005, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3159, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3183, in _execute_task
    return _execute_task(self, context, task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 417, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 505, in execute
    return super().execute(context=serializable_context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 417, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 238, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 870, in execute_callable
    result = self._execute_python_callable_in_subprocess(python_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 588, in _execute_python_callable_in_subprocess
    raise AirflowException(error_msg) from None
airflow.exceptions.AirflowException: Process returned non-zero exit status 1.
Error 111 connecting to localhost:6379. Connection refused.

 

 

발생이유

connect_to_redis 함수에서는 host가 로컬로 되어 있는데 docker-compose.yaml에서는 container_name을 redis 로 명시하였기 때문에 host를 'redis'로 명시해줘야 서로 연결할 수 있다.

 

 

해결방법

변경 전 utils.py

import redis

def connect_to_redis():
    return redis.StrictRedis(
        host='localhost',  
        port=6379,
        decode_responses=True
    )

 

변경 후 utils.py

import redis

def connect_to_redis():
    return redis.StrictRedis(
        host='redis',  
        port=6379,
        decode_responses=True
    )