From 490559ea22c39d40b03cbfd52f82841d40eb3c88 Mon Sep 17 00:00:00 2001 From: Federico Rodriguez Date: Wed, 25 Sep 2024 10:28:50 +0200 Subject: [PATCH 1/4] Apply POC email notification --- .classpath | 34 ++-- .idea/kotlinc.xml | 3 + .idea/misc.xml | 2 +- .idea/vcs.xml | 1 + .project | 2 +- build.gradle | 53 +++++- .../ReportsSchedulerPlugin.kt | 2 + .../scheduler/ReportDefinitionJobRunner.kt | 151 ++++++++++++++++++ 8 files changed, 221 insertions(+), 27 deletions(-) diff --git a/.classpath b/.classpath index 9bfe9694..1c3c12f6 100644 --- a/.classpath +++ b/.classpath @@ -1,15 +1,17 @@ - + - - + + + - + - - + + + @@ -18,35 +20,33 @@ - + - + - - - + + - + - - - + + - + - + diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 0dd4b354..951989e5 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -3,4 +3,7 @@ + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 08bcbb84..6d81e09f 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -7,7 +7,7 @@ - + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 6c0b8635..288b36b1 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,5 +2,6 @@ + \ No newline at end of file diff --git a/.project b/.project index 3f751dd9..321bbac3 100644 --- a/.project +++ b/.project @@ -1,6 +1,6 @@ - opensearch-reports-scheduler + wazuh-indexer-reports-scheduler-wazuh-indexer-reporting Project reports-scheduler created by Buildship. diff --git a/build.gradle b/build.gradle index 57442cc3..0a62a8a3 100644 --- a/build.gradle +++ b/build.gradle @@ -168,6 +168,9 @@ repositories { } dependencies { + // Needed for integ tests + zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}" + zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}" zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" implementation "org.opensearch:opensearch:${opensearch_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" @@ -297,14 +300,48 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty( testClusters.integTest { testDistribution = "INTEG_TEST" // need to install job-scheduler first, need to assemble job-scheduler first - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.getSingleFile() - } +// plugin(provider(new Callable(){ +// @Override +// RegularFile call() throws Exception { +// return new RegularFile() { +// @Override +// File getAsFile() { +// return configurations.zipArchive.asFileTree.getSingleFile() +// } +// } +// } +// })) + + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-job-scheduler*' + }.singleFile + } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-notifications-core*' + }.singleFile + } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/notifications*' + }.singleFile } } })) diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt index e97135f2..45158303 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -7,6 +7,7 @@ package org.opensearch.reportsscheduler import org.opensearch.action.ActionRequest import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.service.ClusterService @@ -103,6 +104,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier ): Collection { + ReportDefinitionJobRunner.nodeClient = client as NodeClient PluginSettings.addSettingsUpdateConsumer(clusterService) ReportDefinitionsIndex.initialize(client, clusterService) ReportInstancesIndex.initialize(client, clusterService) diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt index 35af1b78..a66c4629 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt @@ -8,6 +8,23 @@ package org.opensearch.reportsscheduler.scheduler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.GetNotificationConfigRequest +import org.opensearch.commons.notifications.action.GetNotificationConfigResponse +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.model.ChannelMessage +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.commons.notifications.model.SeverityType +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import org.opensearch.index.query.QueryBuilders import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner @@ -15,12 +32,75 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF import org.opensearch.reportsscheduler.index.ReportInstancesIndex import org.opensearch.reportsscheduler.model.ReportDefinitionDetails import org.opensearch.reportsscheduler.model.ReportInstance +import org.opensearch.reportsscheduler.util.buildReportLink import org.opensearch.reportsscheduler.util.logger +import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine internal object ReportDefinitionJobRunner : ScheduledJobRunner { private val log by logger(ReportDefinitionJobRunner::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + lateinit var nodeClient: NodeClient + const val MAX_SIZE: Int = 10 + + /** + * Wazuh - Gets a NotificationConfigInfo object by ID if it exists. + */ + private suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? { + return try { + val res: GetNotificationConfigResponse = getNotificationConfig(client, GetNotificationConfigRequest(setOf(id))) + res.searchResult.objectList.firstOrNull() + } catch (e: OpenSearchSecurityException) { + throw e + } catch (e: OpenSearchStatusException) { + if (e.status() == RestStatus.NOT_FOUND) { + log.debug("Notification config [$id] was not found") + } + null + } + } + + private suspend fun getNotificationConfig( + client: NodeClient, + getNotificationConfigRequest: GetNotificationConfigRequest + ): GetNotificationConfigResponse { + val getNotificationConfigResponse: GetNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.getNotificationConfig( + client, + getNotificationConfigRequest, + it + ) + } + return getNotificationConfigResponse + } + + private suspend fun createNotification( + client: NodeClient, + configInfo: NotificationConfigInfo?, + reportDefinitionDetails: ReportDefinitionDetails, + id: String, + hits: Long? + ) { + val title: String = reportDefinitionDetails.reportDefinition.delivery!!.title + val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription + val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription + + val urlDefinition: String = buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id) + + val textWithURL: String = textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString()) + val htmlWithURL: String? = htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString()) + + log.info("esto es el mensaje html $htmlMessage") + configInfo?.sendNotifications( + client, + title, + textWithURL, + htmlWithURL + ) + } override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { if (job !is ReportDefinitionDetails) { @@ -48,7 +128,78 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance") } else { log.info("$LOG_PREFIX:runJob-created job:$id") + + // Wazuh - Make queries + val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder() + .query( + QueryBuilders.boolQuery() + .must( + QueryBuilders.rangeQuery("timestamp") + .gt(beginTime) + .lte(currentTime) + ) + .must( + QueryBuilders.matchQuery("agent.id", "001") + ) + ) + val jobSearchRequest: SearchRequest = SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse) + val response: SearchResponse = nodeClient.search(jobSearchRequest).actionGet() + + val configInfo = getNotificationConfigInfo( + nodeClient, + id = reportDefinitionDetails.reportDefinition.delivery!!.configIds.get(0) + ) + createNotification(nodeClient, configInfo, reportDefinitionDetails, id, response.getHits().getTotalHits()?.value) } } } } + + +/** + * Wazuh - Send notification + */ +suspend fun NotificationConfigInfo.sendNotifications(client: Client, title: String, compiledMessage: String, compiledMessageHTML: String?): String { + val config = this + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + EventSource(title, config.configId, SeverityType.INFO), + ChannelMessage(compiledMessage, compiledMessageHTML, null), + listOf(config.configId), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.toString()) + return res.notificationEvent.toString() +} + +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + +/** + * All valid response statuses. + */ +private val VALID_RESPONSE_STATUS = setOf( + RestStatus.OK.status, + RestStatus.CREATED.status, + RestStatus.ACCEPTED.status, + RestStatus.NON_AUTHORITATIVE_INFORMATION.status, + RestStatus.NO_CONTENT.status, + RestStatus.RESET_CONTENT.status, + RestStatus.PARTIAL_CONTENT.status, + RestStatus.MULTI_STATUS.status +) + +@Throws(OpenSearchStatusException::class) +fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { + if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { + throw OpenSearchStatusException("Failed: $responseContent", restStatus) + } +} From ca1ac391220643da1c16f5bdefbc3dfb22ba2fad Mon Sep 17 00:00:00 2001 From: Alex Ruiz Date: Wed, 25 Sep 2024 12:58:38 +0200 Subject: [PATCH 2/4] Roll back changes to IDEs config files --- .classpath | 36 ++++++++++++++++++------------------ .gitignore | 1 + .idea/kotlinc.xml | 3 --- .idea/misc.xml | 2 +- .idea/vcs.xml | 1 - .project | 4 ++-- 6 files changed, 22 insertions(+), 25 deletions(-) diff --git a/.classpath b/.classpath index 1c3c12f6..554e9498 100644 --- a/.classpath +++ b/.classpath @@ -1,17 +1,15 @@ - + - - - + + - + - - - + + @@ -20,33 +18,35 @@ - + - + - - + + + - + - - + + + - + - + - + \ No newline at end of file diff --git a/.gitignore b/.gitignore index 8c03270c..d954410c 100644 --- a/.gitignore +++ b/.gitignore @@ -159,6 +159,7 @@ $RECYCLE.BIN/ .idea/modules.xml .idea/*.iml .idea/modules +.idea/*.xml *.iml *.ipr diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 951989e5..0dd4b354 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -3,7 +3,4 @@ - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 6d81e09f..08bcbb84 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -7,7 +7,7 @@ - + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 288b36b1..6c0b8635 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,6 +2,5 @@ - \ No newline at end of file diff --git a/.project b/.project index 321bbac3..27ff4359 100644 --- a/.project +++ b/.project @@ -1,6 +1,6 @@ - wazuh-indexer-reports-scheduler-wazuh-indexer-reporting + wazuh-indexer-reports-scheduler Project reports-scheduler created by Buildship. @@ -31,4 +31,4 @@ - + \ No newline at end of file From c0c9e0616716256b66e2b4d21b773b8af8999aeb Mon Sep 17 00:00:00 2001 From: Alex Ruiz Date: Wed, 25 Sep 2024 12:59:47 +0200 Subject: [PATCH 3/4] Roll back changes to IDEs config files --- .classpath | 2 +- .project | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.classpath b/.classpath index 554e9498..9bfe9694 100644 --- a/.classpath +++ b/.classpath @@ -49,4 +49,4 @@ - \ No newline at end of file + diff --git a/.project b/.project index 27ff4359..6746ef3a 100644 --- a/.project +++ b/.project @@ -31,4 +31,4 @@ - \ No newline at end of file + From 5a73ef1671c10bb35e80e4b647232d2ce9526950 Mon Sep 17 00:00:00 2001 From: Alex Ruiz Date: Wed, 25 Sep 2024 20:27:02 +0200 Subject: [PATCH 4/4] Refactor --- .idea/kotlinc.xml | 3 + .idea/misc.xml | 2 +- build.gradle | 13 -- .../ReportsSchedulerPlugin.kt | 3 +- .../scheduler/ReportDefinitionJobRunner.kt | 150 +++++------------- .../util/NotificationApiUtils.kt | 121 ++++++++++++++ 6 files changed, 169 insertions(+), 123 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 0dd4b354..951989e5 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -3,4 +3,7 @@ + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 08bcbb84..6d5ded36 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -7,7 +7,7 @@ - + \ No newline at end of file diff --git a/build.gradle b/build.gradle index 0a62a8a3..78358cae 100644 --- a/build.gradle +++ b/build.gradle @@ -300,19 +300,6 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty( testClusters.integTest { testDistribution = "INTEG_TEST" // need to install job-scheduler first, need to assemble job-scheduler first -// plugin(provider(new Callable(){ -// @Override -// RegularFile call() throws Exception { -// return new RegularFile() { -// @Override -// File getAsFile() { -// return configurations.zipArchive.asFileTree.getSingleFile() -// } -// } -// } -// })) - - plugin(provider({ new RegularFile() { @Override diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt index 45158303..7a1fd80d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -7,7 +7,6 @@ package org.opensearch.reportsscheduler import org.opensearch.action.ActionRequest import org.opensearch.client.Client -import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.service.ClusterService @@ -104,8 +103,8 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier ): Collection { - ReportDefinitionJobRunner.nodeClient = client as NodeClient PluginSettings.addSettingsUpdateConsumer(clusterService) + ReportDefinitionJobRunner.initialize(client, clusterService) ReportDefinitionsIndex.initialize(client, clusterService) ReportInstancesIndex.initialize(client, clusterService) return emptyList() diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt index a66c4629..b1f6624d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt @@ -8,22 +8,12 @@ package org.opensearch.reportsscheduler.scheduler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch -import org.opensearch.OpenSearchSecurityException -import org.opensearch.OpenSearchStatusException import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.client.Client import org.opensearch.client.node.NodeClient -import org.opensearch.commons.notifications.NotificationsPluginInterface -import org.opensearch.commons.notifications.action.GetNotificationConfigRequest -import org.opensearch.commons.notifications.action.GetNotificationConfigResponse -import org.opensearch.commons.notifications.action.SendNotificationResponse -import org.opensearch.commons.notifications.model.ChannelMessage -import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.cluster.service.ClusterService import org.opensearch.commons.notifications.model.NotificationConfigInfo -import org.opensearch.commons.notifications.model.SeverityType -import org.opensearch.core.action.ActionListener -import org.opensearch.core.rest.RestStatus import org.opensearch.index.query.QueryBuilders import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.ScheduledJobParameter @@ -32,54 +22,33 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF import org.opensearch.reportsscheduler.index.ReportInstancesIndex import org.opensearch.reportsscheduler.model.ReportDefinitionDetails import org.opensearch.reportsscheduler.model.ReportInstance +import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo +import org.opensearch.reportsscheduler.util.SecureIndexClient import org.opensearch.reportsscheduler.util.buildReportLink import org.opensearch.reportsscheduler.util.logger +import org.opensearch.reportsscheduler.util.sendNotificationWithHTML import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.coroutines.suspendCoroutine internal object ReportDefinitionJobRunner : ScheduledJobRunner { private val log by logger(ReportDefinitionJobRunner::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) - lateinit var nodeClient: NodeClient - const val MAX_SIZE: Int = 10 + + private lateinit var client: Client + private lateinit var clusterService: ClusterService /** - * Wazuh - Gets a NotificationConfigInfo object by ID if it exists. + * Initialize the class + * @param client The ES client + * @param clusterService The ES cluster service */ - private suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? { - return try { - val res: GetNotificationConfigResponse = getNotificationConfig(client, GetNotificationConfigRequest(setOf(id))) - res.searchResult.objectList.firstOrNull() - } catch (e: OpenSearchSecurityException) { - throw e - } catch (e: OpenSearchStatusException) { - if (e.status() == RestStatus.NOT_FOUND) { - log.debug("Notification config [$id] was not found") - } - null - } - } - - private suspend fun getNotificationConfig( - client: NodeClient, - getNotificationConfigRequest: GetNotificationConfigRequest - ): GetNotificationConfigResponse { - val getNotificationConfigResponse: GetNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { - this.getNotificationConfig( - client, - getNotificationConfigRequest, - it - ) - } - return getNotificationConfigResponse + fun initialize(client: Client, clusterService: ClusterService) { + this.client = SecureIndexClient(client) + this.clusterService = clusterService } private suspend fun createNotification( - client: NodeClient, - configInfo: NotificationConfigInfo?, + configInfo: NotificationConfigInfo, reportDefinitionDetails: ReportDefinitionDetails, id: String, hits: Long? @@ -88,14 +57,17 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription - val urlDefinition: String = buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id) + val urlDefinition: String = + buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id) - val textWithURL: String = textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString()) - val htmlWithURL: String? = htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString()) + val textWithURL: String = + textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString()) + val htmlWithURL: String? = + htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString()) log.info("esto es el mensaje html $htmlMessage") - configInfo?.sendNotifications( - client, + configInfo.sendNotificationWithHTML( + this.client, title, textWithURL, htmlWithURL @@ -128,7 +100,7 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance") } else { log.info("$LOG_PREFIX:runJob-created job:$id") - + // Wazuh - Make queries val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder() .query( @@ -142,64 +114,28 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { QueryBuilders.matchQuery("agent.id", "001") ) ) - val jobSearchRequest: SearchRequest = SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse) - val response: SearchResponse = nodeClient.search(jobSearchRequest).actionGet() - - val configInfo = getNotificationConfigInfo( - nodeClient, - id = reportDefinitionDetails.reportDefinition.delivery!!.configIds.get(0) + val jobSearchRequest: SearchRequest = + SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse) + val response: SearchResponse = client.search(jobSearchRequest).actionGet() + + val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0] + val configInfo: NotificationConfigInfo? = getNotificationConfigInfo( + client as NodeClient, + reportDefinitionId ) - createNotification(nodeClient, configInfo, reportDefinitionDetails, id, response.getHits().getTotalHits()?.value) + + if (configInfo != null) { + createNotification( + configInfo, + reportDefinitionDetails, + id, + response.hits.totalHits?.value + ) + log.info("Notification with id $id was sent.") + } else { + log.error("NotificationConfigInfo with id $reportDefinitionId was not found.") + } } } } } - - -/** - * Wazuh - Send notification - */ -suspend fun NotificationConfigInfo.sendNotifications(client: Client, title: String, compiledMessage: String, compiledMessageHTML: String?): String { - val config = this - val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - EventSource(title, config.configId, SeverityType.INFO), - ChannelMessage(compiledMessage, compiledMessageHTML, null), - listOf(config.configId), - it - ) - } - validateResponseStatus(res.getStatus(), res.notificationEvent.toString()) - return res.notificationEvent.toString() -} - -suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = - suspendCoroutine { cont -> - block(object : ActionListener { - override fun onResponse(response: T) = cont.resume(response) - - override fun onFailure(e: Exception) = cont.resumeWithException(e) - }) - } - -/** - * All valid response statuses. - */ -private val VALID_RESPONSE_STATUS = setOf( - RestStatus.OK.status, - RestStatus.CREATED.status, - RestStatus.ACCEPTED.status, - RestStatus.NON_AUTHORITATIVE_INFORMATION.status, - RestStatus.NO_CONTENT.status, - RestStatus.RESET_CONTENT.status, - RestStatus.PARTIAL_CONTENT.status, - RestStatus.MULTI_STATUS.status -) - -@Throws(OpenSearchStatusException::class) -fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { - if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { - throw OpenSearchStatusException("Failed: $responseContent", restStatus) - } -} diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt new file mode 100644 index 00000000..414db187 --- /dev/null +++ b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.reportsscheduler.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.GetNotificationConfigRequest +import org.opensearch.commons.notifications.action.GetNotificationConfigResponse +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.model.ChannelMessage +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.commons.notifications.model.SeverityType +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +object NotificationApiUtils { + + private val logger = LogManager.getLogger(NotificationApiUtils::class) + + /** + * Gets a NotificationConfigInfo object by ID if it exists. + */ + suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? { + return try { + val res: GetNotificationConfigResponse = + getNotificationConfig(client, GetNotificationConfigRequest(setOf(id))) + res.searchResult.objectList.firstOrNull() + } catch (e: OpenSearchSecurityException) { + throw e + } catch (e: OpenSearchStatusException) { + if (e.status() == RestStatus.NOT_FOUND) { + logger.debug("Notification config [$id] was not found") + } + null + } + } + + private suspend fun getNotificationConfig( + client: NodeClient, + getNotificationConfigRequest: GetNotificationConfigRequest + ): GetNotificationConfigResponse { + val getNotificationConfigResponse: GetNotificationConfigResponse = + NotificationsPluginInterface.suspendUntil { + this.getNotificationConfig( + client, + getNotificationConfigRequest, + it + ) + } + return getNotificationConfigResponse + } +} + +/** + * Extension function for publishing a notification to a channel in the Notification plugin. + */ +suspend fun NotificationConfigInfo.sendNotificationWithHTML( + client: Client, + title: String, + compiledMessage: String, + compiledMessageHTML: String? +): String { + val config = this + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + EventSource(title, config.configId, SeverityType.INFO), + ChannelMessage(compiledMessage, compiledMessageHTML, null), + listOf(config.configId), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.toString()) + return res.notificationEvent.toString() +} + +/** + * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. + */ +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + +/** + * All valid response statuses. + */ +private val VALID_RESPONSE_STATUS = setOf( + RestStatus.OK.status, + RestStatus.CREATED.status, + RestStatus.ACCEPTED.status, + RestStatus.NON_AUTHORITATIVE_INFORMATION.status, + RestStatus.NO_CONTENT.status, + RestStatus.RESET_CONTENT.status, + RestStatus.PARTIAL_CONTENT.status, + RestStatus.MULTI_STATUS.status +) + +@Throws(OpenSearchStatusException::class) +fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { + if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { + throw OpenSearchStatusException("Failed: $responseContent", restStatus) + } +}