Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temp refactor cluster metrics monitor suspendUntil() to get(). #1015

Merged
merged 6 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class CatIndicesRequestWrapper(val pathParams: String = "") : ActionRequest() {
if (pathParams.isNotBlank()) {
indicesList = pathParams.split(",").toTypedArray()

require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." }
require(validate() == null) {
"The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases."
}

clusterHealthRequest = clusterHealthRequest.indices(*indicesList)
clusterStateRequest = clusterStateRequest.indices(*indicesList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class CatShardsRequestWrapper(val pathParams: String = "") : ActionRequest() {
if (pathParams.isNotBlank()) {
indicesList = pathParams.split(",").toTypedArray()

require(validate() == null) { "The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases." }
require(validate() == null) {
"The path parameters do not form a valid, comma-separated list of data streams, indices, or index aliases."
}

clusterStateRequest = clusterStateRequest.indices(*indicesList)
indicesStatsRequest = indicesStatsRequest.indices(*indicesList)
Expand Down Expand Up @@ -173,7 +175,8 @@ class CatShardsResponseWrapper(
searchScrollTotal = getOrNull(commonStats, CommonStats::getSearch, { it.total.scrollCount })?.toString(),
segmentsCount = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)?.toString(),
segmentsMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)?.toString(),
segmentsIndexWriterMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(),
segmentsIndexWriterMemory =
getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)?.toString(),
segmentsVersionMapMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)?.toString(),
fixedBitsetMemory = getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)?.toString(),
globalCheckpoint = getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)?.toString(),
Expand Down Expand Up @@ -211,7 +214,8 @@ class CatShardsResponseWrapper(
shardInfo = shardInfo.copy(
unassignedReason = shard.unassignedInfo().reason.name,
unassignedAt = UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime),
unassignedFor = TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep,
unassignedFor =
TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().unassignedTimeInMillis).stringRep,
unassignedDetails = shard.unassignedInfo().details
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
Expand All @@ -43,49 +42,49 @@ import kotlin.collections.HashMap
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) }
client.admin().cluster().health(request.clusterHealthRequest).get()
val indexSettingsResponse: GetSettingsResponse =
client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) }
client.admin().indices().getSettings(request.indexSettingsRequest).get()
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
client.admin().indices().stats(request.indicesStatsRequest).get()
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
client.admin().cluster().state(request.clusterStateRequest).get()
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) }
client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) }
client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
client.admin().cluster().state(request.clusterStateRequest).get()
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
client.admin().indices().stats(request.indicesStatsRequest).get()
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) }
client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) }
client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) }
client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) }
client.admin().cluster().state(request as ClusterStateRequest).get()
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) }
client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) }
client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) }
}

suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down