From c464b262b0fc5cec014f39c9f55b319d7e18951d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 00:06:25 -0700 Subject: [PATCH] Refactors dynamic partitioning support Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- .../{ => sql/hive}/SparkHadoopWriter.scala | 197 ++++++-------- .../hive/execution/InsertIntoHiveTable.scala | 254 ++++++------------ 2 files changed, 172 insertions(+), 279 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/{ => sql/hive}/SparkHadoopWriter.scala (50%) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala similarity index 50% rename from sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala index bbb66ae6005bd..6e07b51f49230 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala @@ -21,20 +21,23 @@ import java.io.IOException import java.text.NumberFormat import java.util.Date +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc -import org.apache.hadoop.mapred._ import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred._ +import org.apache.spark.sql.Row import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. */ -private[hive] class SparkHiveHadoopWriter( +private[hive] class SparkHiveWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc) extends Logging @@ -42,7 +45,7 @@ private[hive] class SparkHiveHadoopWriter( with Serializable { private val now = new Date() - private val conf = new SerializableWritable(jobConf) + protected val conf = new SerializableWritable(jobConf) private var jobID = 0 private var splitID = 0 @@ -51,152 +54,75 @@ private[hive] class SparkHiveHadoopWriter( private var taID: SerializableWritable[TaskAttemptID] = null @transient private var writer: FileSinkOperator.RecordWriter = null - @transient private var format: HiveOutputFormat[AnyRef, Writable] = null - @transient private var committer: OutputCommitter = null - @transient private var jobContext: JobContext = null - @transient private var taskContext: TaskAttemptContext = null + @transient private lazy val committer = conf.value.getOutputCommitter + @transient private lazy val jobContext = newJobContext(conf.value, jID.value) + @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) + @transient private lazy val outputFormat = + conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - def preSetup() { + def driverSideSetup() { setIDs(0, 0, 0) setConfParams() - - val jCtxt = getJobContext() - getOutputCommitter().setupJob(jCtxt) + committer.setupJob(jobContext) } - - def setup(jobid: Int, splitid: Int, attemptid: Int) { - setIDs(jobid, splitid, attemptid) + def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { + setIDs(jobId, splitId, attemptId) setConfParams() - } - - def open() { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val extension = Utilities.getFileExtension( - conf.value, - fileSinkConf.getCompressed, - getOutputFormat()) - - val outputName = "part-" + numfmt.format(splitID) + extension - val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName) - - getOutputCommitter().setupTask(getTaskContext()) - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - path, - null) + committer.setupTask(taskContext) } /** - * create an HiveRecordWriter. imitate the above function open() - * @param dynamicPartPath the relative path for dynamic partition - * - * since this function is used to create different writer for - * different dynamic partition.So we need a parameter dynamicPartPath - * and use it we can calculate a new path and pass the new path to - * the function HiveFileFormatUtils.getHiveRecordWriter + * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer + * for writing data to a dynamic partition. */ - def open(dynamicPartPath: String) { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val extension = Utilities.getFileExtension( - conf.value, - fileSinkConf.getCompressed, - getOutputFormat()) - - val outputName = "part-" + numfmt.format(splitID) + extension - val outputPath: Path = FileOutputFormat.getOutputPath(conf.value) - if (outputPath == null) { - throw new IOException("Undefined job output-path") - } - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) // remove "/" - val path = new Path(workPath, outputName) - getOutputCommitter().setupTask(getTaskContext()) + def open() { writer = HiveFileFormatUtils.getHiveRecordWriter( conf.value, fileSinkConf.getTableInfo, conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], fileSinkConf, - path, + FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), Reporter.NULL) } - def write(value: Writable) { - if (writer != null) { - writer.write(value) - } else { - throw new IOException("Writer is null, open() has not been called") - } + protected def getOutputName: String = { + val numberFormat = NumberFormat.getInstance() + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) + "part-" + numberFormat.format(splitID) + extension } + def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer + def close() { // Seems the boolean value passed into close does not matter. writer.close(false) } def commit() { - val taCtxt = getTaskContext() - val cmtr = getOutputCommitter() - if (cmtr.needsTaskCommit(taCtxt)) { + if (committer.needsTaskCommit(taskContext)) { try { - cmtr.commitTask(taCtxt) + committer.commitTask(taskContext) logInfo (taID + ": Committed") } catch { case e: IOException => logError("Error committing the output of task: " + taID.value, e) - cmtr.abortTask(taCtxt) + committer.abortTask(taskContext) throw e } } else { - logWarning ("No need to commit output of task: " + taID.value) + logInfo("No need to commit output of task: " + taID.value) } } def commitJob() { - // always ? Or if cmtr.needsTaskCommit ? - val cmtr = getOutputCommitter() - cmtr.commitJob(getJobContext()) + committer.commitJob(jobContext) } // ********* Private Functions ********* - private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = { - if (format == null) { - format = conf.value.getOutputFormat() - .asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - } - format - } - - private def getOutputCommitter(): OutputCommitter = { - if (committer == null) { - committer = conf.value.getOutputCommitter - } - committer - } - - private def getJobContext(): JobContext = { - if (jobContext == null) { - jobContext = newJobContext(conf.value, jID.value) - } - jobContext - } - - private def getTaskContext(): TaskAttemptContext = { - if (taskContext == null) { - taskContext = newTaskAttemptContext(conf.value, taID.value) - } - taskContext - } - private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { jobID = jobId splitID = splitId @@ -216,7 +142,7 @@ private[hive] class SparkHiveHadoopWriter( } } -private[hive] object SparkHiveHadoopWriter { +private[hive] object SparkHiveWriterContainer { def createPathFromString(path: String, conf: JobConf): Path = { if (path == null) { throw new IllegalArgumentException("Output path is null") @@ -226,6 +152,59 @@ private[hive] object SparkHiveHadoopWriter { if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath.makeQualified(fs) + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } +} + +private[spark] class SparkHiveDynamicPartitionWriterContainer( + @transient jobConf: JobConf, + fileSinkConf: FileSinkDesc, + dynamicPartColNames: Array[String], + defaultPartName: String) + extends SparkHiveWriterContainer(jobConf, fileSinkConf) { + + @transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ + + override def open(): Unit = { + writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] + } + + override def close(): Unit = { + writers.values.foreach(_.close(false)) + } + + override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { + val dynamicPartPath = dynamicPartColNames + .zip(row.takeRight(dynamicPartColNames.length)) + .map { case (col, rawVal) => + val string = String.valueOf(rawVal) + s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}" + } + .mkString + + val path = { + val outputPath = FileOutputFormat.getOutputPath(conf.value) + assert(outputPath != null, "Undefined job output-path") + val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) + new Path(workPath, getOutputName) + } + + def newWriter = { + val newFileSinkDesc = new FileSinkDesc( + fileSinkConf.getDirName + dynamicPartPath, + fileSinkConf.getTableInfo, + fileSinkConf.getCompressed) + newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) + newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) + HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + newFileSinkDesc, + path, + Reporter.NULL) + } + + writers.getOrElseUpdate(dynamicPartPath, newWriter) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 12b4c3eec6ab0..c88ae70063b4b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -18,30 +18,29 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ - -import java.util.{HashMap => JHashMap} +import scala.collection.mutable import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.Context -import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.{SerializableWritable, SparkException, TaskContext} +import org.apache.spark.SparkContext._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} -import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} -import org.apache.hadoop.hive.conf.HiveConf +import org.apache.spark.sql.hive._ +import org.apache.spark.{SerializableWritable, SparkException, TaskContext} /** * :: DeveloperApi :: @@ -102,30 +101,23 @@ case class InsertIntoHiveTable( obj } - /** - * since we should get directory of dynamic partition from upstream RDD - * reference the code "serializer.serialize(outputData, standardOI) -> dynamicPartPath" - * So The type of the elment in RDD is (Writable, String) - */ def saveAsHiveFile( - rdd: RDD[(Writable, String)], + rdd: RDD[Row], valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableWritable[JobConf], isCompressed: Boolean, - dynamicPartNum: Int) { - if (valueClass == null) { - throw new SparkException("Output value class not set") - } + writerContainer: SparkHiveWriterContainer) { + assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) - if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { - throw new SparkException("Output format class not set") - } + + assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.value.set("mapred.output.format.class", - fileSinkConf.getTableInfo.getOutputFileFormatClassName) + conf.value.set( + "mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties @@ -139,101 +131,44 @@ case class InsertIntoHiveTable( FileOutputFormat.setOutputPath( conf.value, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value)) + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - var writer: SparkHiveHadoopWriter = null - // Map restore writesr for Dynamic Partition - var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null - if (dynamicPartNum == 0) { - writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) - writer.preSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() - } else { - writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] - sc.sparkContext.runJob(rdd, writeToFile _) - for ((k,v) <- writerMap) { - v.commitJob() - } - writerMap.clear() - } - def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) { + writerContainer.driverSideSetup() + sc.sparkContext.runJob(rdd, writeToFile _) + writerContainer.commitJob() + + // Note that this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[Row]) { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt + writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber) + writerContainer.open() - if (dynamicPartNum == 0) { // for All static partition - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - // writer for Dynamic Partition - while(iter.hasNext) { - val record = iter.next() - writer.write(record._1) - } - writer.close() - writer.commit() - } else { // if there is dynamic Partition - while(iter.hasNext) { - val record = iter.next() - val location = fileSinkConf.getDirName - val partLocation = location + record._2 // different writer related with different file - def createNewWriter(): SparkHiveHadoopWriter = { - val tempWriter = new SparkHiveHadoopWriter(conf.value, - new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) - tempWriter.setup(context.stageId, context.partitionId, attemptNumber) - tempWriter.open(record._2) - writerMap += (record._2 -> tempWriter) - tempWriter - } - val writer2 = writerMap.getOrElseUpdate(record._2, createNewWriter) - writer2.write(record._1) - } - for ((k,v) <- writerMap) { - v.close() - v.commit() + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = wrap(row(i), fieldOIs(i)) + i += 1 } - } - } - } - /** - * Returns the Dynamic partition directory for the given row. - * @param partCols an array containing the string names of the partition columns - * - * we get the last dynamicPartNum elements from partCols and - * last dynamicPartNum elements from the current row, - * then we can construct a String for dynamic partition directory - * For example: - * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... - * return: /part1=val1/part2=val2 - * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... - * return: /part2=val2 - * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... - * return: /part2=val2/part3=val3 - */ - private def getDynamicPartDir(partCols: Array[String], - row: Row, - dynamicPartNum: Int, - defaultPartName: String): String = { - assert(dynamicPartNum > 0) - // TODO needs optimization - partCols - .takeRight(dynamicPartNum) - .zip(row.takeRight(dynamicPartNum)) - .map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" } - .mkString - } + val writer = writerContainer.getLocalFileWriter(row) + writer.write(serializer.serialize(outputData, standardOI)) + } - /** - * Returns `rowVal` as a String. - * If `rowVal` is null or equal to "", returns the default partition name. - */ - private def handleNull(rowVal: Any, defaultPartName: String): String = { - if (rowVal == null ||String.valueOf(rowVal).length == 0) { - defaultPartName - } else { - String.valueOf(rowVal) + writerContainer.close() + writerContainer.commit() } } @@ -247,9 +182,6 @@ case class InsertIntoHiveTable( * Note: this is run once and then kept to avoid double insertions. */ private lazy val result: RDD[Row] = { - val childRdd = child.execute() - assert(childRdd != null) - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -257,80 +189,62 @@ case class InsertIntoHiveTable( val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val numDynamicPartitions = partition.values.filter(_.isEmpty).size - val numStaticPartitions = partition.values.filter(_.isDefined).size + val numDynamicPartitions = partition.values.count(_.isEmpty) + val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { - case (key, Some(value)) => - key -> value - case (key, None) => - key -> "" + case (key, Some(value)) => key -> value + case (key, None) => key -> "" } - val jobConf = new JobConf(sc.hiveconf) - val jobConfSer = new SerializableWritable(jobConf) - // check if the partition spec is valid + // All partition column names in the format of "//..." + val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull + + // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { + // Report error if dynamic partitioning is not enabled if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } + + // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } - // check if static partition appear after dynamic partitions - var tmpNumStaticPartitions = numStaticPartitions - for ((k,v) <- partitionSpec) { - if (partitionSpec(k) == "") { - if (tmpNumStaticPartitions > 0) { // found a DP, but there exists ST as subpartition - throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()) - } - } else { - tmpNumStaticPartitions -= 1 - } + + // Report error if any static partition appears after a dynamic partition + val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) + isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ => + throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } - val rdd = childRdd.mapPartitions { iter => - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] + val jobConf = new JobConf(sc.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val outputData = new Array[Any](fieldOIs.length) - val defaultPartName = jobConfSer.value.get( - HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, - HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultVal) - - val partitionColumns = fileSinkConf.getTableInfo. - getProperties.getProperty("partition_columns") // a String like "colname1/colname2" - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull - - iter.map { row => - var dynamicPartPath: String = null - if (numDynamicPartitions > 0) { - dynamicPartPath = getDynamicPartDir(partitionColumnNames, row, - numDynamicPartitions, defaultPartName) - } - var i = 0 - while (i < fieldOIs.length) { - // Casts Strings to HiveVarchars when necessary. - outputData(i) = wrap(row(i), fieldOIs(i)) - i += 1 - } - // pass the dynamicPartPath to downStream RDD - serializer.serialize(outputData, standardOI) -> dynamicPartPath - } + val defaultPartName = jobConf.get( + ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) + val writerContainer = if (numDynamicPartitions > 0) { + new SparkHiveDynamicPartitionWriterContainer( + jobConf, + fileSinkConf, + partitionColumnNames.takeRight(numDynamicPartitions), + defaultPartName) + } else { + new SparkHiveWriterContainer(jobConf, fileSinkConf) } + + val isCompressed = jobConf.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + saveAsHiveFile( - rdd, + child.execute(), outputClass, fileSinkConf, jobConfSer, - sc.hiveconf.getBoolean("hive.exec.compress.output", false), - numDynamicPartitions) + isCompressed, + writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename.