From 95485068f7fc6fdb2fd28b35a08f283d40c2b36a Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 30 Oct 2023 17:15:47 -0700 Subject: [PATCH] Added session, statement, emrjob metrics to sql stats api Signed-off-by: Vamsi Manohar --- datasources/build.gradle | 1 + .../rest/RestDataSourceQueryAction.java | 5 +- .../TransportCreateDataSourceAction.java | 3 ++ .../TransportDeleteDataSourceAction.java | 3 ++ .../TransportGetDataSourceAction.java | 3 ++ .../TransportPatchDataSourceAction.java | 3 ++ .../TransportUpdateDataSourceAction.java | 3 ++ .../TransportCreateDataSourceActionTest.java | 11 ++++ .../TransportDeleteDataSourceActionTest.java | 15 ++++++ .../TransportGetDataSourceActionTest.java | 15 ++++++ .../TransportPatchDataSourceActionTest.java | 12 +++++ .../TransportUpdateDataSourceActionTest.java | 14 +++++ .../sql/legacy/metrics/MetricFactory.java | 13 +++++ .../sql/legacy/metrics/MetricName.java | 31 ++++++++++- .../sql/legacy/utils/MetricUtils.java | 22 ++++++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 17 ++++++ spark/build.gradle | 1 + .../spark/client/EmrServerlessClientImpl.java | 35 ++++++++++-- .../spark/dispatcher/BatchQueryHandler.java | 3 ++ .../dispatcher/InteractiveQueryHandler.java | 3 ++ .../dispatcher/StreamingQueryHandler.java | 3 ++ .../execution/statestore/StateStore.java | 14 +++++ .../AsyncQueryExecutorServiceSpec.java | 16 ++++++ .../client/EmrServerlessClientImplTest.java | 54 +++++++++++++++++++ 24 files changed, 294 insertions(+), 6 deletions(-) create mode 100644 legacy/src/main/java/org/opensearch/sql/legacy/utils/MetricUtils.java diff --git a/datasources/build.gradle b/datasources/build.gradle index be4e12b3bd..c1a0b94b5c 100644 --- a/datasources/build.gradle +++ b/datasources/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation project(':core') implementation project(':protocol') implementation project(':opensearch') + implementation project(':legacy') implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}" implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}" diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java index c207f55738..5693df3486 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java @@ -34,6 +34,8 @@ import org.opensearch.sql.datasources.transport.*; import org.opensearch.sql.datasources.utils.Scheduler; import org.opensearch.sql.datasources.utils.XContentParserUtils; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; public class RestDataSourceQueryAction extends BaseRestHandler { @@ -133,7 +135,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { - DataSourceMetadata dataSourceMetadata = XContentParserUtils.toDataSourceMetadata(restRequest.contentParser()); return restChannel -> @@ -282,8 +283,10 @@ private void handleException(Exception e, RestChannel restChannel) { } else { LOG.error("Error happened during request handling", e); if (isClientError(e)) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS); reportError(restChannel, e, BAD_REQUEST); } else { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS); reportError(restChannel, e, SERVICE_UNAVAILABLE); } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java index 95e6493e05..1b3e678f5d 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java @@ -20,6 +20,8 @@ import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -60,6 +62,7 @@ protected void doExecute( Task task, CreateDataSourceActionRequest request, ActionListener actionListener) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_CREATION_REQ_COUNT); int dataSourceLimit = settings.getSettingValue(DATASOURCES_LIMIT); if (dataSourceService.getDataSourceMetadata(false).size() >= dataSourceLimit) { actionListener.onFailure( diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java index 5578d40651..bcc5ef650f 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java @@ -16,6 +16,8 @@ import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -53,6 +55,7 @@ protected void doExecute( Task task, DeleteDataSourceActionRequest request, ActionListener actionListener) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT); try { dataSourceService.deleteDataSource(request.getDataSourceName()); actionListener.onResponse( diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java index 34ad59c80f..c8d77dd2e7 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java @@ -18,6 +18,8 @@ import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -56,6 +58,7 @@ protected void doExecute( Task task, GetDataSourceActionRequest request, ActionListener actionListener) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT); try { String responseContent; if (request.getDataSourceName() == null) { diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java index 303e905cec..8c9334f3a6 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java @@ -19,6 +19,8 @@ import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -57,6 +59,7 @@ protected void doExecute( Task task, PatchDataSourceActionRequest request, ActionListener actionListener) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT); try { dataSourceService.patchDataSource(request.getDataSourceData()); String responseContent = diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java index fefd0f3a01..32394ab64c 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java @@ -18,6 +18,8 @@ import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -56,6 +58,7 @@ protected void doExecute( Task task, UpdateDataSourceActionRequest request, ActionListener actionListener) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT); try { dataSourceService.updateDataSource(request.getDataSourceMetadata()); String responseContent = diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java index 2b9973b31b..9088d3c4ad 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java @@ -1,7 +1,9 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -21,11 +23,14 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -56,6 +61,12 @@ public void setUp() { transportService, new ActionFilters(new HashSet<>()), dataSourceService, settings); when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1); when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(20); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java index ea581de20c..df066654c6 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceActionTest.java @@ -1,8 +1,11 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.HashSet; import org.junit.jupiter.api.Assertions; @@ -16,9 +19,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -31,6 +38,8 @@ public class TransportDeleteDataSourceActionTest { @Mock private Task task; @Mock private ActionListener actionListener; + @Mock private OpenSearchSettings settings; + @Captor private ArgumentCaptor deleteDataSourceActionResponseArgumentCaptor; @@ -42,6 +51,12 @@ public void setUp() { action = new TransportDeleteDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java index 4f04afd667..854829ccc9 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java @@ -1,5 +1,7 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -22,11 +24,16 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -45,11 +52,19 @@ public class TransportGetDataSourceActionTest { @Captor private ArgumentCaptor exceptionArgumentCaptor; + @Mock private OpenSearchSettings settings; + @BeforeEach public void setUp() { action = new TransportGetDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java index 5e1e7df112..a0f159bfd0 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceActionTest.java @@ -1,5 +1,6 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; import static org.mockito.Mockito.*; import static org.opensearch.sql.datasources.utils.XContentParserUtils.*; @@ -17,9 +18,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -36,12 +41,19 @@ public class TransportPatchDataSourceActionTest { private ArgumentCaptor patchDataSourceActionResponseArgumentCaptor; @Captor private ArgumentCaptor exceptionArgumentCaptor; + @Mock private OpenSearchSettings settings; @BeforeEach public void setUp() { action = new TransportPatchDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java index ffcd526f87..4d42cdb2fa 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java @@ -1,8 +1,11 @@ package org.opensearch.sql.datasources.transport; +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.HashSet; import org.junit.jupiter.api.Assertions; @@ -16,11 +19,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -32,6 +39,7 @@ public class TransportUpdateDataSourceActionTest { @Mock private DataSourceServiceImpl dataSourceService; @Mock private Task task; @Mock private ActionListener actionListener; + @Mock private OpenSearchSettings settings; @Captor private ArgumentCaptor @@ -44,6 +52,12 @@ public void setUp() { action = new TransportUpdateDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); + // Required for metrics initialization + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); } @Test diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java index e4fbd173c9..fc243e1b50 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricFactory.java @@ -27,6 +27,19 @@ public static Metric createMetric(MetricName name) { case PPL_REQ_COUNT_TOTAL: case PPL_FAILED_REQ_COUNT_CUS: case PPL_FAILED_REQ_COUNT_SYS: + case DATASOURCE_CREATION_REQ_COUNT: + case DATASOURCE_GET_REQ_COUNT: + case DATASOURCE_PUT_REQ_COUNT: + case DATASOURCE_PATCH_REQ_COUNT: + case DATASOURCE_DELETE_REQ_COUNT: + case DATASOURCE_FAILED_REQ_COUNT_SYS: + case DATASOURCE_FAILED_REQ_COUNT_CUS: + case EMR_GET_JOB_RESULT_FAILURE_COUNT: + case EMR_START_JOB_REQUEST_FAILURE_COUNT: + case EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT: + case EMR_BATCH_QUERY_JOBS_CREATION_COUNT: + case EMR_STREAMING_QUERY_JOBS_CREATION_COUNT: + case EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT: return new NumericMetric<>(name.getName(), new RollingCounter()); default: return new NumericMetric<>(name.getName(), new BasicCounter()); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 1c895f5d69..2f1b7282d5 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -26,9 +26,21 @@ public enum MetricName { PPL_REQ_COUNT_TOTAL("ppl_request_count"), PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"), PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr"), - DATASOURCE_REQ_COUNT("datasource_request_count"), + DATASOURCE_CREATION_REQ_COUNT("datasource_create_request_count"), + DATASOURCE_GET_REQ_COUNT("datasource_get_request_count"), + DATASOURCE_PUT_REQ_COUNT("datasource_put_request_count"), + DATASOURCE_PATCH_REQ_COUNT("datasource_patch_request_count"), + DATASOURCE_DELETE_REQ_COUNT("datasource_delete_request_count"), DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"), - DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"); + DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"), + EMR_START_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"), + EMR_GET_JOB_RESULT_FAILURE_COUNT("emr_get_job_request_failure_count"), + EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"), + EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), + EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"), + EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"), + ACTIVE_ASYNC_QUERY_SESSION_COUNT("active_async_query_session_count"), + ACTIVE_ASYNC_QUERY_STATEMENT_COUNT("active_async_query_statement_count"); private String name; @@ -50,6 +62,21 @@ public static List getNames() { .add(PPL_REQ_COUNT_TOTAL) .add(PPL_FAILED_REQ_COUNT_SYS) .add(PPL_FAILED_REQ_COUNT_CUS) + .add(DATASOURCE_CREATION_REQ_COUNT) + .add(DATASOURCE_DELETE_REQ_COUNT) + .add(DATASOURCE_FAILED_REQ_COUNT_CUS) + .add(DATASOURCE_GET_REQ_COUNT) + .add(DATASOURCE_PATCH_REQ_COUNT) + .add(DATASOURCE_FAILED_REQ_COUNT_SYS) + .add(DATASOURCE_PUT_REQ_COUNT) + .add(EMR_GET_JOB_RESULT_FAILURE_COUNT) + .add(EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT) + .add(EMR_START_JOB_REQUEST_FAILURE_COUNT) + .add(EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT) + .add(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT) + .add(EMR_BATCH_QUERY_JOBS_CREATION_COUNT) + .add(ACTIVE_ASYNC_QUERY_SESSION_COUNT) + .add(ACTIVE_ASYNC_QUERY_SESSION_COUNT) .build(); public boolean isNumerical() { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/utils/MetricUtils.java b/legacy/src/main/java/org/opensearch/sql/legacy/utils/MetricUtils.java new file mode 100644 index 0000000000..b7a56bde4f --- /dev/null +++ b/legacy/src/main/java/org/opensearch/sql/legacy/utils/MetricUtils.java @@ -0,0 +1,22 @@ +package org.opensearch.sql.legacy.utils; + +import lombok.experimental.UtilityClass; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; + +/** Utility to add metrics. */ +@UtilityClass +public class MetricUtils { + + private static final Logger LOG = LogManager.getLogger(); + + public static void incrementNumericalMetric(MetricName metricName) { + try { + Metrics.getInstance().getNumericalMetric(metricName).increment(); + } catch (Throwable throwable) { + LOG.error("Error while adding metric: {}", throwable.getMessage()); + } + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 63c07de032..fa65131a2f 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -7,6 +7,7 @@ import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG; import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; +import static org.opensearch.sql.spark.execution.statestore.StateStore.ALL_DATASOURCE; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.emrserverless.AWSEMRServerless; @@ -67,6 +68,8 @@ import org.opensearch.sql.datasources.transport.*; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; +import org.opensearch.sql.legacy.metrics.GaugeMetric; +import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; import org.opensearch.sql.legacy.plugin.RestSqlStatsAction; @@ -321,6 +324,7 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService( SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier, SparkExecutionEngineConfig sparkExecutionEngineConfig) { StateStore stateStore = new StateStore(client, clusterService); + registerStateStoreMetrics(stateStore); AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(stateStore); EMRServerlessClient emrServerlessClient = @@ -343,6 +347,19 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService( sparkExecutionEngineConfigSupplier); } + private void registerStateStoreMetrics(StateStore stateStore) { + GaugeMetric activeSessionMetric = + new GaugeMetric<>( + MetricName.ACTIVE_ASYNC_QUERY_SESSION_COUNT.getName(), + StateStore.activeSessionsCount(stateStore, ALL_DATASOURCE)); + GaugeMetric activeStatementMetric = + new GaugeMetric<>( + MetricName.ACTIVE_ASYNC_QUERY_STATEMENT_COUNT.getName(), + StateStore.activeStatementsCount(stateStore, ALL_DATASOURCE)); + Metrics.getInstance().registerMetric(activeSessionMetric); + Metrics.getInstance().registerMetric(activeStatementMetric); + } + private EMRServerlessClient createEMRServerlessClient(String region) { return AccessController.doPrivileged( (PrivilegedAction) diff --git a/spark/build.gradle b/spark/build.gradle index d703f6b24d..bed355b9d2 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -45,6 +45,7 @@ dependencies { api project(':core') implementation project(':protocol') implementation project(':datasources') + implementation project(':legacy') implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.json', name: 'json', version: '20231013' diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 0da5ae7211..d7f558a020 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -22,6 +22,8 @@ import java.security.PrivilegedAction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; public class EmrServerlessClientImpl implements EMRServerlessClient { @@ -52,9 +54,19 @@ public String startJobRun(StartJobRequest startJobRequest) { .withEntryPoint(SPARK_SQL_APPLICATION_JAR) .withEntryPointArguments(startJobRequest.getQuery(), resultIndex) .withSparkSubmitParameters(startJobRequest.getSparkSubmitParams()))); + StartJobRunResult startJobRunResult = AccessController.doPrivileged( - (PrivilegedAction) () -> emrServerless.startJobRun(request)); + (PrivilegedAction) + () -> { + try { + return emrServerless.startJobRun(request); + } catch (Throwable t) { + MetricUtils.incrementNumericalMetric( + MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT); + throw t; + } + }); logger.info("Job Run ID: " + startJobRunResult.getJobRunId()); return startJobRunResult.getJobRunId(); } @@ -65,7 +77,16 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobId); GetJobRunResult getJobRunResult = AccessController.doPrivileged( - (PrivilegedAction) () -> emrServerless.getJobRun(request)); + (PrivilegedAction) + () -> { + try { + return emrServerless.getJobRun(request); + } catch (Throwable t) { + MetricUtils.incrementNumericalMetric( + MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT); + throw t; + } + }); logger.info("Job Run state: " + getJobRunResult.getJobRun().getState()); return getJobRunResult; } @@ -78,7 +99,15 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { CancelJobRunResult cancelJobRunResult = AccessController.doPrivileged( (PrivilegedAction) - () -> emrServerless.cancelJobRun(cancelJobRunRequest)); + () -> { + try { + return emrServerless.cancelJobRun(cancelJobRunRequest); + } catch (Throwable t) { + MetricUtils.incrementNumericalMetric( + MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); + throw t; + } + }); logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); return cancelJobRunResult; } catch (ValidationException e) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 9e59fb707c..de25f1188c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -14,6 +14,8 @@ import lombok.RequiredArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -85,6 +87,7 @@ public DispatchQueryResponse submit( false, dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); + MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT); return new DispatchQueryResponse( context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index d6ca83e52a..5aa82432bb 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -15,6 +15,8 @@ import lombok.RequiredArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; @@ -100,6 +102,7 @@ public DispatchQueryResponse submit( tags, dataSourceMetadata.getResultIndex(), dataSourceMetadata.getName())); + MetricUtils.incrementNumericalMetric(MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT); } session.submit( new QueryRequest( diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index ac5c878c28..6a4045b85a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -10,6 +10,8 @@ import java.util.Map; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -63,6 +65,7 @@ public DispatchQueryResponse submit( indexQueryDetails.isAutoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); + MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT); return new DispatchQueryResponse( AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), jobId, diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index f36cbba32c..e99087b24d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -343,4 +343,18 @@ public static Supplier activeRefreshJobCount(StateStore stateStore, String SessionModel.TYPE, FlintIndexStateModel.FLINT_INDEX_DOC_TYPE)) .must(QueryBuilders.termQuery(STATE, FlintIndexState.REFRESHING.getState()))); } + + public static Supplier activeStatementsCount(StateStore stateStore, String datasourceName) { + return () -> + stateStore.count( + DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + QueryBuilders.boolQuery() + .must( + QueryBuilders.termQuery(StatementModel.TYPE, StatementModel.STATEMENT_DOC_TYPE)) + .should( + QueryBuilders.termsQuery( + StatementModel.STATEMENT_STATE, + StatementState.RUNNING.getState(), + StatementState.WAITING.getState()))); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 35ec778c8e..2222cdf435 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.asyncquery; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.DATASOURCE_URI_HOSTS_DENY_LIST; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; @@ -22,6 +23,7 @@ import java.net.URL; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; import lombok.Getter; @@ -49,6 +51,8 @@ import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; @@ -92,7 +96,19 @@ public void setup() { clusterService = clusterService(); clusterSettings = clusterService.getClusterSettings(); pluginSettings = new OpenSearchSettings(clusterSettings); + LocalClusterState.state().setClusterService(clusterService); + LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); + Metrics.getInstance().registerDefaultMetrics(); client = (NodeClient) cluster().client(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) + .build()) + .get(); dataSourceService = createDataSourceService(); DataSourceMetadata dm = new DataSourceMetadata( diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index f874b351a9..319e887171 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -4,7 +4,9 @@ package org.opensearch.sql.spark.client; +import static java.util.Collections.emptyList; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID; @@ -22,15 +24,31 @@ import com.amazonaws.services.emrserverless.model.ValidationException; import java.util.HashMap; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; @ExtendWith(MockitoExtension.class) public class EmrServerlessClientImplTest { @Mock private AWSEMRServerless emrServerless; + @Mock private OpenSearchSettings settings; + + @BeforeEach + public void setUp() { + doReturn(emptyList()).when(settings).getSettings(); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L); + when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L); + LocalClusterState.state().setPluginSettings(settings); + Metrics.getInstance().registerDefaultMetrics(); + } + @Test void testStartJobRun() { StartJobRunResult response = new StartJobRunResult(); @@ -49,6 +67,25 @@ void testStartJobRun() { null)); } + @Test + void testStartJobRunWithErrorMetric() { + doThrow(new RuntimeException()).when(emrServerless).startJobRun(any()); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + Assertions.assertThrows( + RuntimeException.class, + () -> + emrServerlessClient.startJobRun( + new StartJobRequest( + QUERY, + EMRS_JOB_NAME, + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + SPARK_SUBMIT_PARAMETERS, + new HashMap<>(), + false, + null))); + } + @Test void testStartJobRunResultIndex() { StartJobRunResult response = new StartJobRunResult(); @@ -78,6 +115,15 @@ void testGetJobRunState() { emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123"); } + @Test + void testGetJobRunStateWithErrorMetric() { + doThrow(new RuntimeException()).when(emrServerless).getJobRun(any()); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + Assertions.assertThrows( + RuntimeException.class, + () -> emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123")); + } + @Test void testCancelJobRun() { when(emrServerless.cancelJobRun(any())) @@ -88,6 +134,14 @@ void testCancelJobRun() { Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); } + @Test + void testCancelJobRunWithErrorMetric() { + doThrow(new RuntimeException()).when(emrServerless).cancelJobRun(any()); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + Assertions.assertThrows( + RuntimeException.class, () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, "123")); + } + @Test void testCancelJobRunWithValidationException() { doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());