imhamburger 님의 블로그
데이터 엔지니어링 - 초당 수천만개의 로그데이터가 생긴다면? 본문
만약 초당 수천만개의 로그데이터가 생긴다고 상상했을 때, 데이터를 어떻게 처리해야할까?
방법은 다양하겠지만, 내가 공부한 것을 토대로 적어보고자 한다.
우선 어떤 문제가 발생할까?
1. 로그파일이 너무 많아 성능 저하
2. 데이터를 읽어올 때 스캔 비용 증가
3. 배치 처리 속도가 느림
이러한 문제들이 발생할 수 있다. 이에 따른 해결전략은??
1. 로그파일이 너무 많아 성능 저하 -> Parquet 파일을 병합(Compaction) 하여 큰 파일로 변환
2. 데이터를 읽어올 때 스캔 비용 증가 -> Partitioning + Clustering
3. 배치 처리 속도가 느림 -> Apache Spark로 분산처리
수천만개의 로그데이터가 S3 혹은 GCS 아니면 어느 별도의 데이터저장소에 저장되어 있다.
1. Parquet 파일을 병합(Compaction)하기
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ParquetCompaction").getOrCreate()
# S3에서 작은 Parquet 파일 로드
df = spark.read.parquet("s3://my-bucket/raw-logs/")
# 병합하여 S3에 다시 저장 (큰 파일로 변환)
df.repartition(100).write.mode("overwrite").parquet("s3://my-bucket/logs/")
pyspark의 repartition을 이용해 100개의 파티션으로 나눠 저장한다. 그럼 최종적으로 Parquet 파일 100개가 생성된다.
100개는 내가 정한건데, 더 줄이고싶다면 수를 조정하면 된다.
단, 너무 큰 파일을 만들면 병렬 처리 속도가 저하될 수 있으니 → 적절한 개수 선택 필요!
혹은 날짜별 폴더에 저장하고 싶다면?
df = df.withColumn("event_date", df["event_time"].cast("date"))
df.write \
.partitionBy("event_date") \
.mode("overwrite") \
.parquet("gs://my-bucket/logs/")
+-------------------+-----------+
| event_time| event_date|
+-------------------+-----------+
|2025-03-01 12:00:00| 2025-03-01|
|2025-03-02 13:00:00| 2025-03-02|
+-------------------+-----------+
기존 컬럼을 덮어쓰고 싶다면, withColumn()을 사용해 event_time을 다시 할당할 수도 있다!
여기서 coalesce 랑 헷갈릴 수 있다.
함수 | 동작 방식 | 사용 |
repartition(n) | Shuffle 발생 + 균등 분배 | 성능 최적화 + 병렬 처리 |
coalesce(n) | Shuffle 없이 파티션 병합 | 작은 파일 단순 병합 |
데이터가 크고 여러 노드에서 병렬 처리가 필요하면 → repartition(100)
단순히 작은 파일을 합치고 싶으면 → coalesce(100)
2. Partitioning + Clustering
Partitioning? Clustering?
- Partitioning: 날짜(event_date), 로그 유형(log_type) 기준으로 저장
- Clustering: user_id 또는 session_id 기준으로 정렬
클러스터링을 사용해야 하는 이유?
클러스터링은 테이블이 저장되는 방식을 결정하므로 일반적으로 쿼리 성능 향상을 위해 가장 먼저 고려할 만한 옵션이다.
쿼리에서 특정 열을 기준으로 필터링하는 경우가 많다면 쿼리에서 필터와 일치하는 블록만 스캔하므로 클러스터링이 쿼리 속도를 높인다.
따라서 데이터 웨어하우스(Bigquery/Redshift) 적재 전, 컬럼 기준으로 클러스터링하면 조회 성능이 향상됨!
Partition을 설정하면 특정 날짜 데이터만 조회할 때 비용 절감 가능!
-- BigQuery Partitioned Table 생성
CREATE TABLE my_dataset.logs_partitioned
PARTITION BY DATE(event_time)
CLUSTER BY user_id
AS SELECT * FROM my_dataset.optimized_logs;
혹은 빅쿼리에 테이블을 생성하지 않고 GCS에 생성하고싶다면 EXTERNAL TABLE 사용
CREATE EXTERNAL TABLE my_dataset.external_logs
WITH PARTITION COLUMNS (event_time DATE)
OPTIONS (
format = 'PARQUET',
uris = ['gs://my-bucket/logs/*']
);
BigQuery에서는 외부 데이터에 대해 쿼리만 수행하고, 실제 데이터는 Cloud Storage에 남아있다.
3. 에어플로우로 배치작업
S3 로그 데이터 → Spark 처리 → BigQuery 적재 → 데이터모델링
from airflow import DAG
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
dag = DAG(
"gcs_to_bigquery_dbt",
schedule_interval="0 * * * *", # 매시간 실행
start_date=datetime(2025, 3, 1),
catchup=False
)
# GCS에서 새로운 로그 파일 감지
gcs_sensor = GCSObjectExistenceSensor(
task_id="wait_for_gcs_file",
bucket="my-log-bucket",
object="logs/raw/*.json",
google_cloud_conn_id="google_cloud_default",
timeout=600,
poke_interval=60,
dag=dag
)
# Spark로 로그 데이터 변환
spark_transform_task = SparkSubmitOperator(
task_id="spark_transform_logs",
application="/path/to/spark_log_processing.py",
conn_id="spark_default",
executor_memory="8g",
total_executor_cores=4,
dag=dag
)
# 1. GCS에서 BigQuery의 staging_table로 데이터 로드
gcs_to_bq_task = GCSToBigQueryOperator(
task_id="load_logs_to_staging_table",
bucket="my-processed-bucket", # GCS 버킷
source_objects=["logs/processed/*.parquet"], # GCS의 Parquet 파일들
destination_project_dataset_table="my_project.logs_dataset.staging_table", # BigQuery staging table
write_disposition="WRITE_APPEND", # 기존 데이터에 추가
source_format="PARQUET", # 파일 형식
dag=dag,
)
# 2. BigQuery에서 MERGE 작업 수행 (staging_table과 logs_table 병합)
bq_merge_task = BigQueryExecuteQueryOperator(
task_id='bq_merge_task',
sql="""
MERGE INTO `my_project.logs_dataset.logs_table` AS target
USING `my_project.logs_dataset.staging_table` AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.value = source.value
WHEN NOT MATCHED THEN
INSERT (id, value) VALUES (source.id, source.value)
WHERE target.event_date = CURRENT_DATE()
""",
use_legacy_sql=False, # Standard SQL 사용
dag=dag,
)
# dbt 테스트 실행 (데이터 검증을 먼저)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="cd /path/to/dbt_project && dbt test",
dag=dag
)
# dbt 실행 (스타 스키마 모델링)
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /path/to/dbt_project && dbt run",
dag=dag
)
# 실행 순서 정의
start_task = DummyOperator(task_id="start", dag=dag)
end_task = DummyOperator(task_id="end", dag=dag)
start_task >> gcs_sensor >> spark_transform_task >> gcs_to_bq_task >> dbt_test >> dbt_run >> end_task
write_disposition 옵션 종류
옵션 | 동작 방식 |
WRITE_APPEND | 기존 데이터 유지 + 새로운 데이터 추가 |
WRITE_TRUNCATE | 기존 데이터 삭제 후 새로운 데이터 덮어쓰기 |
WRITE_EMPTY | 테이블이 비어 있을 때만 적재 (기존 데이터가 있으면 실패) |
WRITE_APPEND 주의할 점
1. 중복 데이터 문제: 같은 데이터가 여러 번 추가될 수도 있음 → 해결 방법: MERGE를 사용하여 중복 방지
2. 무분별한 적재로 테이블이 너무 커질 수 있음 → 해결 방법: 파티셔닝 & 클러스터링 활용
파티셔닝된 테이블에서 MERGE 최적화
가장 최근의 데이터 파티션만을 대상으로 MERGE 연산을 수행. 테이블의 일부 파티션만 갱신되고, 전체 테이블을 스캔하는 시간을 절약할 수 있다.
MERGE INTO `my_project.dataset.table` AS target
USING `my_project.dataset.staging_table` AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.value = source.value
WHEN NOT MATCHED THEN
INSERT (id, value) VALUES (source.id, source.value)
WHERE target.date = CURRENT_DATE() -- 날짜 파티션만 처리
target.event_date = CURRENT_DATE() → 오늘 날짜 파티션만 갱신
staging_table이란?
staging_table은 보통 원본 데이터를 BigQuery로 로드하기 전에 임시로 저장하거나, 데이터 정제 및 변환을 위한 중간 단계 테이블로 사용된다. 이 테이블은 데이터를 로드하기 전에 정리하거나 변환하여 적재하는 데 유용하다.
예를 들어, 로그 데이터를 BigQuery에 로드하고, 이 데이터를 바로 MERGE 작업에 사용하기 전에, 변환 및 정제된 데이터를 staging_table에 임시로 저장할 수 있다.
데이터 정합성을 유지하고, 병합 전에 데이터를 미리 처리하고 검증하는 데 유용!!
그럼에도 불구하고 배치사이즈가 너무 크다면, 배치사이즈를 조절하여 처리할 수 있다.
# 배치 크기 설정
batch_size = 100000
# 첫 번째 배치 처리 시작
start_id = 1
end_id = start_id + batch_size - 1
# 예시 SQL 쿼리로 동적 범위 처리
bq_merge_task = BigQueryExecuteQueryOperator(
task_id="bq_merge_task",
sql=f"""
MERGE INTO `my_project.dataset.table` AS target
USING `my_project.dataset.staging_table` AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.value = source.value
WHEN NOT MATCHED THEN
INSERT (id, value) VALUES (source.id, source.value)
WHERE target.event_date = CURRENT_DATE() -- 특정 날짜 파티션만 처리
AND source.id BETWEEN {start_id} AND {end_id} -- 동적 배치 범위
""",
use_legacy_sql=False,
dag=dag,
)
# 다음 배치 범위 계산
start_id = end_id + 1
end_id = start_id + batch_size - 1
# 배치가 끝날 때마다 범위를 업데이트하여 계속 반복 처리
GCS → 빅쿼리 적재 전 SQL로 미리 테이블 생성
CREATE TABLE my_project.logs_dataset.logs_table (
log_id STRING,
event_time TIMESTAMP,
user_id STRING,
event_type STRING
)
PARTITION BY DATE(event_time) -- 날짜 기준 파티셔닝
CLUSTER BY user_id, event_type; -- user_id와 event_type 기준으로 클러스터링
미리 생성안하고 자동 생성하여 적용하는 방법은?
만약 테이블이 존재하지 않는다면 GCSToBigQueryOperator의 create_disposition="CREATE_IF_NEEDED"를 사용하여 테이블을 자동 생성할 수 있다.
GCSToBigQueryOperator(
task_id="load_logs_to_bq",
bucket="my-processed-bucket",
source_objects=["logs/processed/*.parquet"],
destination_project_dataset_table="my_project.logs_dataset.logs_table",
write_disposition="WRITE_APPEND",
create_disposition="CREATE_IF_NEEDED", # 테이블이 없으면 생성
schema_fields=[
{"name": "log_id", "type": "STRING"},
{"name": "event_time", "type": "TIMESTAMP"},
{"name": "user_id", "type": "STRING"},
{"name": "event_type", "type": "STRING"}
],
time_partitioning={"type": "DAY", "field": "event_time"}, # 파티셔닝 설정
clustering_fields=["user_id", "event_type"], # 클러스터링 설정
source_format="PARQUET",
dag=dag
)
'데이터 엔지니어링' 카테고리의 다른 글
UTM 파라미터 (0) | 2025.04.13 |
---|---|
데이터 엔지니어링 - 삭제된 데이터를 어떻게 DW에 반영할까? (0) | 2025.03.05 |
데이터 엔지니어링 - DBT 연습해보기 (Feat.Bigquery) (0) | 2025.02.28 |
6개월간의 데이터엔지니어 부트캠프를 마치며 (3) | 2025.01.03 |
데이터엔지니어 부트캠프 - 파이널 프로젝트 (12월의 기록) (1) | 2024.12.28 |