Skip to content

Commit

Permalink
Properly override Shims for int96Rebase [databricks] (#3643)
Browse files Browse the repository at this point in the history
* Proper overriding of Shims for int96

Signed-off-by: Raza Jafri <[email protected]>

* removed the duplicate int96 rebase conf

Signed-off-by: Raza Jafri <[email protected]>

Co-authored-by: Raza Jafri <[email protected]>
  • Loading branch information
razajafri and razajafri authored Sep 24, 2021
1 parent ba16a19 commit 9cbb0b3
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,6 @@ class Spark311Shims extends SparkBaseShims {
classOf[RapidsShuffleManager].getCanonicalName
}

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}

override def hasCastFloatTimestampUpcast: Boolean = false

override def getParquetFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,6 @@ class Spark311CDHShims extends SparkBaseShims {
sessionCatalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
}

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}

override def hasCastFloatTimestampUpcast: Boolean = false

override def getParquetFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,4 @@ class Spark311dbShims extends SparkBaseShims {
new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith,
pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode)

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ class Spark312Shims extends SparkBaseShims {

override def hasCastFloatTimestampUpcast: Boolean = true

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}

override def getParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,4 @@ class Spark313Shims extends SparkBaseShims {

override def hasCastFloatTimestampUpcast: Boolean = true

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ trait Spark30XShims extends SparkShims {
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)
override def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseRead(conf: SQLConf): String =
parquetRebaseRead(conf)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
parquetRebaseWrite(conf)
override def int96ParquetRebaseReadKey: String =
parquetRebaseReadKey
override def int96ParquetRebaseWriteKey: String =
parquetRebaseWriteKey
override def hasSeparateINT96RebaseConf: Boolean = false

override def sessionFromPlan(plan: SparkPlan): SparkSession = {
plan.sqlContext.sparkSession
Expand Down Expand Up @@ -113,8 +122,6 @@ trait Spark30XShims extends SparkShims {

override def shouldFailDivOverflow(): Boolean = false

override def hasSeparateINT96RebaseConf: Boolean = false

override def leafNodeDefaultParallelism(ss: SparkSession): Int = {
ss.sparkContext.defaultParallelism
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ trait Spark30XShims extends SparkShims {
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)
override def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseRead(conf: SQLConf): String =
parquetRebaseRead(conf)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
parquetRebaseWrite(conf)
override def int96ParquetRebaseReadKey: String =
parquetRebaseReadKey
override def int96ParquetRebaseWriteKey: String =
parquetRebaseWriteKey
override def hasSeparateINT96RebaseConf: Boolean = false

override def sessionFromPlan(plan: SparkPlan): SparkSession = {
plan.sqlContext.sparkSession
Expand Down Expand Up @@ -126,7 +135,5 @@ trait Spark30XShims extends SparkShims {
ss.sparkContext.defaultParallelism
}

override def hasSeparateINT96RebaseConf: Boolean = false

override def shouldFallbackOnAnsiTimestamp(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ trait Spark30XShims extends SparkShims {
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)
override def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseRead(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseReadKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
override def int96ParquetRebaseWriteKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
override def hasSeparateINT96RebaseConf: Boolean = true

override def sessionFromPlan(plan: SparkPlan): SparkSession = {
plan.sqlContext.sparkSession
Expand Down Expand Up @@ -111,8 +120,6 @@ trait Spark30XShims extends SparkShims {

override def skipAssertIsOnTheGpu(plan: SparkPlan): Boolean = false

override def hasSeparateINT96RebaseConf: Boolean = true

override def shouldFailDivOverflow(): Boolean = false

override def leafNodeDefaultParallelism(ss: SparkSession): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@

package com.nvidia.spark.rapids.shims.v2

import org.apache.spark.sql.internal.SQLConf

trait Spark31XShims extends Spark30XShims {
override def hasSeparateINT96RebaseConf: Boolean = true

override def int96ParquetRebaseRead(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseReadKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
override def int96ParquetRebaseWriteKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ trait Spark32XShims extends SparkShims {
conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ)
override final def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)

override def int96ParquetRebaseRead(conf: SQLConf): String =
conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseReadKey: String =
SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key
override def int96ParquetRebaseWriteKey: String =
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key
override def hasSeparateINT96RebaseConf: Boolean = true

override final def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[AQEShuffleReadExec](
Expand Down
20 changes: 4 additions & 16 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,10 @@ trait SparkShims {
def parquetRebaseWrite(conf: SQLConf): String
def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand
def hasSeparateINT96RebaseConf: Boolean

def int96ParquetRebaseRead(conf: SQLConf): String = {
parquetRebaseRead(conf)
}

def int96ParquetRebaseWrite(conf: SQLConf): String = {
parquetRebaseWrite(conf)
}

def int96ParquetRebaseReadKey: String = {
parquetRebaseReadKey
}

def int96ParquetRebaseWriteKey: String = {
parquetRebaseWriteKey
}
def int96ParquetRebaseRead(conf: SQLConf): String
def int96ParquetRebaseWrite(conf: SQLConf): String
def int96ParquetRebaseReadKey: String
def int96ParquetRebaseWriteKey: String

def getParquetFilters(
schema: MessageType,
Expand Down

0 comments on commit 9cbb0b3

Please sign in to comment.