Skip to content

Commit

Permalink
Use maxlength metadata to configure VARCHAR column lengths
Browse files Browse the repository at this point in the history
This patch allows users to specify a `maxlength` column metadata entry for string columns in order to control the width of `VARCHAR` columns in generated Redshift table schemas. This is necessary in order to support string columns that are wider than 256 characters. In addition, this configuration can be used as an optimization to achieve space-savings in Redshift. For more background on the motivation of this feature, see #29.

See also: #53 to improve error reporting when LOAD fails.

Author: Josh Rosen <[email protected]>

Closes #54 from JoshRosen/max-length.
  • Loading branch information
JoshRosen committed Aug 27, 2015
1 parent 1cf3fb0 commit 08f9419
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 4 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ table, the changes will be reverted and the backup table restored if post action
</tr>
</table>

## Additional configuration options

### Configuring the maximum size of string columns

When creating Redshift tables, `spark-redshift`'s default behavior is to create `TEXT` columns for string columns. Redshift stores `TEXT` columns as `VARCHAR(256)`, so these columns have a maximum size of 256 characters ([source](http://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html)).

To support larger columns, you can use the `maxlength` column metadata field to specify the maximum length of individual string columns. This can also be done as a space-savings performance optimization in order to declare columns with a smaller maximum length than the default.

Here is an example of updating a column's metadata field in Scala:

```
import org.apache.spark.sql.types.MetadataBuilder
val metadata = new MetadataBuilder().putLong("maxlength", 10).build()
df.withColumn("colName", col("colName").as("colName", metadata)
```

Column metadata modification is unsupported in the Python, SQL, and R language APIs.

## AWS Credentials

Note that you can provide AWS credentials in the parameters above, with Hadoop `fs.*` configuration settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,49 @@ class RedshiftIntegrationSuite
}
}

test("configuring maxlength on string columns") {
val tableName = s"configuring_maxlength_on_string_column_$randomSuffix"
try {
val metadata = new MetadataBuilder().putLong("maxlength", 512).build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("aws_access_key_id", AWS_ACCESS_KEY_ID)
.option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY)
.mode(SaveMode.ErrorIfExists)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("aws_access_key_id", AWS_ACCESS_KEY_ID)
.option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY)
.load()
checkAnswer(loadedDf, Seq(Row("a" * 512)))
// This append should fail due to the string being longer than the maxlength
intercept[SQLException] {
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 513))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("aws_access_key_id", AWS_ACCESS_KEY_ID)
.option("aws_secret_access_key", AWS_SECRET_ACCESS_KEY)
.mode(SaveMode.Append)
.save()
}
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("informative error message when saving a table with string that is longer than max length") {
val tableName = s"error_message_when_string_too_long_$randomSuffix"
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,20 @@ private[redshift] class JDBCWrapper extends Logging {
case ShortType => "INTEGER"
case ByteType => "SMALLINT" // Redshift does not support the BYTE type.
case BooleanType => "BOOLEAN"
case StringType => "TEXT"
case StringType =>
if (field.metadata.contains("maxlength")) {
s"VARCHAR(${field.metadata.getLong("maxlength")})"
} else {
"TEXT"
}
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
}
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
sb.append(s", $name $typ $nullable".trim)
}}
if (sb.length < 2) "" else sb.substring(2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import org.apache.spark.sql.types._
/**
* Functions to write data to Redshift with intermediate Avro serialisation into S3.
*/
class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging {
private[redshift] class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging {

/**
* Generate CREATE TABLE statement for Redshift
*/
private def createTableSql(data: DataFrame, params: MergedParameters): String = {
// Visible for testing.
private[redshift] def createTableSql(data: DataFrame, params: MergedParameters): String = {
val schemaSql = jdbcWrapper.schemaString(data.schema)
val distStyleDef = params.distStyle match {
case Some(style) => s"DISTSTYLE $style"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

import com.databricks.spark.redshift.Parameters.MergedParameters

private class TestContext extends SparkContext("local", "RedshiftSourceSuite") {

/**
Expand Down Expand Up @@ -441,6 +443,23 @@ class RedshiftSourceSuite
checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}

test("configuring maxlength on string columns") {
val longStrMetadata = new MetadataBuilder().putLong("maxlength", 512).build()
val shortStrMetadata = new MetadataBuilder().putLong("maxlength", 10).build()
val schema = StructType(
StructField("long_str", StringType, metadata = longStrMetadata) ::
StructField("short_str", StringType, metadata = shortStrMetadata) ::
StructField("default_str", StringType) ::
Nil)
val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val createTableCommand =
DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim
val expectedCreateTableCommand =
"CREATE TABLE IF NOT EXISTS test_table (long_str VARCHAR(512), short_str VARCHAR(10), " +
"default_str TEXT)"
assert(createTableCommand === expectedCreateTableCommand)
}

test("Respect SaveMode.ErrorIfExists when table exists") {
val errIfExistsWrapper = mockJdbcWrapper(defaultParams("url"), Seq.empty[Regex])

Expand Down

0 comments on commit 08f9419

Please sign in to comment.