From 9829e00e25b869ccb97f1351b48cf6ac7752b3c8 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 21 Nov 2024 15:38:29 -0800 Subject: [PATCH] Rethrow exception in writeData Signed-off-by: Louis Chu --- .../org/opensearch/flint/core/IRestHighLevelClient.java | 8 ++++---- .../scala/org/apache/spark/sql/FlintJobExecutor.scala | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 9facd89ef..721685c38 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -98,15 +98,15 @@ static void recordLatency(String metricNamePrefix, long latencyMilliseconds) { * Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx). * * @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure - * @param e the exception encountered during the operation, used to determine the type of failure + * @param t the exception encountered during the operation, used to determine the type of failure */ - static void recordOperationFailure(String metricNamePrefix, Exception e) { - OpenSearchException openSearchException = extractOpenSearchException(e); + static void recordOperationFailure(String metricNamePrefix, Throwable t) { + OpenSearchException openSearchException = extractOpenSearchException(t); int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; if (openSearchException != null) { CustomLogging.logError(new OperationMessage("OpenSearch Operation failed.", statusCode), openSearchException); } else { - CustomLogging.logError("OpenSearch Operation failed with an exception.", e); + CustomLogging.logError("OpenSearch Operation failed with an exception.", t); } if (statusCode == 403) { String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index a9bb6f5bb..ad26cf21a 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -168,10 +168,12 @@ trait FlintJobExecutor { IRestHighLevelClient.recordOperationSuccess( MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX) } catch { - case e: Exception => + case t: Throwable => IRestHighLevelClient.recordOperationFailure( MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX, - e) + t) + // Re-throw the exception + throw t } }