Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25102][SQL][2.4] Write Spark version to ORC/Parquet file metadata #28142

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache

import java.util.Properties

import org.apache.spark.util.VersionUtils

/**
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
Expand Down Expand Up @@ -89,6 +91,7 @@ package object spark {
}

val SPARK_VERSION = SparkBuildInfo.spark_version
val SPARK_VERSION_SHORT = VersionUtils.shortVersion(SparkBuildInfo.spark_version)
val SPARK_BRANCH = SparkBuildInfo.spark_branch
val SPARK_REVISION = SparkBuildInfo.spark_revision
val SPARK_BUILD_USER = SparkBuildInfo.spark_build_user
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/VersionUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package org.apache.spark.util
private[spark] object VersionUtils {

private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r

/**
* Given a Spark version string, return the major version number.
Expand All @@ -36,6 +37,19 @@ private[spark] object VersionUtils {
*/
def minorVersion(sparkVersion: String): Int = majorMinorVersion(sparkVersion)._2

/**
* Given a Spark version string, return the short version string.
* E.g., for 3.0.0-SNAPSHOT, return '3.0.0'.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change this example in order to minimize the diff between branch-2.4 and master.

*/
def shortVersion(sparkVersion: String): String = {
shortVersionRegex.findFirstMatchIn(sparkVersion) match {
case Some(m) => m.group(1)
case None =>
throw new IllegalArgumentException(s"Spark tried to parse '$sparkVersion' as a Spark" +
s" version string, but it could not find the major/minor/maintenance version numbers.")
}
}

/**
* Given a Spark version string, return the (major version number, minor version number).
* E.g., for 2.0.1-SNAPSHOT, return (2, 0).
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,29 @@ class VersionUtilsSuite extends SparkFunSuite {
}
}
}

test("Return short version number") {
assert(shortVersion("3.0.0") === "3.0.0")
assert(shortVersion("3.0.0-SNAPSHOT") === "3.0.0")
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change the version 3.0.x in order to minimize the diff between master and branch-2.4.

withClue("shortVersion parsing should fail for missing maintenance version number") {
intercept[IllegalArgumentException] {
shortVersion("3.0")
}
}
withClue("shortVersion parsing should fail for invalid major version number") {
intercept[IllegalArgumentException] {
shortVersion("x.0.0")
}
}
withClue("shortVersion parsing should fail for invalid minor version number") {
intercept[IllegalArgumentException] {
shortVersion("3.x.0")
}
}
withClue("shortVersion parsing should fail for invalid maintenance version number") {
intercept[IllegalArgumentException] {
shortVersion("3.0.x")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.datasources.orc
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcOutputFormat
import org.apache.orc.OrcFile
import org.apache.orc.mapred.{OrcOutputFormat => OrcMapRedOutputFormat, OrcStruct}
import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
Expand All @@ -36,11 +37,17 @@ private[orc] class OrcOutputWriter(
private[this] val serializer = new OrcSerializer(dataSchema)

private val recordWriter = {
new OrcOutputFormat[OrcStruct]() {
val orcOutputFormat = new OrcOutputFormat[OrcStruct]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
new Path(path)
}
}.getRecordWriter(context)
}
val filename = orcOutputFormat.getDefaultWorkFile(context, ".orc")
val options = OrcMapRedOutputFormat.buildOptions(context.getConfiguration)
val writer = OrcFile.createWriter(filename, options)
val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer)
OrcUtils.addSparkVersionMetadata(writer)
recordWriter
}

override def write(row: InternalRow): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

package org.apache.spark.sql.execution.datasources.orc

import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.orc.{OrcFile, Reader, TypeDescription}
import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}

import org.apache.spark.SparkException
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -144,4 +145,11 @@ object OrcUtils extends Logging {
}
}
}

/**
* Add a metadata specifying Spark version.
*/
def addSparkVersionMetadata(writer: Writer): Unit = {
writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.io.api.{Binary, RecordConsumer}

import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -93,7 +95,10 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]

val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
val metadata = Map(
SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
).asJava

logInfo(
s"""Initialized Parquet WriteSupport with Catalyst schema:
Expand Down
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,13 @@ package object sql {
type Strategy = SparkStrategy

type DataFrame = Dataset[Row]

/**
* Metadata key which is used to write Spark version in the followings:
* - Parquet file metadata
* - ORC file metadata
*
* Note that Hive table property `spark.sql.create.version` also has Spark version.
*/
private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
Created Time [not included in comparison]
Last Access [not included in comparison]
Partition Statistics 1121 bytes, 3 rows
Partition Statistics 1189 bytes, 3 rows

# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
Expand Down Expand Up @@ -128,7 +128,7 @@ Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
Created Time [not included in comparison]
Last Access [not included in comparison]
Partition Statistics 1121 bytes, 3 rows
Partition Statistics 1189 bytes, 3 rows

# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
Expand All @@ -155,7 +155,7 @@ Partition Values [ds=2017-08-01, hr=11]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
Created Time [not included in comparison]
Last Access [not included in comparison]
Partition Statistics 1098 bytes, 4 rows
Partition Statistics 1166 bytes, 4 rows

# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
Expand Down Expand Up @@ -190,7 +190,7 @@ Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
Created Time [not included in comparison]
Last Access [not included in comparison]
Partition Statistics 1121 bytes, 3 rows
Partition Statistics 1189 bytes, 3 rows

# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
Expand All @@ -217,7 +217,7 @@ Partition Values [ds=2017-08-01, hr=11]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
Created Time [not included in comparison]
Last Access [not included in comparison]
Partition Statistics 1098 bytes, 4 rows
Partition Statistics 1166 bytes, 4 rows

# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
Expand All @@ -244,7 +244,7 @@ Partition Values [ds=2017-09-01, hr=5]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5
Created Time [not included in comparison]
Last Access [not included in comparison]
Partition Statistics 1144 bytes, 2 rows
Partition Statistics 1212 bytes, 2 rows

# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
case plan: InMemoryRelation => plan
}.head
// InMemoryRelation's stats is file size before the underlying RDD is materialized
assert(inMemoryRelation.computeStats().sizeInBytes === 800)
assert(inMemoryRelation.computeStats().sizeInBytes === 868)

// InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect()
Expand All @@ -522,7 +522,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {

// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
// is calculated
assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
assert(inMemoryRelation2.computeStats().sizeInBytes === 868)

// InMemoryRelation's stats should be updated after calculating stats of the table
// clear cache to simulate a fresh environment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
import testImplicits._
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
"spark.sql.autoBroadcastJoinThreshold" -> "434") {
withTempPath { workDir =>
// the file size is 740 bytes
val workDirPath = workDir.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.orc

import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.util.Locale

Expand All @@ -29,7 +30,8 @@ import org.apache.orc.OrcProto.Stream.Kind
import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.Row
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -243,6 +245,22 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts))
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that the following test case is executed twice; OrcSourceSuite and HiveOrcSourceSuite.

test("Write Spark version into ORC file metadata") {
withTempPath { path =>
spark.range(1).repartition(1).write.orc(path.getCanonicalPath)

val partFiles = path.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(partFiles.length === 1)

val orcFilePath = new Path(partFiles.head.getAbsolutePath)
val readerOptions = OrcFile.readerOptions(new Configuration())
val reader = OrcFile.createReader(orcFilePath, readerOptions)
val version = UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
assert(version === SPARK_VERSION_SHORT)
}
}
}

class OrcSourceSuite extends OrcSuite with SharedSQLContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.HadoopReadOptions
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.SparkException
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -799,6 +801,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, null, "foo"))))
}
}

test("Write Spark version into Parquet metadata") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(1).repartition(1).write.parquet(path)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)

val conf = new Configuration()
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
val parquetReadOptions = HadoopReadOptions.builder(conf).build()
val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
val metaData = m.getFileMetaData.getKeyValueMetaData
m.close()

assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down
Loading