Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Snapshot Deny List (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn authored Jan 7, 2021
1 parent ae97e5a commit ff33f16
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
ManagedIndexSettings.SNAPSHOT_DENY_LIST,
RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
RollupSettings.ROLLUP_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ManagedIndexSettings {
const val DEFAULT_JOB_INTERVAL = 5
private val ALLOW_LIST_ALL = ActionConfig.ActionType.values().toList().map { it.type }
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.enabled",
Expand Down Expand Up @@ -121,5 +122,13 @@ class ManagedIndexSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val SNAPSHOT_DENY_LIST: Setting<List<String>> = Setting.listSetting(
"opendistro.index_state_management.snapshot.deny_list",
SNAPSHOT_DENY_LIST_NONE,
Function.identity(),
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.common.regex.Regex
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
Expand All @@ -46,26 +48,35 @@ class AttemptSnapshotStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null
private var denyList: List<String> = clusterService.clusterSettings.get(SNAPSHOT_DENY_LIST)

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught", "ComplexMethod")
override suspend fun execute(): AttemptSnapshotStep {
try {
snapshotName = config
.snapshot
.plus("-")
.plus(LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT)))
val mutableInfo = mutableMapOf<String, String>()

if (isDenied(denyList, config.repository)) {
stepStatus = StepStatus.FAILED
mutableInfo["message"] = getBlockedMessage(denyList, config.repository, indexName)
info = mutableInfo.toMap()
return this
}

snapshotName = config
.snapshot
.plus("-")
.plus(LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT)))

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
.indices(indexName)
.snapshot(snapshotName)
.repository(config.repository)
.waitForCompletion(false)
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
.indices(indexName)
.snapshot(snapshotName)
.repository(config.repository)
.waitForCompletion(false)

val response: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(createSnapshotRequest, it) }
when (response.status()) {
Expand Down Expand Up @@ -102,6 +113,11 @@ class AttemptSnapshotStep(
return this
}

private fun isDenied(denyList: List<String>, repoName: String): Boolean {
val predicate = { pattern: String -> Regex.simpleMatch(pattern, repoName) }
return denyList.stream().anyMatch(predicate)
}

private fun handleSnapshotException(e: ConcurrentSnapshotExecutionException) {
val message = getFailedConcurrentSnapshotMessage(indexName)
logger.debug(message, e)
Expand All @@ -122,15 +138,16 @@ class AttemptSnapshotStep(
override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

companion object {
const val name = "attempt_snapshot"
fun getBlockedMessage(denyList: List<String>, repoName: String, index: String) = "Snapshot repository [$repoName] is blocked in $denyList [index=$index]"
fun getFailedMessage(index: String) = "Failed to create snapshot [index=$index]"
fun getFailedConcurrentSnapshotMessage(index: String) = "Concurrent snapshot in progress, retrying next execution [index=$index]"
fun getSuccessMessage(index: String) = "Successfully started snapshot [index=$index]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.delete.AttemptDeleteStep
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.index.IndexRequest
Expand All @@ -57,6 +59,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.time.temporal.ChronoUnit

private val log = LogManager.getLogger("ManagedIndexUtils")

fun managedIndexConfigIndexRequest(index: String, uuid: String, policyID: String, jobInterval: Int): IndexRequest {
val managedIndexConfig = ManagedIndexConfig(
jobName = index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor
Expand Down Expand Up @@ -168,4 +169,45 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
assertEquals("[$repository:$snapshotName] is missing", getExplainManagedIndexMetaData(indexName).info?.get("cause"))
}
}

fun `test snapshot repository blocked`() {
val denyList = listOf("hello-*")
updateClusterSetting(SNAPSHOT_DENY_LIST.key, "hello-*")

val indexName = "${testIndexName}_index_blocked"
val policyID = "${testIndexName}_policy_basic"
val repository = "hello-world"
val snapshot = "snapshot"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertEquals(AttemptSnapshotStep.getBlockedMessage(denyList, repository, indexName), getExplainManagedIndexMetaData(indexName).info?.get("message"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
Expand All @@ -17,17 +18,25 @@ import org.elasticsearch.client.AdminClient
import org.elasticsearch.client.Client
import org.elasticsearch.client.ClusterAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.ClusterSettings
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException
import org.elasticsearch.test.ESTestCase
import org.elasticsearch.transport.RemoteTransportException
import org.junit.Before

class AttemptSnapshotStepTests : ESTestCase() {

private val clusterService: ClusterService = mock()
private val config = SnapshotActionConfig("repo", "snapshot-name", 0)
private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)

@Before
fun settings() {
whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST)))
}

fun `test snapshot response when block`() {
val response: CreateSnapshotResponse = mock()
val client = getClient(getAdminClient(getClusterAdminClient(response, null)))
Expand Down

0 comments on commit ff33f16

Please sign in to comment.