Skip to content

Commit

Permalink
Fix metadata migration logic error when update setting call failed (#328
Browse files Browse the repository at this point in the history
)

If update setting to 1 failed, runTimeCounter shouldn't be increased.
Otherwise it will increase to be larger than 10, and start to update
setting to -1 which is unwanted.

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored Apr 14, 2022
1 parent 34b480c commit f4a3938
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class MetadataService(
private var failedToCleanIndices = mutableSetOf<Index>()

private var counter = 0
private var runTimeCounter = 0
final var runTimeCounter = 1
private set
private val maxRunTime = 10

// used in coordinator sweep to cancel scheduled process
Expand Down Expand Up @@ -84,12 +85,18 @@ class MetadataService(
return
}

if (runTimeCounter >= maxRunTime) {
if (!imIndices.indexManagementIndexExists()) {
logger.info("ISM config index not exist, so we cancel the metadata migration job.")
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}

if (runTimeCounter > maxRunTime) {
updateStatusSetting(-1)
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}
logger.info("Doing metadata migration ${++runTimeCounter} time.")
logger.info("Doing metadata migration $runTimeCounter time.")

val indicesMetadata = clusterService.state().metadata.indices
var clusterStateManagedIndexMetadata = indicesMetadata.map {
Expand Down Expand Up @@ -117,18 +124,18 @@ class MetadataService(
}

if (clusterStateManagedIndexMetadata.isEmpty()) {
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.")
cleanMetadatas(failedToCleanIndices.toList())
finishFlag = false; runningLock = false
return
}
if (counter++ > 2 && corruptManagedIndices.isEmpty()) {
logger.info("Move Metadata succeed, set finish flag to true. Indices failed to get indexed: $failedToIndexIndices")
updateStatusSetting(1)
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.")
cleanMetadatas(failedToCleanIndices.toList())
finishFlag = false; runningLock = false
return
}
} else {
counter = 0; finishFlag = false // index metadata for indices which metadata hasn't been indexed
val bulkIndexReq =
Expand Down Expand Up @@ -160,6 +167,8 @@ class MetadataService(
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean cluster metadata for: ${failedToCleanIndices.map { it.name }}")
}

runTimeCounter++
} finally {
runningLock = false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.never
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse
import org.opensearch.client.AdminClient
import org.opensearch.client.Client
import org.opensearch.client.ClusterAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.test.OpenSearchTestCase
import kotlin.test.assertFailsWith

class MetadataServiceTests : OpenSearchTestCase() {

private val clusterService: ClusterService = mock()
private val clusterState: ClusterState = mock()
private val metadata: Metadata = mock()
private val imIndices: IndexManagementIndices = mock()

private val ex = Exception()

@Before
fun setup() {
whenever(clusterService.state()).doReturn(clusterState)
whenever(clusterState.metadata).doReturn(metadata)
whenever(metadata.indices).doReturn(ImmutableOpenMap.of())
}

fun `test config index not exists`() = runBlocking {
whenever(imIndices.indexManagementIndexExists()).doReturn(false)

val client = getClient(
getAdminClient(
getClusterAdminClient(
updateSettingResponse = null,
updateSettingException = ex
)
)
)
val skipFlag = SkipExecution(client, clusterService)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()

verify(client.admin().cluster(), never()).updateSettings(any(), any())
assertEquals(metadataService.finishFlag, true)
}

// If update setting to 1 failed with some exception, runTimeCounter shouldn't be increased
fun `test failed to update setting to 1`() = runBlocking {
whenever(imIndices.indexManagementIndexExists()).doReturn(true)

val client = getClient(
getAdminClient(
getClusterAdminClient(
updateSettingResponse = null,
updateSettingException = ex
)
)
)

val skipFlag = SkipExecution(client, clusterService)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 2)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 3)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 4)
assertFailsWith(Exception::class) {
runBlocking {
metadataService.moveMetadata()
}
}
assertEquals(metadataService.runTimeCounter, 4)
assertEquals(metadataService.finishFlag, false)
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }

private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient }

private fun getClusterAdminClient(
updateSettingResponse: ClusterUpdateSettingsResponse?,
updateSettingException: Exception?
): ClusterAdminClient {
assertTrue(
"Must provide either a getMappingsResponse or getMappingsException",
(updateSettingResponse != null).xor(updateSettingException != null)
)

return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<ClusterUpdateSettingsResponse>>(1)
if (updateSettingResponse != null) listener.onResponse(updateSettingResponse)
else listener.onFailure(updateSettingException)
}.whenever(this.mock).updateSettings(any(), any())
}
}
}

0 comments on commit f4a3938

Please sign in to comment.