imhamburger 님의 블로그
Pandas join으로 데이터에 이미 존재하는 ID가 있으면 append 하기 본문
배경
- 매월 크롤링해오는 데이터를 전처리 후 DynamoDB에 저장시키는 형태
- 원천 테이블 A에는 companyID가 없는 상태
- 기준 테이블 B에는 companyID가 존재하며, companyName + addressCode 조합으로 기업 식별이 가능
- 기준 테이블 B에 적재 시 companyID 값을 만들어 적재해야하는 상황
- 기존에는 단순히 데이터 삭제 후 새로운 companyID를 생성해서 적재했기 때문에 매번 다른 companyID값이 생성
해결방법
테이블 A의 데이터를 테이블 B에 적재할 때 이미 존재하는 회사라면 기존 companyID로 매핑하고,
존재하지 않는 회사만 새로운 companyID를 부여하는 로직을 설계·구현을 해야한다.
import boto3
import pandas as pd
def insert_companyid():
FORMATTED_ONE_MONTH_AGO = '2025-09'
dynamodb = boto3.resource('dynamodb')
client = boto3.client('dynamodb')
src_table = dynamodb.Table('NPS_temp1_Cleaned')
target_table = dynamodb.Table('test2_data_cleaned')
# Source scan (2025-09 데이터)
df_source = scan_table(src_table)
df_source['addressCode'] = df_source['addressCode'].astype(int)
print(df_source)
# Target scan (기존 companyID)
df_target = scan_table(target_table)
df_existing_mapping = df_target[['companyID', 'companyName', 'addressCode']].drop_duplicates()
df_existing_mapping['addressCode'] = df_existing_mapping['addressCode'].astype(int)
print(df_existing_mapping)
# 기존 회사: inner merge → companyID 유지
df_existing = df_source.merge(
df_existing_mapping,
on=['companyName', 'addressCode'],
how='inner'
)
df_existing['registrationDate'] = FORMATTED_ONE_MONTH_AGO
print(f"기존 회사 merge rows: {len(df_existing)}")
print(df_existing[['companyName','addressCode','companyID','registrationDate']])
# 신규 회사: anti-join → max+1 ID
df_new = df_source.merge(
df_existing_mapping,
on=['companyName','addressCode'],
how='left',
indicator=True
)
df_new = df_new[df_new['_merge']=='left_only'].drop(columns=['_merge'])
if not df_new.empty:
# 신규 companyID 부여
max_id = df_existing_mapping['companyID'].astype(int).max() if not df_existing_mapping.empty else 0
df_new['companyID'] = range(max_id+1, max_id+1 + len(df_new))
df_new['registrationDate'] = FORMATTED_ONE_MONTH_AGO
print(f"신규 회사 rows: {len(df_new)}")
print(df_new[['companyName','addressCode','companyID','registrationDate']])
else:
# 빈 DataFrame이라도 컬럼 유지
df_new = pd.DataFrame(columns=['companyName','addressCode','companyID','registrationDate'])
print("신규 회사 없음")
# 최종 insert 대상 (if-else 밖으로 이동)
df_to_insert = pd.concat([df_existing, df_new], ignore_index=True)
print(f"최종 insert 대상 rows: {len(df_to_insert)}")
print(df_to_insert[['companyName','addressCode','companyID','registrationDate']])
# DynamoDB item 변환
items = []
for _, row in df_to_insert.iterrows():
item = {
'companyID': int(row['companyID']), # Partition Key
'registrationDate': row['registrationDate'], # Sort Key
'companyName': row['companyName'],
'addressCode': int(row['addressCode']),
'applyAt': row['applyAt'],
'companyIndustryCode': int(row['companyIndustryCode']),
'companyIndustryName': row['companyIndustryName'],
'companyTypeDivision': int(row['companyTypeDivision']),
'currentMonthAmount': int(row['currentMonthAmount']),
'employerIdNumber': int(row['employerIdNumber']),
'joinStatusCode': int(row['joinStatusCode']),
'lotNumberAddress': row['lotNumberAddress'],
'numberLostSubscriber': int(row['numberLostSubscriber']),
'numberNewSubscriber': int(row['numberNewSubscriber']),
'numberSubscriber': int(row['numberSubscriber']),
'reRegisterAt': row['reRegisterAt'],
'roadNameAddress': row['roadNameAddress'],
'withdrawalAt': row['withdrawalAt']
}
items.append(item)
print(f"DynamoDB에 넣을 item 수: {len(items)}")
batch_write(target_table, items)
print(f"{FORMATTED_ONE_MONTH_AGO} 데이터 {len(df_to_insert)}건 저장 완료")
STEP.1 기존 회사: inner merge → companyID 유지
- `df_source`(신규 로드된 원천 데이터)와 `df_existing_mapping`(기존 companyName–addressCode–companyID 매핑 테이블)을 companyName + addressCode 기준으로 inner join
- 즉, 이미 테이블 B에 존재하는 회사들만 골라내는 단계
- 기존 회사는 매핑된 `companyID`를 그대로 사용
- `registrationDate`는 동일 날짜로 업데이트
STEP.2 신규 회사: anti-join → max+1 ID
- left join 후 `_merge` 컬럼을 확인하여 left_only → 기존 테이블에 없는 신규 회사만 필터링
- 즉, anti-join(차집합) 로직
- 이 단계에서 아직 companyID는 없음
STEP.3 신규 companyID 부여
- 신규 회사가 존재하는 경우, 기존 companyID의 최댓값 `max_id`를 가져옴
- 신규 row 수만큼 `max_id + 1`부터 연속된 ID 생성
- 신규 회사도 동일하게 `registrationDate` 부여
STEP.4 빈 DataFrame 컬럼 유지
- 신규 데이터가 한 개도 없을 때 로직이 실패하지 않도록 필요한 컬럼만 가진 빈 DataFrame 생성
- 이후 후속 프로세스에서 에러 방지
결과
매번 데이터를 삭제하고 companyID 새로 생성할 필요가 없어졌다!
'판다스(Pandas)' 카테고리의 다른 글
| PostgreSQL의 JSON 타입 컬럼을 Pandas로 읽을 때 (0) | 2025.10.19 |
|---|---|
| 데이터엔지니어 부트캠프 - 영화 박스오피스 데이터 ETL(Extraction / Transform / Load) (7월의 기록) (1) | 2024.08.04 |
| 판다스(Pandas) - Partition_cols 이해하기 (0) | 2024.07.27 |
| 판다스(Pandas) - csv파일 불러오기, unicode 에러 해결하기 (0) | 2024.07.25 |