imhamburger 님의 블로그

스파크(Spark) - JSON파일을 읽어와 아파치스파크에서 파싱하기 본문

스파크(Spark)

스파크(Spark) - JSON파일을 읽어와 아파치스파크에서 파싱하기

imhamburger 2024. 8. 20. 12:24

JSON파일을 읽어와 스파크에서 파싱하기
 

1. JSON파일 불러오기

jdf = spark.read.option("multiline","true").json('/home/data/movies')
  • json파일 안에 데이터 형식 단일이 아닌 여러줄로 구성되어 있는 경우, multiline 값을 true로 줘야한다.

 
 

2. JSON파일 schema 확인하기

jdf.printSchema()

root
 |-- companys: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- companyCd: string (nullable = true)
 |    |    |-- companyNm: string (nullable = true)
 |-- directors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- peopleNm: string (nullable = true)
 |-- genreAlt: string (nullable = true)
 |-- movieCd: string (nullable = true)
 |-- movieNm: string (nullable = true)
 |-- movieNmEn: string (nullable = true)
 |-- nationAlt: string (nullable = true)
 |-- openDt: string (nullable = true)
 |-- prdtStatNm: string (nullable = true)
 |-- prdtYear: string (nullable = true)
 |-- repGenreNm: string (nullable = true)
 |-- repNationNm: string (nullable = true)
 |-- typeNm: string (nullable = true)
 |-- year: integer (nullable = true)
  • printSchema()를 통해 데이터의 형식과 타입을 확인할 수 있다.
  • companys와 directors는 array타입이기 때문에 행과 열로 이루어진 테이블로 만들 때 분리하거나 필요한 것만 선택하여 사용할 수 있다.

jdf를 z.show(jdf)하면 다음과 같다.

위 테이블에서 마지막 영화는 Array타입인 companys 컬럼에 영화사가 2개이다.
이를 2개로 펼쳐서 테이블을 만드려고 한다면, explode를 사용하면 된다.
 
 
 

3. explode를 이용하여 Array 펼치기

from pyspark.sql.functions import explode

edf = jdf.withColumn("company", explode("companys"))
  • withColumn을 이용하여 "company"라는 새로운 컬럼을 만들고 그 컬럼안에 value는 companys를 펼친 값이 들어간다.

이렇게 진행하였더니 빈테이블이 나왔다...! 
이유를 찾아보니, explode 함수를 사용했을 때 빈 테이블이 나오는 문제는 보통 다음과 같은 이유로 발생할 수 있다고 한다.
 
컬럼에 null 값 또는 빈 배열이 있을 때
explode 함수는 배열이나 리스트를 행으로 분해할 때, 해당 컬럼에 null 또는 빈 배열이 있으면 해당 행을 제거해 버린다.
이로 인해 결과 데이터프레임이 비어 있을 수 있다.

데이터프레임의 구조 문제
JSON 데이터의 구조가 예상과 다를 경우, explode를 사용하는 위치나 방법에 문제가 있을 수 있다.
예를 들어, 데이터가 중첩된 구조로 되어 있을 때, explode를 올바르게 적용하지 않으면 원하는 결과를 얻지 못할 수 있다.
 

해결방법

explode_outer 사용

explode_outer 함수는 null 또는 빈 배열에 대해서도 행을 유지하면서 explode를 수행한다. 이 함수는 null 값을 유지하고, 대신 해당 행에 null을 넣어준다.
 
explode와 explode_outer는 모두 값을 그대로 확장하여 개별 행으로 나눈다. 그러나 두 함수 모두 중첩된 리스트를 평탄화하지 않고, 리스트 형태로 남긴다. 중첩된 리스트를 처리하려면 추가적인 평탄화 작업이 필요하다.

(어쨋든 두 함수 모두 중첩된 데이터는 개별 행으로 나눌 수 없어 별도의 작업이 필요하다... 이건 마지막에 설명하겠다!!)

from pyspark.sql.functions import explode_outer

edf = jdf.withColumn("company", explode_outer("companys"))
eedf = edf.withColumn("director", explode_outer("directors"))

 
결과: 만들어진 "company"와 "director" 에 잘 펼쳐져서 값이 들어갔다.
(나는 directors 컬럼에도 값이 여러개여서 펼쳐주었다.)

 
 
근데 여기서 또 문제는 StructType.elementType을 잘 펴주었지만 아직도 중첩된데이터가 있다. printSchema()를 하면 다음과 같이 나온다.
(위에랑 비교했을 시 element 레벨이 사라진 것을 알 수있다.)
사실 중첩된 데이터를 펼쳐주는 작업이 있긴하지만 우선 나는 select를 이용하여 중첩된 데이터 중 하나만 고르는 방식을 이용하였다.

eedf.printSchema()

|-- repNationNm: string (nullable = true)
 |-- typeNm: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- company: struct (nullable = true)
 |    |-- companyCd: string (nullable = true)
 |    |-- companyNm: string (nullable = true)
 |-- director: struct (nullable = true)
 |    |-- peopleNm: string (nullable = true)

 

내가 필요한 건 이름이기에 companyNm과 peopleNm만 있으면 된다.
 
 
 

4. company의 하위항목인 companyNm을 직접적으로 선택해주기

sdf = eedf.select("movieCd", "movieNm", "genreAlt", "typeNm", "director.peopleNm", "company.companyNm")

 
결과

sdf.printSchema()

root
 |-- movieCd: string (nullable = true)
 |-- movieNm: string (nullable = true)
 |-- genreAlt: string (nullable = true)
 |-- typeNm: string (nullable = true)
 |-- peopleNm: string (nullable = true)
 |-- companyNm: string (nullable = true)

 
 
만약, 원본 컬럼도 유지하고 싶다면??
아예 새로운 컬럼으로 만들어주면 된다.

sdf = eedf.withColumn("directorNm", col("director.peopleNm"))

 
결과

 
 
만약, Array타입이 아닌 String타입인데 데이터가 1개가 아닌 2개라면??
 
 
 

5. Split을 이용하여 String타입 펼치기

 
예시

|-- nationAlt: string (nullable = true)

위 사진을 보면 nationAlt 는 string타입인데 2개의 국가가 표시되어있는 영화도 있다.
 
이것도 위에 했던것처럼 펼치려면??
split을 이용하면 된다!

eedf = edf.withColumn("nation", explode(split(edf["nationAlt"], ",")))
  • edf의 "nationAlt"라는 컬럼을 "," 기준으로 분리하고 explode하여 펼친다. 
  • 그리고 그값을 "nation"이라는 컬럼 안에 넣는다.

결과

 
 
 
 
+ 번외
중첩된 리스트는 어떻게 처리할까?
먼저 중첩되어 있다는 건...!
예를들어, 아래와 같은 테이블과 같은 모양이다.

name address contacts
hamburger New York, 10001 email, hamburger@example.com

address컬럼과 contacts컬럼 안에는 두 개의 데이터가 각각 들어가 있는 것을 볼 수 있다.
 
이를 JSON 구조로 생각해보자. 이 JSON파일의 이름은 double이라 하겠다.

{
    "name": "hamburger",
    "address": {
        "city": "New York",
        "zipcode": "10001"
    },
    "contacts": [
        {
            "type": "email",
            "value": "hamburger@example.com"
        }
    ]
}

 
위 데이터를 스키마로 나타내면 다음과 같다.

schema = StructType([
    StructField("name", StringType()),
    StructField("address", StructType([
        StructField("city", StringType()),
        StructField("zipcode", StringType())
    ])),
    StructField("contacts", ArrayType(StructType([
        StructField("type", StringType()),
        StructField("value", StringType())
    ])))
])

StructType 안에 StringType과 StructType이 있고 또 StructType안에 StringType이 있고.. 이런 구조이다.
이를 중첩 데이터 구조라 할 수 있다.
 
중첩된 데이터 구조를 컬럼으로 모두 펼쳐?내려면 재귀적으로 접근하면 된다.

def get_json_keys(schema, prefix):
    keys = []
    for field in schema.fields:
        if isinstance(field.dataType, StructType):
            if prefix:
                new_prefix = f"{prefix}.{field.name}"
            else:
                new_prefix = field.name
            keys += get_json_keys(field.dataType, new_prefix)
        elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
            if prefix:
                new_prefix = f"{prefix}.{field.name}"
            else:
                new_prefix = field.name
            keys += get_json_keys(field.dataType.elementType, new_prefix)
        else:
            if prefix:
                keys.append(f"{prefix}.{field.name}")
            else:
                keys.append(field.name)
    return keys

 
1. def get_json_keys(schema, prefix):

  • schema는 PySpark의 StructType, ArrayType 등 스키마를 나타내고, prefix는 현재 탐색 중인 키의 경로를 나타내는 문자열
  • prefix는 함수가 재귀적으로 호출될 때 중첩된 구조체의 키 경로를 유지

 
2. keys = []:

  • 함수 내에서 사용할 빈 리스트를 생성. 이 리스트는 모든 키를 저장하는 역할

 
3. for field in schema.fields:

  • schema.fields는 StructType의 각 필드를 나타내는 리스트
  • for문을 이용해 LOOP

 
4. if isinstance(field.dataType, StructType):

  • if prefix: prefix는 현재 탐색 중인 키의 경로를 나타내는 문자열로 존재한다면 현재 경로가 최상위 레벨이 아니라는 것
  • new_prefix = f"{prefix}.{field.name}": prefix가 존재하면, 현재 필드 이름을 접두사에 붙여 새로운 경로 new_prefix를 생성. 예를들어, prefix가 "address"이고 field.name이 "city"라면, new_prefix는 "address.city"가 된다.
  • else: new_prefix = field.name: prefix가 없으면, 현재 필드 이름만으로 new_prefix를 생성

 
5.  keys += get_json_keys(field.dataType, new_prefix)

  • 재귀적으로 get_json_keys 함수를 호출하여, 현재 필드(즉, 구조체)의 내부 필드를 탐색
  • 호출 결과를 keys 리스트에 추가
  • 중첩된 구조체가 존재할 때 내부 구조체의 모든 키를 가져오는 핵심

 
6.  elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):

  • 필드의 데이터 타입이 배열(ArrayType)이고, 그 배열의 요소가 구조체(StructType)인지 확인
  •  if prefix: new_prefix = f"{prefix}.{field.name}" 위에서 설명한 StructType 필드에 대한 new_prefix를 만드는 과정과 동일

 
7. else: if prefix:

  • 위의 조건들에 해당하지 않는 경우, 즉 필드가 기본 데이터 타입인 경우를 처리
  • 이 경우에는 더 이상 구조체나 배열이 아니므로, 현재 필드 이름을 최종 키로 간주하고 이를 keys에 추가 keys.append(field.name)

 
실행결과

flat = get_json_keys(double.schema, "")
print(flat)

#출력결과
["name", "address.city", "address.zipcode", "contacts.type", "contacts.value"]

모든 컬럼이 펼쳐진 것을 확인할 수 있다.