From ff33f167bce37f7d37117625e54449b51990c83a Mon Sep 17 00:00:00 2001 From: Bowen Lan <62091230+bowenlan-amzn@users.noreply.github.com> Date: Thu, 7 Jan 2021 09:18:32 -0800 Subject: [PATCH] Snapshot Deny List (#366) --- .../indexmanagement/IndexManagementPlugin.kt | 1 + .../settings/ManagedIndexSettings.kt | 9 ++++ .../step/snapshot/AttemptSnapshotStep.kt | 47 +++++++++++++------ .../util/ManagedIndexUtils.kt | 4 ++ .../action/SnapshotActionIT.kt | 42 +++++++++++++++++ .../step/AttemptSnapshotStepTests.kt | 9 ++++ 6 files changed, 97 insertions(+), 15 deletions(-) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt index 636971841..ae31e6055 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt @@ -268,6 +268,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT, ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS, ManagedIndexSettings.ALLOW_LIST, + ManagedIndexSettings.SNAPSHOT_DENY_LIST, RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT, RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS, RollupSettings.ROLLUP_SEARCH_BACKOFF_COUNT, diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index 1034f3a00..a3c9f8621 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -27,6 +27,7 @@ class ManagedIndexSettings { const val DEFAULT_JOB_INTERVAL = 5 private val ALLOW_LIST_ALL = ActionConfig.ActionType.values().toList().map { it.type } val ALLOW_LIST_NONE = emptyList() + val SNAPSHOT_DENY_LIST_NONE = emptyList() val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( "opendistro.index_state_management.enabled", @@ -121,5 +122,13 @@ class ManagedIndexSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val SNAPSHOT_DENY_LIST: Setting> = Setting.listSetting( + "opendistro.index_state_management.snapshot.deny_list", + SNAPSHOT_DENY_LIST_NONE, + Function.identity(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ) } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt index 2c54b34c4..d3d5ede7d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt @@ -20,8 +20,10 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.common.regex.Regex import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse @@ -46,26 +48,35 @@ class AttemptSnapshotStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null private var snapshotName: String? = null + private var denyList: List = clusterService.clusterSettings.get(SNAPSHOT_DENY_LIST) override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught", "ComplexMethod") override suspend fun execute(): AttemptSnapshotStep { try { - snapshotName = config - .snapshot - .plus("-") - .plus(LocalDateTime - .now(ZoneId.of("UTC")) - .format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))) val mutableInfo = mutableMapOf() + if (isDenied(denyList, config.repository)) { + stepStatus = StepStatus.FAILED + mutableInfo["message"] = getBlockedMessage(denyList, config.repository, indexName) + info = mutableInfo.toMap() + return this + } + + snapshotName = config + .snapshot + .plus("-") + .plus(LocalDateTime + .now(ZoneId.of("UTC")) + .format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))) + val createSnapshotRequest = CreateSnapshotRequest() - .userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management")) - .indices(indexName) - .snapshot(snapshotName) - .repository(config.repository) - .waitForCompletion(false) + .userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management")) + .indices(indexName) + .snapshot(snapshotName) + .repository(config.repository) + .waitForCompletion(false) val response: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(createSnapshotRequest, it) } when (response.status()) { @@ -102,6 +113,11 @@ class AttemptSnapshotStep( return this } + private fun isDenied(denyList: List, repoName: String): Boolean { + val predicate = { pattern: String -> Regex.simpleMatch(pattern, repoName) } + return denyList.stream().anyMatch(predicate) + } + private fun handleSnapshotException(e: ConcurrentSnapshotExecutionException) { val message = getFailedConcurrentSnapshotMessage(indexName) logger.debug(message, e) @@ -122,15 +138,16 @@ class AttemptSnapshotStep( override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { val currentActionMetaData = currentMetaData.actionMetaData return currentMetaData.copy( - actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)), - stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), - transitionTo = null, - info = info + actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)), + stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), + transitionTo = null, + info = info ) } companion object { const val name = "attempt_snapshot" + fun getBlockedMessage(denyList: List, repoName: String, index: String) = "Snapshot repository [$repoName] is blocked in $denyList [index=$index]" fun getFailedMessage(index: String) = "Failed to create snapshot [index=$index]" fun getFailedConcurrentSnapshotMessage(index: String) = "Concurrent snapshot in progress, retrying next execution [index=$index]" fun getSuccessMessage(index: String) = "Successfully started snapshot [index=$index]" diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 2ef274737..6a1a37a76 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -35,10 +35,12 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.delete.AttemptDeleteStep import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule +import org.apache.logging.log4j.LogManager import org.elasticsearch.action.DocWriteRequest import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.index.IndexRequest @@ -57,6 +59,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder import java.time.Instant import java.time.temporal.ChronoUnit +private val log = LogManager.getLogger("ManagedIndexUtils") + fun managedIndexConfigIndexRequest(index: String, uuid: String, policyID: String, jobInterval: Int): IndexRequest { val managedIndexConfig = ManagedIndexConfig( jobName = index, diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt index 7c9762fff..4d2e7fe74 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor @@ -168,4 +169,45 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() { assertEquals("[$repository:$snapshotName] is missing", getExplainManagedIndexMetaData(indexName).info?.get("cause")) } } + + fun `test snapshot repository blocked`() { + val denyList = listOf("hello-*") + updateClusterSetting(SNAPSHOT_DENY_LIST.key, "hello-*") + + val indexName = "${testIndexName}_index_blocked" + val policyID = "${testIndexName}_policy_basic" + val repository = "hello-world" + val snapshot = "snapshot" + val actionConfig = SnapshotActionConfig(repository, snapshot, 0) + val states = listOf( + State("Snapshot", listOf(actionConfig), listOf()) + ) + + createRepository(repository) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { + assertEquals(AttemptSnapshotStep.getBlockedMessage(denyList, repository, indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) + } + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt index 52a411818..8e49ca9ff 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt @@ -4,6 +4,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer @@ -17,10 +18,13 @@ import org.elasticsearch.client.AdminClient import org.elasticsearch.client.Client import org.elasticsearch.client.ClusterAdminClient import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.settings.ClusterSettings +import org.elasticsearch.common.settings.Settings import org.elasticsearch.rest.RestStatus import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException import org.elasticsearch.test.ESTestCase import org.elasticsearch.transport.RemoteTransportException +import org.junit.Before class AttemptSnapshotStepTests : ESTestCase() { @@ -28,6 +32,11 @@ class AttemptSnapshotStepTests : ESTestCase() { private val config = SnapshotActionConfig("repo", "snapshot-name", 0) private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + @Before + fun settings() { + whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST))) + } + fun `test snapshot response when block`() { val response: CreateSnapshotResponse = mock() val client = getClient(getAdminClient(getClusterAdminClient(response, null)))