diff --git a/.gitignore b/.gitignore index 8c03270c..d954410c 100644 --- a/.gitignore +++ b/.gitignore @@ -159,6 +159,7 @@ $RECYCLE.BIN/ .idea/modules.xml .idea/*.iml .idea/modules +.idea/*.xml *.iml *.ipr diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 0dd4b354..951989e5 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -3,4 +3,7 @@ + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 08bcbb84..6d5ded36 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -7,7 +7,7 @@ - + \ No newline at end of file diff --git a/.project b/.project index 3f751dd9..6746ef3a 100644 --- a/.project +++ b/.project @@ -1,6 +1,6 @@ - opensearch-reports-scheduler + wazuh-indexer-reports-scheduler Project reports-scheduler created by Buildship. diff --git a/build.gradle b/build.gradle index d2c2ce54..fc219a07 100644 --- a/build.gradle +++ b/build.gradle @@ -172,6 +172,9 @@ repositories { } dependencies { + // Needed for integ tests + zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}" + zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}" zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" implementation "org.opensearch:opensearch:${opensearch_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" @@ -301,14 +304,35 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty( testClusters.integTest { testDistribution = "INTEG_TEST" // need to install job-scheduler first, need to assemble job-scheduler first - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.getSingleFile() - } + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-job-scheduler*' + }.singleFile + } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-notifications-core*' + }.singleFile + } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/notifications*' + }.singleFile } } })) diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt index e97135f2..7a1fd80d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -104,6 +104,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch repositoriesServiceSupplier: Supplier ): Collection { PluginSettings.addSettingsUpdateConsumer(clusterService) + ReportDefinitionJobRunner.initialize(client, clusterService) ReportDefinitionsIndex.initialize(client, clusterService) ReportInstancesIndex.initialize(client, clusterService) return emptyList() diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt index 35af1b78..b1f6624d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt @@ -8,6 +8,13 @@ package org.opensearch.reportsscheduler.scheduler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.index.query.QueryBuilders import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner @@ -15,13 +22,58 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF import org.opensearch.reportsscheduler.index.ReportInstancesIndex import org.opensearch.reportsscheduler.model.ReportDefinitionDetails import org.opensearch.reportsscheduler.model.ReportInstance +import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo +import org.opensearch.reportsscheduler.util.SecureIndexClient +import org.opensearch.reportsscheduler.util.buildReportLink import org.opensearch.reportsscheduler.util.logger +import org.opensearch.reportsscheduler.util.sendNotificationWithHTML +import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant internal object ReportDefinitionJobRunner : ScheduledJobRunner { private val log by logger(ReportDefinitionJobRunner::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + private lateinit var client: Client + private lateinit var clusterService: ClusterService + + /** + * Initialize the class + * @param client The ES client + * @param clusterService The ES cluster service + */ + fun initialize(client: Client, clusterService: ClusterService) { + this.client = SecureIndexClient(client) + this.clusterService = clusterService + } + + private suspend fun createNotification( + configInfo: NotificationConfigInfo, + reportDefinitionDetails: ReportDefinitionDetails, + id: String, + hits: Long? + ) { + val title: String = reportDefinitionDetails.reportDefinition.delivery!!.title + val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription + val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription + + val urlDefinition: String = + buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id) + + val textWithURL: String = + textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString()) + val htmlWithURL: String? = + htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString()) + + log.info("esto es el mensaje html $htmlMessage") + configInfo.sendNotificationWithHTML( + this.client, + title, + textWithURL, + htmlWithURL + ) + } + override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { if (job !is ReportDefinitionDetails) { log.warn("$LOG_PREFIX:job is not of type ReportDefinitionDetails:${job.javaClass.name}") @@ -48,6 +100,41 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance") } else { log.info("$LOG_PREFIX:runJob-created job:$id") + + // Wazuh - Make queries + val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder() + .query( + QueryBuilders.boolQuery() + .must( + QueryBuilders.rangeQuery("timestamp") + .gt(beginTime) + .lte(currentTime) + ) + .must( + QueryBuilders.matchQuery("agent.id", "001") + ) + ) + val jobSearchRequest: SearchRequest = + SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse) + val response: SearchResponse = client.search(jobSearchRequest).actionGet() + + val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0] + val configInfo: NotificationConfigInfo? = getNotificationConfigInfo( + client as NodeClient, + reportDefinitionId + ) + + if (configInfo != null) { + createNotification( + configInfo, + reportDefinitionDetails, + id, + response.hits.totalHits?.value + ) + log.info("Notification with id $id was sent.") + } else { + log.error("NotificationConfigInfo with id $reportDefinitionId was not found.") + } } } } diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt new file mode 100644 index 00000000..414db187 --- /dev/null +++ b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.reportsscheduler.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.GetNotificationConfigRequest +import org.opensearch.commons.notifications.action.GetNotificationConfigResponse +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.model.ChannelMessage +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.commons.notifications.model.SeverityType +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +object NotificationApiUtils { + + private val logger = LogManager.getLogger(NotificationApiUtils::class) + + /** + * Gets a NotificationConfigInfo object by ID if it exists. + */ + suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? { + return try { + val res: GetNotificationConfigResponse = + getNotificationConfig(client, GetNotificationConfigRequest(setOf(id))) + res.searchResult.objectList.firstOrNull() + } catch (e: OpenSearchSecurityException) { + throw e + } catch (e: OpenSearchStatusException) { + if (e.status() == RestStatus.NOT_FOUND) { + logger.debug("Notification config [$id] was not found") + } + null + } + } + + private suspend fun getNotificationConfig( + client: NodeClient, + getNotificationConfigRequest: GetNotificationConfigRequest + ): GetNotificationConfigResponse { + val getNotificationConfigResponse: GetNotificationConfigResponse = + NotificationsPluginInterface.suspendUntil { + this.getNotificationConfig( + client, + getNotificationConfigRequest, + it + ) + } + return getNotificationConfigResponse + } +} + +/** + * Extension function for publishing a notification to a channel in the Notification plugin. + */ +suspend fun NotificationConfigInfo.sendNotificationWithHTML( + client: Client, + title: String, + compiledMessage: String, + compiledMessageHTML: String? +): String { + val config = this + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + EventSource(title, config.configId, SeverityType.INFO), + ChannelMessage(compiledMessage, compiledMessageHTML, null), + listOf(config.configId), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.toString()) + return res.notificationEvent.toString() +} + +/** + * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. + */ +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + +/** + * All valid response statuses. + */ +private val VALID_RESPONSE_STATUS = setOf( + RestStatus.OK.status, + RestStatus.CREATED.status, + RestStatus.ACCEPTED.status, + RestStatus.NON_AUTHORITATIVE_INFORMATION.status, + RestStatus.NO_CONTENT.status, + RestStatus.RESET_CONTENT.status, + RestStatus.PARTIAL_CONTENT.status, + RestStatus.MULTI_STATUS.status +) + +@Throws(OpenSearchStatusException::class) +fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { + if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { + throw OpenSearchStatusException("Failed: $responseContent", restStatus) + } +}