From 34efdb0772f33551f28eb46971d96b4337856163 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 9 Aug 2022 17:36:07 -0700 Subject: [PATCH 1/2] Added `SQLConfInjectingRDD`; Fixed `HoodieSparkUtils.createRDD` to make sure `SQLConf` is properly propagated to the executor (required by `AvroSerializer`) --- .../org/apache/hudi/HoodieSparkUtils.scala | 15 ++++++- .../sql/execution/SQLConfInjectingRDD.scala | 43 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 980030cc7c00..2b496cfb551e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -26,9 +26,13 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.SQLConfInjectingRDD +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.JavaConverters._ +import scala.reflect.ClassTag private[hudi] trait SparkVersionsSupport { def getSparkVersion: String @@ -89,8 +93,12 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { // serializer is not able to digest it val readerAvroSchemaStr = readerAvroSchema.toString val writerAvroSchemaStr = writerAvroSchema.toString + // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to [[Row]] conversion - df.queryExecution.toRdd.mapPartitions { rows => + // Additionally, we have to explicitly wrap around resulting [[RDD]] into the one + // injecting [[SQLConf]], which by default isn't propgated by Spark to the executor(s). + // [[SQLConf]] is required by [[AvroSerializer]] + injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows => if (rows.isEmpty) { Iterator.empty } else { @@ -108,10 +116,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { rows.map { ir => transform(convert(ir)) } } - } + }, SQLConf.get) } def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = { sparkAdapter.createSparkRowSerDe(structType) } + + private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] = + new SQLConfInjectingRDD(rdd, conf) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala new file mode 100644 index 000000000000..477e96ed100b --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala @@ -0,0 +1,43 @@ +package org.apache.spark.sql.execution + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf + +import scala.reflect.ClassTag + +/** + * NOTE: This is a generalized version of of Spark's [[SQLExecutionRDD]] + * + * It is just a wrapper over [[sqlRDD]] which sets and makes effective all the configs from the + * captured [[SQLConf]] + * + * @param sqlRDD the `RDD` generated by the SQL plan + * @param conf the `SQLConf` to apply to the execution of the SQL plan + */ +class SQLConfInjectingRDD[T: ClassTag](var sqlRDD: RDD[T], @transient conf: SQLConf) extends RDD[T](sqlRDD) { + private val sqlConfigs = conf.getAllConfs + private lazy val sqlConfExecutorSide = { + val newConf = new SQLConf() + sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) } + newConf + } + + override val partitioner = firstParent[InternalRow].partitioner + + override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + // If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set + // and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of + // this RDD. + if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) { + SQLConf.withExistingConf(sqlConfExecutorSide) { + firstParent[T].iterator(split, context) + } + } else { + firstParent[T].iterator(split, context) + } + } +} \ No newline at end of file From 285ed43d8ca7b525c3bd5334a0acd19ec2c7757c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 9 Aug 2022 17:36:30 -0700 Subject: [PATCH 2/2] Fixing compilation --- .../sql/execution/SQLConfInjectingRDD.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala index 477e96ed100b..1a44fd1af1e5 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution import org.apache.spark.{Partition, TaskContext}