Skip to content

Commit

Permalink
[Backport 2.x] Implemented support for configuring a cluster metrics …
Browse files Browse the repository at this point in the history
…monitor to call cat/indices, and cat/shards. #992 (#1009)

* Implemented support for configuring a cluster metrics monitor to call cat/indices, and cat/shards. (#992)

* Implemented support for configuring a cluster metrics monitor to call cat/indices, and cat/shards.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored executeTransportAction to use suspendUntil() instead of get() to receive responses.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>

* Resolved merge conflicts.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored API calls from suspendUntil() to get().

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt authored Jul 12, 2023
1 parent d2d03c6 commit 84e8b00
Show file tree
Hide file tree
Showing 10 changed files with 1,759 additions and 21 deletions.
16 changes: 6 additions & 10 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.toMap
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -91,10 +91,8 @@ class InputService(

val searchSource = scriptService.compile(
Script(
ScriptType.INLINE,
Script.DEFAULT_TEMPLATE_LANG,
rewrittenQuery.toString(),
searchParams
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
rewrittenQuery.toString(), searchParams
),
TemplateScript.CONTEXT
)
Expand Down Expand Up @@ -185,10 +183,8 @@ class InputService(
val searchParams = mapOf("period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE,
Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(),
searchParams
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
),
TemplateScript.CONTEXT
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesRequestWrapper
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatShardsRequestWrapper
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.alerting.model.ClusterMetricsInput
Expand Down Expand Up @@ -85,12 +87,14 @@ class SupportedClusterMetricsSettings : org.opensearch.commons.alerting.settings
fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest {
val pathParams = clusterMetricsInput.parsePathParams()
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_INDICES -> CatIndicesRequestWrapper(pathParams)
ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest()
ClusterMetricType.CAT_RECOVERY -> {
if (pathParams.isEmpty()) return RecoveryRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return RecoveryRequest(*pathParamsArray)
}
ClusterMetricType.CAT_SHARDS -> CatShardsRequestWrapper(pathParams)
ClusterMetricType.CAT_SNAPSHOTS -> {
return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.opensearch.index.IndexNotFoundException
class IndexUtils {

companion object {
val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""")

const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util
package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.ActionResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
Expand All @@ -16,19 +16,25 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.support.XContentMapValues
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Calls the appropriate transport action for the API requested in the [clusterMetricsInput].
Expand All @@ -39,18 +45,46 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster()
.pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.admin().cluster().health(request.clusterHealthRequest).get()
val indexSettingsResponse: GetSettingsResponse =
client.admin().indices().getSettings(request.indexSettingsRequest).get()
val indicesResponse: IndicesStatsResponse =
client.admin().indices().stats(request.indicesStatsRequest).get()
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request.clusterStateRequest).get()
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request.clusterStateRequest).get()
val indicesResponse: IndicesStatsResponse =
client.admin().indices().stats(request.indicesStatsRequest).get()
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request as ClusterStateRequest).get()
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand All @@ -74,6 +108,14 @@ fun ActionResponse.toMap(): Map<String, Any> {
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS.defaultPath)
)
is CatIndicesResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath)
)
is CatShardsResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_SHARDS.defaultPath)
)
is NodesStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.NODES_STATS.defaultPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"/_cat/indices": {},
"/_cat/pending_tasks": {},
"/_cat/recovery": {},
"/_cat/shards": {},
"/_cat/snapshots": {},
"/_cat/tasks": {},
"/_cluster/health": {},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.randomClusterMetricsInput
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesResponseWrapper.Companion.WRAPPER_FIELD
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.core.common.Strings
import org.opensearch.test.OpenSearchSingleNodeTestCase

class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
private val path = ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath

fun `test CatIndicesRequestWrapper validate valid pathParams`() {
// GIVEN
val pathParams = "index1,index-name-2,index-3"

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertEquals(3, requestWrapper.clusterHealthRequest.indices().size)
assertEquals(3, requestWrapper.clusterStateRequest.indices().size)
assertEquals(3, requestWrapper.indexSettingsRequest.indices().size)
assertEquals(3, requestWrapper.indicesStatsRequest.indices().size)
}

fun `test CatIndicesRequestWrapper validate without providing pathParams`() {
// GIVEN & WHEN
val requestWrapper = CatIndicesRequestWrapper()

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate blank pathParams`() {
// GIVEN
val pathParams = " "

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate empty pathParams`() {
// GIVEN
val pathParams = ""

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate invalid pathParams`() {
// GIVEN
val pathParams = "_index1,index^2"

// WHEN & THEN
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

/*
Creating a subset of indices to use for the pathParams to test that all indices on the cluster ARE NOT returned.
*/
val pathParamsIndices = testIndices.keys.toList().subList(1, testIndices.size - 1)
val pathParams = pathParamsIndices.joinToString(",")
val input = randomClusterMetricsInput(path = path, pathParams = pathParams)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(pathParamsIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
if (pathParamsIndices.contains(indexName)) {
assertEquals(
indexName,
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String
)
assertEquals(
docCount.toString(),
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String
)
}
}
}

fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

val input = randomClusterMetricsInput(path = path)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(testIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
assertEquals(
indexName,
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String
)
assertEquals(
docCount.toString(),
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String
)
}
}

private fun indexDoc(index: String, id: String, doc: String) {
client().prepareIndex(index).setId(id)
.setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get()
}
}
Loading

0 comments on commit 84e8b00

Please sign in to comment.