Skip to content

Commit

Permalink
Force using snakeyaml version 1.32 to fix CVE issue (#535)
Browse files Browse the repository at this point in the history
* Force using snakeyaml version 1.32 to fix CVE issue

Signed-off-by: Angie Zhang <[email protected]>

* Force using snakeyaml version 1.32 to fix CVE issue

Signed-off-by: Angie Zhang <[email protected]>

* Force using snakeyaml version 1.32 to fix CVE issue

Signed-off-by: Angie Zhang <[email protected]>

* Force using snakeyaml version 1.32 to fix CVE issue

Signed-off-by: Angie Zhang <[email protected]>

* Kotlin version upgrade compatibility and jackson version upgrade

Signed-off-by: Angie Zhang <[email protected]>

* Kotlin version upgrade compatibility

Signed-off-by: Angie Zhang <[email protected]>

* detekt error fixing

Signed-off-by: Angie Zhang <[email protected]>

* Update detekt setting

Signed-off-by: Angie Zhang <[email protected]>

* Update detekt setting

Signed-off-by: Angie Zhang <[email protected]>

* Update gradle typo

Signed-off-by: Angie Zhang <[email protected]>

* Fix ktlint

Signed-off-by: Angie Zhang <[email protected]>

* Update gradle

Signed-off-by: Angie Zhang <[email protected]>

* Update gradle

Signed-off-by: Angie Zhang <[email protected]>

* Fix build.gradle

Signed-off-by: Angie Zhang <[email protected]>

* Fix jacoco tool version

Signed-off-by: Angie Zhang <[email protected]>

Signed-off-by: Angie Zhang <[email protected]>
  • Loading branch information
Angie Zhang authored Oct 4, 2022
1 parent 73161b5 commit b72eeaa
Show file tree
Hide file tree
Showing 25 changed files with 151 additions and 92 deletions.
20 changes: 12 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ buildscript {
notification_version = System.getProperty("notification.version", opensearch_build)
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.4.32")
kotlin_version = System.getProperty("kotlin.version", "1.6.10")

opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
job_scheduler_no_snapshot = job_scheduler_version.replace("-SNAPSHOT","")
Expand All @@ -39,16 +39,19 @@ buildscript {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}"
classpath "org.jetbrains.kotlin:kotlin-allopen:${kotlin_version}"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.17.1"
classpath "org.jacoco:org.jacoco.agent:0.8.5"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.21.0"
classpath "org.jacoco:org.jacoco.agent:0.8.7"
}
}



plugins {
id 'nebula.ospackage' version "8.3.0"
id "com.dorongold.task-tree" version "1.5"
}


apply plugin: 'java'
apply plugin: 'jacoco'
apply plugin: 'idea'
Expand All @@ -73,8 +76,8 @@ configurations.all {
force 'org.apache.httpcomponents:httpclient-osgi:4.5.13'
force 'org.apache.httpcomponents.client5:httpclient5:5.0.3'
force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3'
force 'com.fasterxml.jackson.core:jackson-databind:2.10.4'
force 'org.yaml:snakeyaml:1.26'
force 'com.fasterxml.jackson.core:jackson-databind:2.13.4'
force 'org.yaml:snakeyaml:1.32'
force 'org.codehaus.plexus:plexus-utils:3.0.24'
}
}
Expand Down Expand Up @@ -127,7 +130,7 @@ detekt {
buildUponDefaultConfig = true
}

configurations.testCompile {
configurations.testImplementation {
exclude module: "securemock"
}

Expand All @@ -144,6 +147,7 @@ allprojects {
if (isSnapshot) {
version += "-SNAPSHOT"
}
jacoco.toolVersion = "0.8.7"
}

dependencies {
Expand All @@ -164,7 +168,7 @@ dependencies {
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testCompile "org.mockito:mockito-core:3.12.4"

add("ktlint", "com.pinterest:ktlint:0.41.0") {
add("ktlint", "com.pinterest:ktlint:0.45.1") {
attributes {
attribute(Bundling.BUNDLING_ATTRIBUTE, objects.named(Bundling, Bundling.EXTERNAL))
}
Expand Down Expand Up @@ -602,4 +606,4 @@ task updateVersion {
// String tokenization to support -SNAPSHOT
ant.replaceregexp(file:'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags:'g', byline:true)
}
}
}
2 changes: 1 addition & 1 deletion detekt.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
maxIssues: 10
maxIssues: 20

exceptions:
TooGenericExceptionCaught:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

companion object {
fun read(streamInput: StreamInput): StepStatus {
return valueOf(streamInput.readString().toUpperCase(Locale.ROOT))
return valueOf(streamInput.readString().uppercase(Locale.ROOT))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ data class ActionRetry(

when (fieldName) {
COUNT_FIELD -> count = xcp.longValue()
BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().toUpperCase(Locale.ROOT))
BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().uppercase(Locale.ROOT))
DELAY_FIELD -> delay = TimeValue.parseTimeValue(xcp.text(), DELAY_FIELD)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ data class StepMetaData(
when (fieldName) {
NAME -> name = xcp.text()
START_TIME -> startTime = xcp.longValue()
STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().toUpperCase(Locale.ROOT))
STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().uppercase(Locale.ROOT))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

package org.opensearch.indexmanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
Expand Down Expand Up @@ -38,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinat
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
Expand Down Expand Up @@ -73,7 +73,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retr
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
Expand Down Expand Up @@ -251,15 +250,15 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
indexManagementExtensions.forEach { extension ->
val extensionName = extension.getExtensionName()
if (extensionName in extensions) {
throw IllegalStateException("Multiple extensions of IndexManagement have same name $extensionName - not supported")
error("Multiple extensions of IndexManagement have same name $extensionName - not supported")
}
extension.getISMActionParsers().forEach { parser ->
ISMActionsParser.instance.addParser(parser, extensionName)
}
indexMetadataServices.add(extension.getIndexMetadataService())
extension.overrideClusterStateIndexUuidSetting()?.let {
if (customIndexUUIDSetting != null) {
throw IllegalStateException(
error(
"Multiple extensions of IndexManagement plugin overriding ClusterStateIndexUUIDSetting - not supported"
)
}
Expand Down Expand Up @@ -359,7 +358,9 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
)

indexMetadataProvider = IndexMetadataProvider(
settings, client, clusterService,
settings,
client,
clusterService,
hashMapOf(
DEFAULT_INDEX_TYPE to DefaultIndexMetadataService(customIndexUUIDSetting)
)
Expand All @@ -386,7 +387,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider
client,
clusterService,
threadPool,
indexManagementIndices,
metadataService,
templateService,
indexMetadataProvider
)

return listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
Expand All @@ -73,7 +74,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed
import org.opensearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.contentParser
import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse
Expand Down Expand Up @@ -128,17 +128,26 @@ class ManagedIndexCoordinator(
private var scheduledTemplateMigration: Scheduler.Cancellable? = null

@Volatile private var lastFullSweepTimeNano = System.nanoTime()

@Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings)

@Volatile private var metadataServiceEnabled = METADATA_SERVICE_ENABLED.get(settings)

@Volatile private var sweepPeriod = SWEEP_PERIOD.get(settings)

@Volatile private var retryPolicy =
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))

@Volatile private var templateMigrationEnabled: Boolean = true

@Volatile private var templateMigrationEnabledSetting = TEMPLATE_MIGRATION_CONTROL.get(settings)

@Volatile private var jobInterval = JOB_INTERVAL.get(settings)

@Volatile private var jobJitter = JITTER.get(settings)

@Volatile private var isMaster = false

@Volatile private var onMasterTimeStamp: Long = 0L

init {
Expand Down Expand Up @@ -168,8 +177,7 @@ class ManagedIndexCoordinator(
if (!templateMigrationEnabled) scheduledTemplateMigration?.cancel()
else initTemplateMigration(it)
}
clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) {
millis, count ->
clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { millis, count ->
retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}
Expand Down Expand Up @@ -556,14 +564,15 @@ class ManagedIndexCoordinator(

logger.info("Performing ISM template migration.")
if (enableSetting == 0L) {
if (onMasterTimeStamp != 0L)
if (onMasterTimeStamp != 0L) {
templateService.doMigration(Instant.ofEpochMilli(onMasterTimeStamp))
else {
} else {
logger.error("No valid onMaster time cached, cancel ISM template migration job.")
scheduledTemplateMigration?.cancel()
}
} else
} else {
templateService.doMigration(Instant.ofEpochMilli(enableSetting))
}
} catch (e: Exception) {
logger.error("Failed to migrate ISM template", e)
}
Expand Down Expand Up @@ -596,7 +605,8 @@ class ManagedIndexCoordinator(

// Get the matching policyIds for applicable indices
val updateMatchingIndicesReqs = createManagedIndexRequests(
clusterService.state(), unManagedIndices.map { (indexName, _) -> indexName }
clusterService.state(),
unManagedIndices.map { (indexName, _) -> indexName }
)

// check all managed indices, if the index has already been deleted
Expand Down Expand Up @@ -701,7 +711,10 @@ class ManagedIndexCoordinator(
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
it.response.id,
it.response.seqNo,
it.response.primaryTerm,
ManagedIndexConfig.Companion::parse
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ object ManagedIndexRunner :
private lateinit var extensionStatusChecker: ExtensionStatusChecker
private lateinit var indexMetadataProvider: IndexMetadataProvider
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED

@Suppress("MagicNumber")
private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

@Suppress("MagicNumber")
private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

@Suppress("MagicNumber")
private val errorNotificationRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)
private var jobInterval: Int = DEFAULT_JOB_INTERVAL
Expand Down Expand Up @@ -345,7 +348,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Previous action was not able to update IndexMetaData.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -359,7 +363,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Failed to execute action=${action?.type} as extension [$actionExtensionName] is not enabled.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -372,7 +377,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -388,7 +394,10 @@ object ManagedIndexRunner :
// Step null check is done in getStartingManagedIndexMetaData
withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
managedIndexConfig.id,
settings,
threadPool.threadContext,
managedIndexConfig.policy.user
)
) {
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger)
Expand Down Expand Up @@ -476,8 +485,10 @@ object ManagedIndexRunner :
// Intellij complains about createParser/parseWithType blocking because it sees they throw IOExceptions
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
policySource, XContentType.JSON
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
policySource,
XContentType.JSON
)
xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse)
}
Expand All @@ -504,8 +515,11 @@ object ManagedIndexRunner :
@Suppress("TooGenericExceptionCaught")
private suspend fun savePolicyToManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, policy: Policy): Boolean {
val updatedManagedIndexConfig = managedIndexConfig.copy(
policyID = policy.id, policy = policy,
policySeqNo = policy.seqNo, policyPrimaryTerm = policy.primaryTerm, changePolicy = null
policyID = policy.id,
policy = policy,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
changePolicy = null
)
val indexRequest = managedIndexConfigIndexRequest(updatedManagedIndexConfig)
var savedPolicy = false
Expand Down Expand Up @@ -605,8 +619,8 @@ object ManagedIndexRunner :
// this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm
// in the metadata, in this case we just want to say we successfully initialized the policy again but we will not
// modify the state, action, etc. so it can resume where it left off
managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm
&& managedIndexMetaData.policyID == policy.id ->
managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm &&
managedIndexMetaData.policyID == policy.id ->
// If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue.
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0),
Expand Down Expand Up @@ -688,7 +702,6 @@ object ManagedIndexRunner :
managedIndexMetaData: ManagedIndexMetaData,
actionToExecute: Action?
) {

// should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null
val changePolicy = managedIndexConfig.changePolicy
if (changePolicy == null) {
Expand All @@ -709,8 +722,13 @@ object ManagedIndexRunner :
// if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are
// in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase
val newTransitionMetaData = ActionMetaData(
TransitionsAction.name, Instant.now().toEpochMilli(), -1,
false, 0, 0, null
TransitionsAction.name,
Instant.now().toEpochMilli(),
-1,
false,
0,
0,
null
)
val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) {
newTransitionMetaData
Expand Down
Loading

0 comments on commit b72eeaa

Please sign in to comment.