Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-50263
Browse files Browse the repository at this point in the history
  • Loading branch information
changgyoopark-db committed Jan 14, 2025
2 parents 9aaec76 + 2d498d5 commit f270603
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ private[sql] class ProtobufSerializer(
}

private val converter: Any => Any = {
assert(
rootCatalystType.isInstanceOf[StructType],
"ProtobufSerializer's root catalyst type must be a struct type")
val baseConverter =
try {
rootCatalystType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1721,33 +1721,6 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
}
}

test("non-struct SQL type") {
val dfWithInt = spark
.range(1)
.select(
lit(9999).as("int_col")
)

val parseError = intercept[AnalysisException] {
dfWithInt.select(
to_protobuf_wrapper($"int_col", "SimpleMessageEnum", Some(testFileDesc))).collect()
}
val descMsg = testFileDesc.map("%02X".format(_)).mkString("")
checkError(
exception = parseError,
condition = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" ->
s"""\"to_protobuf(int_col, SimpleMessageEnum, X'$descMsg', NULL)\"""",
"msg" -> ("The first argument of the TO_PROTOBUF SQL function must be a struct type"),
"hint" -> ""
),
queryContext = Array(ExpectedContext(
fragment = "fn",
callSitePattern = ".*"))
)
}

test("test unsigned integer types") {
// Test that we correctly handle unsigned integer parsing.
// We're using Integer/Long's `MIN_VALUE` as it has a 1 in the sign bit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{BinaryType, MapType, NullType, StringType, StructType}
import org.apache.spark.sql.types.{BinaryType, MapType, NullType, StringType}
import org.apache.spark.sql.util.ProtobufUtils
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -238,15 +238,6 @@ case class ToProtobuf(
}

override def checkInputDataTypes(): TypeCheckResult = {
val colTypeCheck = first.dataType match {
case _: StructType => None
case _ =>
Some(
TypeCheckResult.TypeCheckFailure(
"The first argument of the TO_PROTOBUF SQL function must be a struct type")
)
}

val messageNameCheck = messageName.dataType match {
case _: StringType if messageName.foldable => None
case _ =>
Expand All @@ -271,11 +262,10 @@ case class ToProtobuf(
"strings to strings containing the options to use for converting the value to " +
"Protobuf format"))
}
colTypeCheck.getOrElse(
messageNameCheck.getOrElse(
descFilePathCheck.getOrElse(
optionsCheck.getOrElse(TypeCheckResult.TypeCheckSuccess)
)

messageNameCheck.getOrElse(
descFilePathCheck.getOrElse(
optionsCheck.getOrElse(TypeCheckResult.TypeCheckSuccess)
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
.getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
// If the progress feature is disabled, wait for the deadline.
if (progressTimeout > 0L) {
timeoutNs = Math.min(progressTimeout * NANOS_PER_MILLIS, timeoutNS)
timeoutNs = Math.min(progressTimeout * NANOS_PER_MILLIS, timeoutNs)
}
logTrace(s"Wait for response to become available with timeout=$timeoutNs ns.")
executionObserver.responseLock.wait(timeoutNs / NANOS_PER_MILLIS)
Expand Down

0 comments on commit f270603

Please sign in to comment.