Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ktlint formatting issues #156

Merged
merged 1 commit into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ class AlertService(

return monitor.triggers.associateWith { trigger ->
// Default to an empty map if there are no Alerts found for a Trigger to make Alert categorization logic easier
(foundAlerts[trigger.id]?.mapNotNull { alert ->
alert.aggregationResultBucket?.let { it.getBucketKeysHash() to alert }
}?.toMap() ?: mutableMapOf()) as MutableMap<String, Alert>
(
foundAlerts[trigger.id]?.mapNotNull { alert ->
alert.aggregationResultBucket?.let { it.getBucketKeysHash() to alert }
}?.toMap() ?: mutableMapOf()
) as MutableMap<String, Alert>
}
}

Expand All @@ -122,38 +124,52 @@ class AlertService(
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
currentAlert?.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
currentAlert.copy(
state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
}
}

Expand All @@ -172,8 +188,11 @@ class AlertService(
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
Expand Down Expand Up @@ -221,10 +240,12 @@ class AlertService(
} else {
// New Alert
// TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here
val newAlert = Alert(monitor = monitor, trigger = trigger, startTime = currentTime,
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket)
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket
)
newAlerts.add(newAlert)
}
}
Expand All @@ -238,8 +259,10 @@ class AlertService(
fun convertToCompletedAlerts(currentAlerts: Map<String, Alert>?): List<Alert> {
val currentTime = Instant.now()
return currentAlerts?.map {
it.value.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
it.value.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} ?: listOf()
}

Expand All @@ -251,10 +274,12 @@ class AlertService(
// spend time reloading the alert and writing it back.
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null))
listOf<DocWriteRequest<*>>(
IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
}
Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
Expand Down Expand Up @@ -354,8 +379,10 @@ class AlertService(
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,18 @@ class InputService(
when (input) {
is SearchInput -> {
// TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient
val searchParams = mapOf("period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli())
val searchParams = mapOf(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
AggregationQueryRewriter.rewriteQuery(input.query, prevResult, monitor.triggers)
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()

Expand Down Expand Up @@ -100,8 +107,13 @@ class InputService(
val input = monitor.inputs[0] as SearchInput

val searchParams = mapOf("period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ class TriggerService(val scriptService: ScriptService) {
return try {
val bucketIndices =
((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)[trigger.id] as HashMap<*, *>)[BUCKET_INDICES] as List<*>
val parentBucketPath = ((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
.get(trigger.id) as HashMap<*, *>)[PARENT_BUCKET_PATH] as String
val parentBucketPath = (
(ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
.get(trigger.id) as HashMap<*, *>
)[PARENT_BUCKET_PATH] as String
val aggregationPath = AggregationPath.parse(parentBucketPath)
// TODO test this part by passing sub-aggregation path
var parentAgg = (ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ class BucketSelectorExtAggregationBuilder :
if (other == null || javaClass != other.javaClass) return false
if (!super.equals(other)) return false
val otherCast = other as BucketSelectorExtAggregationBuilder
return (bucketsPathsMap == otherCast.bucketsPathsMap &&
script == otherCast.script &&
gapPolicy == otherCast.gapPolicy)
return (
bucketsPathsMap == otherCast.bucketsPathsMap &&
script == otherCast.script &&
gapPolicy == otherCast.gapPolicy
)
}

override fun getWriteableName(): String {
Expand Down Expand Up @@ -224,21 +226,24 @@ class BucketSelectorExtAggregationBuilder :
}
if (bucketsPathsMap == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
parser.tokenLocation,
"Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}
if (script == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
parser.tokenLocation,
"Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}

if (parentBucketPath == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PARENT_BUCKET_PATH +
"] for bucket_selector aggregation [" + reducerName + "]"
parser.tokenLocation,
"Missing required field [" + PARENT_BUCKET_PATH +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}
val factory = BucketSelectorExtAggregationBuilder(reducerName, bucketsPathsMap, script, parentBucketPath, filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ data class AggregationResultBucket(
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)

if (CONFIG_NAME != xcp.currentName()) {
throw ParsingException(xcp.tokenLocation,
throw ParsingException(
xcp.tokenLocation,
String.format(
Locale.ROOT, "Failed to parse object: expecting token with name [%s] but found [%s]",
CONFIG_NAME, xcp.currentName())
CONFIG_NAME, xcp.currentName()
)
)
}
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down
26 changes: 17 additions & 9 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.opensearch.alerting.elasticapi.instant
import org.opensearch.alerting.elasticapi.optionalTimeField
import org.opensearch.alerting.elasticapi.optionalUserField
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.authuser.User
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
Expand All @@ -40,6 +39,7 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import java.io.IOException
import java.time.Instant

Expand Down Expand Up @@ -81,11 +81,13 @@ data class Alert(
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)
aggregationResultBucket = null
)

constructor(
monitor: Monitor,
Expand All @@ -97,11 +99,13 @@ data class Alert(
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)
aggregationResultBucket = null
)

constructor(
monitor: Monitor,
Expand All @@ -114,11 +118,13 @@ data class Alert(
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
aggregationResultBucket: AggregationResultBucket
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = aggregationResultBucket)
aggregationResultBucket = aggregationResultBucket
)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand Down Expand Up @@ -272,13 +278,15 @@ data class Alert(
}
}

return Alert(id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
return Alert(
id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
monitorName = requireNotNull(monitorName), monitorVersion = monitorVersion, monitorUser = monitorUser,
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket)
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket
)
}

@JvmStatic
Expand Down
Loading