Skip to content

Commit

Permalink
Cherry picking from Yelp changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sungjuly committed Jan 31, 2019
1 parent 184b442 commit 3b6a318
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 63 deletions.
18 changes: 0 additions & 18 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@ matrix:
- jdk: openjdk7
scala: 2.11.7
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.7.4"
env:
global:
# AWS_REDSHIFT_JDBC_URL
- secure: "RNkxdKcaKEYuJqxli8naazp42qO5/pgueIzs+J5rHwl39jcBvJMgW3DX8kT7duzdoBb/qrolj/ttbQ3l/30P45+djn0BEwcJMX7G/FGpZYD23yd03qeq7sOKPQl2Ni/OBttYHJMah5rI6aPmAysBZMQO7Wijdenb/RUiU2YcZp0="
# AWS_REDSHIFT_PASSWORD
- secure: "g5li3gLejD+/2BIqIm+qHiqBUvCc5l0qnftVaVlLtL7SffErp/twDiFP4gW8eqnFqi2GEC1c9Shf7Z9cOIUunNSBQZdYIVG0f38UfBeDP14nOoIuwZ974O5yggbgZhX0cKvJzINcENGoRNk0FzRwgOdCCiF05IMnRqQxI3C24fE="
# AWS_REDSHIFT_USER
- secure: "LIkY/ZpBXK3vSFsdpBSRXEsgfD2wDF52X8OZOlyBJOiZpS4y1/obj8b3VQABDPyPH95bGX/LOpM0vVM137rYgF0pskgVEzLMyZOPpwYqNGPf/d4BtQhBRc8f7+jmr6D4Hrox4jCl0cCKaeiTazun2+Y9E+zgCUDvQ8y9qGctR2k="
# TEST_AWS_ACCESS_KEY_ID
- secure: "bsB6YwkscUxtzcZOKja4Y69IR3JqvCP3W/4vFftW/v33/hOC3EBz7TVNKS+ZIomBUQYJnzsMfM59bj7YEc3KZe8WxIcUdLI40hg0X5O1RhJDNPW+0oGbWshmzyua+hY1y7nRja+8/17tYTbAi1+MhscRu+O/2aWaXolA9BicuX0="
# TEST_AWS_SECRET_ACCESS_KEY
- secure: "cGxnZh4be9XiPBOMxe9wHYwEfrWNw4zSjmvGFEC9UUV11ydHLo5wrXtcTVFmY7qxUxYeb0NB2N+CQXE0GcyUKoTviKG9sOS3cxR1q30FsdOVcWDKAzpBUmzDTMwDLAUMysziyOtMorDlNVydqYdYLMpiUN0O+eDKA+iOHlJp7fo="
# STS_ROLE_ARN
- secure: "cuyemI1bqPkWBD5B1FqIKDJb5g/SX5x8lrzkO0J/jkyGY0VLbHxrl5j/9PrKFuvraBK3HC56HEP1Zg+IMvh+uv0D+p5y14C97fAzE33uNgR2aVkamOo92zHvxvXe7zBtqc8rztWsJb1pgkrY7SdgSXgQc88ohey+XecDh4TahTY="
# AWS_S3_SCRATCH_SPACE
- secure: "LvndQIW6dHs6nyaMHtblGI/oL+s460lOezFs2BoD0Isenb/O/IM+nY5K9HepTXjJIcq8qvUYnojZX1FCrxxOXX2/+/Iihiq7GzJYdmdMC6hLg9bJYeAFk0dWYT88/AwadrJCBOa3ockRLhiO3dkai7Ki5+M1erfaFiAHHMpJxYQ="
# AWS_S3_CROSS_REGION_SCRATCH_SPACE
- secure: "esYmBqt256Dc77HT68zoaE/vtsFGk2N+Kt+52RlR0cjHPY1q5801vxLbeOlpYb2On3x8YckE++HadjL40gwSBsca0ffoogq6zTlfbJYDSQkQG1evxXWJZLcafB0igfBs/UbEUo7EaxoAJQcLgiWWwUdO0a0iU1ciSVyogZPagL0="

script:
- ./dev/run-tests-travis.sh
Expand Down
2 changes: 1 addition & 1 deletion project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object SparkRedshiftBuild extends Build {
// A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work.
// For testing, we use an Amazon driver, which is available from
// http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html
"com.amazon.redshift" % "jdbc4" % "1.1.7.1007" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.1.7.1007.jar",
"com.amazon.redshift" % "jdbc41" % "1.2.12.1017" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.12.1017/RedshiftJDBC41-1.2.12.1017.jar",
// Although support for the postgres driver is lower priority than support for Amazon's
// official Redshift driver, we still run basic tests with it.
"postgresql" % "postgresql" % "8.3-606.jdbc4" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class AWSCredentialsInUriIntegrationSuite extends IntegrationSuiteBase {
// Override this method so that we do not set the credentials in sc.hadoopConf.
override def beforeAll(): Unit = {
assert(tempDir.contains("AKIA"), "tempdir did not contain AWS credentials")
assert(!AWS_SECRET_ACCESS_KEY.contains("/"), "AWS secret key should not contain slash")
sc = new SparkContext("local", getClass.getSimpleName)
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,19 @@ trait IntegrationSuiteBase
protected val AWS_REDSHIFT_JDBC_URL: String = loadConfigFromEnv("AWS_REDSHIFT_JDBC_URL")
protected val AWS_REDSHIFT_USER: String = loadConfigFromEnv("AWS_REDSHIFT_USER")
protected val AWS_REDSHIFT_PASSWORD: String = loadConfigFromEnv("AWS_REDSHIFT_PASSWORD")
protected val AWS_ACCESS_KEY_ID: String = loadConfigFromEnv("TEST_AWS_ACCESS_KEY_ID")
protected val AWS_SECRET_ACCESS_KEY: String = loadConfigFromEnv("TEST_AWS_SECRET_ACCESS_KEY")
protected val AWS_ACCESS_KEY_ID: String = loadConfigFromEnv("AWS_ACCESS_KEY_ID")
protected val AWS_SECRET_ACCESS_KEY: String = loadConfigFromEnv("AWS_SECRET_ACCESS_KEY")
// Path to a directory in S3 (e.g. 's3n://bucket-name/path/to/scratch/space').
protected val AWS_S3_SCRATCH_SPACE: String = loadConfigFromEnv("AWS_S3_SCRATCH_SPACE")
require(AWS_S3_SCRATCH_SPACE.contains("s3n"), "must use s3n:// URL")

protected def jdbcUrl: String = {
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD"
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD&ssl=true"
}

protected def jdbcUrlNoUserPassword: String = {
s"$AWS_REDSHIFT_JDBC_URL?ssl=true"
}
/**
* Random suffix appended appended to table and directory names in order to avoid collisions
* between separate Travis builds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase {
val tableName = s"roundtrip_save_and_load_$randomSuffix"
try {
write(df)
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("url", jdbcUrlNoUserPassword)
.option("user", AWS_REDSHIFT_USER)
.option("password", AWS_REDSHIFT_PASSWORD)
.option("dbtable", tableName)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = read
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("url", jdbcUrlNoUserPassword)
.option("user", AWS_REDSHIFT_USER)
.option("password", AWS_REDSHIFT_PASSWORD)
.option("dbtable", tableName)
Expand Down
18 changes: 16 additions & 2 deletions src/it/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,30 @@ class RedshiftReadSuite extends IntegrationSuiteBase {
s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
// Due to #98, we use Double here instead of float:
checkAnswer(
read.option("dbtable", tableName).load(),
Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x)))
Seq(Float.NaN, Float.PositiveInfinity, Float.NegativeInfinity).map(x => Row.apply(x)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("test empty string and null") {
withTempRedshiftTable("records_with_empty_and_null_characters") { tableName =>
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x varchar(256))")
conn.createStatement().executeUpdate(
s"INSERT INTO $tableName VALUES ('null'), (''), (null)")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
checkAnswer(
read.option("dbtable", tableName).load(),
Seq("null", "", null).map(x => Row.apply(x)))
}
}


test("read special double values (regression test for #261)") {
val tableName = s"roundtrip_special_double_values_$randomSuffix"
try {
Expand Down
13 changes: 11 additions & 2 deletions src/main/scala/com/databricks/spark/redshift/Conversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[redshift] object Conversions {
*
* Note that instances of this function are NOT thread-safe.
*/
def createRowConverter(schema: StructType): Array[String] => InternalRow = {
def createRowConverter(schema: StructType, nullString: String): Array[String] => InternalRow = {
val dateFormat = createRedshiftDateFormat()
val decimalFormat = createRedshiftDecimalFormat()
val conversionFunctions: Array[String => Any] = schema.fields.map { field =>
Expand Down Expand Up @@ -116,7 +116,16 @@ private[redshift] object Conversions {
var i = 0
while (i < schema.length) {
val data = inputRow(i)
converted(i) = if (data == null || data.isEmpty) null else conversionFunctions(i)(data)
converted(i) = if ((data == null || data == nullString) ||
(data.isEmpty && schema.fields(i).dataType != StringType)) {
null
}
else if (data.isEmpty) {
""
}
else {
conversionFunctions(i)(data)
}
i += 1
}
encoder.toRow(externalRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}

/**
* Internal data source used for reading Redshift UNLOAD files.
Expand Down Expand Up @@ -95,8 +95,11 @@ private[redshift] class RedshiftFileFormat extends FileFormat {
// be closed once it is completely iterated, but this is necessary to guard against
// resource leaks in case the task fails or is interrupted.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
val converter = Conversions.createRowConverter(requiredSchema)
val converter = Conversions.createRowConverter(requiredSchema,
options.getOrElse("nullString", Parameters.DEFAULT_PARAMETERS("csvnullstring")))
iter.map(converter)
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,46 +300,39 @@ private[redshift] class JDBCWrapper {
// TODO: cleanup types which are irrelevant for Redshift.
val answer = sqlType match {
// scalastyle:off
case java.sql.Types.ARRAY => null
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.BINARY => BinaryType
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BLOB => BinaryType
case java.sql.Types.BOOLEAN => BooleanType
// Null Type
case java.sql.Types.NULL => null

// Character Types
case java.sql.Types.CHAR => StringType
case java.sql.Types.CLOB => StringType
case java.sql.Types.DATALINK => null
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.VARCHAR => StringType

// Datetime Types
case java.sql.Types.DATE => DateType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP => TimestampType

// Boolean Type
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BOOLEAN => BooleanType

// Numeric Types
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.DECIMAL
if precision != 0 || scale != 0 => DecimalType(precision, scale)
case java.sql.Types.DECIMAL => DecimalType(38, 18) // Spark 1.5.0 default
case java.sql.Types.DISTINCT => null
case java.sql.Types.DOUBLE => DoubleType
case java.sql.Types.FLOAT => FloatType
case java.sql.Types.INTEGER => if (signed) { IntegerType } else { LongType }
case java.sql.Types.JAVA_OBJECT => null
case java.sql.Types.LONGNVARCHAR => StringType
case java.sql.Types.LONGVARBINARY => BinaryType
case java.sql.Types.LONGVARCHAR => StringType
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NCLOB => StringType
case java.sql.Types.NULL => null
case java.sql.Types.NUMERIC
if precision != 0 || scale != 0 => DecimalType(precision, scale)
case java.sql.Types.NUMERIC => DecimalType(38, 18) // Spark 1.5.0 default
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.OTHER => null
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.ROWID => LongType
// Redshift Real is represented in 4 bytes IEEE Float. https://docs.aws.amazon.com/redshift/latest/dg/r_Numeric_types201.html
case java.sql.Types.REAL => FloatType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ => null
// scalastyle:on
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ private[redshift] case class RedshiftRelation(
// Unload data from Redshift into a temporary directory in S3:
val tempDir = params.createPerQueryTempDir()
val unloadSql = buildUnloadStmt(requiredColumns, filters, tempDir, creds)
log.info(unloadSql)
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials)
try {
jdbcWrapper.executeInterruptibly(conn.prepareStatement(unloadSql))
Expand Down Expand Up @@ -165,6 +164,7 @@ private[redshift] case class RedshiftRelation(
sqlContext.read
.format(classOf[RedshiftFileFormat].getName)
.schema(prunedSchema)
.option("nullString", params.nullString)
.load(filesToRead: _*)
.queryExecution.executedPlan.execute().asInstanceOf[RDD[Row]]
}
Expand All @@ -189,11 +189,13 @@ private[redshift] case class RedshiftRelation(
val escapedTableNameOrSubqury = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
s"SELECT $columnList FROM $escapedTableNameOrSubqury $whereClause"
}
log.info(query)
// We need to remove S3 credentials from the unload path URI because they will conflict with
// the credentials passed via `credsString`.
val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString)

s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST"
s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString'" +
s" ESCAPE MANIFEST NULL AS '${params.nullString}'"
}

private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.sql.types._
class ConversionsSuite extends FunSuite {

private def createRowConverter(schema: StructType) = {
Conversions.createRowConverter(schema).andThen(RowEncoder(schema).resolveAndBind().fromRow)
Conversions.createRowConverter(schema, Parameters.DEFAULT_PARAMETERS("csvnullstring"))
.andThen(RowEncoder(schema).resolveAndBind().fromRow)
}

test("Data should be correctly converted") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class RedshiftSourceSuite
|1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0
|0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00
|0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123|
||||||||||
|||||||||@NULL@|
""".stripMargin.trim
// scalastyle:on
val expectedQuery = (
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "3.0.0-SNAPSHOT"
version in ThisBuild := "3.0.0"

0 comments on commit 3b6a318

Please sign in to comment.