imhamburger 님의 블로그

데이터 엔지니어링 - 삭제된 데이터를 어떻게 DW에 반영할까? 본문

데이터 엔지니어링

데이터 엔지니어링 - 삭제된 데이터를 어떻게 DW에 반영할까?

imhamburger 2025. 3. 5. 16:42

먼저, 데이터베이스에서 데이터를 S3로 저장할 때, 삭제된 데이터를 추적할 수 있도록 is_deleted 컬럼을 추가해야 한다.

그런 다음 데이터베이스에서 변경된 데이터(삭제 포함) 추출한다.

SELECT 
    id, 
    name, 
    updated_at, 
    CASE 
        WHEN deleted_at IS NOT NULL THEN TRUE 
        ELSE FALSE 
    END AS is_deleted
FROM my_table
WHERE updated_at >= (SELECT MAX(updated_at) FROM my_s3_staging_table);

 

  • deleted_at 값이 존재하면 is_deleted = TRUE로 표시 (Soft Delete)
  • updated_at 기준으로 변경된 데이터만 추출 (Incremental 방식)
  • 이 데이터를 S3에 저장 (CSV, Parquet 등으로 저장 가능)

다음과 같이 S3에 저장된다.

id name updated_at is_deleted
1 John 2024-03-04 FALSE
2 Alice 2024-03-05 FALSE
3 Bob 2024-03-06 TRUE

 

 

 

S3 → 데이터 웨어하우스 (DW)로 적재

이제, S3에서 데이터를 가져와 DW (BigQuery, Redshift 등)에 적재한다.

 

  • BigQuery → LOAD DATA 대신 EXTERNAL TABLE 또는 bq load 명령어 사용
  • Snowflake → COPY INTO 사용
  • Redshift → COPY FROM 사용

 

CREATE OR REPLACE TABLE my_dwh_table (
    id INT64,
    name STRING,
    updated_at TIMESTAMP,
    is_deleted BOOL
);

 

GCS에서 데이터를 가져와 DW에 로드

LOAD DATA INTO my_dataset.my_dwh_table
FROM FILES (
    format = 'PARQUET',
    uris = ['gs://my-bucket/data.parquet']
);

 

CREATE TABLE ? EXTERNAL TABLE?

EXTERNAL TABLE (GCS에서 직접 조회)

CREATE OR REPLACE EXTERNAL TABLE my_dataset.my_external_table
OPTIONS (
    format = 'PARQUET',
    uris = ['gs://my-bucket/data.parquet']
);

-> 이 방식은 BigQuery 내부에 데이터를 저장하지 않고, GCS에 있는 데이터를 직접 조회
-> 쿼리 실행 시마다 GCS에서 데이터를 읽기 때문에 비용이 다르게 계산됨

 

데이터를 BigQuery 내부에 저장해야 한다면?  CREATE TABLE + LOAD DATA
GCS에서 직접 데이터 조회하고 싶다면? EXTERNAL TABLE

 

 

dbt 모델에서 삭제 반영 (MERGE 활용)

dbt 모델 (models/my_table.sql)

{{ config(
    materialized='incremental',
    unique_key='id',
    on_schema_change='merge'
) }}

SELECT 
    id, 
    name, 
    updated_at, 
    is_deleted
FROM my_dwh_table

{% if is_incremental() %}
    -- 변경된 데이터만 가져오기
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

 

  • incremental 모델로 설정하여, 변경된 데이터만 반영
  • unique_key='id' → ID 기준으로 데이터를 업데이트
  • is_deleted = TRUE인 데이터도 포함됨

삭제 처리 (DELETE)를 post-hook으로 실행

models/my_table.yml (삭제 처리 추가)

version: 2

models:
  - name: my_table
    description: "Incremental table with delete handling"
    config:
      post-hook: 
        - "DELETE FROM {{ this }} WHERE id IN (SELECT id FROM my_dwh_table WHERE is_deleted = TRUE)"

 

  • post-hook을 사용해 DELETE를 실행!
  • dbt가 SELECT로 데이터를 업데이트한 후 자동으로 삭제를 실행함

post-hook이랑 pre-hook은 무슨 차이??

pre-hook은 모델 실행 전에 임시 테이블 데이터 삭제 (주로 테이블 초기화, 임시 데이터 정리, 로그 기록 등에 사용됨.)

models:
  - name: my_table
    config:
      pre-hook: 
        - "DELETE FROM my_temp_table WHERE updated_at < CURRENT_DATE - INTERVAL 30 DAY"

모델 실행 전에 30일 이상된 데이터를 삭제!

 

post-hook은 모델 실행 후 삭제된 데이터 반영

models:
  - name: my_table
    config:
      post-hook: 
        - "DELETE FROM {{ this }} WHERE id IN (SELECT id FROM my_dwh_table WHERE is_deleted = TRUE)"

모델 실행 후에 is_deleted = TRUE인 데이터를 삭제!