Skip to content

Commit

Permalink
Record statement execution error (#940)
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger authored Nov 21, 2024
1 parent 8efe1ec commit a14013c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,36 +81,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}
}

def createJobOperator(query: String, jobRunId: String): JobOperator = {
val streamingRunningCount = new AtomicInteger(0)

/*
* Because we cannot test from FlintJob.main() for the reason below, we have to configure
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

val job = JobOperator(
appId,
jobRunId,
spark,
query,
queryId,
dataSourceName,
resultIndex,
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
job
}

def startJob(query: String, jobRunId: String): Future[Unit] = {
val prefix = "flint-job-test"
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
val streamingRunningCount = new AtomicInteger(0)

val futureResult = Future {
/*
* Because we cannot test from FlintJob.main() for the reason below, we have to configure
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

/**
* FlintJob.main() is not called because we need to manually set these variables within a
* JobOperator instance to accommodate specific runtime requirements.
*/
val job =
JobOperator(
appId,
jobRunId,
spark,
query,
queryId,
dataSourceName,
resultIndex,
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
val job = createJobOperator(query, jobRunId)
job.start()
}
futureResult.onComplete {
Expand Down Expand Up @@ -291,6 +297,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}

test("create skipping index with non-existent table") {
val prefix = "flint-job-test"
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

val query =
s"""
| CREATE SKIPPING INDEX ON testTable
Expand All @@ -303,7 +313,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
| """.stripMargin
val queryStartTime = System.currentTimeMillis()
val jobRunId = "00ff4o3b5091080r"
threadLocalFuture.set(startJob(query, jobRunId))

val job = createJobOperator(query, jobRunId)
threadLocalFuture.set(Future(job.start()))

val validation: REPLResult => Boolean = result => {
assert(
Expand All @@ -315,6 +327,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {

assert(result.status == "FAILED", s"expected status is FAILED, but got ${result.status}")
assert(!result.error.isEmpty, s"we expect error, but got ${result.error}")
assert(
job.throwableHandler.error.contains("Table spark_catalog.default.testTable is not found"),
"Expected error message to mention 'spark_catalog.default.testTable is not found'")
commonAssert(result, jobRunId, query, queryStartTime)
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ trait FlintJobExecutor {
statusCode.foreach(code => errorDetails.put("StatusCode", code.toString))

val errorJson = mapper.writeValueAsString(errorDetails)

// Record the processed error message
throwableHandler.setError(errorJson)
// CustomLogging will call log4j logger.error() underneath
statusCode match {
case Some(code) =>
Expand Down

0 comments on commit a14013c

Please sign in to comment.