Skip to content

Commit

Permalink
Fix ktlint formatting issues (#156)
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Aug 27, 2021
1 parent 28f401d commit 0eed799
Show file tree
Hide file tree
Showing 27 changed files with 262 additions and 147 deletions.
81 changes: 54 additions & 27 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ class AlertService(

return monitor.triggers.associateWith { trigger ->
// Default to an empty map if there are no Alerts found for a Trigger to make Alert categorization logic easier
(foundAlerts[trigger.id]?.mapNotNull { alert ->
alert.aggregationResultBucket?.let { it.getBucketKeysHash() to alert }
}?.toMap() ?: mutableMapOf()) as MutableMap<String, Alert>
(
foundAlerts[trigger.id]?.mapNotNull { alert ->
alert.aggregationResultBucket?.let { it.getBucketKeysHash() to alert }
}?.toMap() ?: mutableMapOf()
) as MutableMap<String, Alert>
}
}

Expand All @@ -122,38 +124,52 @@ class AlertService(
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
currentAlert?.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
currentAlert.copy(
state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
}
}

Expand All @@ -172,8 +188,11 @@ class AlertService(
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
Expand Down Expand Up @@ -221,10 +240,12 @@ class AlertService(
} else {
// New Alert
// TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here
val newAlert = Alert(monitor = monitor, trigger = trigger, startTime = currentTime,
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket)
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket
)
newAlerts.add(newAlert)
}
}
Expand All @@ -238,8 +259,10 @@ class AlertService(
fun convertToCompletedAlerts(currentAlerts: Map<String, Alert>?): List<Alert> {
val currentTime = Instant.now()
return currentAlerts?.map {
it.value.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
it.value.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} ?: listOf()
}

Expand All @@ -251,10 +274,12 @@ class AlertService(
// spend time reloading the alert and writing it back.
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null))
listOf<DocWriteRequest<*>>(
IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
}
Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
Expand Down Expand Up @@ -354,8 +379,10 @@ class AlertService(
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
}
Expand Down
24 changes: 18 additions & 6 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,18 @@ class InputService(
when (input) {
is SearchInput -> {
// TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient
val searchParams = mapOf("period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli())
val searchParams = mapOf(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
AggregationQueryRewriter.rewriteQuery(input.query, prevResult, monitor.triggers)
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()

Expand Down Expand Up @@ -100,8 +107,13 @@ class InputService(
val input = monitor.inputs[0] as SearchInput

val searchParams = mapOf("period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ class TriggerService(val scriptService: ScriptService) {
return try {
val bucketIndices =
((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)[trigger.id] as HashMap<*, *>)[BUCKET_INDICES] as List<*>
val parentBucketPath = ((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
.get(trigger.id) as HashMap<*, *>)[PARENT_BUCKET_PATH] as String
val parentBucketPath = (
(ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
.get(trigger.id) as HashMap<*, *>
)[PARENT_BUCKET_PATH] as String
val aggregationPath = AggregationPath.parse(parentBucketPath)
// TODO test this part by passing sub-aggregation path
var parentAgg = (ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ class BucketSelectorExtAggregationBuilder :
if (other == null || javaClass != other.javaClass) return false
if (!super.equals(other)) return false
val otherCast = other as BucketSelectorExtAggregationBuilder
return (bucketsPathsMap == otherCast.bucketsPathsMap &&
script == otherCast.script &&
gapPolicy == otherCast.gapPolicy)
return (
bucketsPathsMap == otherCast.bucketsPathsMap &&
script == otherCast.script &&
gapPolicy == otherCast.gapPolicy
)
}

override fun getWriteableName(): String {
Expand Down Expand Up @@ -224,21 +226,24 @@ class BucketSelectorExtAggregationBuilder :
}
if (bucketsPathsMap == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
parser.tokenLocation,
"Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}
if (script == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
parser.tokenLocation,
"Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}

if (parentBucketPath == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PARENT_BUCKET_PATH +
"] for bucket_selector aggregation [" + reducerName + "]"
parser.tokenLocation,
"Missing required field [" + PARENT_BUCKET_PATH +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}
val factory = BucketSelectorExtAggregationBuilder(reducerName, bucketsPathsMap, script, parentBucketPath, filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ data class AggregationResultBucket(
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)

if (CONFIG_NAME != xcp.currentName()) {
throw ParsingException(xcp.tokenLocation,
throw ParsingException(
xcp.tokenLocation,
String.format(
Locale.ROOT, "Failed to parse object: expecting token with name [%s] but found [%s]",
CONFIG_NAME, xcp.currentName())
CONFIG_NAME, xcp.currentName()
)
)
}
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down
26 changes: 17 additions & 9 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.opensearch.alerting.elasticapi.instant
import org.opensearch.alerting.elasticapi.optionalTimeField
import org.opensearch.alerting.elasticapi.optionalUserField
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.authuser.User
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
Expand All @@ -40,6 +39,7 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import java.io.IOException
import java.time.Instant

Expand Down Expand Up @@ -81,11 +81,13 @@ data class Alert(
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)
aggregationResultBucket = null
)

constructor(
monitor: Monitor,
Expand All @@ -97,11 +99,13 @@ data class Alert(
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)
aggregationResultBucket = null
)

constructor(
monitor: Monitor,
Expand All @@ -114,11 +118,13 @@ data class Alert(
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
aggregationResultBucket: AggregationResultBucket
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = aggregationResultBucket)
aggregationResultBucket = aggregationResultBucket
)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand Down Expand Up @@ -272,13 +278,15 @@ data class Alert(
}
}

return Alert(id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
return Alert(
id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
monitorName = requireNotNull(monitorName), monitorVersion = monitorVersion, monitorUser = monitorUser,
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket)
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket
)
}

@JvmStatic
Expand Down
Loading

0 comments on commit 0eed799

Please sign in to comment.