Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate notifications backend #129

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reports-scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ integTest.dependsOn(bundle)
integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))}

testClusters.integTest {
testDistribution = "ARCHIVE"
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package org.opensearch.reportsscheduler

import org.opensearch.OpenSearchStatusException
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -58,6 +59,7 @@ import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
Expand All @@ -72,9 +74,11 @@ import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestStatus
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool
import org.opensearch.watcher.ResourceWatcherService
Expand Down Expand Up @@ -122,6 +126,12 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
(client as? NodeClient)?.let { NotificationsActions.initialize(it) } ?: run {
throw OpenSearchStatusException(
"Unable to cast client to NodeClient for Notifications call",
RestStatus.INTERNAL_SERVER_ERROR
)
}
return emptyList()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package org.opensearch.reportsscheduler.action

import org.opensearch.OpenSearchStatusException
import org.opensearch.commons.authuser.User
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportDefinitionsIndex
Expand All @@ -45,10 +46,10 @@ import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstance.Status
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusRequest
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusResponse
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.rest.RestStatus
import java.time.Instant
import kotlin.random.Random
Expand Down Expand Up @@ -125,6 +126,8 @@ internal object ReportInstanceActions {
Metrics.REPORT_FROM_DEFINITION_ID_SYSTEM_ERROR.counter.increment()
throw OpenSearchStatusException("Report Instance Creation failed", RestStatus.INTERNAL_SERVER_ERROR)
}
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, docId)
val reportInstanceCopy = reportInstance.copy(id = docId)
return OnDemandReportCreateResponse(reportInstanceCopy, UserAccessManager.hasAllInfoAccess(user))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.reportsscheduler.notifications

import org.opensearch.action.ActionListener
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.Feature
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.model.CreateReportDefinitionResponse
import org.opensearch.reportsscheduler.model.ReportDefinition
import org.opensearch.reportsscheduler.util.logger

/**
* Report definitions index operation actions.
*/
internal object NotificationsActions {
private val log by logger(NotificationsActions::class.java)

private lateinit var client: NodeClient

/**
* Initialize the class
* @param client NodeClient for transport call
*/
fun initialize(client: NodeClient) {
this.client = client
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String) {
log.info("$LOG_PREFIX:NotificationsActions-send")
NotificationsPluginInterface.sendNotification(
client,
EventSource(delivery.title, referenceId, Feature.REPORTS, SeverityType.INFO),
ChannelMessage(delivery.textDescription, delivery.htmlDescription, null),
delivery.configIds,
object : ActionListener<SendNotificationResponse> {
override fun onResponse(p0: SendNotificationResponse) {
log.info("$LOG_PREFIX:NotificationsActions-send:${p0.notificationId}")
}

override fun onFailure(p0: java.lang.Exception) {
log.error("$LOG_PREFIX:NotificationsActions-send Error:$p0")
}
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.util.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -68,6 +69,8 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, id)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,13 @@ fun constructReportDefinitionRequest(
"origin":"localhost:5601",
"id":"id"
},
$trigger
"format":{
"duration":"PT1H",
"fileFormat":"Pdf",
"limit":1000,
"header":"optional header",
"footer":"optional footer"
},
$trigger
joshuali925 marked this conversation as resolved.
Show resolved Hide resolved
"delivery":{
"recipients":["[email protected]"],
"deliveryFormat":"LinkOnly",
"title":"title",
"textDescription":"textDescription",
"htmlDescription":"optional htmlDescription",
"configIds":["optional_configIds"]
}
}
}
Expand Down