From ef2cef3c01a211586af8a02c56b2e21e25e082f8 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Fri, 21 Jun 2024 14:21:34 -0700 Subject: [PATCH] Abstract metrics to reduce dependency to legacy (#2747) * Abstract metrics to reduce dependency to legacy Signed-off-by: Tomoyuki Morita * Add comment Signed-off-by: Tomoyuki Morita * Fix style Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita --- .../EMRServerlessClientFactoryImpl.java | 4 +- .../spark/client/EmrServerlessClientImpl.java | 22 +++++------ .../spark/dispatcher/BatchQueryHandler.java | 7 ++-- .../dispatcher/InteractiveQueryHandler.java | 7 ++-- .../spark/dispatcher/QueryHandlerFactory.java | 18 +++++++-- .../spark/dispatcher/RefreshQueryHandler.java | 6 ++- .../dispatcher/StreamingQueryHandler.java | 11 +++--- .../sql/spark/metrics/EmrMetrics.java | 15 +++++++ .../sql/spark/metrics/MetricsService.java | 11 ++++++ .../EMRServerlessClientFactoryImplTest.java | 10 +++-- .../client/EmrServerlessClientImplTest.java | 39 ++++++++++++------- .../dispatcher/SparkQueryDispatcherTest.java | 5 ++- .../metrics/OpenSearchMetricsService.java | 32 +++++++++++++++ .../config/AsyncExecutorServiceModule.java | 18 +++++++-- .../AsyncQueryExecutorServiceSpec.java | 4 +- 15 files changed, 156 insertions(+), 53 deletions(-) create mode 100644 async-query-core/src/main/java/org/opensearch/sql/spark/metrics/EmrMetrics.java create mode 100644 async-query-core/src/main/java/org/opensearch/sql/spark/metrics/MetricsService.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/metrics/OpenSearchMetricsService.java diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImpl.java b/async-query-core/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImpl.java index 9af9878577..33c0e9fbfa 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImpl.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImpl.java @@ -16,12 +16,14 @@ import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; +import org.opensearch.sql.spark.metrics.MetricsService; /** Implementation of {@link EMRServerlessClientFactory}. */ @RequiredArgsConstructor public class EMRServerlessClientFactoryImpl implements EMRServerlessClientFactory { private final SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier; + private final MetricsService metricsService; private EMRServerlessClient emrServerlessClient; private String region; @@ -68,7 +70,7 @@ private EMRServerlessClient createEMRServerlessClient(String awsRegion) { .withRegion(awsRegion) .withCredentials(new DefaultAWSCredentialsProviderChain()) .build(); - return new EmrServerlessClientImpl(awsemrServerless); + return new EmrServerlessClientImpl(awsemrServerless, metricsService); }); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/async-query-core/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 0ceb269d1d..c785067398 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -7,6 +7,9 @@ import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; +import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT; +import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_GET_JOB_RESULT_FAILURE_COUNT; +import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_START_JOB_REQUEST_FAILURE_COUNT; import com.amazonaws.services.emrserverless.AWSEMRServerless; import com.amazonaws.services.emrserverless.model.CancelJobRunRequest; @@ -20,25 +23,23 @@ import com.amazonaws.services.emrserverless.model.ValidationException; import java.security.AccessController; import java.security.PrivilegedAction; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; +import org.opensearch.sql.spark.metrics.MetricsService; +@RequiredArgsConstructor public class EmrServerlessClientImpl implements EMRServerlessClient { private final AWSEMRServerless emrServerless; + private final MetricsService metricsService; private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class); private static final int MAX_JOB_NAME_LENGTH = 255; public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error."; - public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { - this.emrServerless = emrServerless; - } - @Override public String startJobRun(StartJobRequest startJobRequest) { String resultIndex = @@ -68,8 +69,7 @@ public String startJobRun(StartJobRequest startJobRequest) { return emrServerless.startJobRun(request); } catch (Throwable t) { logger.error("Error while making start job request to emr:", t); - MetricUtils.incrementNumericalMetric( - MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT); + metricsService.incrementNumericalMetric(EMR_START_JOB_REQUEST_FAILURE_COUNT); if (t instanceof ValidationException) { throw new IllegalArgumentException( "The input fails to satisfy the constraints specified by AWS EMR" @@ -94,8 +94,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { return emrServerless.getJobRun(request); } catch (Throwable t) { logger.error("Error while making get job run request to emr:", t); - MetricUtils.incrementNumericalMetric( - MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT); + metricsService.incrementNumericalMetric(EMR_GET_JOB_RESULT_FAILURE_COUNT); throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); } }); @@ -119,8 +118,7 @@ public CancelJobRunResult cancelJobRun( throw t; } else { logger.error("Error while making cancel job request to emr:", t); - MetricUtils.incrementNumericalMetric( - MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); + metricsService.incrementNumericalMetric(EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 09d2dbd6c6..8014cf935f 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -8,14 +8,13 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY; +import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import java.util.Map; import lombok.RequiredArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -25,6 +24,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.leasemanager.LeaseManager; +import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** @@ -36,6 +36,7 @@ public class BatchQueryHandler extends AsyncQueryHandler { protected final EMRServerlessClient emrServerlessClient; protected final JobExecutionResponseReader jobExecutionResponseReader; protected final LeaseManager leaseManager; + protected final MetricsService metricsService; @Override protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { @@ -90,7 +91,7 @@ public DispatchQueryResponse submit( false, dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); - MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT); + metricsService.incrementNumericalMetric(EMR_BATCH_QUERY_JOBS_CREATION_COUNT); return DispatchQueryResponse.builder() .queryId(context.getQueryId()) .jobId(jobId) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index e47f439d9d..266d5db978 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -15,8 +15,6 @@ import lombok.RequiredArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; @@ -32,6 +30,8 @@ import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; +import org.opensearch.sql.spark.metrics.EmrMetrics; +import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** @@ -45,6 +45,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler { private final SessionManager sessionManager; private final JobExecutionResponseReader jobExecutionResponseReader; private final LeaseManager leaseManager; + private final MetricsService metricsService; @Override protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { @@ -121,7 +122,7 @@ public DispatchQueryResponse submit( dataSourceMetadata.getResultIndex(), dataSourceMetadata.getName()), context.getAsyncQueryRequestContext()); - MetricUtils.incrementNumericalMetric(MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT); + metricsService.incrementNumericalMetric(EmrMetrics.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT); } session.submit( new QueryRequest( diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java index f994d9c728..9951edc5a9 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java @@ -12,6 +12,7 @@ import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.LeaseManager; +import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @RequiredArgsConstructor @@ -24,6 +25,7 @@ public class QueryHandlerFactory { private final IndexDMLResultStorageService indexDMLResultStorageService; private final FlintIndexOpFactory flintIndexOpFactory; private final EMRServerlessClientFactory emrServerlessClientFactory; + private final MetricsService metricsService; public RefreshQueryHandler getRefreshQueryHandler() { return new RefreshQueryHandler( @@ -31,21 +33,29 @@ public RefreshQueryHandler getRefreshQueryHandler() { jobExecutionResponseReader, flintIndexMetadataService, leaseManager, - flintIndexOpFactory); + flintIndexOpFactory, + metricsService); } public StreamingQueryHandler getStreamingQueryHandler() { return new StreamingQueryHandler( - emrServerlessClientFactory.getClient(), jobExecutionResponseReader, leaseManager); + emrServerlessClientFactory.getClient(), + jobExecutionResponseReader, + leaseManager, + metricsService); } public BatchQueryHandler getBatchQueryHandler() { return new BatchQueryHandler( - emrServerlessClientFactory.getClient(), jobExecutionResponseReader, leaseManager); + emrServerlessClientFactory.getClient(), + jobExecutionResponseReader, + leaseManager, + metricsService); } public InteractiveQueryHandler getInteractiveQueryHandler() { - return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager); + return new InteractiveQueryHandler( + sessionManager, jobExecutionResponseReader, leaseManager, metricsService); } public IndexDMLHandler getIndexDMLHandler() { diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index 78a2651317..634dfa49f6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -19,6 +19,7 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; +import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** @@ -35,8 +36,9 @@ public RefreshQueryHandler( JobExecutionResponseReader jobExecutionResponseReader, FlintIndexMetadataService flintIndexMetadataService, LeaseManager leaseManager, - FlintIndexOpFactory flintIndexOpFactory) { - super(emrServerlessClient, jobExecutionResponseReader, leaseManager); + FlintIndexOpFactory flintIndexOpFactory, + MetricsService metricsService) { + super(emrServerlessClient, jobExecutionResponseReader, leaseManager, metricsService); this.flintIndexMetadataService = flintIndexMetadataService; this.flintIndexOpFactory = flintIndexOpFactory; } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 7b317d2218..7291637e5b 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -7,11 +7,10 @@ import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.INDEX_TAG_KEY; import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY; +import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT; import java.util.Map; import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -23,6 +22,7 @@ import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; +import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** @@ -34,8 +34,9 @@ public class StreamingQueryHandler extends BatchQueryHandler { public StreamingQueryHandler( EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, - LeaseManager leaseManager) { - super(emrServerlessClient, jobExecutionResponseReader, leaseManager); + LeaseManager leaseManager, + MetricsService metricsService) { + super(emrServerlessClient, jobExecutionResponseReader, leaseManager, metricsService); } @Override @@ -81,7 +82,7 @@ public DispatchQueryResponse submit( indexQueryDetails.getFlintIndexOptions().autoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); - MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT); + metricsService.incrementNumericalMetric(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT); return DispatchQueryResponse.builder() .queryId(context.getQueryId()) .jobId(jobId) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/metrics/EmrMetrics.java b/async-query-core/src/main/java/org/opensearch/sql/spark/metrics/EmrMetrics.java new file mode 100644 index 0000000000..2ec587bcc7 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/metrics/EmrMetrics.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.metrics; + +public enum EmrMetrics { + EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT, + EMR_GET_JOB_RESULT_FAILURE_COUNT, + EMR_START_JOB_REQUEST_FAILURE_COUNT, + EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT, + EMR_STREAMING_QUERY_JOBS_CREATION_COUNT, + EMR_BATCH_QUERY_JOBS_CREATION_COUNT; +} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/metrics/MetricsService.java b/async-query-core/src/main/java/org/opensearch/sql/spark/metrics/MetricsService.java new file mode 100644 index 0000000000..ca9cb9db4e --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/metrics/MetricsService.java @@ -0,0 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.metrics; + +/** Interface to abstract the emit of metrics */ +public interface MetricsService { + void incrementNumericalMetric(EmrMetrics emrMetrics); +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImplTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImplTest.java index 562fc84eca..a27363a153 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImplTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/client/EMRServerlessClientFactoryImplTest.java @@ -16,18 +16,20 @@ import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; import org.opensearch.sql.spark.constants.TestConstants; +import org.opensearch.sql.spark.metrics.MetricsService; @ExtendWith(MockitoExtension.class) public class EMRServerlessClientFactoryImplTest { @Mock private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier; + @Mock private MetricsService metricsService; @Test public void testGetClient() { when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())) .thenReturn(createSparkExecutionEngineConfig()); EMRServerlessClientFactory emrServerlessClientFactory = - new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier); + new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService); EMRServerlessClient emrserverlessClient = emrServerlessClientFactory.getClient(); Assertions.assertNotNull(emrserverlessClient); } @@ -38,7 +40,7 @@ public void testGetClientWithChangeInSetting() { when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())) .thenReturn(sparkExecutionEngineConfig); EMRServerlessClientFactory emrServerlessClientFactory = - new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier); + new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService); EMRServerlessClient emrserverlessClient = emrServerlessClientFactory.getClient(); Assertions.assertNotNull(emrserverlessClient); @@ -57,7 +59,7 @@ public void testGetClientWithChangeInSetting() { public void testGetClientWithException() { when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())).thenReturn(null); EMRServerlessClientFactory emrServerlessClientFactory = - new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier); + new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService); IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, emrServerlessClientFactory::getClient); @@ -74,7 +76,7 @@ public void testGetClientWithExceptionWithNullRegion() { when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())) .thenReturn(sparkExecutionEngineConfig); EMRServerlessClientFactory emrServerlessClientFactory = - new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier); + new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService); IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, emrServerlessClientFactory::getClient); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 9ea7e91c54..35b42ccaaf 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -44,12 +44,13 @@ import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; +import org.opensearch.sql.spark.metrics.MetricsService; @ExtendWith(MockitoExtension.class) public class EmrServerlessClientImplTest { @Mock private AWSEMRServerless emrServerless; - @Mock private OpenSearchSettings settings; + @Mock private MetricsService metricsService; @Captor private ArgumentCaptor startJobRunRequestArgumentCaptor; @@ -67,7 +68,8 @@ void testStartJobRun() { StartJobRunResult response = new StartJobRunResult(); when(emrServerless.startJobRun(any())).thenReturn(response); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); String parameters = SparkSubmitParameters.builder().query(QUERY).build().toString(); emrServerlessClient.startJobRun( @@ -102,7 +104,8 @@ void testStartJobRunWithErrorMetric() { doThrow(new AWSEMRServerlessException("Couldn't start job")) .when(emrServerless) .startJobRun(any()); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); RuntimeException runtimeException = Assertions.assertThrows( RuntimeException.class, @@ -125,7 +128,8 @@ void testStartJobRunResultIndex() { StartJobRunResult response = new StartJobRunResult(); when(emrServerless.startJobRun(any())).thenReturn(response); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); emrServerlessClient.startJobRun( new StartJobRequest( EMRS_JOB_NAME, @@ -145,14 +149,16 @@ void testGetJobRunState() { GetJobRunResult response = new GetJobRunResult(); response.setJobRun(jobRun); when(emrServerless.getJobRun(any())).thenReturn(response); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123"); } @Test void testGetJobRunStateWithErrorMetric() { doThrow(new ValidationException("Not a good job")).when(emrServerless).getJobRun(any()); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); RuntimeException runtimeException = Assertions.assertThrows( RuntimeException.class, @@ -164,7 +170,8 @@ void testGetJobRunStateWithErrorMetric() { void testCancelJobRun() { when(emrServerless.cancelJobRun(any())) .thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID)); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); CancelJobRunResult cancelJobRunResult = emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false); Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); @@ -173,7 +180,8 @@ void testCancelJobRun() { @Test void testCancelJobRunWithErrorMetric() { doThrow(new RuntimeException()).when(emrServerless).cancelJobRun(any()); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); Assertions.assertThrows( RuntimeException.class, () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, "123", false)); @@ -182,7 +190,8 @@ void testCancelJobRunWithErrorMetric() { @Test void testCancelJobRunWithValidationException() { doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any()); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); RuntimeException runtimeException = Assertions.assertThrows( RuntimeException.class, @@ -193,7 +202,8 @@ void testCancelJobRunWithValidationException() { @Test void testCancelJobRunWithNativeEMRExceptionWithValidationException() { doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any()); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); ValidationException validationException = Assertions.assertThrows( ValidationException.class, @@ -205,7 +215,8 @@ void testCancelJobRunWithNativeEMRExceptionWithValidationException() { void testCancelJobRunWithNativeEMRException() { when(emrServerless.cancelJobRun(any())) .thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID)); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); CancelJobRunResult cancelJobRunResult = emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, true); Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); @@ -216,7 +227,8 @@ void testStartJobRunWithLongJobName() { StartJobRunResult response = new StartJobRunResult(); when(emrServerless.startJobRun(any())).thenReturn(response); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); emrServerlessClient.startJobRun( new StartJobRequest( RandomStringUtils.random(300), @@ -235,7 +247,8 @@ void testStartJobRunWithLongJobName() { @Test void testStartJobRunThrowsValidationException() { when(emrServerless.startJobRun(any())).thenThrow(new ValidationException("Unmatched quote")); - EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, metricsService); IllegalArgumentException exception = Assertions.assertThrows( diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 199582dde7..d57284b9ca 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -75,6 +75,7 @@ import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.LeaseManager; +import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -94,6 +95,7 @@ public class SparkQueryDispatcherTest { @Mock private SparkSubmitParameterModifier sparkSubmitParameterModifier; @Mock private QueryIdProvider queryIdProvider; @Mock private AsyncQueryRequestContext asyncQueryRequestContext; + @Mock private MetricsService metricsService; @Mock(answer = RETURNS_DEEP_STUBS) private Session session; @@ -117,7 +119,8 @@ void setUp() { leaseManager, indexDMLResultStorageService, flintIndexOpFactory, - emrServerlessClientFactory); + emrServerlessClientFactory, + metricsService); sparkQueryDispatcher = new SparkQueryDispatcher( dataSourceService, sessionManager, queryHandlerFactory, queryIdProvider); diff --git a/async-query/src/main/java/org/opensearch/sql/spark/metrics/OpenSearchMetricsService.java b/async-query/src/main/java/org/opensearch/sql/spark/metrics/OpenSearchMetricsService.java new file mode 100644 index 0000000000..316ab536bc --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/metrics/OpenSearchMetricsService.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.metrics; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; + +public class OpenSearchMetricsService implements MetricsService { + private static final Map mapping = + ImmutableMap.of( + EmrMetrics.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT, + MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT, + EmrMetrics.EMR_GET_JOB_RESULT_FAILURE_COUNT, MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT, + EmrMetrics.EMR_START_JOB_REQUEST_FAILURE_COUNT, + MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT, + EmrMetrics.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT, + MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT, + EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT, + MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT, + EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT, + MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT); + + @Override + public void incrementNumericalMetric(EmrMetrics metricName) { + MetricUtils.incrementNumericalMetric(mapping.get(metricName)); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index c4eaceb937..7287dc0201 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -49,6 +49,8 @@ import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; +import org.opensearch.sql.spark.metrics.MetricsService; +import org.opensearch.sql.spark.metrics.OpenSearchMetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; @@ -106,7 +108,8 @@ public QueryHandlerFactory queryhandlerFactory( DefaultLeaseManager defaultLeaseManager, IndexDMLResultStorageService indexDMLResultStorageService, FlintIndexOpFactory flintIndexOpFactory, - EMRServerlessClientFactory emrServerlessClientFactory) { + EMRServerlessClientFactory emrServerlessClientFactory, + MetricsService metricsService) { return new QueryHandlerFactory( openSearchJobExecutionResponseReader, flintIndexMetadataReader, @@ -114,7 +117,8 @@ public QueryHandlerFactory queryhandlerFactory( defaultLeaseManager, indexDMLResultStorageService, flintIndexOpFactory, - emrServerlessClientFactory); + emrServerlessClientFactory, + metricsService); } @Provides @@ -172,8 +176,14 @@ public DefaultLeaseManager defaultLeaseManager(Settings settings, StateStore sta @Provides public EMRServerlessClientFactory createEMRServerlessClientFactory( - SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier) { - return new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier); + SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier, + MetricsService metricsService) { + return new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService); + } + + @Provides + public MetricsService metricsService() { + return new OpenSearchMetricsService(); } @Provides diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 9a94accd7d..f69a3ff44e 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -85,6 +85,7 @@ import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; +import org.opensearch.sql.spark.metrics.OpenSearchMetricsService; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; import org.opensearch.sql.storage.DataSourceFactory; @@ -262,7 +263,8 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( client, new FlintIndexMetadataServiceImpl(client), emrServerlessClientFactory), - emrServerlessClientFactory); + emrServerlessClientFactory, + new OpenSearchMetricsService()); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( this.dataSourceService,