Skip to content

Commit

Permalink
Always re-run Feature migrations which have encountered errors (elast…
Browse files Browse the repository at this point in the history
…ic#83918)

This PR addressed the behavior described in elastic#83917, in which Feature migrations
which have encountered errors are not re-run in some cases. As of this PR, Features
which have encountered errors during migration are treated the same as Features 
requiring migration.

This PR also adds a test which artificially replicates elastic#83917.
  • Loading branch information
gwbrown authored and Gordon Brown committed Feb 17, 2022
1 parent c3f34f6 commit 5e0c650
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/83918.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 83918
summary: Always re-run Feature migrations which have encountered errors
area: Infra/Core
type: bug
issues:
- 83917
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.upgrades.FeatureMigrationResults;
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand All @@ -51,6 +54,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -268,6 +273,67 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
});
}

public void testMigrationWillRunAfterError() throws Exception {
createSystemIndexForDescriptor(INTERNAL_MANAGED);

TestPlugin.preMigrationHook.set((state) -> Collections.emptyMap());
TestPlugin.postMigrationHook.set((state, metadata) -> {});

ensureGreen();

SetOnce<Exception> failure = new SetOnce<>();
CountDownLatch clusterStateUpdated = new CountDownLatch(1);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitStateUpdateTask(this.getTestName(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
FeatureMigrationResults newResults = new FeatureMigrationResults(
Collections.singletonMap(
FEATURE_NAME,
SingleFeatureMigrationResult.failure(INTERNAL_MANAGED_INDEX_NAME, new RuntimeException("it failed :("))
)
);
Metadata newMetadata = Metadata.builder(currentState.metadata())
.putCustom(FeatureMigrationResults.TYPE, newResults)
.build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
clusterStateUpdated.countDown();
}

@Override
public void onFailure(Exception e) {
failure.set(e);
clusterStateUpdated.countDown();
}
}, ClusterStateTaskExecutor.unbatched());

clusterStateUpdated.await(10, TimeUnit.SECONDS); // Should be basically instantaneous
if (failure.get() != null) {
logger.error("cluster state update to inject migration failure state did not succeed", failure.get());
fail("cluster state update failed, see log for details");
}

PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest();
PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get();
// Make sure we actually started the migration
assertTrue(
"could not find [" + FEATURE_NAME + "] in response: " + Strings.toString(migrationResponse),
migrationResponse.getFeatures().stream().anyMatch(feature -> feature.getFeatureName().equals(FEATURE_NAME))
);

// Now wait for the migration to finish (otherwise the test infra explodes)
assertBusy(() -> {
GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest();
GetFeatureUpgradeStatusResponse statusResp = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
});
}

public void assertIndexHasCorrectProperties(
Metadata metadata,
String indexName,
Expand Down Expand Up @@ -343,6 +409,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
static final String FEATURE_NAME = "A-test-feature"; // Sorts alphabetically before the feature from MultiFeatureMigrationIT
static final String ORIGIN = FeatureMigrationIT.class.getSimpleName();
static final String FlAG_SETTING_KEY = IndexMetadata.INDEX_PRIORITY_SETTING.getKey();
static final String INTERNAL_MANAGED_INDEX_NAME = ".int-man-old";
static final int INDEX_DOC_COUNT = 100; // arbitrarily chosen
public static final Version NEEDS_UPGRADE_VERSION = Version.V_6_0_0;

Expand All @@ -353,7 +420,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
static final SystemIndexDescriptor INTERNAL_MANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".int-man-*")
.setAliasName(".internal-managed-alias")
.setPrimaryIndex(".int-man-old")
.setPrimaryIndex(INTERNAL_MANAGED_INDEX_NAME)
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
.setSettings(createSimpleSettings(NEEDS_UPGRADE_VERSION, INTERNAL_MANAGED_FLAG_VALUE))
.setMappings(createSimpleMapping(true, true, false)) // See note below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams;

import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.getFeatureUpgradeStatus;
Expand Down Expand Up @@ -73,11 +75,15 @@ protected void masterOperation(
ClusterState state,
ActionListener<PostFeatureUpgradeResponse> listener
) throws Exception {
final Set<GetFeatureUpgradeStatusResponse.UpgradeStatus> upgradableStatuses = EnumSet.of(
GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED,
GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR
);
List<PostFeatureUpgradeResponse.Feature> featuresToMigrate = systemIndices.getFeatures()
.values()
.stream()
.map(feature -> getFeatureUpgradeStatus(state, feature))
.filter(status -> status.getUpgradeStatus().equals(GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED))
.filter(status -> upgradableStatuses.contains(status.getUpgradeStatus()))
.map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getFeatureName)
.map(PostFeatureUpgradeResponse.Feature::new)
.sorted(Comparator.comparing(PostFeatureUpgradeResponse.Feature::getFeatureName)) // consistent ordering to simplify testing
Expand Down

0 comments on commit 5e0c650

Please sign in to comment.