imhamburger 님의 블로그

Pandas join으로 데이터에 이미 존재하는 ID가 있으면 append 하기 본문

판다스(Pandas)

Pandas join으로 데이터에 이미 존재하는 ID가 있으면 append 하기

imhamburger 2025. 11. 16. 16:01

배경

  • 매월 크롤링해오는 데이터를 전처리 후 DynamoDB에 저장시키는 형태
  • 원천 테이블 A에는 companyID가 없는 상태
  • 기준 테이블 B에는 companyID가 존재하며, companyName + addressCode 조합으로 기업 식별이 가능
  • 기준 테이블 B에 적재 시 companyID 값을 만들어 적재해야하는 상황
  • 기존에는 단순히 데이터 삭제 후 새로운 companyID를 생성해서 적재했기 때문에 매번 다른 companyID값이 생성

 

해결방법

테이블 A의 데이터를 테이블 B에 적재할 때 이미 존재하는 회사라면 기존 companyID로 매핑하고,

존재하지 않는 회사만 새로운 companyID를 부여하는 로직을 설계·구현을 해야한다.

import boto3
import pandas as pd

def insert_companyid():
    FORMATTED_ONE_MONTH_AGO = '2025-09'

    dynamodb = boto3.resource('dynamodb')
    client = boto3.client('dynamodb')
    src_table = dynamodb.Table('NPS_temp1_Cleaned')
    target_table = dynamodb.Table('test2_data_cleaned')

    # Source scan (2025-09 데이터)
    df_source = scan_table(src_table)
    df_source['addressCode'] = df_source['addressCode'].astype(int)
    print(df_source)

    # Target scan (기존 companyID)
    df_target = scan_table(target_table)
    df_existing_mapping = df_target[['companyID', 'companyName', 'addressCode']].drop_duplicates()
    df_existing_mapping['addressCode'] = df_existing_mapping['addressCode'].astype(int)
    print(df_existing_mapping)


    # 기존 회사: inner merge → companyID 유지
    df_existing = df_source.merge(
        df_existing_mapping,
        on=['companyName', 'addressCode'],
        how='inner'
    )
    df_existing['registrationDate'] = FORMATTED_ONE_MONTH_AGO
    print(f"기존 회사 merge rows: {len(df_existing)}")
    print(df_existing[['companyName','addressCode','companyID','registrationDate']])

    # 신규 회사: anti-join → max+1 ID
    df_new = df_source.merge(
        df_existing_mapping,
        on=['companyName','addressCode'],
        how='left',
        indicator=True
    )
    
    df_new = df_new[df_new['_merge']=='left_only'].drop(columns=['_merge'])

    if not df_new.empty:
        # 신규 companyID 부여
        max_id = df_existing_mapping['companyID'].astype(int).max() if not df_existing_mapping.empty else 0
        df_new['companyID'] = range(max_id+1, max_id+1 + len(df_new))
        df_new['registrationDate'] = FORMATTED_ONE_MONTH_AGO
        print(f"신규 회사 rows: {len(df_new)}")
        print(df_new[['companyName','addressCode','companyID','registrationDate']])
    else:
        # 빈 DataFrame이라도 컬럼 유지
        df_new = pd.DataFrame(columns=['companyName','addressCode','companyID','registrationDate'])
        print("신규 회사 없음")

    # 최종 insert 대상 (if-else 밖으로 이동)
    df_to_insert = pd.concat([df_existing, df_new], ignore_index=True)
    print(f"최종 insert 대상 rows: {len(df_to_insert)}")
    print(df_to_insert[['companyName','addressCode','companyID','registrationDate']])


    # DynamoDB item 변환
    items = []
    for _, row in df_to_insert.iterrows():
        item = {
            'companyID': int(row['companyID']),                   # Partition Key
            'registrationDate': row['registrationDate'],         # Sort Key
            'companyName': row['companyName'],
            'addressCode': int(row['addressCode']),
            'applyAt': row['applyAt'],
            'companyIndustryCode': int(row['companyIndustryCode']),
            'companyIndustryName': row['companyIndustryName'],
            'companyTypeDivision': int(row['companyTypeDivision']),
            'currentMonthAmount': int(row['currentMonthAmount']),
            'employerIdNumber': int(row['employerIdNumber']),
            'joinStatusCode': int(row['joinStatusCode']),
            'lotNumberAddress': row['lotNumberAddress'],
            'numberLostSubscriber': int(row['numberLostSubscriber']),
            'numberNewSubscriber': int(row['numberNewSubscriber']),
            'numberSubscriber': int(row['numberSubscriber']),
            'reRegisterAt': row['reRegisterAt'],
            'roadNameAddress': row['roadNameAddress'],
            'withdrawalAt': row['withdrawalAt']
        }
        items.append(item)

    print(f"DynamoDB에 넣을 item 수: {len(items)}")

    batch_write(target_table, items)
    print(f"{FORMATTED_ONE_MONTH_AGO} 데이터 {len(df_to_insert)}건 저장 완료")

 

 

STEP.1 기존 회사: inner merge → companyID 유지

  • `df_source`(신규 로드된 원천 데이터)와 `df_existing_mapping`(기존 companyName–addressCode–companyID 매핑 테이블)을 companyName + addressCode 기준으로 inner join
  • 즉, 이미 테이블 B에 존재하는 회사들만 골라내는 단계
  • 기존 회사는 매핑된 `companyID`를 그대로 사용
  • `registrationDate`는 동일 날짜로 업데이트

STEP.2 신규 회사: anti-join → max+1 ID

  • left join 후 `_merge` 컬럼을 확인하여 left_only → 기존 테이블에 없는 신규 회사만 필터링
  • 즉, anti-join(차집합) 로직
  • 이 단계에서 아직 companyID는 없음

STEP.3 신규 companyID 부여

  • 신규 회사가 존재하는 경우, 기존 companyID의 최댓값 `max_id`를 가져옴
  • 신규 row 수만큼 `max_id + 1`부터 연속된 ID 생성
  • 신규 회사도 동일하게 `registrationDate` 부여

STEP.4 빈 DataFrame 컬럼 유지

  • 신규 데이터가 한 개도 없을 때 로직이 실패하지 않도록 필요한 컬럼만 가진 빈 DataFrame 생성
  • 이후 후속 프로세스에서 에러 방지

 

결과

매번 데이터를 삭제하고 companyID 새로 생성할 필요가 없어졌다!