Skip to content

Commit

Permalink
Apply POC email notification (#6)
Browse files Browse the repository at this point in the history
* Apply POC email notification

* Roll back changes to IDEs config files

* Roll back changes to IDEs config files

* Refactor

---------

Co-authored-by: Alex Ruiz <[email protected]>
  • Loading branch information
asteriscos and AlexRuiz7 committed Nov 15, 2024
1 parent 65a6360 commit eddffc3
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ $RECYCLE.BIN/
.idea/modules.xml
.idea/*.iml
.idea/modules
.idea/*.xml
*.iml
*.ipr

Expand Down
3 changes: 3 additions & 0 deletions .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .project
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>opensearch-reports-scheduler</name>
<name>wazuh-indexer-reports-scheduler</name>
<comment>Project reports-scheduler created by Buildship.</comment>
<projects>
</projects>
Expand Down
40 changes: 32 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ repositories {
}

dependencies {
// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down Expand Up @@ -301,14 +304,35 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty(
testClusters.integTest {
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-job-scheduler*'
}.singleFile
}
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-notifications-core*'
}.singleFile
}
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/notifications*'
}.singleFile
}
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch
repositoriesServiceSupplier: Supplier<RepositoriesService>
): Collection<Any> {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionJobRunner.initialize(client, clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
return emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,72 @@ package org.opensearch.reportsscheduler.scheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.index.query.QueryBuilders
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo
import org.opensearch.reportsscheduler.util.SecureIndexClient
import org.opensearch.reportsscheduler.util.buildReportLink
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.reportsscheduler.util.sendNotificationWithHTML
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

internal object ReportDefinitionJobRunner : ScheduledJobRunner {
private val log by logger(ReportDefinitionJobRunner::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

private lateinit var client: Client
private lateinit var clusterService: ClusterService

/**
* Initialize the class
* @param client The ES client
* @param clusterService The ES cluster service
*/
fun initialize(client: Client, clusterService: ClusterService) {
this.client = SecureIndexClient(client)
this.clusterService = clusterService
}

private suspend fun createNotification(
configInfo: NotificationConfigInfo,
reportDefinitionDetails: ReportDefinitionDetails,
id: String,
hits: Long?
) {
val title: String = reportDefinitionDetails.reportDefinition.delivery!!.title
val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription
val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription

val urlDefinition: String =
buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id)

val textWithURL: String =
textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString())
val htmlWithURL: String? =
htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString())

log.info("esto es el mensaje html $htmlMessage")
configInfo.sendNotificationWithHTML(
this.client,
title,
textWithURL,
htmlWithURL
)
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ReportDefinitionDetails) {
log.warn("$LOG_PREFIX:job is not of type ReportDefinitionDetails:${job.javaClass.name}")
Expand All @@ -48,6 +100,41 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")

// Wazuh - Make queries
val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder()
.query(
QueryBuilders.boolQuery()
.must(
QueryBuilders.rangeQuery("timestamp")
.gt(beginTime)
.lte(currentTime)
)
.must(
QueryBuilders.matchQuery("agent.id", "001")
)
)
val jobSearchRequest: SearchRequest =
SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse)
val response: SearchResponse = client.search(jobSearchRequest).actionGet()

val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0]
val configInfo: NotificationConfigInfo? = getNotificationConfigInfo(
client as NodeClient,
reportDefinitionId
)

if (configInfo != null) {
createNotification(
configInfo,
reportDefinitionDetails,
id,
response.hits.totalHits?.value
)
log.info("Notification with id $id was sent.")
} else {
log.error("NotificationConfigInfo with id $reportDefinitionId was not found.")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.reportsscheduler.util

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.GetNotificationConfigRequest
import org.opensearch.commons.notifications.action.GetNotificationConfigResponse
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

object NotificationApiUtils {

private val logger = LogManager.getLogger(NotificationApiUtils::class)

/**
* Gets a NotificationConfigInfo object by ID if it exists.
*/
suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? {
return try {
val res: GetNotificationConfigResponse =
getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
res.searchResult.objectList.firstOrNull()
} catch (e: OpenSearchSecurityException) {
throw e
} catch (e: OpenSearchStatusException) {
if (e.status() == RestStatus.NOT_FOUND) {
logger.debug("Notification config [$id] was not found")
}
null
}
}

private suspend fun getNotificationConfig(
client: NodeClient,
getNotificationConfigRequest: GetNotificationConfigRequest
): GetNotificationConfigResponse {
val getNotificationConfigResponse: GetNotificationConfigResponse =
NotificationsPluginInterface.suspendUntil {
this.getNotificationConfig(
client,
getNotificationConfigRequest,
it
)
}
return getNotificationConfigResponse
}
}

/**
* Extension function for publishing a notification to a channel in the Notification plugin.
*/
suspend fun NotificationConfigInfo.sendNotificationWithHTML(
client: Client,
title: String,
compiledMessage: String,
compiledMessageHTML: String?
): String {
val config = this
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
EventSource(title, config.configId, SeverityType.INFO),
ChannelMessage(compiledMessage, compiledMessageHTML, null),
listOf(config.configId),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.toString())
return res.notificationEvent.toString()
}

/**
* Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API.
*/
suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* All valid response statuses.
*/
private val VALID_RESPONSE_STATUS = setOf(
RestStatus.OK.status,
RestStatus.CREATED.status,
RestStatus.ACCEPTED.status,
RestStatus.NON_AUTHORITATIVE_INFORMATION.status,
RestStatus.NO_CONTENT.status,
RestStatus.RESET_CONTENT.status,
RestStatus.PARTIAL_CONTENT.status,
RestStatus.MULTI_STATUS.status
)

@Throws(OpenSearchStatusException::class)
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
}
}

0 comments on commit eddffc3

Please sign in to comment.