Skip to content

Commit

Permalink
Implemented support for ClusterMetrics monitors (opensearch-project#221
Browse files Browse the repository at this point in the history
…) (opensearch-project#342)

* Implemented support for LocalUriInput monitors.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored feature naming convention from LocalUriInput to ClusterMetricsInput. Added Cluster Metrics as a new monitor type to align with frontend experience.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Implemented randomClusterMetricsMonitor test helper, and refactored tests accordingly.

Signed-off-by: AWSHurneyt <[email protected]>

* Renamed some assets to align with the new name for this feature.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored cluster metrics feature to remove support for Cat repositories API.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored supported JSON payload to return all response fields.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored nodes stats request object to return all metrics.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored nodes stats request object to return all metrics.

Signed-off-by: AWSHurneyt <[email protected]>

* Removing unused connectionTimeout and socketTimeout params.

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Mar 11, 2022
1 parent 9dd93af commit 247f502
Show file tree
Hide file tree
Showing 12 changed files with 1,296 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting

import org.opensearch.action.ActionRequest
Expand Down Expand Up @@ -30,6 +31,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -117,7 +119,7 @@ import java.util.function.Supplier
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY],
* [BucketLevelTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
* [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {

Expand Down Expand Up @@ -209,7 +211,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.elasticapi.convertToMap
import org.opensearch.alerting.elasticapi.suspendUntil
Expand All @@ -16,6 +17,8 @@ import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.executeTransportAction
import org.opensearch.alerting.util.toMap
import org.opensearch.client.Client
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
Expand Down Expand Up @@ -85,6 +88,11 @@ class InputService(
)
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}")
val response = executeTransportAction(input, client)
results += response.toMap()
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
12 changes: 10 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.alerting.model

import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.CronSchedule
import org.opensearch.alerting.core.model.Input
import org.opensearch.alerting.core.model.Schedule
Expand All @@ -15,6 +16,7 @@ import org.opensearch.alerting.elasticapi.optionalTimeField
import org.opensearch.alerting.elasticapi.optionalUserField
import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS
import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.alerting.util._ID
import org.opensearch.alerting.util._VERSION
Expand Down Expand Up @@ -69,6 +71,8 @@ data class Monitor(
require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" }
MonitorType.BUCKET_LEVEL_MONITOR ->
require(trigger is BucketLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" }
MonitorType.CLUSTER_METRICS_MONITOR ->
require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" }
}
}
if (enabled) {
Expand Down Expand Up @@ -113,7 +117,8 @@ data class Monitor(
// This is different from 'type' which denotes the Scheduled Job type
enum class MonitorType(val value: String) {
QUERY_LEVEL_MONITOR("query_level_monitor"),
BUCKET_LEVEL_MONITOR("bucket_level_monitor");
BUCKET_LEVEL_MONITOR("bucket_level_monitor"),
CLUSTER_METRICS_MONITOR("cluster_metrics_monitor");

override fun toString(): String {
return value
Expand Down Expand Up @@ -250,7 +255,10 @@ data class Monitor(
INPUTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
inputs.add(Input.parse(xcp))
val input = Input.parse(xcp)
if (input is ClusterMetricsInput)
SupportedClusterMetricsSettings.validateApiType(input)
inputs.add(input)
}
}
TRIGGERS_FIELD -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.settings

import org.opensearch.action.ActionRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.json.JsonXContent

/**
* A class that supports storing a unique set of API paths that can be accessed by general users.
*/
class SupportedClusterMetricsSettings {
companion object {
const val RESOURCE_FILE = "supported_json_payloads.json"

/**
* The key in this map represents the path to call an API.
*
* NOTE: Paths should conform to the following pattern:
* "/_cluster/stats"
*
* The value in these maps represents a path root mapped to a list of paths to field values.
* If the value mapped to an API is an empty map, no fields will be redacted from the API response.
*
* NOTE: Keys in this map should consist of root components of the response body; e.g.,:
* "indices"
*
* Values in these maps should consist of the remaining fields in the path
* to the supported value separated by periods; e.g.,:
* "shards.total",
* "shards.index.shards.min"
*
* In this example for ClusterStats, the response will only include
* the values at the end of these two paths:
* "/_cluster/stats": {
* "indices": [
* "shards.total",
* "shards.index.shards.min"
* ]
* }
*/
private var supportedApiList = HashMap<String, Map<String, ArrayList<String>>>()

init {
val supportedJsonPayloads = SupportedClusterMetricsSettings::class.java.getResource(RESOURCE_FILE)

@Suppress("UNCHECKED_CAST")
if (supportedJsonPayloads != null)
supportedApiList = XContentHelper.convertToMap(JsonXContent.jsonXContent, supportedJsonPayloads.readText(), false) as HashMap<String, Map<String, ArrayList<String>>>
}

/**
* Returns the map of all supported json payload associated with the provided path from supportedApiList.
* @param path The path for the requested API.
* @return The map of the supported json payload for the requested API.
* @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key.
*/
fun getSupportedJsonPayload(path: String): Map<String, ArrayList<String>> {
return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList.")
}

/**
* Will return an [ActionRequest] for the API associated with that path.
* Will otherwise throw an exception.
* @param clusterMetricsInput The [ClusterMetricsInput] to resolve.
* @throws IllegalArgumentException when the requested API is not supported.
* @return The [ActionRequest] for the API associated with the provided [ClusterMetricsInput].
*/
fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest {
val pathParams = clusterMetricsInput.parsePathParams()
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest()
ClusterMetricType.CAT_RECOVERY -> {
if (pathParams.isEmpty()) return RecoveryRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return RecoveryRequest(*pathParamsArray)
}
ClusterMetricType.CAT_SNAPSHOTS -> {
return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS))
}
ClusterMetricType.CAT_TASKS -> ListTasksRequest()
ClusterMetricType.CLUSTER_HEALTH -> {
if (pathParams.isEmpty()) return ClusterHealthRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return ClusterHealthRequest(*pathParamsArray)
}
ClusterMetricType.CLUSTER_SETTINGS -> ClusterStateRequest().routingTable(false).nodes(false)
ClusterMetricType.CLUSTER_STATS -> {
if (pathParams.isEmpty()) return ClusterStatsRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return ClusterStatsRequest(*pathParamsArray)
}
ClusterMetricType.NODES_STATS -> NodesStatsRequest().addMetrics(
"os",
"process",
"jvm",
"thread_pool",
"fs",
"transport",
"http",
"breaker",
"script",
"discovery",
"ingest",
"adaptive_selection",
"script_cache",
"indexing_pressure",
"shard_indexing_pressure"
)
else -> throw IllegalArgumentException("Unsupported API.")
}
}

/**
* Confirms whether the provided path is in [supportedApiList].
* Throws an exception if the provided path is not on the list; otherwise performs no action.
* @param clusterMetricsInput The [ClusterMetricsInput] to validate.
* @throws IllegalArgumentException when supportedApiList does not contain the provided path.
*/
fun validateApiType(clusterMetricsInput: ClusterMetricsInput) {
if (!supportedApiList.keys.contains(clusterMetricsInput.clusterMetricType.defaultPath))
throw IllegalArgumentException("API path not in supportedApiList.")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util

import org.opensearch.action.ActionResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse
import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType
import org.opensearch.alerting.elasticapi.convertToMap
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.support.XContentMapValues

/**
* Calls the appropriate transport action for the API requested in the [clusterMetricsInput].
* @param clusterMetricsInput The [ClusterMetricsInput] to resolve.
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricType.CLUSTER_SETTINGS -> {
val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}

/**
* Populates a [HashMap] with the values in the [ActionResponse].
* @return The [ActionResponse] values formatted in a [HashMap].
* @throws IllegalArgumentException when the [ActionResponse] is not supported by this feature.
*/
fun ActionResponse.toMap(): Map<String, Any> {
return when (this) {
is ClusterHealthResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_HEALTH.defaultPath)
)
is ClusterStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_STATS.defaultPath)
)
is ClusterGetSettingsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_SETTINGS.defaultPath)
)
is NodesStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.NODES_STATS.defaultPath)
)
is PendingClusterTasksResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_PENDING_TASKS.defaultPath)
)
is RecoveryResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_RECOVERY.defaultPath)
)
is GetSnapshotsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_SNAPSHOTS.defaultPath)
)
is ListTasksResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_TASKS.defaultPath)
)
else -> throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}")
}
}

/**
* Populates a [HashMap] with only the values that support being exposed to users.
* @param mappedActionResponse The response from the [ClusterMetricsInput] API call.
* @param supportedJsonPayload The JSON payload as configured in [SupportedClusterMetricsSettings.RESOURCE_FILE].
* @return The response values [HashMap] without the redacted fields.
*/
@Suppress("UNCHECKED_CAST")
fun redactFieldsFromResponse(
mappedActionResponse: Map<String, Any>,
supportedJsonPayload: Map<String, ArrayList<String>>
): Map<String, Any> {
return when {
supportedJsonPayload.isEmpty() -> mappedActionResponse
else -> {
val output = hashMapOf<String, Any>()
for ((key, value) in supportedJsonPayload) {
when (val mappedValue = mappedActionResponse[key]) {
is Map<*, *> -> output[key] = XContentMapValues.filter(
mappedActionResponse[key] as MutableMap<String, *>?,
value.toTypedArray(), arrayOf()
)
else -> output[key] = mappedValue ?: hashMapOf<String, Any>()
}
}
output
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"/_cat/pending_tasks": {},
"/_cat/recovery": {},
"/_cat/snapshots": {},
"/_cat/tasks": {},
"/_cluster/health": {},
"/_cluster/settings": {},
"/_cluster/stats": {},
"/_nodes/stats": {}
}
Loading

0 comments on commit 247f502

Please sign in to comment.