imhamburger 님의 블로그
DBT unique_key 값 설정하기 본문
기존에는 GA 데이터를 BigQuery에 단순히 Airflow를 통해 주기적으로 적재하는 방식으로 운영해 왔다. 이때, 최근 3일치 데이터만 조회하여 기존 데이터를 삭제한 뒤 INSERT하는 방식으로 처리해 왔다.
DELETE FROM data_mart.EventAction
WHERE event_date IN (
CAST(PARSE_DATE('%Y%m%d', @target_date) AS DATE),
CAST(PARSE_DATE('%Y%m%d', @target_date_2) AS DATE),
CAST(PARSE_DATE('%Y%m%d', @target_date_3) AS DATE));
하지만 최근 dbt를 도입하면서, DELETE와 INSERT 방식 대신 event_timestamp가 이미 저장된 데이터보다 클 경우에만 데이터를 증분 처리하는 방식으로 개선하였다.
그럼에도 불구하고 여전히 현재 날짜 기준 3일 전부터의 데이터를 읽어오는 방식은 유지하고 있으며, 이때는 unique_key를 설정하여 해당 키가 이미 존재하면 병합(MERGE)하는 방식으로 처리하고 있다.
이는 event_timestamp만을 기준으로 증분 처리할 경우, 사용자 행동 로그 특성상 타임스탬프가 뒤늦게 들어오는 데이터가 누락될 수 있기 때문이다.
특히 GA 로그와 같은 사용자 행동 로그는 전송 지연이나 누락 등으로 인해 타임스탬프가 오래된 데이터가 나중에 수집되는 경우도 있기 때문에, 단순한 타임스탬프 기준 증분보다는 unique_key 기반 병합이 데이터의 정합성을 보다 잘 보장할 수 있다.
unique_key 를 설정할 때는 반드시 해당 컬럼에 null 값이 없어야 한다. 안그럼... 데이터가 이상해질 수 있다.
초기 설정은 unique_key 값에 여러가지를 추가하였다. 다음과 같이 말이다.
unique_key = ['event_timestamp',
'user_pseudo_id',
'event_name',
'user_id',
'ga_session_id',
'page_location',
'screen_name',
'previous_page_location',
],
그렇지만 공식문서를 살펴보면 각 행을 고유하게 식별하기 위해 여러 열을 조합해야 하는 경우
unique_key = ['user_id', 'session_number']문자열 표현식( )이 아닌 목록( ) 으로 이러한 열을 전달하는 것이 좋다고 한다. unique_key = 'concat(user_id, session_number)'.
공식문서
Configure incremental models | dbt Developer Hub
Learn how to configure and optimize incremental models when developing in dbt.
docs.getdbt.com
실제로 내가 처음에 여러 값들로 주었을 때보다 concat으로 조합하여 사용했을 때가 더 정합성이 높았다.
이유는 여러값을 주었을 땐 다음과 같은 에러가 자주 났었다.
Database Error in model return_visitor (models/marts/return_visitor.sql)
UPDATE/MERGE must match at most one source row for each target row
해당 에러는 다음과 같은 이유로 발생한다. MERGE 시 unique_key 기준으로 소스 테이블에서 중복된 row가 2개 이상일 경우, 어떤 하나로 업데이트해야 할지 dbt가 판단하지 못하기 때문에 실패하는 것.
수정한 unique_key
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'unique_key',
target = 'general'
) }}
CONCAT(CAST(event_datetime_KST AS STRING), user_pseudo_id, event_name, page_location, ga_session_id) AS unique_key
{% if is_incremental() %}
WHERE TIMESTAMP_MICROS(event_timestamp) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY)
{% endif %}
그리고 PARTITION BY를 썼다면, ROW_NUMBER()의 PARTITION BY 기준을 unique_key와 정확히 맞춰야 한다.
이렇게 공식문서 TIP에 나와있는 것처럼 설정하니 에러를 해결할 수 있었다.
이제는 삭제-삽입 방식이 아닌 UPDATE 또는 INSERT만 수행하는 구조로 바뀌었기 때문에, 데이터 적재 과정에서 불필요한 연산이 줄어들고, 기존 데이터의 무결성을 유지할 수 있게 되었다.
이 방식은 특히 이벤트 데이터처럼 양이 많고, 타임스탬프가 뒤늦게 도착하는 경우가 잦은 로그성 데이터 처리에 적합하다.
덕분에 분석에 필요한 데이터 정합성이 높아지고, 배치 처리 효율성도 개선할 수 있었다!!

'끄적끄적' 카테고리의 다른 글
| 컬럼형 DB ClickHouse?? (0) | 2025.06.19 |
|---|---|
| A/B 테스트할 때는 어떤걸 쓸까 gtm과 gtag (0) | 2025.05.29 |
| ERROR: Lambda is initializing your function (0) | 2025.05.11 |
| 인스타그램 API 에러... (Impressions를 더이상 제공하지 않는다..!) (0) | 2025.04.27 |
| 3월 회고 - 데이터 엔지니어로서의 시작 (1) | 2025.03.30 |
