Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Snapshot Deny List #366

Merged
merged 5 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
ManagedIndexSettings.SNAPSHOT_DENY_LIST,
RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
RollupSettings.ROLLUP_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ManagedIndexSettings {
const val DEFAULT_JOB_INTERVAL = 5
private val ALLOW_LIST_ALL = ActionConfig.ActionType.values().toList().map { it.type }
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.enabled",
Expand Down Expand Up @@ -121,5 +122,13 @@ class ManagedIndexSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val SNAPSHOT_DENY_LIST: Setting<List<String>> = Setting.listSetting(
"opendistro.index_state_management.snapshot.deny_list",
SNAPSHOT_DENY_LIST_NONE,
Function.identity(),
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.common.regex.Regex
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
Expand All @@ -46,26 +48,35 @@ class AttemptSnapshotStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null
private var denyList: List<String> = clusterService.clusterSettings.get(SNAPSHOT_DENY_LIST)

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught", "ComplexMethod")
override suspend fun execute(): AttemptSnapshotStep {
try {
snapshotName = config
.snapshot
.plus("-")
.plus(LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT)))
val mutableInfo = mutableMapOf<String, String>()

if (isDenied(denyList, config.repository)) {
stepStatus = StepStatus.FAILED
mutableInfo["message"] = getBlockedMessage(denyList, config.repository, indexName)
info = mutableInfo.toMap()
return this
}

snapshotName = config
.snapshot
.plus("-")
.plus(LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT)))

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
.indices(indexName)
.snapshot(snapshotName)
.repository(config.repository)
.waitForCompletion(false)
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
.indices(indexName)
.snapshot(snapshotName)
.repository(config.repository)
.waitForCompletion(false)

val response: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(createSnapshotRequest, it) }
when (response.status()) {
Expand Down Expand Up @@ -102,6 +113,11 @@ class AttemptSnapshotStep(
return this
}

private fun isDenied(denyList: List<String>, repoName: String): Boolean {
val predicate = { pattern: String -> Regex.simpleMatch(pattern, repoName) }
return denyList.stream().anyMatch(predicate)
}

private fun handleSnapshotException(e: ConcurrentSnapshotExecutionException) {
val message = getFailedConcurrentSnapshotMessage(indexName)
logger.debug(message, e)
Expand All @@ -122,15 +138,16 @@ class AttemptSnapshotStep(
override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

companion object {
const val name = "attempt_snapshot"
fun getBlockedMessage(denyList: List<String>, repoName: String, index: String) = "Snapshot repository [$repoName] is blocked in $denyList [index=$index]"
fun getFailedMessage(index: String) = "Failed to create snapshot [index=$index]"
fun getFailedConcurrentSnapshotMessage(index: String) = "Concurrent snapshot in progress, retrying next execution [index=$index]"
fun getSuccessMessage(index: String) = "Successfully started snapshot [index=$index]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.delete.AttemptDeleteStep
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.index.IndexRequest
Expand All @@ -57,6 +59,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.time.temporal.ChronoUnit

private val log = LogManager.getLogger("ManagedIndexUtils")

fun managedIndexConfigIndexRequest(index: String, uuid: String, policyID: String, jobInterval: Int): IndexRequest {
val managedIndexConfig = ManagedIndexConfig(
jobName = index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor
Expand Down Expand Up @@ -168,4 +169,45 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
assertEquals("[$repository:$snapshotName] is missing", getExplainManagedIndexMetaData(indexName).info?.get("cause"))
}
}

fun `test snapshot repository blocked`() {
val denyList = listOf("hello-*")
updateClusterSetting(SNAPSHOT_DENY_LIST.key, "hello-*")

val indexName = "${testIndexName}_index_blocked"
val policyID = "${testIndexName}_policy_basic"
val repository = "hello-world"
val snapshot = "snapshot"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertEquals(AttemptSnapshotStep.getBlockedMessage(denyList, repository, indexName), getExplainManagedIndexMetaData(indexName).info?.get("message"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
Expand All @@ -17,17 +18,25 @@ import org.elasticsearch.client.AdminClient
import org.elasticsearch.client.Client
import org.elasticsearch.client.ClusterAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.ClusterSettings
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException
import org.elasticsearch.test.ESTestCase
import org.elasticsearch.transport.RemoteTransportException
import org.junit.Before

class AttemptSnapshotStepTests : ESTestCase() {

private val clusterService: ClusterService = mock()
private val config = SnapshotActionConfig("repo", "snapshot-name", 0)
private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)

@Before
fun settings() {
whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST)))
}

fun `test snapshot response when block`() {
val response: CreateSnapshotResponse = mock()
val client = getClient(getAdminClient(getClusterAdminClient(response, null)))
Expand Down