Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.x] Force using snakeyaml version 1.32 to fix CVE issue #543

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ buildscript {
notification_version = System.getProperty("notification.version", opensearch_build)
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.4.0")
kotlin_version = System.getProperty("kotlin.version", "1.6.10")

opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
job_scheduler_no_snapshot = job_scheduler_version.replace("-SNAPSHOT","")
Expand All @@ -39,16 +39,19 @@ buildscript {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}"
classpath "org.jetbrains.kotlin:kotlin-allopen:${kotlin_version}"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.17.1"
classpath "org.jacoco:org.jacoco.agent:0.8.5"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.21.0"
classpath "org.jacoco:org.jacoco.agent:0.8.7"
}
}



plugins {
id 'nebula.ospackage' version "8.3.0"
id "com.dorongold.task-tree" version "1.5"
}


apply plugin: 'java'
apply plugin: 'jacoco'
apply plugin: 'idea'
Expand All @@ -73,8 +76,8 @@ configurations.all {
force 'org.apache.httpcomponents:httpclient-osgi:4.5.13'
force 'org.apache.httpcomponents.client5:httpclient5:5.0.3'
force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3'
force 'com.fasterxml.jackson.core:jackson-databind:2.10.4'
force 'org.yaml:snakeyaml:1.26'
force 'com.fasterxml.jackson.core:jackson-databind:2.13.4'
force 'org.yaml:snakeyaml:1.32'
force 'org.codehaus.plexus:plexus-utils:3.0.24'
}
}
Expand Down Expand Up @@ -127,7 +130,7 @@ detekt {
buildUponDefaultConfig = true
}

configurations.testCompile {
configurations.testImplementation {
exclude module: "securemock"
}

Expand All @@ -144,6 +147,7 @@ allprojects {
if (isSnapshot) {
version += "-SNAPSHOT"
}
jacoco.toolVersion = "0.8.7"
}

dependencies {
Expand All @@ -164,7 +168,7 @@ dependencies {
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testCompile "org.mockito:mockito-core:3.12.4"

add("ktlint", "com.pinterest:ktlint:0.41.0") {
add("ktlint", "com.pinterest:ktlint:0.45.1") {
attributes {
attribute(Bundling.BUNDLING_ATTRIBUTE, objects.named(Bundling, Bundling.EXTERNAL))
}
Expand Down
2 changes: 1 addition & 1 deletion detekt.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
maxIssues: 10
maxIssues: 20

exceptions:
TooGenericExceptionCaught:
Expand Down
1 change: 1 addition & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ configurations.all {
resolutionStrategy {
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
force "org.yaml:snakeyaml:1.32"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

companion object {
fun read(streamInput: StreamInput): StepStatus {
return valueOf(streamInput.readString().toUpperCase(Locale.ROOT))
return valueOf(streamInput.readString().uppercase(Locale.ROOT))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ data class ActionRetry(

when (fieldName) {
COUNT_FIELD -> count = xcp.longValue()
BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().toUpperCase(Locale.ROOT))
BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().uppercase(Locale.ROOT))
DELAY_FIELD -> delay = TimeValue.parseTimeValue(xcp.text(), DELAY_FIELD)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ data class StepMetaData(
when (fieldName) {
NAME -> name = xcp.text()
START_TIME -> startTime = xcp.longValue()
STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().toUpperCase(Locale.ROOT))
STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().uppercase(Locale.ROOT))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

package org.opensearch.indexmanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
Expand Down Expand Up @@ -38,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinat
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
Expand Down Expand Up @@ -73,7 +73,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retr
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
Expand Down Expand Up @@ -251,15 +250,15 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
indexManagementExtensions.forEach { extension ->
val extensionName = extension.getExtensionName()
if (extensionName in extensions) {
throw IllegalStateException("Multiple extensions of IndexManagement have same name $extensionName - not supported")
error("Multiple extensions of IndexManagement have same name $extensionName - not supported")
}
extension.getISMActionParsers().forEach { parser ->
ISMActionsParser.instance.addParser(parser, extensionName)
}
indexMetadataServices.add(extension.getIndexMetadataService())
extension.overrideClusterStateIndexUuidSetting()?.let {
if (customIndexUUIDSetting != null) {
throw IllegalStateException(
error(
"Multiple extensions of IndexManagement plugin overriding ClusterStateIndexUUIDSetting - not supported"
)
}
Expand Down Expand Up @@ -359,7 +358,9 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
)

indexMetadataProvider = IndexMetadataProvider(
settings, client, clusterService,
settings,
client,
clusterService,
hashMapOf(
DEFAULT_INDEX_TYPE to DefaultIndexMetadataService(customIndexUUIDSetting)
)
Expand All @@ -386,7 +387,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider
client,
clusterService,
threadPool,
indexManagementIndices,
metadataService,
templateService,
indexMetadataProvider
)

return listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
Expand All @@ -73,7 +74,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed
import org.opensearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.contentParser
import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse
Expand Down Expand Up @@ -128,17 +128,26 @@ class ManagedIndexCoordinator(
private var scheduledTemplateMigration: Scheduler.Cancellable? = null

@Volatile private var lastFullSweepTimeNano = System.nanoTime()

@Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings)

@Volatile private var metadataServiceEnabled = METADATA_SERVICE_ENABLED.get(settings)

@Volatile private var sweepPeriod = SWEEP_PERIOD.get(settings)

@Volatile private var retryPolicy =
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))

@Volatile private var templateMigrationEnabled: Boolean = true

@Volatile private var templateMigrationEnabledSetting = TEMPLATE_MIGRATION_CONTROL.get(settings)

@Volatile private var jobInterval = JOB_INTERVAL.get(settings)

@Volatile private var jobJitter = JITTER.get(settings)

@Volatile private var isMaster = false

@Volatile private var onMasterTimeStamp: Long = 0L

init {
Expand Down Expand Up @@ -168,8 +177,7 @@ class ManagedIndexCoordinator(
if (!templateMigrationEnabled) scheduledTemplateMigration?.cancel()
else initTemplateMigration(it)
}
clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) {
millis, count ->
clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { millis, count ->
retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}
Expand Down Expand Up @@ -556,14 +564,15 @@ class ManagedIndexCoordinator(

logger.info("Performing ISM template migration.")
if (enableSetting == 0L) {
if (onMasterTimeStamp != 0L)
if (onMasterTimeStamp != 0L) {
templateService.doMigration(Instant.ofEpochMilli(onMasterTimeStamp))
else {
} else {
logger.error("No valid onMaster time cached, cancel ISM template migration job.")
scheduledTemplateMigration?.cancel()
}
} else
} else {
templateService.doMigration(Instant.ofEpochMilli(enableSetting))
}
} catch (e: Exception) {
logger.error("Failed to migrate ISM template", e)
}
Expand Down Expand Up @@ -596,7 +605,8 @@ class ManagedIndexCoordinator(

// Get the matching policyIds for applicable indices
val updateMatchingIndicesReqs = createManagedIndexRequests(
clusterService.state(), unManagedIndices.map { (indexName, _) -> indexName }
clusterService.state(),
unManagedIndices.map { (indexName, _) -> indexName }
)

// check all managed indices, if the index has already been deleted
Expand Down Expand Up @@ -701,7 +711,10 @@ class ManagedIndexCoordinator(
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
it.response.id,
it.response.seqNo,
it.response.primaryTerm,
ManagedIndexConfig.Companion::parse
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ object ManagedIndexRunner :
private lateinit var extensionStatusChecker: ExtensionStatusChecker
private lateinit var indexMetadataProvider: IndexMetadataProvider
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED

@Suppress("MagicNumber")
private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

@Suppress("MagicNumber")
private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

@Suppress("MagicNumber")
private val errorNotificationRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)
private var jobInterval: Int = DEFAULT_JOB_INTERVAL
Expand Down Expand Up @@ -345,7 +348,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Previous action was not able to update IndexMetaData.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -359,7 +363,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Failed to execute action=${action?.type} as extension [$actionExtensionName] is not enabled.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -372,7 +377,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -388,7 +394,10 @@ object ManagedIndexRunner :
// Step null check is done in getStartingManagedIndexMetaData
withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
managedIndexConfig.id,
settings,
threadPool.threadContext,
managedIndexConfig.policy.user
)
) {
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger)
Expand Down Expand Up @@ -476,8 +485,10 @@ object ManagedIndexRunner :
// Intellij complains about createParser/parseWithType blocking because it sees they throw IOExceptions
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
policySource, XContentType.JSON
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
policySource,
XContentType.JSON
)
xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse)
}
Expand All @@ -504,8 +515,11 @@ object ManagedIndexRunner :
@Suppress("TooGenericExceptionCaught")
private suspend fun savePolicyToManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, policy: Policy): Boolean {
val updatedManagedIndexConfig = managedIndexConfig.copy(
policyID = policy.id, policy = policy,
policySeqNo = policy.seqNo, policyPrimaryTerm = policy.primaryTerm, changePolicy = null
policyID = policy.id,
policy = policy,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
changePolicy = null
)
val indexRequest = managedIndexConfigIndexRequest(updatedManagedIndexConfig)
var savedPolicy = false
Expand Down Expand Up @@ -605,8 +619,8 @@ object ManagedIndexRunner :
// this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm
// in the metadata, in this case we just want to say we successfully initialized the policy again but we will not
// modify the state, action, etc. so it can resume where it left off
managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm
&& managedIndexMetaData.policyID == policy.id ->
managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm &&
managedIndexMetaData.policyID == policy.id ->
// If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue.
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0),
Expand Down Expand Up @@ -688,7 +702,6 @@ object ManagedIndexRunner :
managedIndexMetaData: ManagedIndexMetaData,
actionToExecute: Action?
) {

// should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null
val changePolicy = managedIndexConfig.changePolicy
if (changePolicy == null) {
Expand All @@ -709,8 +722,13 @@ object ManagedIndexRunner :
// if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are
// in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase
val newTransitionMetaData = ActionMetaData(
TransitionsAction.name, Instant.now().toEpochMilli(), -1,
false, 0, 0, null
TransitionsAction.name,
Instant.now().toEpochMilli(),
-1,
false,
0,
0,
null
)
val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) {
newTransitionMetaData
Expand Down
Loading