diff --git a/datasources/build.gradle b/datasources/build.gradle index be4e12b3bd..c1a0b94b5c 100644 --- a/datasources/build.gradle +++ b/datasources/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation project(':core') implementation project(':protocol') implementation project(':opensearch') + implementation project(':legacy') implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}" implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}" diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java index c207f55738..c3b0369bf3 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java @@ -34,6 +34,8 @@ import org.opensearch.sql.datasources.transport.*; import org.opensearch.sql.datasources.utils.Scheduler; import org.opensearch.sql.datasources.utils.XContentParserUtils; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; public class RestDataSourceQueryAction extends BaseRestHandler { @@ -133,7 +135,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { - DataSourceMetadata dataSourceMetadata = XContentParserUtils.toDataSourceMetadata(restRequest.contentParser()); return restChannel -> @@ -282,8 +283,14 @@ private void handleException(Exception e, RestChannel restChannel) { } else { LOG.error("Error happened during request handling", e); if (isClientError(e)) { + Metrics.getInstance() + .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS) + .increment(); reportError(restChannel, e, BAD_REQUEST); } else { + Metrics.getInstance() + .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS) + .increment(); reportError(restChannel, e, SERVICE_UNAVAILABLE); } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java index 95e6493e05..43557ac1f6 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java @@ -80,6 +80,7 @@ protected Object buildJsonObject(String response) { }.format("Created DataSource with name " + dataSourceMetadata.getName()); actionListener.onResponse(new CreateDataSourceActionResponse(responseContent)); } catch (Exception e) { + actionListener.onFailure(e); } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java index 5578d40651..7da1296286 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java @@ -16,6 +16,8 @@ import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -53,6 +55,7 @@ protected void doExecute( Task task, DeleteDataSourceActionRequest request, ActionListener actionListener) { + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT).increment(); try { dataSourceService.deleteDataSource(request.getDataSourceName()); actionListener.onResponse( diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java index 34ad59c80f..92b9495f1e 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java @@ -18,6 +18,8 @@ import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -56,6 +58,7 @@ protected void doExecute( Task task, GetDataSourceActionRequest request, ActionListener actionListener) { + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT).increment(); try { String responseContent; if (request.getDataSourceName() == null) { diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java index 303e905cec..69e04f0e57 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java @@ -19,6 +19,8 @@ import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -57,6 +59,7 @@ protected void doExecute( Task task, PatchDataSourceActionRequest request, ActionListener actionListener) { + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT).increment(); try { dataSourceService.patchDataSource(request.getDataSourceData()); String responseContent = diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java index fefd0f3a01..367c0ed124 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java @@ -18,6 +18,8 @@ import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -56,6 +58,7 @@ protected void doExecute( Task task, UpdateDataSourceActionRequest request, ActionListener actionListener) { + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT).increment(); try { dataSourceService.updateDataSource(request.getDataSourceMetadata()); String responseContent = diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java index 2b9973b31b..9088d3c4ad 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java @@ -1,7 +1,9 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -21,11 +23,14 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -56,6 +61,12 @@ public void setUp() { transportService, new ActionFilters(new HashSet<>()), dataSourceService, settings); when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1); when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(20); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java index ea581de20c..df066654c6 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java @@ -1,8 +1,11 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.HashSet; import org.junit.jupiter.api.Assertions; @@ -16,9 +19,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -31,6 +38,8 @@ public class TransportDeleteDataSourceActionTest { @Mock private Task task; @Mock private ActionListener actionListener; + @Mock private OpenSearchSettings settings; + @Captor private ArgumentCaptor deleteDataSourceActionResponseArgumentCaptor; @@ -42,6 +51,12 @@ public void setUp() { action = new TransportDeleteDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java index 4f04afd667..286f308402 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java @@ -1,5 +1,7 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -22,11 +24,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -45,11 +51,19 @@ public class TransportGetDataSourceActionTest { @Captor private ArgumentCaptor exceptionArgumentCaptor; + @Mock private OpenSearchSettings settings; + @BeforeEach public void setUp() { action = new TransportGetDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java index 5e1e7df112..a0f159bfd0 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java @@ -1,5 +1,6 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; import static org.mockito.Mockito.*; import static org.opensearch.sql.datasources.utils.XContentParserUtils.*; @@ -17,9 +18,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -36,12 +41,19 @@ public class TransportPatchDataSourceActionTest { private ArgumentCaptor patchDataSourceActionResponseArgumentCaptor; @Captor private ArgumentCaptor exceptionArgumentCaptor; + @Mock private OpenSearchSettings settings; @BeforeEach public void setUp() { action = new TransportPatchDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java index ffcd526f87..4d42cdb2fa 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java @@ -1,8 +1,11 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.HashSet; import org.junit.jupiter.api.Assertions; @@ -16,11 +19,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -32,6 +39,7 @@ public class TransportUpdateDataSourceActionTest { @Mock private DataSourceServiceImpl dataSourceService; @Mock private Task task; @Mock private ActionListener actionListener; + @Mock private OpenSearchSettings settings; @Captor private ArgumentCaptor @@ -44,6 +52,12 @@ public void setUp() { action = new TransportUpdateDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java index e4fbd173c9..fc243e1b50 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java @@ -27,6 +27,19 @@ public static Metric createMetric(MetricName name) { case PPL_REQ_COUNT_TOTAL: case PPL_FAILED_REQ_COUNT_CUS: case PPL_FAILED_REQ_COUNT_SYS: + case DATASOURCE_CREATION_REQ_COUNT: + case DATASOURCE_GET_REQ_COUNT: + case DATASOURCE_PUT_REQ_COUNT: + case DATASOURCE_PATCH_REQ_COUNT: + case DATASOURCE_DELETE_REQ_COUNT: + case DATASOURCE_FAILED_REQ_COUNT_SYS: + case DATASOURCE_FAILED_REQ_COUNT_CUS: + case EMR_GET_JOB_RESULT_FAILURE_COUNT: + case EMR_START_JOB_REQUEST_FAILURE_COUNT: + case EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT: + case EMR_BATCH_QUERY_JOBS_CREATION_COUNT: + case EMR_STREAMING_QUERY_JOBS_CREATION_COUNT: + case EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT: return new NumericMetric<>(name.getName(), new RollingCounter()); default: return new NumericMetric<>(name.getName(), new BasicCounter()); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 1c895f5d69..7b7a6b4993 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -26,9 +26,21 @@ public enum MetricName { PPL_REQ_COUNT_TOTAL("ppl_request_count"), PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"), PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr"), - DATASOURCE_REQ_COUNT("datasource_request_count"), + DATASOURCE_CREATION_REQ_COUNT("datasource_create_request_count"), + DATASOURCE_GET_REQ_COUNT("datasource_get_request_count"), + DATASOURCE_PUT_REQ_COUNT("datasource_put_request_count"), + DATASOURCE_PATCH_REQ_COUNT("datasource_patch_request_count"), + DATASOURCE_DELETE_REQ_COUNT("datasource_delete_request_count"), DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"), - DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"); + DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"), + EMR_START_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"), + EMR_GET_JOB_RESULT_FAILURE_COUNT("emr_start_job_request_failure_count"), + EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"), + EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), + EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), + EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), + ACTIVE_ASYNC_QUERY_SESSION_COUNT("active_async_query_session_count"), + ACTIVE_ASYNC_QUERY_STATEMENT_COUNT("active_async_query_statement_count"); private String name; diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 9d37fe28d0..104e7a058c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -7,6 +7,7 @@ import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG; import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; +import static org.opensearch.sql.spark.execution.statestore.StateStore.ALL_DATASOURCE; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.emrserverless.AWSEMRServerless; @@ -67,6 +68,8 @@ import org.opensearch.sql.datasources.transport.*; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; +import org.opensearch.sql.legacy.metrics.GaugeMetric; +import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; import org.opensearch.sql.legacy.plugin.RestSqlStatsAction; @@ -321,6 +324,7 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService( SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier, SparkExecutionEngineConfig sparkExecutionEngineConfig) { StateStore stateStore = new StateStore(client, clusterService); + registerStateStoreMetrics(stateStore); AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(stateStore); EMRServerlessClient emrServerlessClient = @@ -342,6 +346,19 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService( sparkExecutionEngineConfigSupplier); } + private void registerStateStoreMetrics(StateStore stateStore) { + GaugeMetric activeSessionMetric = + new GaugeMetric<>( + MetricName.ACTIVE_ASYNC_QUERY_SESSION_COUNT.getName(), + StateStore.activeSessionsCount(stateStore, ALL_DATASOURCE)); + GaugeMetric activeStatementMetric = + new GaugeMetric<>( + MetricName.ACTIVE_ASYNC_QUERY_STATEMENT_COUNT.getName(), + StateStore.activeStatementsCount(stateStore, ALL_DATASOURCE)); + Metrics.getInstance().registerMetric(activeSessionMetric); + Metrics.getInstance().registerMetric(activeStatementMetric); + } + private EMRServerlessClient createEMRServerlessClient(String region) { return AccessController.doPrivileged( (PrivilegedAction) diff --git a/spark/build.gradle b/spark/build.gradle index ed91b9820b..0b387f1cdb 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -45,6 +45,7 @@ dependencies { api project(':core') implementation project(':protocol') implementation project(':datasources') + implementation project(':legacy') implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.json', name: 'json', version: '20231013' diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 0da5ae7211..1354b99c95 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -22,6 +22,8 @@ import java.security.PrivilegedAction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; public class EmrServerlessClientImpl implements EMRServerlessClient { @@ -52,9 +54,20 @@ public String startJobRun(StartJobRequest startJobRequest) { .withEntryPoint(SPARK_SQL_APPLICATION_JAR) .withEntryPointArguments(startJobRequest.getQuery(), resultIndex) .withSparkSubmitParameters(startJobRequest.getSparkSubmitParams()))); + StartJobRunResult startJobRunResult = AccessController.doPrivileged( - (PrivilegedAction) () -> emrServerless.startJobRun(request)); + (PrivilegedAction) + () -> { + try { + return emrServerless.startJobRun(request); + } catch (Throwable t) { + Metrics.getInstance() + .getNumericalMetric(MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT) + .increment(); + throw t; + } + }); logger.info("Job Run ID: " + startJobRunResult.getJobRunId()); return startJobRunResult.getJobRunId(); } @@ -65,7 +78,17 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobId); GetJobRunResult getJobRunResult = AccessController.doPrivileged( - (PrivilegedAction) () -> emrServerless.getJobRun(request)); + (PrivilegedAction) + () -> { + try { + return emrServerless.getJobRun(request); + } catch (Throwable t) { + Metrics.getInstance() + .getNumericalMetric(MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT) + .increment(); + throw t; + } + }); logger.info("Job Run state: " + getJobRunResult.getJobRun().getState()); return getJobRunResult; } @@ -78,7 +101,16 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { CancelJobRunResult cancelJobRunResult = AccessController.doPrivileged( (PrivilegedAction) - () -> emrServerless.cancelJobRun(cancelJobRunRequest)); + () -> { + try { + return emrServerless.cancelJobRun(cancelJobRunRequest); + } catch (Throwable t) { + Metrics.getInstance() + .getNumericalMetric(MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT) + .increment(); + throw t; + } + }); logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); return cancelJobRunResult; } catch (ValidationException e) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index c6bac9b288..875d7735be 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -14,6 +14,8 @@ import lombok.RequiredArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -80,6 +82,9 @@ public DispatchQueryResponse submit( false, dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); + Metrics.getInstance() + .getNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT) + .increment(); return new DispatchQueryResponse( context.getQueryId(), jobId, false, dataSourceMetadata.getResultIndex(), null); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index d75f568275..64b0f570bd 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -15,6 +15,8 @@ import lombok.RequiredArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; @@ -100,6 +102,9 @@ public DispatchQueryResponse submit( tags, dataSourceMetadata.getResultIndex(), dataSourceMetadata.getName())); + Metrics.getInstance() + .getNumericalMetric(MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT) + .increment(); } session.submit( new QueryRequest( diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 81c3438532..dc523d947c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -10,6 +10,8 @@ import java.util.Map; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -57,6 +59,9 @@ public DispatchQueryResponse submit( indexQueryDetails.isAutoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); + Metrics.getInstance() + .getNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT) + .increment(); return new DispatchQueryResponse( AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), jobId, diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index 86d15a7036..b7b8f7fa04 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -303,4 +303,18 @@ public static Supplier activeSessionsCount(StateStore stateStore, String d QueryBuilders.termQuery( SessionModel.SESSION_STATE, SessionState.RUNNING.getSessionState()))); } + + public static Supplier activeStatementsCount(StateStore stateStore, String datasourceName) { + return () -> + stateStore.count( + DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + QueryBuilders.boolQuery() + .must( + QueryBuilders.termQuery(StatementModel.TYPE, StatementModel.STATEMENT_DOC_TYPE)) + .should( + QueryBuilders.termsQuery( + StatementModel.STATEMENT_STATE, + StatementState.RUNNING.getState(), + StatementState.WAITING.getState()))); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 862da697d1..bd5d4d220f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -53,6 +53,8 @@ import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -114,6 +116,9 @@ public void setup() { .get(); clusterSettings = clusterService.getClusterSettings(); pluginSettings = new OpenSearchSettings(clusterSettings); + LocalClusterState.state().setClusterService(clusterService); + LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); + Metrics.getInstance().registerDefaultMetrics(); dataSourceService = createDataSourceService(); DataSourceMetadata dm = new DataSourceMetadata(