search:

Spark에서 csv file을 읽는 방법과 다양한 옵션들 소개

14 Jul 2019

목차

1. 들어가며

이제는 좀 없어졌으면 하면서도 아직도 많이 사용되는 CSV (Comma Seperated Value) 파일 포맷. value 안에 콤마가 포함되면 복잡한 처리가 필요하기 때문에 TAB을 seperator로 사용하기도 하면 이때는 TSV라고 불린다.

CSV의 문제는 표준 포맷이 없다는 점. 예를 들어서, 앞에 잠시 언급된 value 안에 구분자가 포함된 경우를 보자. ,를 escape 처리를 해야할지 혹은 큰 따옴표를 enclose를 해야할지에 대한 표준이 없다. 또한 new line에 대한 처리도 제각각이다.

이외에도 필드가 많아지고 String이 아닌 데이터 타입이 섞이면 너무 복잡해서 입에서 “What the hack”이 자연스레 나온다. BigData 특성상 전달 파일의 포맷이 잘못되는 경우가 많은데 필드가 많고 data type이 섞이게 되면 에러 찾는데 골치 아프다.

row oriented 이기 때문에 analytic한 질의처리도 느리고 압축도 안되어 있다.

하지만 어쪄랴, 계속 쓰이고 있으니 최대한 CSV의 특성과 사용 중인 framework에서 제공되는 csv parser 특성을 잘 알고 사용해야겠다.

그래서 이번엔 Spark에서 csv 읽는 방법과 다양한 옵션들을 알아볼까 한다.

이따가 나오겠지만, Spark 2.2에서는 columnNameOfCorruptRecord 옵션에 버르가 있기 때문에 가급적 Spark 2.3을 사용하는 것이 좋다. 아래 코드들은 특별한 언급이 없는 한 Spark 2.4로 작성되었다.

그리고 주로 CSV를 읽는 옵션에 대해 설명한다. CSV write에 관한 옵션도 많지만 우리 이제 파일 저장은 가급적 CSV가 아닌 parquet 같은 포맷으로 저장하자.

2. Spark CSV Data Source의 역사

Spark에서 csv를 사용하는 방법은 Spark 1.x와 Spark 2.x가 나뉜다. Spark 1.x에서는 csv를 읽기 위해서 별도 제공되는 패키지를 사용해야했다. Spark을 개발한 Databricks에서 제공하는 패키지로서 소스코드는 아직도 https://github.com/databricks/spark-csv 에서 볼 수 있다.

여담으로 한국의 Spark 커미터인 권혁진님이 참여한 코드를 볼 수 있다. commit history를 보니 첫 번재 커밋은 2015년 10월 22일인 듯.

Spark에서 기본 제공되지 않기 때문에 --packages 옵션으로 패키지를 지정해야 했다.

$ SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

이후 파일을 읽을 땐 format("com.databricks.spark.csv") 처럼 약간 긴 형식의 포맷을 지정해야 해서 코딩하기 불편했다.

예)

val df = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .load("cars.csv")

(위의 github에서 copy해봤음)

Spark 2.0부터는 csv 읽기 기능이 Spark에 기본 내장되었다. 따라서 더 이상 --packages 옵션을 지정할 필요가 없고, csv를 읽는 코드도 쉽게 작성 가능하다. (기존 코드도 어려웠던 건 아니지만 패키지 이름을 기억하고 매번 입력하는 게 번거로웠다)

아래는 Spark 2.x에서 csv 파일을 읽는 예이다.

val df = spark.
  read.
  csv("file:///tmp/test.csv")

(참고: 함수 호출 시 .을 어디에 입력했느냐는 csv나 Spark 버전과 아무 상관이 없다. code를 spark-shell에 copy&paste하기 쉽도록 라인이 끝나기 전에 .를 붙였을 뿐)

3. Spark의 csv 관련 옵션들

Spark 2.x에서는 다음과 같은 csv 관련 옵션을 제공한다.

출처는 Scala 공식 API 문서 이며, 소스 코드가 궁금한 분들은 여기를 보면 된다.

이해하기 어렵거나 중요한 옵션에만 설명을 달았다.

  • sep: 구분자
    • default: ,
    • Spark 1.6 방식에서는 delimiter를 사용해야 한다
  • encoding
    • default: UTF-8
  • quote: value가 큰 따옴표로 묶인 경우 "를 지정
    • defualt: "
  • escape: 구분자가 value안에 사용된 경우 escape를 처리할 문자
    • default: \
  • charToEscapeQuoteEscaping
    • default: escape or \0
  • comment: 코멘트의 시작을 알리는 문자
    • default: #
  • header: 첫 줄이 data가 아닌 헤더인 경우 "true"로 설정
    • default: false
  • enforceSchema:
    • defalut: true
  • inferSchema: schema를 Spark이 자동으로 알아내는 경우 사용
    • default: schema
  • samplingRatio: schema inferring 시에 data의 얼마를 샘플링할지
    • default: 1.0
  • ignoreLeadingWhiteSpace: value의 앞에 있는 공백을 제거할지 여부
    • default: false
  • ignoreTrailingWhiteSpace: value의 뒤에 있는 공백을 제거할지 여부
    • default: false
  • nullValue: null을 표현하는 문자열
    • default: empty string
  • emptyValue: 공백을 표현하는 문자열
    • default: empty string
  • nanValue: non-number를 표현하는 문자열
    • default: NaN
  • positiveInf: “양의 무한대”를 표현하는 문자열
    • default: Inf
  • negativeInf: “음의 무한대”를 표현하는 문자열
    • default: -Inf
  • dateFormat: 날자 포맷을 지정하는 문자열. java.text.SimpleDateFormat에서 사용하는한 포맷을 지정할 수 있다. date 타입인 필드에서 사용된다
    • default: yyyy-MM-dd
  • timestampFormat: dateFormat과 유사하며, timestamp 필드에서 사용된다
    • default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX
  • maxColumns: 최대 필드 개수
    • default: 20480
  • maxCharsPerColumn: 1개 필드에서 최대 문자열의 길이
    • default: -1 (즉, 제한없음)
  • mode: 에러 처리에 관련된 옵션. 개인적으로 csv를 다를 때 가장 중요한 옵션이라 생각한다
    • PERMISSIVE (default): 잘못된 포맷의 line을 만나면 해당 문자열을 columnNameOfCorruptRecord에서 지정한 필드에 저장하고, 나머지 필드는 모두 null로 설정한다
    • DROPMALFORMED: 잘못된 문자열의 레코드는 무시한다
    • FAILFAST: 잘못된 문자열을 레코드를 만나면 에러를 출력한다
  • columnNameOfCorruptRecord
  • multiLine: 1개 레코드가 여러 line에 걸쳐있는지 지정하는 옵션

4. 읽기 기초

csv 파일에 필드 이름을 알려주는 헤더가 존재하는 경우 schema 지정 없이도 쉽게 파일을 읽을 수 있다.

  • 파일 내용
      $ cat /tmp/test.csv
      name,age,weight
      Kim,10,70.0
      Park,20,69.5
    
  • csv 읽기
      val df = spark.
        read.
        option("header", "true").
        csv("file:///tmp/test.csv")
    
  • 읽은 결과
      df.show
      +----+---+------+
      |name|age|weight|
      +----+---+------+
      | Kim| 10|  70.0|
      |Park| 20|  69.5|
      +----+---+------+
    

필드 이름까지 정확히 출력되었다.

그런데, schema를 보면 3개 필드 모두 string으로 정의된 것을 볼 수 있다.

df.printSchema
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- weight: string (nullable = true)

이 경우 명시적으로 schema를 지정하면 된다. 아래 예부터는 원본 csv에는 header가 없는 것을 가정하였다.

  • 파일 내용
      $ cat test.csv
      Kim,10,70.0
      Park,20,69.5
    
  • csv 읽기
      case class Person(name: String, age: Integer, weight: Double)
    
      val df = spark.
        read.
        schema(org.apache.spark.sql.Encoders.product[Person].schema).
        csv("file:///tmp/test.csv")
    
  • 읽혀진 schema
      df.printSchema
      root
       |-- name: string (nullable = true)
       |-- age: integer (nullable = true)
       |-- weight: double (nullable = true)
    

ageweight가 string에서 각각 integer과 double로 변경된 것을 볼 수 있다.

다음은 enclosednullValue 옵션 두 개를 한번에 보고 가자

  • 파일 내용
      $ cat /tmp/test.csv
      Kim,10,70.0,'Korea, South',test@test.com
      Park,20,69.5,USA,N/A
    
  • csv 읽기
      val df = spark.
        read.
        option("quote", "'").
        option("nullValue", "N/A").
        schema(org.apache.spark.sql.Encoders.product[Person].schema).
        csv("file:///tmp/test.csv")
    
  • 읽은 결과
      df.show
      +----+---+------+------------+-------------+
      |name|age|weight|     country|        email|
      +----+---+------+------------+-------------+
      | Kim| 10|  70.0|Korea, South|test@test.com|
      |Park| 20|  69.5|         USA|         null|
      +----+---+------+------------+-------------+
    

Korea, South가 1개의 필드에 잘 저장되었고, 원본에 N/A로 입력되었던 값이 null로 잘 변환되었다.

5. 읽기 고급 - 에러 처리

사실 “에러 처리”에 관한 내용이 본 글을 포스팅하게 된 계기다. Data를 전달받을 때는 그 포맷이 올바른지 검사를 하는 것이 중요한데, CSV의 특성상 포맷이 잘못 가능성이 높고 잘못된 포맷을 검사하기가 불편한다.

하지만, Spark csv의 mode 옵션을 잘 사용하는 경우 에러를 쉽게 찾을 수 있다.

test를 쉽게 하기 위해 필드는 name, age만 만들었다.

입력 파일의 내용은 다음과 같다.

$ cat /tmp/test.csv
Kim,10
Park,Female
Gu,10,
Lee,20,Male
,20
,,
,

첫 번재 line을 제외하고는 모두 잘못된 포맷이다 (일부 line은 요구 사항에 따라 정상일 수 있음)

Kim,10      <= 정상
Park,Female <= 나이가 나와야할 곳에 문자열이 자장되어 있다
Gu,10,      <= 나이 옆에 콤마가 하나 더 있음
Lee,20,Male <= (이름, 나이)만 있어야 하지만 성별까지 저장되어 있음
,20         <= 이름이 비어 있다
,           <= 필드가 2개인데, 모두 empty string
,,          <= 필드가 3개이며 모두 empty string

5-1) PERMISSIVE mode

기본 모드인 PERMISSIVE 모드이다. permissive는 “허용된”이라는 단어이다. (사실 나도 이번에 알게된 단어고, 흔히 사용되는 permission의 형용사이다.)

  • csv 읽기
      case class Person(name: String, age: Integer)
    
      val df = spark.
        read.
        option("mode", "PERMISSIVE").
        schema(org.apache.spark.sql.Encoders.product[Person].schema).
        csv("file:///tmp/test.csv")
    
  • 읽은 결과
      df.show
      +----+----+
      |name| age|
      +----+----+
      | Kim|  10|
      |null|null| <=  부분이 중요
      |  Gu|  10|
      | Lee|  20|
      |null|  20|
      |null|null|
      |null|null|
      +----+----+
    

두 번째 레코드가 중요하다. 원본 CSV에는 ageFemale이 저장되어 있으나 null로 바뀌었고, name 조차도 null로 바뀌었다.

이것이 PERMISSIVE 모드의 특징이다. Integer가 나와야할 위치에 문자열 값이 나오다보니 Spark은 원본 Data가 잘못되었다고 판단하여 null로 변경해버렸다.

그렇다면 이번엔 PERMISSIVE와 함께 사용되는 columnNameOfCorruptRecord을 지정해보자.

  • csv 읽기
      case class Person(name: String,
                        age: Integer,
                        error_string: String) // 필드가 추가되었음
    
      val df = spark.
        read.
        option("mode", "PERMISSIVE").
        option("columnNameOfCorruptRecord", "error_string"). // 추가된 옵션
        schema(org.apache.spark.sql.Encoders.product[Person].schema).
        csv("file:///tmp/test.csv")
    
  • 읽은 결과
       df.show
      +----+----+------------+
      |name| age|error_string|
      +----+----+------------+
      | Kim|  10|        null|
      |null|null| Park,Female|
      |  Gu|  10|      Gu,10,|
      | Lee|  20| Lee,20,Male|
      |null|  20|        null|
      |null|null|          ,,|
      |null|null|        null|
      +----+----+------------+
    

columnNameOfCorruptRecord 옵션에 지정된 필드에 원본 레코드의 문자열이 저장되어 있다.

아래와 같이 error_string을 필터링하면 잘못된 레코드를 걸러낼 수 있다.

df.filter("error_string IS NULL").show
+----+----+------------+
|name| age|error_string|
+----+----+------------+
| Kim|  10|        null|
|null|  20|        null|
|null|null|        null|
+----+----+------------+

두 번째, 세 번째 레코드가 정상인지 비정상인지는 nullable 요구 사항에 따라 다르므로 본인 요구 사항에 맞게 직접 필터링을 하면 된다.

5-2) DROPMALFORMED

DROPMALFORMED는 잘못된 포맷의 레코드는 Spark이 알아서 없애버리는 옵션이다. DROPMALFORMED 사용 시 출력 결과가 어떻게 되는지 보자.

  • csv 읽기
      case class Person(name: String, age: Integer, error_string: String)
    
      val df = spark.
        read.
        option("mode", "DROPMALFORMED").
        option("columnNameOfCorruptRecord", "error_string").
        schema(org.apache.spark.sql.Encoders.product[Person].schema).
        csv("file:///tmp/test.csv")
    
  • 읽은 결과
      df.show
      +----+----+------------+
      |name| age|error_string|
      +----+----+------------+
      | Kim|  10|        null|
      |null|  20|        null|
      |null|null|        null|
      +----+----+------------+
    

5-3) FAILFAST

FAILFAST 지정 시, 잘못된 레코드를 만나면 Exception이 발생한다.

case class Person(name: String, age: Integer, error_string: String)

val df = spark.
  read.
  option("mode", "FAILFAST").
  option("columnNameOfCorruptRecord", "error_string").
  schema(org.apache.spark.sql.Encoders.product[Person].schema).
  csv("file:///tmp/test.csv")

df.show

19/07/14 22:32:58 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.

6. Spark 2.2에서 columnNameOfCorruptRecord의 버그

columnNameOfCorruptRecord는 이처럼 에러 처리 시에 중요한 옵션이지만, Spark 2.2까지 버그가 존재했고 Spark 2.3에서 수정되었다.

버그에 내용은 https://issues.apache.org/jira/browse/SPARK-22580 를 참고하길 바란다.

CSV 파일의 필드 개수가 적고, data type도 단순한 경우에는 columnNameOfCorruptRecord의 오작동이 Spark의 버그라고 인지하기 쉽겠지만, 복잡한 경우에는 현상을 이해하고 버그를 인지하는데 몇 시간이 걸릴 수도 있다.

일반적인 상황에서도 발생하기 쉬운 버그이기 때문에 Spark 2.2 이하에서는 columnNameOfCorruptRecord를 사용하지 않거나, Spark 2.3으로 업그레이드를 하는 것이 좋겠다.

7. 참고 - cleanframes

Data Engineering Weekly News letter issue #315에서 소개한 cleanframes를 사용하여 잘못된 데이터를 없애는 방법도 있는 것 같은데 사용해보지 않았다.

본 카테고리의 추천 글