Spark Data Source API의 PrunedFilterScan
을 상속받아 구현하는 경우 Spark SQL에서 WHERE
절에 입력한 조건을 전달받을 수 있기 때문에, Data Source에서 Spark으로 전달하는 Data량을 줄일 수 있다. 이를 통해 질의 처리 속도도 빨라진다.
하지만 한 가지 아쉬운 점이 있는데, 이렇게 Filter 조건을 만족하는 Record들만 Spark으로 전달된다 하더라도, Spark에서 다시 한번 Filter 조건에 맞는지 검사를 한다는 점이다. 이 때문에 질의처리 시간에 손해를 볼 수 밖에 없다.
다음과 같은 SQL을 실행하다고 생각하자.
SELECT a, b
FROM logs
WHERE c = 'cc'
이때의 EXPLAIN 결과를 보면 다음과 같다.
Scan ParquetRelation[a#4,b#3,c#10], PushedFilters: [EqualTo(c,cc)]
EqualTo(c,cc)
를 보면 알 수 있듯이 Filter는 잘 Push되었다. 그런데, Scan 부분을 보면 a, b, c
처럼 SELECT
에 없는 c
가 포함된 것을 볼 수 있다. 즉, 사용자 의도와 상관없이 Data Source에서 읽어서 Spark에 전달된다는 것을 볼 수 있다. Spark 내부적으로 다시 한번 정말로 c
필드의 값이 cc
였는지 검사를 하고, SQL의 output에서는 c
를 제거하고 a, b
만 출력하게 된다.
정말로 그러한지 테스트를 해보자. 아래 코드에서 Relation을 만들고, DataFrame을 생성하는 것은 이곳의 code를 참고했다.
설명 및 SQL 수행 결과를 주석으로 적어두었으니, 자세한 설명은 생략한다
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StructType}
case class MyRelation(start: Int,
end: Int)(
@transient val spark: SparkSession)
extends BaseRelation with PrunedFilteredScan {
import spark.implicits._
def sqlContext: org.apache.spark.sql.SQLContext = null
def schema = StructType('age.int :: Nil)
/**
* filter를 전달받지만, 사용하진 않는다
* 무조건 start ~ to 까지 범위의 age를 갖는 record를 생성한다
*/
def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = {
spark.sparkContext.parallelize(start to end).map(Row(_))
}
}
object DsAPITest {
def test(spark: SparkSession): Unit = {
/**
* 100개의 Record를 생성한다
* age의 값은 1부터 100까지이다
*/
spark.baseRelationToDataFrame(MyRelation(1, 100)(spark)).createOrReplaceTempView("tab")
spark.sql("SELECT age FROM tab").show(false)
/**
* SQL 수행 결과 (어떤 값이 return되는지 확인)
+---+
|age|
+---+
|1 |
|2 |
|3 |
|4 |
|5 |
|6 |
|7 |
|8 |
...
|100|
+---+
*/
spark.sql("EXPLAIN SELECT COUNT(1) FROM tab WHERE age = 10").show(false)
/**
* EXPLAIN 결과 - COUNT(1)만 했을 뿐인데, age field를 Scan 요청하고 있다.
+--------------------------------------------------------------------------------------------------------------------
|plan
+--------------------------------------------------------------------------------------------------------------------
|== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter (isnotnull(age#0) && (age#0 = 10))
+- *Scan MyRelation(1,100) [age#0] PushedFilters: [IsNotNull(age), EqualTo(age,10)], ReadSchema: struct<>|
+---------------------------------------------------------------------------------------------------------------------
*/
spark.sql("SELECT COUNT(1) FROM tab WHERE age = 10").show(false)
/**
* SQL 수행 결과 - MyRelation에서는 age와 상관없이 항상 100개를 return했는데,
* SQL의 결과는 age=1 조건을 만족하는 것만 출력되었다.
+--------+
|count(1)|
+--------+
|1 |
+--------+
*/
}
}
본 카테고리의 추천 글
- Kafka Unit Test with EmbeddedKafka
- Spark Structured Streaming에서의 Unit Test
- spark memoryOverhead 설정에 대한 이해
- Spark 기능 확장하기
- Spark DataFrame vs Dataset (부제: typed API 사용하기)
- Spark UI 확장하기
- Custom Spark Stream Source 개발하기
- Spark에서 Kafka를 batch 방식으로 읽기
- SparkSession의 implicit에 대한 이해
- spark-submit의 –files로 upload한 파일 읽기
- Scala case class를 Spark의 StructType으로 변환하기
- Spark on Kubernetes 사용법 및 secure HDFS에 접근하기
- Spark의 Locality와 getPreferredLocations() Method
- Spark Streaming의 History