Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification integration with IM #338

Merged
merged 6 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,31 @@ buildscript {
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
notifications_no_snapshot = opensearch_build
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
job_scheduler_no_snapshot += "-${buildVersionQualifier}"
notifications_no_snapshot += "-${buildVersionQualifier}"
}
if (isSnapshot) {
opensearch_build += "-SNAPSHOT"
}
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
job_scheduler_resource_folder = "src/test/resources/job-scheduler"

notifications_resource_folder = "src/test/resources/notifications"
notifications_core_resource_folder = "src/test/resources/notifications-core"
// notification_version = System.getProperty("notification.version", opensearch_build)
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build)
job_scheduler_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-job-scheduler-' + job_scheduler_no_snapshot + '.zip'
notifications_version = System.getProperty("notifications.version", opensearch_build)
notifications_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-' + notifications_no_snapshot + '.zip'
notifications_core_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-core-' + notifications_no_snapshot + '.zip'

kotlin_version = System.getProperty("kotlin.version", "1.6.10")
}

Expand Down Expand Up @@ -160,10 +171,9 @@ dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
implementation "org.jetbrains:annotations:13.0"
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
// implementation "org.opensearch:notification:${notification_version}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "commons-codec:commons-codec:1.13"
implementation "commons-codec:commons-codec:${versions.commonscodec}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down Expand Up @@ -286,6 +296,44 @@ testClusters.integTest {
}
}))

plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
if (new File("$project.rootDir/$notifications_core_resource_folder").exists()) {
project.delete(files("$project.rootDir/$notifications_core_resource_folder"))
}
project.mkdir notifications_core_resource_folder
ant.get(src: notifications_core_build_download,
dest: notifications_core_resource_folder,
httpusecaches: false)
return fileTree(notifications_core_resource_folder).getSingleFile()
}
}
}
}))

plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
if (new File("$project.rootDir/$notifications_resource_folder").exists()) {
project.delete(files("$project.rootDir/$notifications_resource_folder"))
}
project.mkdir notifications_resource_folder
ant.get(src: notifications_build_download,
dest: notifications_resource_folder,
httpusecaches: false)
return fileTree(notifications_resource_folder).getSingleFile()
}
}
}
}))

if (securityEnabled) {
plugin(provider({
new RegularFile() {
Expand Down Expand Up @@ -591,6 +639,20 @@ testClusters.mixedCluster {
}
}))

node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree(notifications_core_resource_folder).getSingleFile() }
}
}))

node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree(notifications_resource_folder).getSingleFile() }
}
}))

if (mixedClusterFlag && node.name == "mixedCluster-1") {
node.plugin(provider({
new RegularFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata
Expand All @@ -69,6 +70,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange
import org.opensearch.indexmanagement.indexstatemanagement.util.isSuccessfulDelete
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest
Expand Down Expand Up @@ -778,9 +781,9 @@ object ManagedIndexRunner :
private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) {
policy.errorNotification?.run {
errorNotificationRetryPolicy.retry(logger) {
withContext(Dispatchers.IO) {
// destination.publish(null, compileTemplate(messageTemplate, managedIndexMetaData), hostDenyList)
}
val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.indexmanagement.indexstatemanagement.step.notification.AttemptNotificationStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
Expand All @@ -16,12 +17,15 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.script.Script

class NotificationAction(
val destination: Destination,
val destination: Destination?,
val channel: Channel?,
val messageTemplate: Script,
index: Int
) : Action(name, index) {

init {
require(destination != null || channel != null) { "Notification must contain a destination or channel" }
require(destination == null || channel == null) { "Notification can only contain a single destination or channel" }
require(messageTemplate.lang == MUSTACHE) { "Notification message template must be a mustache script" }
}

Expand All @@ -36,20 +40,23 @@ class NotificationAction(

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(DESTINATION_FIELD, destination)
if (destination != null) builder.field(DESTINATION_FIELD, destination)
if (channel != null) builder.field(CHANNEL_FIELD, channel)
builder.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
destination.writeTo(out)
out.writeOptionalWriteable(destination)
out.writeOptionalWriteable(channel)
messageTemplate.writeTo(out)
out.writeInt(actionIndex)
}

companion object {
const val name = "notification"
const val DESTINATION_FIELD = "destination"
const val CHANNEL_FIELD = "channel"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val MUSTACHE = "mustache"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,28 @@ import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.CHANNEL_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.DESTINATION_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.MESSAGE_TEMPLATE_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.script.Script

class NotificationActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val destination = Destination(sin)
val destination = sin.readOptionalWriteable(::Destination)
val channel = sin.readOptionalWriteable(::Channel)
val messageTemplate = Script(sin)
val index = sin.readInt()

return NotificationAction(destination, messageTemplate, index)
return NotificationAction(destination, channel, messageTemplate, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var destination: Destination? = null
var channel: Channel? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -36,13 +40,15 @@ class NotificationActionParser : ActionParser() {

when (fieldName) {
DESTINATION_FIELD -> destination = Destination.parse(xcp)
CHANNEL_FIELD -> channel = Channel.parse(xcp)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in NotificationAction.")
}
}

return NotificationAction(
destination = requireNotNull(destination) { "NotificationAction destination is null" },
destination = destination,
channel = channel,
messageTemplate = requireNotNull(messageTemplate) { "NotificationAction message template is null" },
index = index
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,58 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.script.Script
import java.io.IOException

data class ErrorNotification(
val destination: Destination,
val destination: Destination?,
val channel: Channel?,
val messageTemplate: Script
) : ToXContentObject, Writeable {

init {
require(destination != null || channel != null) { "ErrorNotification must contain a destination or channel" }
require(destination == null || channel == null) { "ErrorNotification can only contain a single destination or channel" }
require(messageTemplate.lang == MUSTACHE) { "ErrorNotification message template must be a mustache script" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(DESTINATION_FIELD, destination)
builder.startObject()
if (destination != null) builder.field(DESTINATION_FIELD, destination)
if (channel != null) builder.field(CHANNEL_FIELD, channel)
return builder
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.endObject()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Destination(sin),
sin.readOptionalWriteable(::Destination),
sin.readOptionalWriteable(::Channel),
Script(sin)
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
destination.writeTo(out)
out.writeOptionalWriteable(destination)
out.writeOptionalWriteable(channel)
messageTemplate.writeTo(out)
}

companion object {
const val DESTINATION_FIELD = "destination"
const val CHANNEL_FIELD = "channel"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val MUSTACHE = "mustache"
const val CHANNEL_TITLE = "Index Management-ISM-Error Notification"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ErrorNotification {
var destination: Destination? = null
var channel: Channel? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -63,14 +74,16 @@ data class ErrorNotification(
xcp.nextToken()

when (fieldName) {
DESTINATION_FIELD -> destination = Destination.parse(xcp)
DESTINATION_FIELD -> destination = if (xcp.currentToken() == Token.VALUE_NULL) null else Destination.parse(xcp)
CHANNEL_FIELD -> channel = if (xcp.currentToken() == Token.VALUE_NULL) null else Channel.parse(xcp)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ErrorNotification.")
}
}

return ErrorNotification(
destination = requireNotNull(destination) { "ErrorNotification destination is null" },
destination = destination,
channel = channel,
messageTemplate = requireNotNull(messageTemplate) { "ErrorNotification message template is null" }
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.model.destination

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

data class Channel(val id: String) : ToXContent, Writeable {

init {
require(id.isNotEmpty()) { "Channel ID cannot be empty" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ID, id)
.endObject()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
}

companion object {
const val ID = "id"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): Channel {
var id: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ID -> id = xcp.text()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing Channel destination")
}
}
}

return Channel(requireNotNull(id) { "Channel ID is null" })
}
}
}
Loading