diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt index 9193fd39e..b69fcac5c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt @@ -56,7 +56,8 @@ class MetadataService( private var failedToCleanIndices = mutableSetOf() private var counter = 0 - private var runTimeCounter = 0 + final var runTimeCounter = 1 + private set private val maxRunTime = 10 // used in coordinator sweep to cancel scheduled process @@ -84,12 +85,18 @@ class MetadataService( return } - if (runTimeCounter >= maxRunTime) { + if (!imIndices.indexManagementIndexExists()) { + logger.info("ISM config index not exist, so we cancel the metadata migration job.") + finishFlag = true; runningLock = false; runTimeCounter = 0 + return + } + + if (runTimeCounter > maxRunTime) { updateStatusSetting(-1) finishFlag = true; runningLock = false; runTimeCounter = 0 return } - logger.info("Doing metadata migration ${++runTimeCounter} time.") + logger.info("Doing metadata migration $runTimeCounter time.") val indicesMetadata = clusterService.state().metadata.indices var clusterStateManagedIndexMetadata = indicesMetadata.map { @@ -117,18 +124,18 @@ class MetadataService( } if (clusterStateManagedIndexMetadata.isEmpty()) { - if (failedToCleanIndices.isNotEmpty()) { - logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.") - cleanMetadatas(failedToCleanIndices.toList()) - finishFlag = false; runningLock = false - return - } if (counter++ > 2 && corruptManagedIndices.isEmpty()) { logger.info("Move Metadata succeed, set finish flag to true. Indices failed to get indexed: $failedToIndexIndices") updateStatusSetting(1) finishFlag = true; runningLock = false; runTimeCounter = 0 return } + if (failedToCleanIndices.isNotEmpty()) { + logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.") + cleanMetadatas(failedToCleanIndices.toList()) + finishFlag = false; runningLock = false + return + } } else { counter = 0; finishFlag = false // index metadata for indices which metadata hasn't been indexed val bulkIndexReq = @@ -160,6 +167,8 @@ class MetadataService( if (failedToCleanIndices.isNotEmpty()) { logger.info("Failed to clean cluster metadata for: ${failedToCleanIndices.map { it.name }}") } + + runTimeCounter++ } finally { runningLock = false } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt new file mode 100644 index 000000000..fdd223560 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt @@ -0,0 +1,111 @@ +package org.opensearch.indexmanagement.indexstatemanagement + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.never +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse +import org.opensearch.client.AdminClient +import org.opensearch.client.Client +import org.opensearch.client.ClusterAdminClient +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.metadata.Metadata +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.collect.ImmutableOpenMap +import org.opensearch.indexmanagement.IndexManagementIndices +import org.opensearch.test.OpenSearchTestCase +import kotlin.test.assertFailsWith + +class MetadataServiceTests : OpenSearchTestCase() { + + private val clusterService: ClusterService = mock() + private val clusterState: ClusterState = mock() + private val metadata: Metadata = mock() + private val imIndices: IndexManagementIndices = mock() + + private val ex = Exception() + + @Before + fun setup() { + whenever(clusterService.state()).doReturn(clusterState) + whenever(clusterState.metadata).doReturn(metadata) + whenever(metadata.indices).doReturn(ImmutableOpenMap.of()) + } + + fun `test config index not exists`() = runBlocking { + whenever(imIndices.indexManagementIndexExists()).doReturn(false) + + val client = getClient( + getAdminClient( + getClusterAdminClient( + updateSettingResponse = null, + updateSettingException = ex + ) + ) + ) + val skipFlag = SkipExecution(client, clusterService) + val metadataService = MetadataService(client, clusterService, skipFlag, imIndices) + metadataService.moveMetadata() + + verify(client.admin().cluster(), never()).updateSettings(any(), any()) + assertEquals(metadataService.finishFlag, true) + } + + // If update setting to 1 failed with some exception, runTimeCounter shouldn't be increased + fun `test failed to update setting to 1`() = runBlocking { + whenever(imIndices.indexManagementIndexExists()).doReturn(true) + + val client = getClient( + getAdminClient( + getClusterAdminClient( + updateSettingResponse = null, + updateSettingException = ex + ) + ) + ) + + val skipFlag = SkipExecution(client, clusterService) + val metadataService = MetadataService(client, clusterService, skipFlag, imIndices) + metadataService.moveMetadata() + assertEquals(metadataService.runTimeCounter, 2) + metadataService.moveMetadata() + assertEquals(metadataService.runTimeCounter, 3) + metadataService.moveMetadata() + assertEquals(metadataService.runTimeCounter, 4) + assertFailsWith(Exception::class) { + runBlocking { + metadataService.moveMetadata() + } + } + assertEquals(metadataService.runTimeCounter, 4) + assertEquals(metadataService.finishFlag, false) + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + + private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient } + + private fun getClusterAdminClient( + updateSettingResponse: ClusterUpdateSettingsResponse?, + updateSettingException: Exception? + ): ClusterAdminClient { + assertTrue( + "Must provide either a getMappingsResponse or getMappingsException", + (updateSettingResponse != null).xor(updateSettingException != null) + ) + + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (updateSettingResponse != null) listener.onResponse(updateSettingResponse) + else listener.onFailure(updateSettingException) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +}