imhamburger 님의 블로그
데이터 엔지니어링 - 삭제된 데이터를 어떻게 DW에 반영할까? 본문
먼저, 데이터베이스에서 데이터를 S3로 저장할 때, 삭제된 데이터를 추적할 수 있도록 is_deleted 컬럼을 추가해야 한다.
그런 다음 데이터베이스에서 변경된 데이터(삭제 포함) 추출한다.
SELECT
id,
name,
updated_at,
CASE
WHEN deleted_at IS NOT NULL THEN TRUE
ELSE FALSE
END AS is_deleted
FROM my_table
WHERE updated_at >= (SELECT MAX(updated_at) FROM my_s3_staging_table);
- deleted_at 값이 존재하면 is_deleted = TRUE로 표시 (Soft Delete)
- updated_at 기준으로 변경된 데이터만 추출 (Incremental 방식)
- 이 데이터를 S3에 저장 (CSV, Parquet 등으로 저장 가능)
다음과 같이 S3에 저장된다.
id | name | updated_at | is_deleted |
1 | John | 2024-03-04 | FALSE |
2 | Alice | 2024-03-05 | FALSE |
3 | Bob | 2024-03-06 | TRUE |
S3 → 데이터 웨어하우스 (DW)로 적재
이제, S3에서 데이터를 가져와 DW (BigQuery, Redshift 등)에 적재한다.
- BigQuery → LOAD DATA 대신 EXTERNAL TABLE 또는 bq load 명령어 사용
- Snowflake → COPY INTO 사용
- Redshift → COPY FROM 사용
CREATE OR REPLACE TABLE my_dwh_table (
id INT64,
name STRING,
updated_at TIMESTAMP,
is_deleted BOOL
);
GCS에서 데이터를 가져와 DW에 로드
LOAD DATA INTO my_dataset.my_dwh_table
FROM FILES (
format = 'PARQUET',
uris = ['gs://my-bucket/data.parquet']
);
CREATE TABLE ? EXTERNAL TABLE?
EXTERNAL TABLE (GCS에서 직접 조회)
CREATE OR REPLACE EXTERNAL TABLE my_dataset.my_external_table
OPTIONS (
format = 'PARQUET',
uris = ['gs://my-bucket/data.parquet']
);
-> 이 방식은 BigQuery 내부에 데이터를 저장하지 않고, GCS에 있는 데이터를 직접 조회
-> 쿼리 실행 시마다 GCS에서 데이터를 읽기 때문에 비용이 다르게 계산됨
데이터를 BigQuery 내부에 저장해야 한다면? CREATE TABLE + LOAD DATA
GCS에서 직접 데이터 조회하고 싶다면? EXTERNAL TABLE
dbt 모델에서 삭제 반영 (MERGE 활용)
dbt 모델 (models/my_table.sql)
{{ config(
materialized='incremental',
unique_key='id',
on_schema_change='merge'
) }}
SELECT
id,
name,
updated_at,
is_deleted
FROM my_dwh_table
{% if is_incremental() %}
-- 변경된 데이터만 가져오기
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
- incremental 모델로 설정하여, 변경된 데이터만 반영
- unique_key='id' → ID 기준으로 데이터를 업데이트
- is_deleted = TRUE인 데이터도 포함됨
삭제 처리 (DELETE)를 post-hook으로 실행
models/my_table.yml (삭제 처리 추가)
version: 2
models:
- name: my_table
description: "Incremental table with delete handling"
config:
post-hook:
- "DELETE FROM {{ this }} WHERE id IN (SELECT id FROM my_dwh_table WHERE is_deleted = TRUE)"
- post-hook을 사용해 DELETE를 실행!
- dbt가 SELECT로 데이터를 업데이트한 후 자동으로 삭제를 실행함
post-hook이랑 pre-hook은 무슨 차이??
pre-hook은 모델 실행 전에 임시 테이블 데이터 삭제 (주로 테이블 초기화, 임시 데이터 정리, 로그 기록 등에 사용됨.)
models:
- name: my_table
config:
pre-hook:
- "DELETE FROM my_temp_table WHERE updated_at < CURRENT_DATE - INTERVAL 30 DAY"
모델 실행 전에 30일 이상된 데이터를 삭제!
post-hook은 모델 실행 후 삭제된 데이터 반영
models:
- name: my_table
config:
post-hook:
- "DELETE FROM {{ this }} WHERE id IN (SELECT id FROM my_dwh_table WHERE is_deleted = TRUE)"
모델 실행 후에 is_deleted = TRUE인 데이터를 삭제!
'데이터 엔지니어링' 카테고리의 다른 글
UTM 파라미터 (0) | 2025.04.13 |
---|---|
데이터 엔지니어링 - 초당 수천만개의 로그데이터가 생긴다면? (0) | 2025.03.03 |
데이터 엔지니어링 - DBT 연습해보기 (Feat.Bigquery) (0) | 2025.02.28 |
6개월간의 데이터엔지니어 부트캠프를 마치며 (3) | 2025.01.03 |
데이터엔지니어 부트캠프 - 파이널 프로젝트 (12월의 기록) (1) | 2024.12.28 |