Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly override Shims for int96Rebase [databricks] #3643

Merged
merged 2 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,6 @@ class Spark311Shims extends SparkBaseShims {
classOf[RapidsShuffleManager].getCanonicalName
}

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}

override def hasCastFloatTimestampUpcast: Boolean = false

override def getParquetFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,6 @@ class Spark311CDHShims extends SparkBaseShims {
sessionCatalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
}

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}

override def hasCastFloatTimestampUpcast: Boolean = false

override def getParquetFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,4 @@ class Spark311dbShims extends SparkBaseShims {
new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith,
pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode)

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ class Spark312Shims extends SparkBaseShims {

override def hasCastFloatTimestampUpcast: Boolean = true

override def int96ParquetRebaseRead(conf: SQLConf): String = {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}

override def getParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,4 @@ class Spark313Shims extends SparkBaseShims {

override def hasCastFloatTimestampUpcast: Boolean = true

override def int96ParquetRebaseRead(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
}

override def int96ParquetRebaseWrite(conf: SQLConf): String = {
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
}

override def int96ParquetRebaseReadKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
}

override def int96ParquetRebaseWriteKey: String = {
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ trait Spark30XShims extends SparkShims {
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)
override def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseRead(conf: SQLConf): String =
parquetRebaseRead(conf)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
parquetRebaseWrite(conf)
override def int96ParquetRebaseReadKey: String =
parquetRebaseReadKey
override def int96ParquetRebaseWriteKey: String =
parquetRebaseWriteKey
override def hasSeparateINT96RebaseConf: Boolean = false

override def sessionFromPlan(plan: SparkPlan): SparkSession = {
plan.sqlContext.sparkSession
Expand Down Expand Up @@ -113,8 +122,6 @@ trait Spark30XShims extends SparkShims {

override def shouldFailDivOverflow(): Boolean = false

override def hasSeparateINT96RebaseConf: Boolean = false

override def leafNodeDefaultParallelism(ss: SparkSession): Int = {
ss.sparkContext.defaultParallelism
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ trait Spark30XShims extends SparkShims {
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)
override def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseRead(conf: SQLConf): String =
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
parquetRebaseRead(conf)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
parquetRebaseWrite(conf)
override def int96ParquetRebaseReadKey: String =
parquetRebaseReadKey
override def int96ParquetRebaseWriteKey: String =
parquetRebaseWriteKey
override def hasSeparateINT96RebaseConf: Boolean = false

override def sessionFromPlan(plan: SparkPlan): SparkSession = {
plan.sqlContext.sparkSession
Expand Down Expand Up @@ -126,7 +135,5 @@ trait Spark30XShims extends SparkShims {
ss.sparkContext.defaultParallelism
}

override def hasSeparateINT96RebaseConf: Boolean = false

override def shouldFallbackOnAnsiTimestamp(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ trait Spark30XShims extends SparkShims {
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)
override def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseRead(conf: SQLConf): String =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are still in shims/spark311db/src/main/scala/com/nvidia/spark/rapids/shims/spark311db/Spark311dbShims.scala

conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseReadKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
override def int96ParquetRebaseWriteKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
override def hasSeparateINT96RebaseConf: Boolean = true

override def sessionFromPlan(plan: SparkPlan): SparkSession = {
plan.sqlContext.sparkSession
Expand Down Expand Up @@ -111,8 +120,6 @@ trait Spark30XShims extends SparkShims {

override def skipAssertIsOnTheGpu(plan: SparkPlan): Boolean = false

override def hasSeparateINT96RebaseConf: Boolean = true

override def shouldFailDivOverflow(): Boolean = false

override def leafNodeDefaultParallelism(ss: SparkSession): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@

package com.nvidia.spark.rapids.shims.v2

import org.apache.spark.sql.internal.SQLConf

trait Spark31XShims extends Spark30XShims {
override def hasSeparateINT96RebaseConf: Boolean = true

override def int96ParquetRebaseRead(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseReadKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
override def int96ParquetRebaseWriteKey: String =
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ trait Spark32XShims extends SparkShims {
conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ)
override final def parquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)

override def int96ParquetRebaseRead(conf: SQLConf): String =
conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ)
override def int96ParquetRebaseWrite(conf: SQLConf): String =
conf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE)
override def int96ParquetRebaseReadKey: String =
SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key
override def int96ParquetRebaseWriteKey: String =
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key
override def hasSeparateINT96RebaseConf: Boolean = true

override final def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[AQEShuffleReadExec](
Expand Down
20 changes: 4 additions & 16 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,10 @@ trait SparkShims {
def parquetRebaseWrite(conf: SQLConf): String
def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand
def hasSeparateINT96RebaseConf: Boolean

def int96ParquetRebaseRead(conf: SQLConf): String = {
parquetRebaseRead(conf)
}

def int96ParquetRebaseWrite(conf: SQLConf): String = {
parquetRebaseWrite(conf)
}

def int96ParquetRebaseReadKey: String = {
parquetRebaseReadKey
}

def int96ParquetRebaseWriteKey: String = {
parquetRebaseWriteKey
}
def int96ParquetRebaseRead(conf: SQLConf): String
def int96ParquetRebaseWrite(conf: SQLConf): String
def int96ParquetRebaseReadKey: String
def int96ParquetRebaseWriteKey: String

def getParquetFilters(
schema: MessageType,
Expand Down