Skip to content

Commit

Permalink
Added session, statement, emrjob metrics to sql stats api
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Oct 31, 2023
1 parent d3ce049 commit ca4a23a
Show file tree
Hide file tree
Showing 22 changed files with 202 additions and 6 deletions.
1 change: 1 addition & 0 deletions datasources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation project(':core')
implementation project(':protocol')
implementation project(':opensearch')
implementation project(':legacy')
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.opensearch.sql.datasources.transport.*;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.datasources.utils.XContentParserUtils;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;

public class RestDataSourceQueryAction extends BaseRestHandler {

Expand Down Expand Up @@ -133,7 +135,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {

DataSourceMetadata dataSourceMetadata =
XContentParserUtils.toDataSourceMetadata(restRequest.contentParser());
return restChannel ->
Expand Down Expand Up @@ -282,8 +283,14 @@ private void handleException(Exception e, RestChannel restChannel) {
} else {
LOG.error("Error happened during request handling", e);
if (isClientError(e)) {
Metrics.getInstance()
.getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS)
.increment();
reportError(restChannel, e, BAD_REQUEST);
} else {
Metrics.getInstance()
.getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS)
.increment();
reportError(restChannel, e, SERVICE_UNAVAILABLE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected Object buildJsonObject(String response) {
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {

actionListener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -53,6 +55,7 @@ protected void doExecute(
Task task,
DeleteDataSourceActionRequest request,
ActionListener<DeleteDataSourceActionResponse> actionListener) {
Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT).increment();
try {
dataSourceService.deleteDataSource(request.getDataSourceName());
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -56,6 +58,7 @@ protected void doExecute(
Task task,
GetDataSourceActionRequest request,
ActionListener<GetDataSourceActionResponse> actionListener) {
Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT).increment();
try {
String responseContent;
if (request.getDataSourceName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -57,6 +59,7 @@ protected void doExecute(
Task task,
PatchDataSourceActionRequest request,
ActionListener<PatchDataSourceActionResponse> actionListener) {
Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT).increment();
try {
dataSourceService.patchDataSource(request.getDataSourceData());
String responseContent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -56,6 +58,7 @@ protected void doExecute(
Task task,
UpdateDataSourceActionRequest request,
ActionListener<UpdateDataSourceActionResponse> actionListener) {
Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT).increment();
try {
dataSourceService.updateDataSource(request.getDataSourceMetadata());
String responseContent =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.opensearch.sql.datasources.transport;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -21,11 +23,14 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -56,6 +61,12 @@ public void setUp() {
transportService, new ActionFilters(new HashSet<>()), dataSourceService, settings);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(20);
// Required for metrics initialization
doReturn(emptyList()).when(settings).getSettings();
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L);
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L);
LocalClusterState.state().setPluginSettings(settings);
Metrics.getInstance().registerDefaultMetrics();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.opensearch.sql.datasources.transport;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.HashSet;
import org.junit.jupiter.api.Assertions;
Expand All @@ -16,9 +19,13 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -31,6 +38,8 @@ public class TransportDeleteDataSourceActionTest {
@Mock private Task task;
@Mock private ActionListener<DeleteDataSourceActionResponse> actionListener;

@Mock private OpenSearchSettings settings;

@Captor
private ArgumentCaptor<DeleteDataSourceActionResponse>
deleteDataSourceActionResponseArgumentCaptor;
Expand All @@ -42,6 +51,12 @@ public void setUp() {
action =
new TransportDeleteDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
// Required for metrics initialization
doReturn(emptyList()).when(settings).getSettings();
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L);
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L);
LocalClusterState.state().setPluginSettings(settings);
Metrics.getInstance().registerDefaultMetrics();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.sql.datasources.transport;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -22,11 +24,15 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand All @@ -45,11 +51,19 @@ public class TransportGetDataSourceActionTest {

@Captor private ArgumentCaptor<Exception> exceptionArgumentCaptor;

@Mock private OpenSearchSettings settings;

@BeforeEach
public void setUp() {
action =
new TransportGetDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
// Required for metrics initialization
doReturn(emptyList()).when(settings).getSettings();
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L);
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L);
LocalClusterState.state().setPluginSettings(settings);
Metrics.getInstance().registerDefaultMetrics();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.sql.datasources.transport;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.*;

Expand All @@ -17,9 +18,13 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -36,12 +41,19 @@ public class TransportPatchDataSourceActionTest {
private ArgumentCaptor<PatchDataSourceActionResponse> patchDataSourceActionResponseArgumentCaptor;

@Captor private ArgumentCaptor<Exception> exceptionArgumentCaptor;
@Mock private OpenSearchSettings settings;

@BeforeEach
public void setUp() {
action =
new TransportPatchDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
// Required for metrics initialization
doReturn(emptyList()).when(settings).getSettings();
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L);
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L);
LocalClusterState.state().setPluginSettings(settings);
Metrics.getInstance().registerDefaultMetrics();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.opensearch.sql.datasources.transport;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.HashSet;
import org.junit.jupiter.api.Assertions;
Expand All @@ -16,11 +19,15 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -32,6 +39,7 @@ public class TransportUpdateDataSourceActionTest {
@Mock private DataSourceServiceImpl dataSourceService;
@Mock private Task task;
@Mock private ActionListener<UpdateDataSourceActionResponse> actionListener;
@Mock private OpenSearchSettings settings;

@Captor
private ArgumentCaptor<UpdateDataSourceActionResponse>
Expand All @@ -44,6 +52,12 @@ public void setUp() {
action =
new TransportUpdateDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
// Required for metrics initialization
doReturn(emptyList()).when(settings).getSettings();
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L);
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L);
LocalClusterState.state().setPluginSettings(settings);
Metrics.getInstance().registerDefaultMetrics();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ public static Metric createMetric(MetricName name) {
case PPL_REQ_COUNT_TOTAL:
case PPL_FAILED_REQ_COUNT_CUS:
case PPL_FAILED_REQ_COUNT_SYS:
case DATASOURCE_CREATION_REQ_COUNT:
case DATASOURCE_GET_REQ_COUNT:
case DATASOURCE_PUT_REQ_COUNT:
case DATASOURCE_PATCH_REQ_COUNT:
case DATASOURCE_DELETE_REQ_COUNT:
case DATASOURCE_FAILED_REQ_COUNT_SYS:
case DATASOURCE_FAILED_REQ_COUNT_CUS:
case EMR_GET_JOB_RESULT_FAILURE_COUNT:
case EMR_START_JOB_REQUEST_FAILURE_COUNT:
case EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT:
case EMR_BATCH_QUERY_JOBS_CREATION_COUNT:
case EMR_STREAMING_QUERY_JOBS_CREATION_COUNT:
case EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT:
return new NumericMetric<>(name.getName(), new RollingCounter());
default:
return new NumericMetric<>(name.getName(), new BasicCounter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,21 @@ public enum MetricName {
PPL_REQ_COUNT_TOTAL("ppl_request_count"),
PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"),
PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr"),
DATASOURCE_REQ_COUNT("datasource_request_count"),
DATASOURCE_CREATION_REQ_COUNT("datasource_create_request_count"),
DATASOURCE_GET_REQ_COUNT("datasource_get_request_count"),
DATASOURCE_PUT_REQ_COUNT("datasource_put_request_count"),
DATASOURCE_PATCH_REQ_COUNT("datasource_patch_request_count"),
DATASOURCE_DELETE_REQ_COUNT("datasource_delete_request_count"),
DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"),
DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr");
DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"),
EMR_START_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"),
EMR_GET_JOB_RESULT_FAILURE_COUNT("emr_start_job_request_failure_count"),
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
ACTIVE_ASYNC_QUERY_SESSION_COUNT("active_async_query_session_count"),
ACTIVE_ASYNC_QUERY_STATEMENT_COUNT("active_async_query_statement_count");

private String name;

Expand Down
Loading

0 comments on commit ca4a23a

Please sign in to comment.