diff --git a/.gitignore b/.gitignore
index 8c03270c..d954410c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -159,6 +159,7 @@ $RECYCLE.BIN/
.idea/modules.xml
.idea/*.iml
.idea/modules
+.idea/*.xml
*.iml
*.ipr
diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml
index 0dd4b354..951989e5 100644
--- a/.idea/kotlinc.xml
+++ b/.idea/kotlinc.xml
@@ -3,4 +3,7 @@
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 08bcbb84..6d5ded36 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -7,7 +7,7 @@
-
+
\ No newline at end of file
diff --git a/.project b/.project
index 3f751dd9..6746ef3a 100644
--- a/.project
+++ b/.project
@@ -1,6 +1,6 @@
- opensearch-reports-scheduler
+ wazuh-indexer-reports-scheduler
Project reports-scheduler created by Buildship.
diff --git a/build.gradle b/build.gradle
index 57442cc3..78358cae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -168,6 +168,9 @@ repositories {
}
dependencies {
+ // Needed for integ tests
+ zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
+ zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
@@ -297,14 +300,35 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty(
testClusters.integTest {
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
- plugin(provider(new Callable(){
- @Override
- RegularFile call() throws Exception {
- return new RegularFile() {
- @Override
- File getAsFile() {
- return configurations.zipArchive.asFileTree.getSingleFile()
- }
+ plugin(provider({
+ new RegularFile() {
+ @Override
+ File getAsFile() {
+ return configurations.zipArchive.asFileTree.matching {
+ include '**/opensearch-job-scheduler*'
+ }.singleFile
+ }
+ }
+ }))
+
+ plugin(provider({
+ new RegularFile() {
+ @Override
+ File getAsFile() {
+ return configurations.zipArchive.asFileTree.matching {
+ include '**/opensearch-notifications-core*'
+ }.singleFile
+ }
+ }
+ }))
+
+ plugin(provider({
+ new RegularFile() {
+ @Override
+ File getAsFile() {
+ return configurations.zipArchive.asFileTree.matching {
+ include '**/notifications*'
+ }.singleFile
}
}
}))
diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt
index e97135f2..7a1fd80d 100644
--- a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt
+++ b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt
@@ -104,6 +104,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch
repositoriesServiceSupplier: Supplier
): Collection {
PluginSettings.addSettingsUpdateConsumer(clusterService)
+ ReportDefinitionJobRunner.initialize(client, clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
return emptyList()
diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt
index 35af1b78..b1f6624d 100644
--- a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt
+++ b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt
@@ -8,6 +8,13 @@ package org.opensearch.reportsscheduler.scheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
+import org.opensearch.action.search.SearchRequest
+import org.opensearch.action.search.SearchResponse
+import org.opensearch.client.Client
+import org.opensearch.client.node.NodeClient
+import org.opensearch.cluster.service.ClusterService
+import org.opensearch.commons.notifications.model.NotificationConfigInfo
+import org.opensearch.index.query.QueryBuilders
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
@@ -15,13 +22,58 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
+import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo
+import org.opensearch.reportsscheduler.util.SecureIndexClient
+import org.opensearch.reportsscheduler.util.buildReportLink
import org.opensearch.reportsscheduler.util.logger
+import org.opensearch.reportsscheduler.util.sendNotificationWithHTML
+import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
internal object ReportDefinitionJobRunner : ScheduledJobRunner {
private val log by logger(ReportDefinitionJobRunner::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
+ private lateinit var client: Client
+ private lateinit var clusterService: ClusterService
+
+ /**
+ * Initialize the class
+ * @param client The ES client
+ * @param clusterService The ES cluster service
+ */
+ fun initialize(client: Client, clusterService: ClusterService) {
+ this.client = SecureIndexClient(client)
+ this.clusterService = clusterService
+ }
+
+ private suspend fun createNotification(
+ configInfo: NotificationConfigInfo,
+ reportDefinitionDetails: ReportDefinitionDetails,
+ id: String,
+ hits: Long?
+ ) {
+ val title: String = reportDefinitionDetails.reportDefinition.delivery!!.title
+ val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription
+ val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription
+
+ val urlDefinition: String =
+ buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id)
+
+ val textWithURL: String =
+ textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString())
+ val htmlWithURL: String? =
+ htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString())
+
+ log.info("esto es el mensaje html $htmlMessage")
+ configInfo.sendNotificationWithHTML(
+ this.client,
+ title,
+ textWithURL,
+ htmlWithURL
+ )
+ }
+
override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ReportDefinitionDetails) {
log.warn("$LOG_PREFIX:job is not of type ReportDefinitionDetails:${job.javaClass.name}")
@@ -48,6 +100,41 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")
+
+ // Wazuh - Make queries
+ val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder()
+ .query(
+ QueryBuilders.boolQuery()
+ .must(
+ QueryBuilders.rangeQuery("timestamp")
+ .gt(beginTime)
+ .lte(currentTime)
+ )
+ .must(
+ QueryBuilders.matchQuery("agent.id", "001")
+ )
+ )
+ val jobSearchRequest: SearchRequest =
+ SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse)
+ val response: SearchResponse = client.search(jobSearchRequest).actionGet()
+
+ val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0]
+ val configInfo: NotificationConfigInfo? = getNotificationConfigInfo(
+ client as NodeClient,
+ reportDefinitionId
+ )
+
+ if (configInfo != null) {
+ createNotification(
+ configInfo,
+ reportDefinitionDetails,
+ id,
+ response.hits.totalHits?.value
+ )
+ log.info("Notification with id $id was sent.")
+ } else {
+ log.error("NotificationConfigInfo with id $reportDefinitionId was not found.")
+ }
}
}
}
diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt
new file mode 100644
index 00000000..414db187
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.reportsscheduler.util
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.OpenSearchSecurityException
+import org.opensearch.OpenSearchStatusException
+import org.opensearch.client.Client
+import org.opensearch.client.node.NodeClient
+import org.opensearch.commons.notifications.NotificationsPluginInterface
+import org.opensearch.commons.notifications.action.GetNotificationConfigRequest
+import org.opensearch.commons.notifications.action.GetNotificationConfigResponse
+import org.opensearch.commons.notifications.action.SendNotificationResponse
+import org.opensearch.commons.notifications.model.ChannelMessage
+import org.opensearch.commons.notifications.model.EventSource
+import org.opensearch.commons.notifications.model.NotificationConfigInfo
+import org.opensearch.commons.notifications.model.SeverityType
+import org.opensearch.core.action.ActionListener
+import org.opensearch.core.rest.RestStatus
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.coroutines.suspendCoroutine
+
+object NotificationApiUtils {
+
+ private val logger = LogManager.getLogger(NotificationApiUtils::class)
+
+ /**
+ * Gets a NotificationConfigInfo object by ID if it exists.
+ */
+ suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? {
+ return try {
+ val res: GetNotificationConfigResponse =
+ getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
+ res.searchResult.objectList.firstOrNull()
+ } catch (e: OpenSearchSecurityException) {
+ throw e
+ } catch (e: OpenSearchStatusException) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ logger.debug("Notification config [$id] was not found")
+ }
+ null
+ }
+ }
+
+ private suspend fun getNotificationConfig(
+ client: NodeClient,
+ getNotificationConfigRequest: GetNotificationConfigRequest
+ ): GetNotificationConfigResponse {
+ val getNotificationConfigResponse: GetNotificationConfigResponse =
+ NotificationsPluginInterface.suspendUntil {
+ this.getNotificationConfig(
+ client,
+ getNotificationConfigRequest,
+ it
+ )
+ }
+ return getNotificationConfigResponse
+ }
+}
+
+/**
+ * Extension function for publishing a notification to a channel in the Notification plugin.
+ */
+suspend fun NotificationConfigInfo.sendNotificationWithHTML(
+ client: Client,
+ title: String,
+ compiledMessage: String,
+ compiledMessageHTML: String?
+): String {
+ val config = this
+ val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
+ this.sendNotification(
+ (client as NodeClient),
+ EventSource(title, config.configId, SeverityType.INFO),
+ ChannelMessage(compiledMessage, compiledMessageHTML, null),
+ listOf(config.configId),
+ it
+ )
+ }
+ validateResponseStatus(res.getStatus(), res.notificationEvent.toString())
+ return res.notificationEvent.toString()
+}
+
+/**
+ * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function.
+ *
+ * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API.
+ */
+suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T =
+ suspendCoroutine { cont ->
+ block(object : ActionListener {
+ override fun onResponse(response: T) = cont.resume(response)
+
+ override fun onFailure(e: Exception) = cont.resumeWithException(e)
+ })
+ }
+
+/**
+ * All valid response statuses.
+ */
+private val VALID_RESPONSE_STATUS = setOf(
+ RestStatus.OK.status,
+ RestStatus.CREATED.status,
+ RestStatus.ACCEPTED.status,
+ RestStatus.NON_AUTHORITATIVE_INFORMATION.status,
+ RestStatus.NO_CONTENT.status,
+ RestStatus.RESET_CONTENT.status,
+ RestStatus.PARTIAL_CONTENT.status,
+ RestStatus.MULTI_STATUS.status
+)
+
+@Throws(OpenSearchStatusException::class)
+fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
+ if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
+ throw OpenSearchStatusException("Failed: $responseContent", restStatus)
+ }
+}