From 35215296e48b16760f86fbe472e218af1940b2f4 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 2 Aug 2023 16:42:57 +0000 Subject: [PATCH] Update actionGet to SuspendUntil for ClusterMetrics (#1067) Signed-off-by: Ashish Agrawal (cherry picked from commit 3baf51a60cf8643e57252f979055d07be77faf18) Signed-off-by: github-actions[bot] --- ...pportedClusterMetricsSettingsExtensions.kt | 34 ++++++++++--------- .../CatIndicesWrappersIT.kt | 4 +-- .../CatShardsWrappersIT.kt | 4 +-- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt index bcfb1e94f..3be5c1f78 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/SupportedClusterMetricsSettingsExtensions.kt @@ -26,6 +26,7 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest import org.opensearch.client.Client @@ -42,49 +43,50 @@ import kotlin.collections.HashMap * @param client The [Client] used to call the respective transport action. * @throws IllegalArgumentException When the requested API is not supported by this feature. */ -fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { +suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { val request = resolveToActionRequest(clusterMetricsInput) return when (clusterMetricsInput.clusterMetricType) { ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> { request as CatIndicesRequestWrapper - val healthResponse: ClusterHealthResponse = - client.admin().cluster().health(request.clusterHealthRequest).get() + val healthResponse: ClusterHealthResponse = client.suspendUntil { admin().cluster().health(request.clusterHealthRequest, it) } val indexSettingsResponse: GetSettingsResponse = - client.admin().indices().getSettings(request.indexSettingsRequest).get() + client.suspendUntil { admin().indices().getSettings(request.indexSettingsRequest, it) } val indicesResponse: IndicesStatsResponse = - client.admin().indices().stats(request.indicesStatsRequest).get() + client.suspendUntil { admin().indices().stats(request.indicesStatsRequest, it) } val stateResponse: ClusterStateResponse = - client.admin().cluster().state(request.clusterStateRequest).get() + client.suspendUntil { admin().cluster().state(request.clusterStateRequest, it) } return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse) } ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> - client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get() + client.suspendUntil { + admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest, it) + } ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> - client.admin().indices().recoveries(request as RecoveryRequest).get() + client.suspendUntil { admin().indices().recoveries(request as RecoveryRequest, it) } ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> { request as CatShardsRequestWrapper val stateResponse: ClusterStateResponse = - client.admin().cluster().state(request.clusterStateRequest).get() + client.suspendUntil { admin().cluster().state(request.clusterStateRequest, it) } val indicesResponse: IndicesStatsResponse = - client.admin().indices().stats(request.indicesStatsRequest).get() + client.suspendUntil { admin().indices().stats(request.indicesStatsRequest, it) } return CatShardsResponseWrapper(stateResponse, indicesResponse) } ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> - client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get() + client.suspendUntil { admin().cluster().getSnapshots(request as GetSnapshotsRequest, it) } ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> - client.admin().cluster().listTasks(request as ListTasksRequest).get() + client.suspendUntil { admin().cluster().listTasks(request as ListTasksRequest, it) } ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> - client.admin().cluster().health(request as ClusterHealthRequest).get() + client.suspendUntil { admin().cluster().health(request as ClusterHealthRequest, it) } ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> { val stateResponse: ClusterStateResponse = - client.admin().cluster().state(request as ClusterStateRequest).get() + client.suspendUntil { admin().cluster().state(request as ClusterStateRequest, it) } val metadata: Metadata = stateResponse.state.metadata return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY) } ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> - client.admin().cluster().clusterStats(request as ClusterStatsRequest).get() + client.suspendUntil { admin().cluster().clusterStats(request as ClusterStatsRequest, it) } ClusterMetricsInput.ClusterMetricType.NODES_STATS -> - client.admin().cluster().nodesStats(request as NodesStatsRequest).get() + client.suspendUntil { admin().cluster().nodesStats(request as NodesStatsRequest, it) } else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}") } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt index c63ebb51f..9712b4213 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesWrappersIT.kt @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) } } - fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { + suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() { } } - fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { + suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt index d945e9b2b..c8b5db561 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsWrappersIT.kt @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) } } - fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { + suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10) @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() { } } - fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { + suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() { // GIVEN val testIndices = (1..5).map { "test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)