imhamburger 님의 블로그
스파크(Spark) - JSON파일을 읽어와 아파치스파크에서 파싱하기 본문
JSON파일을 읽어와 스파크에서 파싱하기
1. JSON파일 불러오기
jdf = spark.read.option("multiline","true").json('/home/data/movies')
- json파일 안에 데이터 형식 단일이 아닌 여러줄로 구성되어 있는 경우, multiline 값을 true로 줘야한다.
2. JSON파일 schema 확인하기
jdf.printSchema()
root
|-- companys: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyCd: string (nullable = true)
| | |-- companyNm: string (nullable = true)
|-- directors: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- peopleNm: string (nullable = true)
|-- genreAlt: string (nullable = true)
|-- movieCd: string (nullable = true)
|-- movieNm: string (nullable = true)
|-- movieNmEn: string (nullable = true)
|-- nationAlt: string (nullable = true)
|-- openDt: string (nullable = true)
|-- prdtStatNm: string (nullable = true)
|-- prdtYear: string (nullable = true)
|-- repGenreNm: string (nullable = true)
|-- repNationNm: string (nullable = true)
|-- typeNm: string (nullable = true)
|-- year: integer (nullable = true)
- printSchema()를 통해 데이터의 형식과 타입을 확인할 수 있다.
- companys와 directors는 array타입이기 때문에 행과 열로 이루어진 테이블로 만들 때 분리하거나 필요한 것만 선택하여 사용할 수 있다.
jdf를 z.show(jdf)하면 다음과 같다.
위 테이블에서 마지막 영화는 Array타입인 companys 컬럼에 영화사가 2개이다.
이를 2개로 펼쳐서 테이블을 만드려고 한다면, explode를 사용하면 된다.
3. explode를 이용하여 Array 펼치기
from pyspark.sql.functions import explode
edf = jdf.withColumn("company", explode("companys"))
- withColumn을 이용하여 "company"라는 새로운 컬럼을 만들고 그 컬럼안에 value는 companys를 펼친 값이 들어간다.
이렇게 진행하였더니 빈테이블이 나왔다...!
이유를 찾아보니, explode 함수를 사용했을 때 빈 테이블이 나오는 문제는 보통 다음과 같은 이유로 발생할 수 있다고 한다.
컬럼에 null 값 또는 빈 배열이 있을 때
explode 함수는 배열이나 리스트를 행으로 분해할 때, 해당 컬럼에 null 또는 빈 배열이 있으면 해당 행을 제거해 버린다.
이로 인해 결과 데이터프레임이 비어 있을 수 있다.
데이터프레임의 구조 문제
JSON 데이터의 구조가 예상과 다를 경우, explode를 사용하는 위치나 방법에 문제가 있을 수 있다.
예를 들어, 데이터가 중첩된 구조로 되어 있을 때, explode를 올바르게 적용하지 않으면 원하는 결과를 얻지 못할 수 있다.
해결방법
explode_outer 사용
explode_outer 함수는 null 또는 빈 배열에 대해서도 행을 유지하면서 explode를 수행한다. 이 함수는 null 값을 유지하고, 대신 해당 행에 null을 넣어준다.
explode와 explode_outer는 모두 값을 그대로 확장하여 개별 행으로 나눈다. 그러나 두 함수 모두 중첩된 리스트를 평탄화하지 않고, 리스트 형태로 남긴다. 중첩된 리스트를 처리하려면 추가적인 평탄화 작업이 필요하다.
(어쨋든 두 함수 모두 중첩된 데이터는 개별 행으로 나눌 수 없어 별도의 작업이 필요하다... 이건 마지막에 설명하겠다!!)
from pyspark.sql.functions import explode_outer
edf = jdf.withColumn("company", explode_outer("companys"))
eedf = edf.withColumn("director", explode_outer("directors"))
결과: 만들어진 "company"와 "director" 에 잘 펼쳐져서 값이 들어갔다.
(나는 directors 컬럼에도 값이 여러개여서 펼쳐주었다.)
근데 여기서 또 문제는 StructType.elementType을 잘 펴주었지만 아직도 중첩된데이터가 있다. printSchema()를 하면 다음과 같이 나온다.
(위에랑 비교했을 시 element 레벨이 사라진 것을 알 수있다.)
사실 중첩된 데이터를 펼쳐주는 작업이 있긴하지만 우선 나는 select를 이용하여 중첩된 데이터 중 하나만 고르는 방식을 이용하였다.
eedf.printSchema()
|-- repNationNm: string (nullable = true)
|-- typeNm: string (nullable = true)
|-- year: integer (nullable = true)
|-- company: struct (nullable = true)
| |-- companyCd: string (nullable = true)
| |-- companyNm: string (nullable = true)
|-- director: struct (nullable = true)
| |-- peopleNm: string (nullable = true)
내가 필요한 건 이름이기에 companyNm과 peopleNm만 있으면 된다.
4. company의 하위항목인 companyNm을 직접적으로 선택해주기
sdf = eedf.select("movieCd", "movieNm", "genreAlt", "typeNm", "director.peopleNm", "company.companyNm")
결과
sdf.printSchema()
root
|-- movieCd: string (nullable = true)
|-- movieNm: string (nullable = true)
|-- genreAlt: string (nullable = true)
|-- typeNm: string (nullable = true)
|-- peopleNm: string (nullable = true)
|-- companyNm: string (nullable = true)
만약, 원본 컬럼도 유지하고 싶다면??
아예 새로운 컬럼으로 만들어주면 된다.
sdf = eedf.withColumn("directorNm", col("director.peopleNm"))
결과
만약, Array타입이 아닌 String타입인데 데이터가 1개가 아닌 2개라면??
5. Split을 이용하여 String타입 펼치기
예시
|-- nationAlt: string (nullable = true)
위 사진을 보면 nationAlt 는 string타입인데 2개의 국가가 표시되어있는 영화도 있다.
이것도 위에 했던것처럼 펼치려면??
split을 이용하면 된다!
eedf = edf.withColumn("nation", explode(split(edf["nationAlt"], ",")))
- edf의 "nationAlt"라는 컬럼을 "," 기준으로 분리하고 explode하여 펼친다.
- 그리고 그값을 "nation"이라는 컬럼 안에 넣는다.
결과
+ 번외
중첩된 리스트는 어떻게 처리할까?
먼저 중첩되어 있다는 건...!
예를들어, 아래와 같은 테이블과 같은 모양이다.
name | address | contacts |
hamburger | New York, 10001 | email, hamburger@example.com |
address컬럼과 contacts컬럼 안에는 두 개의 데이터가 각각 들어가 있는 것을 볼 수 있다.
이를 JSON 구조로 생각해보자. 이 JSON파일의 이름은 double이라 하겠다.
{
"name": "hamburger",
"address": {
"city": "New York",
"zipcode": "10001"
},
"contacts": [
{
"type": "email",
"value": "hamburger@example.com"
}
]
}
위 데이터를 스키마로 나타내면 다음과 같다.
schema = StructType([
StructField("name", StringType()),
StructField("address", StructType([
StructField("city", StringType()),
StructField("zipcode", StringType())
])),
StructField("contacts", ArrayType(StructType([
StructField("type", StringType()),
StructField("value", StringType())
])))
])
StructType 안에 StringType과 StructType이 있고 또 StructType안에 StringType이 있고.. 이런 구조이다.
이를 중첩 데이터 구조라 할 수 있다.
중첩된 데이터 구조를 컬럼으로 모두 펼쳐?내려면 재귀적으로 접근하면 된다.
def get_json_keys(schema, prefix):
keys = []
for field in schema.fields:
if isinstance(field.dataType, StructType):
if prefix:
new_prefix = f"{prefix}.{field.name}"
else:
new_prefix = field.name
keys += get_json_keys(field.dataType, new_prefix)
elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
if prefix:
new_prefix = f"{prefix}.{field.name}"
else:
new_prefix = field.name
keys += get_json_keys(field.dataType.elementType, new_prefix)
else:
if prefix:
keys.append(f"{prefix}.{field.name}")
else:
keys.append(field.name)
return keys
1. def get_json_keys(schema, prefix):
- schema는 PySpark의 StructType, ArrayType 등 스키마를 나타내고, prefix는 현재 탐색 중인 키의 경로를 나타내는 문자열
- prefix는 함수가 재귀적으로 호출될 때 중첩된 구조체의 키 경로를 유지
2. keys = []:
- 함수 내에서 사용할 빈 리스트를 생성. 이 리스트는 모든 키를 저장하는 역할
3. for field in schema.fields:
- schema.fields는 StructType의 각 필드를 나타내는 리스트
- for문을 이용해 LOOP
4. if isinstance(field.dataType, StructType):
- if prefix: prefix는 현재 탐색 중인 키의 경로를 나타내는 문자열로 존재한다면 현재 경로가 최상위 레벨이 아니라는 것
- new_prefix = f"{prefix}.{field.name}": prefix가 존재하면, 현재 필드 이름을 접두사에 붙여 새로운 경로 new_prefix를 생성. 예를들어, prefix가 "address"이고 field.name이 "city"라면, new_prefix는 "address.city"가 된다.
- else: new_prefix = field.name: prefix가 없으면, 현재 필드 이름만으로 new_prefix를 생성
5. keys += get_json_keys(field.dataType, new_prefix)
- 재귀적으로 get_json_keys 함수를 호출하여, 현재 필드(즉, 구조체)의 내부 필드를 탐색
- 호출 결과를 keys 리스트에 추가
- 중첩된 구조체가 존재할 때 내부 구조체의 모든 키를 가져오는 핵심
6. elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
- 필드의 데이터 타입이 배열(ArrayType)이고, 그 배열의 요소가 구조체(StructType)인지 확인
- if prefix: new_prefix = f"{prefix}.{field.name}" 위에서 설명한 StructType 필드에 대한 new_prefix를 만드는 과정과 동일
7. else: if prefix:
- 위의 조건들에 해당하지 않는 경우, 즉 필드가 기본 데이터 타입인 경우를 처리
- 이 경우에는 더 이상 구조체나 배열이 아니므로, 현재 필드 이름을 최종 키로 간주하고 이를 keys에 추가 keys.append(field.name)
실행결과
flat = get_json_keys(double.schema, "")
print(flat)
#출력결과
["name", "address.city", "address.zipcode", "contacts.type", "contacts.value"]
모든 컬럼이 펼쳐진 것을 확인할 수 있다.
'스파크(Spark)' 카테고리의 다른 글
스파크(Spark) - 제플린으로 만든 그래프에 select box 구현하기 (0) | 2024.08.21 |
---|---|
스파크(Spark) - 분산 처리 시스템 구조 이해하기 (0) | 2024.08.14 |
Apache Zeppelin process died 오류, Zeppelin 포트번호 변경하기 (0) | 2024.08.09 |