Skip to content

Commit

Permalink
Notification integration with IM (#338)
Browse files Browse the repository at this point in the history
Co-authored-by: Drew Baugher <[email protected]>
Co-authored-by: Ravi Thaluru <[email protected]>
  • Loading branch information
3 people authored Apr 19, 2022
1 parent 21cfcea commit 62858ce
Show file tree
Hide file tree
Showing 21 changed files with 437 additions and 151 deletions.
66 changes: 64 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,31 @@ buildscript {
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
notifications_no_snapshot = opensearch_build
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
job_scheduler_no_snapshot += "-${buildVersionQualifier}"
notifications_no_snapshot += "-${buildVersionQualifier}"
}
if (isSnapshot) {
opensearch_build += "-SNAPSHOT"
}
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
job_scheduler_resource_folder = "src/test/resources/job-scheduler"

notifications_resource_folder = "src/test/resources/notifications"
notifications_core_resource_folder = "src/test/resources/notifications-core"
// notification_version = System.getProperty("notification.version", opensearch_build)
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build)
job_scheduler_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-job-scheduler-' + job_scheduler_no_snapshot + '.zip'
notifications_version = System.getProperty("notifications.version", opensearch_build)
notifications_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-' + notifications_no_snapshot + '.zip'
notifications_core_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-core-' + notifications_no_snapshot + '.zip'

kotlin_version = System.getProperty("kotlin.version", "1.6.10")
}

Expand Down Expand Up @@ -160,10 +171,9 @@ dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
implementation "org.jetbrains:annotations:13.0"
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
// implementation "org.opensearch:notification:${notification_version}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "commons-codec:commons-codec:1.13"
implementation "commons-codec:commons-codec:${versions.commonscodec}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down Expand Up @@ -286,6 +296,44 @@ testClusters.integTest {
}
}))

plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
if (new File("$project.rootDir/$notifications_core_resource_folder").exists()) {
project.delete(files("$project.rootDir/$notifications_core_resource_folder"))
}
project.mkdir notifications_core_resource_folder
ant.get(src: notifications_core_build_download,
dest: notifications_core_resource_folder,
httpusecaches: false)
return fileTree(notifications_core_resource_folder).getSingleFile()
}
}
}
}))

plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
if (new File("$project.rootDir/$notifications_resource_folder").exists()) {
project.delete(files("$project.rootDir/$notifications_resource_folder"))
}
project.mkdir notifications_resource_folder
ant.get(src: notifications_build_download,
dest: notifications_resource_folder,
httpusecaches: false)
return fileTree(notifications_resource_folder).getSingleFile()
}
}
}
}))

if (securityEnabled) {
plugin(provider({
new RegularFile() {
Expand Down Expand Up @@ -591,6 +639,20 @@ testClusters.mixedCluster {
}
}))

node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree(notifications_core_resource_folder).getSingleFile() }
}
}))

node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree(notifications_resource_folder).getSingleFile() }
}
}))

if (mixedClusterFlag && node.name == "mixedCluster-1") {
node.plugin(provider({
new RegularFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata
Expand All @@ -69,6 +70,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange
import org.opensearch.indexmanagement.indexstatemanagement.util.isSuccessfulDelete
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest
Expand Down Expand Up @@ -778,9 +781,9 @@ object ManagedIndexRunner :
private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) {
policy.errorNotification?.run {
errorNotificationRetryPolicy.retry(logger) {
withContext(Dispatchers.IO) {
// destination.publish(null, compileTemplate(messageTemplate, managedIndexMetaData), hostDenyList)
}
val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.indexmanagement.indexstatemanagement.step.notification.AttemptNotificationStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
Expand All @@ -16,12 +17,15 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.script.Script

class NotificationAction(
val destination: Destination,
val destination: Destination?,
val channel: Channel?,
val messageTemplate: Script,
index: Int
) : Action(name, index) {

init {
require(destination != null || channel != null) { "Notification must contain a destination or channel" }
require(destination == null || channel == null) { "Notification can only contain a single destination or channel" }
require(messageTemplate.lang == MUSTACHE) { "Notification message template must be a mustache script" }
}

Expand All @@ -36,20 +40,23 @@ class NotificationAction(

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(DESTINATION_FIELD, destination)
if (destination != null) builder.field(DESTINATION_FIELD, destination)
if (channel != null) builder.field(CHANNEL_FIELD, channel)
builder.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
destination.writeTo(out)
out.writeOptionalWriteable(destination)
out.writeOptionalWriteable(channel)
messageTemplate.writeTo(out)
out.writeInt(actionIndex)
}

companion object {
const val name = "notification"
const val DESTINATION_FIELD = "destination"
const val CHANNEL_FIELD = "channel"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val MUSTACHE = "mustache"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,28 @@ import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.CHANNEL_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.DESTINATION_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.MESSAGE_TEMPLATE_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.script.Script

class NotificationActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val destination = Destination(sin)
val destination = sin.readOptionalWriteable(::Destination)
val channel = sin.readOptionalWriteable(::Channel)
val messageTemplate = Script(sin)
val index = sin.readInt()

return NotificationAction(destination, messageTemplate, index)
return NotificationAction(destination, channel, messageTemplate, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var destination: Destination? = null
var channel: Channel? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -36,13 +40,15 @@ class NotificationActionParser : ActionParser() {

when (fieldName) {
DESTINATION_FIELD -> destination = Destination.parse(xcp)
CHANNEL_FIELD -> channel = Channel.parse(xcp)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in NotificationAction.")
}
}

return NotificationAction(
destination = requireNotNull(destination) { "NotificationAction destination is null" },
destination = destination,
channel = channel,
messageTemplate = requireNotNull(messageTemplate) { "NotificationAction message template is null" },
index = index
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,58 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.script.Script
import java.io.IOException

data class ErrorNotification(
val destination: Destination,
val destination: Destination?,
val channel: Channel?,
val messageTemplate: Script
) : ToXContentObject, Writeable {

init {
require(destination != null || channel != null) { "ErrorNotification must contain a destination or channel" }
require(destination == null || channel == null) { "ErrorNotification can only contain a single destination or channel" }
require(messageTemplate.lang == MUSTACHE) { "ErrorNotification message template must be a mustache script" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(DESTINATION_FIELD, destination)
builder.startObject()
if (destination != null) builder.field(DESTINATION_FIELD, destination)
if (channel != null) builder.field(CHANNEL_FIELD, channel)
return builder
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.endObject()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Destination(sin),
sin.readOptionalWriteable(::Destination),
sin.readOptionalWriteable(::Channel),
Script(sin)
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
destination.writeTo(out)
out.writeOptionalWriteable(destination)
out.writeOptionalWriteable(channel)
messageTemplate.writeTo(out)
}

companion object {
const val DESTINATION_FIELD = "destination"
const val CHANNEL_FIELD = "channel"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val MUSTACHE = "mustache"
const val CHANNEL_TITLE = "Index Management-ISM-Error Notification"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ErrorNotification {
var destination: Destination? = null
var channel: Channel? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -63,14 +74,16 @@ data class ErrorNotification(
xcp.nextToken()

when (fieldName) {
DESTINATION_FIELD -> destination = Destination.parse(xcp)
DESTINATION_FIELD -> destination = if (xcp.currentToken() == Token.VALUE_NULL) null else Destination.parse(xcp)
CHANNEL_FIELD -> channel = if (xcp.currentToken() == Token.VALUE_NULL) null else Channel.parse(xcp)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ErrorNotification.")
}
}

return ErrorNotification(
destination = requireNotNull(destination) { "ErrorNotification destination is null" },
destination = destination,
channel = channel,
messageTemplate = requireNotNull(messageTemplate) { "ErrorNotification message template is null" }
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.model.destination

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

data class Channel(val id: String) : ToXContent, Writeable {

init {
require(id.isNotEmpty()) { "Channel ID cannot be empty" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ID, id)
.endObject()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
}

companion object {
const val ID = "id"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): Channel {
var id: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ID -> id = xcp.text()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing Channel destination")
}
}
}

return Channel(requireNotNull(id) { "Channel ID is null" })
}
}
}
Loading

0 comments on commit 62858ce

Please sign in to comment.