Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented support for ClusterMetrics monitors (#221) #342

Merged
merged 1 commit into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting

import org.opensearch.action.ActionRequest
Expand Down Expand Up @@ -30,6 +31,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -117,7 +119,7 @@ import java.util.function.Supplier
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY],
* [BucketLevelTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
* [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {

Expand Down Expand Up @@ -209,7 +211,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.elasticapi.convertToMap
import org.opensearch.alerting.elasticapi.suspendUntil
Expand All @@ -16,6 +17,8 @@ import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.executeTransportAction
import org.opensearch.alerting.util.toMap
import org.opensearch.client.Client
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
Expand Down Expand Up @@ -85,6 +88,11 @@ class InputService(
)
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}")
val response = executeTransportAction(input, client)
results += response.toMap()
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.alerting.model

import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.CronSchedule
import org.opensearch.alerting.core.model.Input
import org.opensearch.alerting.core.model.Schedule
Expand All @@ -15,6 +16,7 @@ import org.opensearch.alerting.elasticapi.optionalTimeField
import org.opensearch.alerting.elasticapi.optionalUserField
import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS
import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.alerting.util._ID
import org.opensearch.alerting.util._VERSION
Expand Down Expand Up @@ -69,6 +71,8 @@ data class Monitor(
require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" }
MonitorType.BUCKET_LEVEL_MONITOR ->
require(trigger is BucketLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" }
MonitorType.CLUSTER_METRICS_MONITOR ->
require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" }
}
}
if (enabled) {
Expand Down Expand Up @@ -113,7 +117,8 @@ data class Monitor(
// This is different from 'type' which denotes the Scheduled Job type
enum class MonitorType(val value: String) {
QUERY_LEVEL_MONITOR("query_level_monitor"),
BUCKET_LEVEL_MONITOR("bucket_level_monitor");
BUCKET_LEVEL_MONITOR("bucket_level_monitor"),
CLUSTER_METRICS_MONITOR("cluster_metrics_monitor");

override fun toString(): String {
return value
Expand Down Expand Up @@ -250,7 +255,10 @@ data class Monitor(
INPUTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
inputs.add(Input.parse(xcp))
val input = Input.parse(xcp)
if (input is ClusterMetricsInput)
SupportedClusterMetricsSettings.validateApiType(input)
inputs.add(input)
}
}
TRIGGERS_FIELD -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.settings

import org.opensearch.action.ActionRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.json.JsonXContent

/**
* A class that supports storing a unique set of API paths that can be accessed by general users.
*/
class SupportedClusterMetricsSettings {
companion object {
const val RESOURCE_FILE = "supported_json_payloads.json"

/**
* The key in this map represents the path to call an API.
*
* NOTE: Paths should conform to the following pattern:
* "/_cluster/stats"
*
* The value in these maps represents a path root mapped to a list of paths to field values.
* If the value mapped to an API is an empty map, no fields will be redacted from the API response.
*
* NOTE: Keys in this map should consist of root components of the response body; e.g.,:
* "indices"
*
* Values in these maps should consist of the remaining fields in the path
* to the supported value separated by periods; e.g.,:
* "shards.total",
* "shards.index.shards.min"
*
* In this example for ClusterStats, the response will only include
* the values at the end of these two paths:
* "/_cluster/stats": {
* "indices": [
* "shards.total",
* "shards.index.shards.min"
* ]
* }
*/
private var supportedApiList = HashMap<String, Map<String, ArrayList<String>>>()

init {
val supportedJsonPayloads = SupportedClusterMetricsSettings::class.java.getResource(RESOURCE_FILE)

@Suppress("UNCHECKED_CAST")
if (supportedJsonPayloads != null)
supportedApiList = XContentHelper.convertToMap(JsonXContent.jsonXContent, supportedJsonPayloads.readText(), false) as HashMap<String, Map<String, ArrayList<String>>>
}

/**
* Returns the map of all supported json payload associated with the provided path from supportedApiList.
* @param path The path for the requested API.
* @return The map of the supported json payload for the requested API.
* @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key.
*/
fun getSupportedJsonPayload(path: String): Map<String, ArrayList<String>> {
return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList.")
}

/**
* Will return an [ActionRequest] for the API associated with that path.
* Will otherwise throw an exception.
* @param clusterMetricsInput The [ClusterMetricsInput] to resolve.
* @throws IllegalArgumentException when the requested API is not supported.
* @return The [ActionRequest] for the API associated with the provided [ClusterMetricsInput].
*/
fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest {
val pathParams = clusterMetricsInput.parsePathParams()
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest()
ClusterMetricType.CAT_RECOVERY -> {
if (pathParams.isEmpty()) return RecoveryRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return RecoveryRequest(*pathParamsArray)
}
ClusterMetricType.CAT_SNAPSHOTS -> {
return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS))
}
ClusterMetricType.CAT_TASKS -> ListTasksRequest()
ClusterMetricType.CLUSTER_HEALTH -> {
if (pathParams.isEmpty()) return ClusterHealthRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return ClusterHealthRequest(*pathParamsArray)
}
ClusterMetricType.CLUSTER_SETTINGS -> ClusterStateRequest().routingTable(false).nodes(false)
ClusterMetricType.CLUSTER_STATS -> {
if (pathParams.isEmpty()) return ClusterStatsRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return ClusterStatsRequest(*pathParamsArray)
}
ClusterMetricType.NODES_STATS -> NodesStatsRequest().addMetrics(
"os",
"process",
"jvm",
"thread_pool",
"fs",
"transport",
"http",
"breaker",
"script",
"discovery",
"ingest",
"adaptive_selection",
"script_cache",
"indexing_pressure",
"shard_indexing_pressure"
)
else -> throw IllegalArgumentException("Unsupported API.")
}
}

/**
* Confirms whether the provided path is in [supportedApiList].
* Throws an exception if the provided path is not on the list; otherwise performs no action.
* @param clusterMetricsInput The [ClusterMetricsInput] to validate.
* @throws IllegalArgumentException when supportedApiList does not contain the provided path.
*/
fun validateApiType(clusterMetricsInput: ClusterMetricsInput) {
if (!supportedApiList.keys.contains(clusterMetricsInput.clusterMetricType.defaultPath))
throw IllegalArgumentException("API path not in supportedApiList.")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util

import org.opensearch.action.ActionResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse
import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType
import org.opensearch.alerting.elasticapi.convertToMap
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.support.XContentMapValues

/**
* Calls the appropriate transport action for the API requested in the [clusterMetricsInput].
* @param clusterMetricsInput The [ClusterMetricsInput] to resolve.
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricType.CLUSTER_SETTINGS -> {
val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}

/**
* Populates a [HashMap] with the values in the [ActionResponse].
* @return The [ActionResponse] values formatted in a [HashMap].
* @throws IllegalArgumentException when the [ActionResponse] is not supported by this feature.
*/
fun ActionResponse.toMap(): Map<String, Any> {
return when (this) {
is ClusterHealthResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_HEALTH.defaultPath)
)
is ClusterStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_STATS.defaultPath)
)
is ClusterGetSettingsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_SETTINGS.defaultPath)
)
is NodesStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.NODES_STATS.defaultPath)
)
is PendingClusterTasksResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_PENDING_TASKS.defaultPath)
)
is RecoveryResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_RECOVERY.defaultPath)
)
is GetSnapshotsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_SNAPSHOTS.defaultPath)
)
is ListTasksResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_TASKS.defaultPath)
)
else -> throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}")
}
}

/**
* Populates a [HashMap] with only the values that support being exposed to users.
* @param mappedActionResponse The response from the [ClusterMetricsInput] API call.
* @param supportedJsonPayload The JSON payload as configured in [SupportedClusterMetricsSettings.RESOURCE_FILE].
* @return The response values [HashMap] without the redacted fields.
*/
@Suppress("UNCHECKED_CAST")
fun redactFieldsFromResponse(
mappedActionResponse: Map<String, Any>,
supportedJsonPayload: Map<String, ArrayList<String>>
): Map<String, Any> {
return when {
supportedJsonPayload.isEmpty() -> mappedActionResponse
else -> {
val output = hashMapOf<String, Any>()
for ((key, value) in supportedJsonPayload) {
when (val mappedValue = mappedActionResponse[key]) {
is Map<*, *> -> output[key] = XContentMapValues.filter(
mappedActionResponse[key] as MutableMap<String, *>?,
value.toTypedArray(), arrayOf()
)
else -> output[key] = mappedValue ?: hashMapOf<String, Any>()
}
}
output
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"/_cat/pending_tasks": {},
"/_cat/recovery": {},
"/_cat/snapshots": {},
"/_cat/tasks": {},
"/_cluster/health": {},
"/_cluster/settings": {},
"/_cluster/stats": {},
"/_nodes/stats": {}
}
Loading