Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metadata migration logic error when update setting call failed #328

Merged
merged 1 commit into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class MetadataService(
private var failedToCleanIndices = mutableSetOf<Index>()

private var counter = 0
private var runTimeCounter = 0
final var runTimeCounter = 1
private set
private val maxRunTime = 10

// used in coordinator sweep to cancel scheduled process
Expand Down Expand Up @@ -84,12 +85,18 @@ class MetadataService(
return
}

if (runTimeCounter >= maxRunTime) {
if (!imIndices.indexManagementIndexExists()) {
logger.info("ISM config index not exist, so we cancel the metadata migration job.")
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved

if (runTimeCounter > maxRunTime) {
updateStatusSetting(-1)
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}
logger.info("Doing metadata migration ${++runTimeCounter} time.")
logger.info("Doing metadata migration $runTimeCounter time.")

val indicesMetadata = clusterService.state().metadata.indices
var clusterStateManagedIndexMetadata = indicesMetadata.map {
Expand Down Expand Up @@ -117,18 +124,18 @@ class MetadataService(
}

if (clusterStateManagedIndexMetadata.isEmpty()) {
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.")
cleanMetadatas(failedToCleanIndices.toList())
finishFlag = false; runningLock = false
return
}
if (counter++ > 2 && corruptManagedIndices.isEmpty()) {
logger.info("Move Metadata succeed, set finish flag to true. Indices failed to get indexed: $failedToIndexIndices")
updateStatusSetting(1)
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.")
cleanMetadatas(failedToCleanIndices.toList())
finishFlag = false; runningLock = false
return
}
} else {
counter = 0; finishFlag = false // index metadata for indices which metadata hasn't been indexed
val bulkIndexReq =
Expand Down Expand Up @@ -160,6 +167,8 @@ class MetadataService(
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean cluster metadata for: ${failedToCleanIndices.map { it.name }}")
}

runTimeCounter++
} finally {
runningLock = false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.never
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse
import org.opensearch.client.AdminClient
import org.opensearch.client.Client
import org.opensearch.client.ClusterAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.test.OpenSearchTestCase
import kotlin.test.assertFailsWith

class MetadataServiceTests : OpenSearchTestCase() {

private val clusterService: ClusterService = mock()
private val clusterState: ClusterState = mock()
private val metadata: Metadata = mock()
private val imIndices: IndexManagementIndices = mock()

private val ex = Exception()

@Before
fun setup() {
whenever(clusterService.state()).doReturn(clusterState)
whenever(clusterState.metadata).doReturn(metadata)
whenever(metadata.indices).doReturn(ImmutableOpenMap.of())
}

fun `test config index not exists`() = runBlocking {
whenever(imIndices.indexManagementIndexExists()).doReturn(false)

val client = getClient(
getAdminClient(
getClusterAdminClient(
updateSettingResponse = null,
updateSettingException = ex
)
)
)
val skipFlag = SkipExecution(client, clusterService)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()

verify(client.admin().cluster(), never()).updateSettings(any(), any())
assertEquals(metadataService.finishFlag, true)
}

// If update setting to 1 failed with some exception, runTimeCounter shouldn't be increased
fun `test failed to update setting to 1`() = runBlocking {
whenever(imIndices.indexManagementIndexExists()).doReturn(true)

val client = getClient(
getAdminClient(
getClusterAdminClient(
updateSettingResponse = null,
updateSettingException = ex
)
)
)

val skipFlag = SkipExecution(client, clusterService)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 2)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 3)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 4)
assertFailsWith(Exception::class) {
runBlocking {
metadataService.moveMetadata()
}
}
assertEquals(metadataService.runTimeCounter, 4)
assertEquals(metadataService.finishFlag, false)
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }

private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient }

private fun getClusterAdminClient(
updateSettingResponse: ClusterUpdateSettingsResponse?,
updateSettingException: Exception?
): ClusterAdminClient {
assertTrue(
"Must provide either a getMappingsResponse or getMappingsException",
(updateSettingResponse != null).xor(updateSettingException != null)
)

return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<ClusterUpdateSettingsResponse>>(1)
if (updateSettingResponse != null) listener.onResponse(updateSettingResponse)
else listener.onFailure(updateSettingException)
}.whenever(this.mock).updateSettings(any(), any())
}
}
}