Skip to content

Commit

Permalink
small changes
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
Petar Dzepina committed Nov 2, 2022
1 parent efead8e commit dfc5f7b
Showing 1 changed file with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,24 @@ class RollupMapperService(

private val logger = LogManager.getLogger(javaClass)

// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
/**
* If the index already exists we need to verify it's a rollup index,
* confirm it does not conflict with existing jobs and is a valid job.
* If
*
* @param rollup Rollup job we're currently executing
* @param targetIndexResolvedName concrete index name
* @param hasLegacyPlugin flag to indicate if we're running legacy plugin
* @return RollupJobValidationResult indicating success or failure with appropriate error message included.
*/
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(
rollup: Rollup,
targetIndexResolvedName: String,
hasLegacyPlugin: Boolean
): RollupJobValidationResult {
/**
* Target Index is valid alias if either all backing indices have this job in _meta
* or there isn't any rollup job present in _meta
*/
val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
if (rollup.isTargetIndexAlias()) {
val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
if (aliasValidationResult !is RollupJobValidationResult.Valid) {
return aliasValidationResult
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
Expand All @@ -84,20 +88,25 @@ class RollupMapperService(
}
}

/**
* Target Index is valid alias if either all backing indices have this job in _meta
* or there isn't any rollup job present in _meta
*/
@Suppress("ReturnCount")
suspend fun validateTargetIndexAlias(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {

var errorMessage: String

if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
logger.error("[${rollup.targetIndex}] is not an alias!")
return RollupJobValidationResult.Failure("[${rollup.targetIndex}] is not an alias!")
}

val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
if (rollupJobs != null &&
(rollupJobs.size > 1 || rollupJobs[0].id != rollup.id)
) {
errorMessage = "If target_index is alias, write backing index must be used only by this rollup job: [$targetIndexResolvedName]"
errorMessage = "More than one rollup job present on the backing index, cannot add alias for target index: [$targetIndexResolvedName]"
logger.error(errorMessage)
return RollupJobValidationResult.Failure(errorMessage)
}
Expand All @@ -106,8 +115,8 @@ class RollupMapperService(
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
val rollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, rollupJobs)
val allRollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, allRollupJobs)
if (validationResult !is RollupJobValidationResult.Valid) {
return validationResult
}
Expand Down Expand Up @@ -139,7 +148,12 @@ class RollupMapperService(
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
val validationResult = validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
when (validationResult) {
is RollupJobValidationResult.Failure -> logger.error(validationResult.message)
is RollupJobValidationResult.Invalid -> logger.error(validationResult.reason)
}
return validationResult
} else {
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
Expand Down Expand Up @@ -180,10 +194,11 @@ class RollupMapperService(
try {
// 1. First we need to add index.plugins.rollup_index setting to index
if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false) {
logger.error("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
}

// 2. Put rollup mappings
// 2. Put rollup target_index mappings
val putMappingRequest: PutMappingRequest =
PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
Expand All @@ -192,7 +207,7 @@ class RollupMapperService(
if (!respMappings.isAcknowledged) {
return RollupJobValidationResult.Invalid("Failed to put initial rollup mappings for target index [$targetIndexResolvedName]")
}
// 3.
// 3. Add this rollup job to target_index's _meta
errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]"
updateRollupIndexMappings(rollup, targetIndexResolvedName)
} catch (e: RemoteTransportException) {
Expand Down

0 comments on commit dfc5f7b

Please sign in to comment.