Skip to content

Commit

Permalink
Improve error messages for Redshift load errors
Browse files Browse the repository at this point in the history
This patch improves our error reporting for Redshift LOAD errors. When a load error occurs, we will now try to automatically fetch more detailed error information from Redshift's [STL_LOAD_ERRORS](http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html ) table.

As an example of the improved error messages:

Old:

```
java.sql.SQLException: [Amazon](500310) Invalid operation: Load into table 'error_message_when_string_too_long_3596907251636891354' failed.  Check 'stl_load_errors' system table for details.;
```

New:

```
java.sql.SQLException: Error #1204 while loading data into Redshift: "String length exceeds DDL length".
Table name: the_table_name
Column name: a
Column type: varchar(256)
Raw line: [...]
Raw field value: [...]
```

Author: Josh Rosen <[email protected]>

Closes #53 from JoshRosen/load-error-reporting.
  • Loading branch information
JoshRosen committed Aug 26, 2015
1 parent acec336 commit 9f19e1c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.databricks.spark.redshift

import java.net.URI
import java.sql.Connection
import java.sql.{SQLException, Connection}
import java.util.Properties

import scala.util.Random
Expand Down Expand Up @@ -373,6 +373,27 @@ class RedshiftIntegrationSuite
}
}

test("informative error message when saving a table with string that is longer than max length") {
val tableName = s"error_message_when_string_too_long_$randomSuffix"
try {
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))),
StructType(StructField("A", StringType) :: Nil))
val e = intercept[SQLException] {
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.mode(SaveMode.ErrorIfExists)
.save()
}
assert(e.getMessage.contains("while loading data into Redshift"))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("SaveMode.Overwrite with non-existent table") {
val tableName = s"overwrite_non_existent_table$randomSuffix"
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.sql.{Connection, Date, SQLException, Timestamp}
import java.util.Properties

import scala.util.Random
import scala.util.control.NonFatal

import com.databricks.spark.redshift.Parameters.MergedParameters

Expand Down Expand Up @@ -118,7 +119,49 @@ class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging {
// Load the temporary data into the new file
val copyStatement = copySql(data.sqlContext, params)
val copyData = conn.prepareStatement(copyStatement)
copyData.execute()
try {
copyData.execute()
} catch {
case e: SQLException =>
// Try to query Redshift's STL_LOAD_ERRORS table to figure out why the load failed.
// See http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html for details.
val errorLookupQuery =
"""
| SELECT *
| FROM stl_load_errors
| WHERE query = pg_last_query_id()
""".stripMargin
val detailedException: Option[SQLException] = try {
val results = conn.prepareStatement(errorLookupQuery).executeQuery()
if (results.next()) {
val errCode = results.getInt("err_code")
val errReason = results.getString("err_reason").trim
val columnLength: String =
Option(results.getString("col_length"))
.map(_.trim)
.filter(_.nonEmpty)
.map(n => s"($n)")
.getOrElse("")
val exceptionMessage =
s"""
|Error (code $errCode) while loading data into Redshift: "$errReason"
|Table name: ${params.table.get}
|Column name: ${results.getString("colname").trim}
|Column type: ${results.getString("type").trim}$columnLength
|Raw line: ${results.getString("raw_line")}
|Raw field value: ${results.getString("raw_field_value")}
""".stripMargin
Some(new SQLException(exceptionMessage, e))
} else {
None
}
} catch {
case NonFatal(e2) =>
logError("Error occurred while querying STL_LOAD_ERRORS", e2)
None
}
throw detailedException.getOrElse(e)
}

// Execute postActions
params.postActions.foreach { action =>
Expand Down

0 comments on commit 9f19e1c

Please sign in to comment.