Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Added JobSchedular integration to index operation APIs #149

Merged
merged 1 commit into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
* {@inheritDoc}
*/
override fun getJobIndex(): String {
return JOB_INDEX_NAME
return JOB_INDEX_NAME // return ReportDefinitionsIndex.REPORT_DEFINITIONS_INDEX_NAME
}

/**
* {@inheritDoc}
*/
override fun getJobRunner(): ScheduledJobRunner {
return jobRunner
return jobRunner // TODO return ReportDefinitionJobRunner
}

/**
* {@inheritDoc}
*/
override fun getJobParser(): ScheduledJobParser {
return ScheduledReportJobParser()
return ScheduledReportJobParser() // TODO return ReportDefinitionJobParser
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ internal class ReportInstanceAction(
if (state == null || state == State.Scheduled) { // Don't allow changing status to Scheduled
throw IllegalArgumentException("$STATUS_FIELD field not valid")
}
val updatedReportInstance = currentReportInstance.copy(lastUpdatedTime = currentTime,
val updatedReportInstance = currentReportInstance.copy(updatedTime = currentTime,
currentState = state,
currentStateDescription = stateDescription)
val isUpdated = IndexManager.updateReportInstance(updatedReportInstance)
Expand Down Expand Up @@ -274,7 +274,7 @@ internal class ReportInstanceAction(
*/
val lockedJob = reportInstances.subList(0, PluginSettings.maxLockRetries).find {
val updatedInstance = it.copy(reportInstance = it.reportInstance.copy(
lastUpdatedTime = currentTime,
updatedTime = currentTime,
currentState = State.Executing
))
IndexManager.updateReportInstanceDoc(updatedInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package com.amazon.opendistroforelasticsearch.reportsscheduler.index

import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.ReportDefinitionDetails
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.LAST_UPDATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.OWNER_ID_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.UPDATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.settings.PluginSettings
import com.amazon.opendistroforelasticsearch.reportsscheduler.util.SecureIndexClient
import com.amazon.opendistroforelasticsearch.reportsscheduler.util.logger
Expand Down Expand Up @@ -53,7 +53,7 @@ internal class ReportDefinitionsIndex(client: Client, private val clusterService

companion object {
private val log by logger(ReportDefinitionsIndex::class.java)
private const val REPORT_DEFINITIONS_INDEX_NAME = ".opendistro-reports-definitions"
const val REPORT_DEFINITIONS_INDEX_NAME = ".opendistro-reports-definitions"
private const val REPORT_DEFINITIONS_MAPPING_FILE_NAME = "report-definitions-mapping.yml"
private const val REPORT_DEFINITIONS_SETTINGS_FILE_NAME = "report-definitions-settings.yml"
private const val MAPPING_TYPE = "_doc"
Expand Down Expand Up @@ -139,7 +139,7 @@ internal class ReportDefinitionsIndex(client: Client, private val clusterService
val query = QueryBuilders.matchQuery(OWNER_ID_FIELD, ownerId)
val sourceBuilder = SearchSourceBuilder()
.timeout(TimeValue(PluginSettings.operationTimeoutMs, TimeUnit.MILLISECONDS))
.sort(LAST_UPDATED_TIME_FIELD)
.sort(UPDATED_TIME_FIELD)
.size(MAX_ITEMS_TO_QUERY)
.from(from)
.query(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPl
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.ReportInstance
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.ReportInstance.State
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.ReportInstanceDoc
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.LAST_UPDATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.STATUS_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.UPDATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.USER_ID_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.settings.PluginSettings
import com.amazon.opendistroforelasticsearch.reportsscheduler.util.SecureIndexClient
Expand Down Expand Up @@ -143,7 +143,7 @@ internal class ReportInstancesIndex(client: Client, private val clusterService:
val query = QueryBuilders.matchQuery(USER_ID_FIELD, ownerId)
val sourceBuilder = SearchSourceBuilder()
.timeout(TimeValue(PluginSettings.operationTimeoutMs, TimeUnit.MILLISECONDS))
.sort(LAST_UPDATED_TIME_FIELD)
.sort(UPDATED_TIME_FIELD)
.size(MAX_ITEMS_TO_QUERY)
.from(from)
.query(query)
Expand Down Expand Up @@ -242,7 +242,7 @@ internal class ReportInstancesIndex(client: Client, private val clusterService:
parser.nextToken()
val reportInstance = ReportInstance.parse(parser, it.id)
if (reportInstance.currentState == State.Scheduled || // If still in Scheduled state
reportInstance.lastUpdatedTime.isBefore(refTime)) { // or when timeout happened
reportInstance.updatedTime.isBefore(refTime)) { // or when timeout happened
mutableList.add(ReportInstanceDoc(reportInstance, it.seqNo, it.primaryTerm))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.amazon.opendistroforelasticsearch.reportsscheduler.model

import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.CronSchedule
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.Schedule
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.ScheduleParser
import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.reportsscheduler.util.logger
import com.amazon.opendistroforelasticsearch.reportsscheduler.util.stringList
Expand All @@ -28,9 +28,6 @@ import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import java.time.Duration
import java.time.Instant
import java.time.ZoneId
import java.time.temporal.ChronoUnit

/**
* Report definition main data class.
Expand Down Expand Up @@ -274,22 +271,11 @@ internal data class ReportDefinition(
*/
internal data class Trigger(
val triggerType: TriggerType,
val cronSchedule: CronSchedule?,
val intervalSchedule: IntervalSchedule?
val schedule: Schedule?
) : ToXContentObject {
internal companion object {
private const val TRIGGER_TYPE_TAG = "triggerType"

// keeping it same as CronSchedule class
private const val CRON_FIELD = "cron"
private const val EXPRESSION_FIELD = "expression"
private const val TIMEZONE_FIELD = "timezone"

// keeping it same as IntervalSchedule class
private const val START_TIME_FIELD = "start_time"
private const val INTERVAL_FIELD = "interval"
private const val PERIOD_FIELD = "period"
private const val UNIT_FIELD = "unit"
private const val SCHEDULE_TAG = "schedule"

/**
* Parse the data from parser and create Trigger object
Expand All @@ -298,76 +284,26 @@ internal data class ReportDefinition(
*/
fun parse(parser: XContentParser): Trigger {
var triggerType: TriggerType? = null
var cronSchedule: CronSchedule? = null
var intervalSchedule: IntervalSchedule? = null
var schedule: Schedule? = null
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation)
while (XContentParser.Token.END_OBJECT != parser.nextToken()) {
val fieldName = parser.currentName()
parser.nextToken()
when (fieldName) {
TRIGGER_TYPE_TAG -> triggerType = TriggerType.valueOf(parser.text())
CRON_FIELD -> cronSchedule = parseCron(parser)
INTERVAL_FIELD -> intervalSchedule = parseInterval(parser)
SCHEDULE_TAG -> schedule = ScheduleParser.parse(parser)
else -> log.info("$LOG_PREFIX: Trigger Skipping Unknown field $fieldName")
}
}
triggerType ?: throw IllegalArgumentException("$TRIGGER_TYPE_TAG field absent")
if (triggerType == TriggerType.CronSchedule) {
cronSchedule ?: throw IllegalArgumentException("$CRON_FIELD field absent")
} else if (triggerType == TriggerType.IntervalSchedule) {
intervalSchedule ?: throw IllegalArgumentException("$INTERVAL_FIELD field absent")
if (isScheduleType(triggerType)) {
schedule ?: throw IllegalArgumentException("$SCHEDULE_TAG field absent")
}
return Trigger(triggerType, cronSchedule, intervalSchedule)
return Trigger(triggerType, schedule)
}

/**
* Parse cron expression
* @param parser data referenced at parser
* @return CronSchedule object
*/
private fun parseCron(parser: XContentParser): CronSchedule {
var timezone: ZoneId? = null
var expression: String? = null
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation)
while (XContentParser.Token.END_OBJECT != parser.nextToken()) {
val fieldName = parser.currentName()
parser.nextToken()
when (fieldName) {
EXPRESSION_FIELD -> expression = parser.text()
TIMEZONE_FIELD -> timezone = ZoneId.of(parser.text())
else -> log.info("$LOG_PREFIX:CronSchedule Skipping Unknown field $fieldName")
}
}
timezone ?: throw IllegalArgumentException("$TIMEZONE_FIELD field absent")
expression ?: throw IllegalArgumentException("$EXPRESSION_FIELD field absent")
return CronSchedule(expression, timezone)
}

/**
* Parse interval expression
* @param parser data referenced at parser
* @return IntervalSchedule object
*/
private fun parseInterval(parser: XContentParser): IntervalSchedule {
// Keep same logic as in class [IntervalSchedule]
var startTime: Instant? = null
var interval = 0
var unit: ChronoUnit? = null
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation)
while (XContentParser.Token.END_OBJECT != parser.nextToken()) {
val fieldName = parser.currentName()
parser.nextToken()
when (fieldName) {
START_TIME_FIELD -> startTime = Instant.ofEpochMilli(parser.longValue())
PERIOD_FIELD -> interval = parser.intValue()
UNIT_FIELD -> unit = ChronoUnit.valueOf(parser.text())
else -> log.info("$LOG_PREFIX:IntervalSchedule Skipping Unknown field $fieldName")
}
}
startTime ?: throw IllegalArgumentException("$START_TIME_FIELD field absent")
if (interval <= 0) throw IllegalArgumentException("$PERIOD_FIELD field absent or invalid")
unit ?: throw IllegalArgumentException("$UNIT_FIELD field absent")
return IntervalSchedule(startTime, interval, unit)
fun isScheduleType(triggerType: TriggerType): Boolean {
return (triggerType == TriggerType.CronSchedule || triggerType == TriggerType.IntervalSchedule)
}
}

Expand All @@ -378,13 +314,9 @@ internal data class ReportDefinition(
builder!!
builder.startObject()
.field(TRIGGER_TYPE_TAG, triggerType)
if (triggerType == TriggerType.CronSchedule) {
cronSchedule!!
cronSchedule.toXContent(builder, ToXContent.EMPTY_PARAMS)
}
if (triggerType == TriggerType.IntervalSchedule) {
intervalSchedule!!
intervalSchedule.toXContent(builder, ToXContent.EMPTY_PARAMS)
if (isScheduleType(triggerType)) {
builder.field(SCHEDULE_TAG)
schedule!!.toXContent(builder, ToXContent.EMPTY_PARAMS)
}
builder.endObject()
return builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package com.amazon.opendistroforelasticsearch.reportsscheduler.model

import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.Schedule
import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.ReportDefinition.TriggerType
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.CREATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.ID_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.LAST_UPDATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.OWNER_ID_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.REPORT_DEFINITION_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.PluginRestHandler.Companion.UPDATED_TIME_FIELD
import com.amazon.opendistroforelasticsearch.reportsscheduler.settings.PluginSettings
import com.amazon.opendistroforelasticsearch.reportsscheduler.util.logger
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.common.xcontent.XContentParser
Expand All @@ -36,11 +39,11 @@ import java.time.Instant
*/
internal data class ReportDefinitionDetails(
val id: String,
val lastUpdatedTime: Instant,
val updatedTime: Instant,
val createdTime: Instant,
val ownerId: String,
val reportDefinition: ReportDefinition
) : ToXContentObject {
) : ScheduledJobParameter {
internal companion object {
private val log by logger(ReportDefinitionDetails::class.java)

Expand All @@ -51,7 +54,7 @@ internal data class ReportDefinitionDetails(
*/
fun parse(parser: XContentParser, useId: String? = null): ReportDefinitionDetails {
var id: String? = useId
var lastUpdatedTime: Instant? = null
var updatedTime: Instant? = null
var createdTime: Instant? = null
var createdBy: String? = null
var reportDefinition: ReportDefinition? = null
Expand All @@ -61,7 +64,7 @@ internal data class ReportDefinitionDetails(
parser.nextToken()
when (fieldName) {
ID_FIELD -> id = parser.text()
LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = Instant.ofEpochMilli(parser.longValue())
UPDATED_TIME_FIELD -> updatedTime = Instant.ofEpochMilli(parser.longValue())
CREATED_TIME_FIELD -> createdTime = Instant.ofEpochMilli(parser.longValue())
OWNER_ID_FIELD -> createdBy = parser.text()
REPORT_DEFINITION_FIELD -> reportDefinition = ReportDefinition.parse(parser)
Expand All @@ -72,12 +75,12 @@ internal data class ReportDefinitionDetails(
}
}
id ?: throw IllegalArgumentException("$ID_FIELD field absent")
lastUpdatedTime ?: throw IllegalArgumentException("$LAST_UPDATED_TIME_FIELD field absent")
updatedTime ?: throw IllegalArgumentException("$UPDATED_TIME_FIELD field absent")
createdTime ?: throw IllegalArgumentException("$CREATED_TIME_FIELD field absent")
createdBy ?: throw IllegalArgumentException("$OWNER_ID_FIELD field absent")
reportDefinition ?: throw IllegalArgumentException("$REPORT_DEFINITION_FIELD field absent")
return ReportDefinitionDetails(id,
lastUpdatedTime,
updatedTime,
createdTime,
createdBy,
reportDefinition)
Expand All @@ -92,13 +95,6 @@ internal data class ReportDefinitionDetails(
return toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS, includeId)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
return toXContent(builder, params, false)
}

/**
* {ref toXContent}
*/
Expand All @@ -108,12 +104,58 @@ internal data class ReportDefinitionDetails(
if (includeId) {
builder.field(ID_FIELD, id)
}
builder.field(LAST_UPDATED_TIME_FIELD, lastUpdatedTime.toEpochMilli())
builder.field(UPDATED_TIME_FIELD, updatedTime.toEpochMilli())
.field(CREATED_TIME_FIELD, createdTime.toEpochMilli())
.field(OWNER_ID_FIELD, ownerId)
builder.field(REPORT_DEFINITION_FIELD)
reportDefinition.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.endObject()
return builder
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
return toXContent(builder, params, false)
}

/**
* {@inheritDoc}
*/
override fun getName(): String {
return reportDefinition.name
}

/**
* {@inheritDoc}
*/
override fun getLastUpdateTime(): Instant {
return updatedTime
}

/**
* {@inheritDoc}
*/
override fun getEnabledTime(): Instant {
return createdTime
}

/**
* {@inheritDoc}
*/
override fun getLockDurationSeconds(): Long? {
return PluginSettings.jobLockDurationSeconds.toLong()
}

override fun getSchedule(): Schedule {
return reportDefinition.trigger.schedule!!
}

override fun isEnabled(): Boolean {
val trigger = reportDefinition.trigger
return (reportDefinition.isEnabled &&
(reportDefinition.trigger.schedule != null) &&
(trigger.triggerType == TriggerType.IntervalSchedule || trigger.triggerType == TriggerType.CronSchedule))
}
}
Loading