그동안은 현업에서 Spark 버전을 최신보다 약간 낮은 버전을 사용 중이었으나 최근 A 프로젝트부터는 Spark 최신 버전인 3.0.1을 사용 중에 있다.
그런데 Spark 2.4에서는 잘 작동하던 foreachBatch
가 3.0.1에서는 잘 작동을 하지 않았다.
IntelliJ에서 코딩할 때는 오류 메시지가 없었지만 컴파일을 하면 아래와 같은 메시지가 출력되었다.
Error:(34, 25) overloaded method foreachBatch with alternatives:
(function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])
org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)
org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to
((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame)
askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
(에러 메시지 출처: https://stackoverflow.com/q/63137538/2930152)
분명히 Spark 3.0.1 공식 문서에도 foreachBatch
에 대해서 아래와 같이 예제 코드가 있는데 작동하지 않는 원인은 확실히 모르겠다.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// Transform and write batchDF
}.start()
암튼 이것도 구글링해보니 역시 Stackoverflow에 답변이 있었다. 아래처럼 함수를 만들어서 foreachBatch
에 전달하면 잘 돌아간다.
def myFunc(askDF: DataFrame, batchID: Long): Unit = {
askDF.write.parquet("/src/main/scala/file.json")
}
askDF
.writeStream
.foreachBatch(myFunc _)
.start()
(출처: https://stackoverflow.com/a/63176091/2930152)
본 카테고리의 추천 글
- Kafka Unit Test with EmbeddedKafka
- Spark Structured Streaming에서의 Unit Test
- spark memoryOverhead 설정에 대한 이해
- Spark 기능 확장하기
- Spark DataFrame vs Dataset (부제: typed API 사용하기)
- Spark UI 확장하기
- Custom Spark Stream Source 개발하기
- Spark에서 Kafka를 batch 방식으로 읽기
- SparkSession의 implicit에 대한 이해
- spark-submit의 –files로 upload한 파일 읽기
- Scala case class를 Spark의 StructType으로 변환하기
- Spark on Kubernetes 사용법 및 secure HDFS에 접근하기
- Spark의 Locality와 getPreferredLocations() Method
- Spark Streaming의 History