imhamburger 님의 블로그
데이터 엔지니어링 - DBT 연습해보기 (Feat.Bigquery) 본문
데이터웨어하우스에서 데이터 모델링할 때 쓰이는 DBT를 연습해보았다.
Raw data는 JSON파일인데 먼저 sql로 사용가능하게끔 데이터를 전처리해줘야한다!!
Step 1. JSON을 Pandas DataFrame으로 변환
import json
import pandas as pd
# JSON 데이터 로드
json_data = '''
{
"order_id": "ORD123",
"customer": {
"user_id": 12345,
"name": "John Doe",
"email": "johndoe@example.com"
},
"items": [
{
"product_id": "P001",
"name": "Laptop",
"category": "Electronics",
"price": 1200.50,
"quantity": 1
},
{
"product_id": "P002",
"name": "Mouse",
"category": "Accessories",
"price": 25.75,
"quantity": 2
}
],
"total_price": 1251.25,
"order_date": "2025-02-28T12:00:00Z"
}
'''
# JSON 파싱
data = json.loads(json_data)
# 주문 정보
orders_df = pd.DataFrame([{
"order_id": data["order_id"],
"customer_id": data["customer"]["user_id"],
"total_price": data["total_price"],
"order_date": data["order_date"]
}])
# 고객 정보
customers_df = pd.DataFrame([{
"customer_id": data["customer"]["user_id"],
"name": data["customer"]["name"],
"email": data["customer"]["email"]
}])
# 제품 정보
products_df = pd.DataFrame([
{
"product_id": item["product_id"],
"name": item["name"],
"category": item["category"],
"price": item["price"]
}
for item in data["items"]
])
# 주문 상세 정보
order_items_df = pd.DataFrame([
{
"order_id": data["order_id"],
"product_id": item["product_id"],
"quantity": item["quantity"]
}
for item in data["items"]
])
print(orders_df)
print(customers_df)
print(products_df)
print(order_items_df)
Step 2. DataFrame을 parquet로 변환
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# DataFrame을 Parquet 형식으로 변환하는 함수
def convert_df_to_parquet(df):
"""Pandas DataFrame을 Parquet 형식으로 변환."""
return pa.Table.from_pandas(df)
parquet 파일로 바꾸는 이유는 데이터 압축이 다른 json, csv보다 훨씬 높긴때문에 빅데이터에 유리해서!
Step 3. Parquet파일을 GCS로~
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
import io
# 메모리에서 Parquet 파일을 GCS에 업로드하는 함수
def upload_to_gcs_from_memory(parquet_table, gcs_bucket_name, gcs_blob_name):
"""Parquet 테이블을 메모리에서 GCS로 업로드."""
# BytesIO 객체 생성 (메모리에서 처리)
buf = io.BytesIO()
pq.write_table(parquet_table, buf)
buf.seek(0) # 버퍼의 처음으로 이동
# GCS 클라이언트 생성
storage_client = storage.Client()
bucket = storage_client.get_bucket(gcs_bucket_name)
blob = bucket.blob(gcs_blob_name)
# 메모리에서 읽은 Parquet 데이터를 GCS로 업로드
blob.upload_from_file(buf)
print(f"Uploaded Parquet file to gs://{gcs_bucket_name}/{gcs_blob_name}")
# DataFrame을 Parquet 파일로 변환하고 GCS로 업로드하는 함수
def process_and_upload_df(df, gcs_bucket_name, gcs_blob_name):
"""DataFrame을 Parquet로 변환하고 GCS에 업로드."""
parquet_table = convert_df_to_parquet(df)
upload_to_gcs_from_memory(parquet_table, gcs_bucket_name, gcs_blob_name)
# GCS 버킷 이름과 업로드할 경로
gcs_bucket_name = 'gcs-bucket-name'
# 각 데이터프레임을 Parquet 형식으로 변환하여 GCS로 업로드
process_and_upload_df(orders_df, gcs_bucket_name, 'orders.parquet')
process_and_upload_df(customers_df, gcs_bucket_name, 'customers.parquet')
process_and_upload_df(products_df, gcs_bucket_name, 'products.parquet')
process_and_upload_df(order_items_df, gcs_bucket_name, 'order_items.parquet')
Step 4. GCS에서 Bigquery로~
먼저 필요한 라이브러리 설치
pip install google-cloud-storage google-cloud-bigquery
from google.cloud import bigquery
from google.cloud import storage
# GCS에서 Parquet 파일을 BigQuery로 로드하는 함수
def load_parquet_to_bigquery(gcs_bucket_name, gcs_blob_name, bigquery_dataset, bigquery_table):
# BigQuery 클라이언트 생성
client = bigquery.Client()
# GCS URI (gs://버킷/파일경로)
gcs_uri = f"gs://{gcs_bucket_name}/{gcs_blob_name}"
# 로드할 테이블의 BigQuery 테이블 ID
table_id = f"{client.project}.{bigquery_dataset}.{bigquery_table}"
# BigQuery 로드 작업 설정
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # 기존 테이블 덮어쓰기 (선택사항)
)
# GCS의 Parquet 파일을 BigQuery로 로드
load_job = client.load_table_from_uri(
gcs_uri, table_id, job_config=job_config
)
# 로드 작업 진행 상태 확인
load_job.result() # 작업이 완료될 때까지 기다림
print(f"Loaded {gcs_uri} to {table_id}")
# 예시: GCS 버킷과 파일, BigQuery 데이터셋 및 테이블 이름
gcs_bucket_name = 'your-gcs-bucket-name'
gcs_blob_name = 'orders.parquet' # 업로드된 Parquet 파일 이름
bigquery_dataset = 'your_bigquery_dataset'
bigquery_table = 'orders' # BigQuery에 저장할 테이블 이름
# GCS에서 BigQuery로 Parquet 파일 로드
load_parquet_to_bigquery(gcs_bucket_name, gcs_blob_name, bigquery_dataset, bigquery_table)
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE는 BigQuery 테이블에 데이터를 로드할 때
기존 데이터를 덮어쓰는 옵션이다.
Parquet 파일 자체는 BigQuery에 로드되면 변환되어 테이블로 저장.
Step 5. DBT로 데이터모델링하기
일단.... dbt를 시작하려면 init을 해야한다.
dbt init {project_name}
그럼 막 폴더구조가 생겨날 것이다!
your_project_name/
├── models/
│ ├── example/
│ │ └── my_first_dbt_model.sql
│ └── your_model.sql
├── snapshots/
├── tests/
├── macros/
├── analysis/
├── data/
├── docs/
├── target/
├── logs/
├── dbt_project.yml
└── profiles.yml
이렇게 생김... (복잡해보임..)
profiles.yml에 연결할 데이터웨어하우스 정보를 넣어주면 데이터 연결은 됐고.
project_name:
target: dev
outputs:
dev:
type: bigquery
project: GCP 프로젝트 ID
dataset: BigQuery 데이터셋 이름
keyfile: GCP 서비스 계정 키 파일의 경로 (JSON 파일)
threads: 1 (DBT 실행 시 사용할 쓰레드 수)
timeout_seconds: 300 (DBT 작업의 시간 제한)
location: KR
dbt_project.yml 에선 profile 명만 위에 DBT 프로필 이름이랑 맞춰주면 된다.
다른 옵션은 나중에 살펴보는걸로..?
name: my_dbt_project # 프로젝트 이름
version: '1.0' # 프로젝트 버전
config-version: 2 # DBT 설정 버전
profile: my_bigquery_profile # DBT 프로필 이름 (BigQuery 연결 설정)
연결하면 sources/sources.yml 파일을 내손으로 만들어야 한다.
DBT 프로젝트에서 소스 데이터를 정의하려면 이 파일을 작성해야 함... 귀찮지만 해야한다.
예를 들어, BigQuery에 있는 raw_orders, raw_customers, raw_products 테이블을 소스로 정의해보자.
version: 2
sources:
- name: raw "빅쿼리에 넣은 데이터 원본 이름"
description: "원본 데이터를 포함한 테이블들"
tables:
- name: raw_orders
description: "주문 데이터를 포함한 테이블"
columns:
- name: order_id
description: "주문 ID"
- name: customer_id
description: "고객 ID"
- name: product_id
description: "제품 ID"
- name: quantity
description: "주문 수량"
- name: price
description: "제품 가격"
- name: raw_customers
description: "고객 정보를 포함한 테이블"
columns:
- name: customer_id
description: "고객 ID"
- name: name
description: "고객 이름"
- name: email
description: "고객 이메일"
- name: raw_products
description: "제품 정보를 포함한 테이블"
columns:
- name: product_id
description: "제품 ID"
- name: product_name
description: "제품 이름"
- name: category
description: "카테고리"
이제 진짜 준비끝.
이제 dbt에서 source('raw', 'raw_orders') 형태로 참조할 수 있어요.
모델링을 본격적으로 하면 되는데, 모델링 기법에는 3가지가 있다. 이건 나중에 다른글에서 다루겠다!
(스타 스키마 / 스노우플레이크 / 정규화 모델 이렇게 3가지)
우선, models 폴더 안에 3개의 폴더를 만들자 (staging / intermediate / marts)
models/
├── staging/ # 스테이징 모델을 넣을 폴더
│ ├── stg_orders.sql
│ ├── stg_customers.sql
│ └── stg_products.sql
├── intermediate/ # 인터미디엇 모델을 넣을 폴더
│ ├── int_order_summary.sql
│ └── int_customer_analysis.sql
└── marts/ # 최종 분석 모델을 넣을 폴더 (팩트 및 디멘션 테이블)
├── fct_orders.sql
└── dim_customers.sql
Step 5-1. DBT Staging 단계
dbt의 중간 정제 단계(staging 모델)에서는 데이터 타입을 변환한다. 숫자는 Int로 날짜는 날짜형식에 맞게.
SELECT
order_id,
CAST(customer_id AS INT) AS customer_id,
price,
order_date
FROM {{ source('raw', 'raw_orders') }}
customer_id를 INT로 변환, 불필요한 컬럼 제거
Step 5-2. DBT Intermediate 단계
스테이징 테이블을 활용해서, 여러 테이블을 조합한 중간 모델을 만드는 곳이다.
보통 Fact 테이블에서 필요한 정보를 먼저 조인해서 정리하는 과정이다. JOIN~
-- models/intermediate/int_orders.sql
SELECT
o.order_id,
o.customer_id,
c.name AS customer_name,
c.email AS customer_email,
o.price,
o.order_date
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('stg_customers') }} c
ON o.customer_id = c.customer_id
Intermediate 단계의 주요 역할
1. 여러 스테이징 테이블을 JOIN → 팩트 테이블을 만들 때 필요한 정보 미리 정리
2. 데이터 전처리 → 중복 제거, 날짜 변환 등
3. 팩트 테이블을 가볍게 유지 → 불필요한 계산을 사전에 처리
바로 팩트 테이블 만들면 안돼?
가능하지만, 팩트 테이블에서 여러 개의 스테이징 테이블을 직접 조인하면 유지보수와 성능이 떨어질 수 있음!
그래서 중간 가공을 담당하는 `int_*` 모델을 만들어 분리하는 것이 일반적이랩니다...
Step 5-3. DBT Fact & Dimension 테이블 생성
이제 최종적으로 Fact 테이블과 Dimension 테이블을 분리해서 정형화된 모델을 만든다.
-- models/marts/fact_orders.sql
SELECT
o.order_id,
o.customer_id,
o.customer_name,
o.total_price,
DATE_TRUNC('day', o.order_date) AS order_date, -- 일 단위로 정리
COUNT(oi.product_id) AS product_count
FROM {{ ref('int_orders') }} o
LEFT JOIN {{ ref('stg_order_items') }} oi
ON o.order_id = oi.order_id
GROUP BY 1, 2, 3, 4, 5
stg_order_items을 JOIN해서 상품 개수 집계, order_date를 DATE_TRUNC로 일 단위 변환
📍 최종 정리
✅ Staging (stg_) → 원본 데이터 정제 (데이터 타입 변환, 불필요한 컬럼 제거)
✅ Intermediate (int_) → 여러 테이블을 조합하여 중간 모델 생성
✅ Fact & Dimension (fact_, dim_) → 최종적으로 정형화된 테이블 구축
아니 근데 아까부터 팩트랑 디멘젼테이블 나오는데 그게 뭐야?
팩트 테이블(Fact Table): 측정 가능한 데이터(매출, 주문 수량, 방문 횟수 등) 저장
차원 테이블(Dimension Table): 팩트 테이블과 연결되는 참조 테이블로, 주요 속성 정보(날짜, 고객, 제품 정보 등)를 저장
이제 만들었으니 test부터 진행하고 테이블을 만들자.
테스트하는 이유는 데이터 모델의 품질과 무결성을 검증하기 위해서이다.
STEP 6. dbt test
DBT에서 테스트를 정의하는 방법은 크게 두 가지로 나눌 수 있다:
단위 테스트 (Assertions): DBT의 기본 제공 테스트를 사용하여 특정 열에 대한 유효성 검사를 수행
커스텀 테스트 (Custom Tests): SQL을 작성하여 데이터의 유효성을 검사하는 커스텀 테스트를 정의
기본 테스트 사용 (DBT 기본 제공 테스트)
DBT는 테이블과 열에 대한 기본 테스트를 제공한다.
예를 들어, not_null, unique, accepted_values 등 여러 가지 테스트가 있다.
각 단계마다 yml 파일을 만들어 스키마를 정의하면 된다.
schema.yml은 DBT 프로젝트 내에서 각 모델에 대한 메타데이터와 테스트를 정의하는 파일이다.
models/
├── staging/
│ ├── stg_orders.sql # 데이터 변환 SQL 모델
│ ├── stg_customers.sql
│ └── schema.yml # stg_orders, stg_customers 모델에 대한 메타데이터 및 테스트 정의
└── marts/
├── fct_orders.sql
└── schema.yml # fct_orders 모델에 대한 메타데이터 및 테스트 정의
schema.yml 작성
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_date
tests:
- not_null
- accepted_values:
values: ["2023-01-01", "2023-02-01", "2023-03-01"]
기본 제공 테스트 종류:
- not_null: 컬럼에 NULL 값이 없어야 함.
- unique: 컬럼의 값이 고유해야 함.
- accepted_values: 컬럼 값이 지정된 값 중 하나여야 함.
- relationships: 다른 테이블과의 관계를 검사하는 테스트.
커스텀 테스트 (Custom Tests)
사용자가 SQL 쿼리를 작성하여 커스텀 테스트를 정의할 수 있다.
커스텀 테스트 예시: 주문 금액이 0보다 큰지 확인하는 테스트
SQL 파일 작성: tests/ 폴더 안에 테스트 SQL 파일을 작성.
예시: tests/order_amount_positive.sql
-- tests/order_amount_positive.sql
SELECT
COUNT(*) AS failures
FROM {{ ref('stg_orders') }}
WHERE order_total_cleaned <= 0
그리고 해당 모델이 있는 경로로 가서 schema.yml에 커스텀 테스트도 추가하면 된다.
version: 2
models:
- name: stg_orders
columns:
- name: order_total_cleaned
tests:
- order_amount_positive # 커스텀 테스트 호출
테스트가 정의된 후에는 DBT CLI에서 dbt test 명령을 실행하여 모든 테스트를 실행할 수 있다.
dbt test
이 명령은 정의된 모든 테스트를 실행하고, 테스트 결과를 출력한다.
테스트가 성공하면 "PASSED"로 표시되고, 실패하면 "FAILED"로 표시됨.
예시
$ dbt test
11:42:03 | Concurrency: 1 threads (target='dev')
11:42:03 |
11:42:03 | 1 of 1 test passing (100.0%)
11:42:03 |
11:42:03 | DONE. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
STEP 7. dbt run
dbt run
dbt run 하면 models/ 폴더 내에서 정의된 모든 모델을 실행하여 테이블 또는 뷰를 생성한다.
models/marts 폴더에 있는 모델만 실행하려면, 다음과 같이 입력
dbt run --select marts.*
DBT 모델은 SQL 파일로 작성되어 있으며, 테이블 또는 뷰로 변환된다.
dbt_project.yml 파일에서 materializations을 설정하여 모델이 테이블, 뷰, 인덱스, 세션 테이블 등으로 생성되도록 할 수 있다.
models:
marts:
materialized: table # marts 폴더의 모든 모델은 테이블로 생성됨
staging:
materialized: view # staging 폴더의 모델은 뷰로 생성됨
모델이 생성되는 방식은 materialized 설정에 따라 다르다.
- table: 모델이 테이블로 생성.
- view: 모델이 뷰로 생성.
- incremental: 데이터가 증분으로 추가.
- ephemeral: 임시 모델이 생성.
'데이터 엔지니어링' 카테고리의 다른 글
데이터 엔지니어링 - 삭제된 데이터를 어떻게 DW에 반영할까? (0) | 2025.03.05 |
---|---|
데이터 엔지니어링 - 초당 수천만개의 로그데이터가 생긴다면? (0) | 2025.03.03 |
6개월간의 데이터엔지니어 부트캠프를 마치며 (3) | 2025.01.03 |
데이터엔지니어 부트캠프 - 파이널 프로젝트 (12월의 기록) (1) | 2024.12.28 |
데이터엔지니어 부트캠프 - 유사 공연 추천시스템 cosine_sim[idx] 수정하기 (24주차) (0) | 2024.12.28 |