Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always re-run Feature migrations which have encountered errors #83918

Merged
merged 10 commits into from
Feb 17, 2022
6 changes: 6 additions & 0 deletions docs/changelog/83918.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 83918
summary: Always re-run Feature migrations which have encountered errors
area: Infra/Core
type: bug
issues:
- 83917
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.upgrades.FeatureMigrationResults;
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

Expand All @@ -50,6 +53,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -267,6 +272,67 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
});
}

public void testMigrationWillRunAfterError() throws Exception {
createSystemIndexForDescriptor(INTERNAL_MANAGED);

TestPlugin.preMigrationHook.set((state) -> Collections.emptyMap());
TestPlugin.postMigrationHook.set((state, metadata) -> {});

ensureGreen();

SetOnce<Exception> failure = new SetOnce<>();
CountDownLatch clusterStateUpdated = new CountDownLatch(1);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitStateUpdateTask(this.getTestName(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
FeatureMigrationResults newResults = new FeatureMigrationResults(
Collections.singletonMap(
FEATURE_NAME,
SingleFeatureMigrationResult.failure(INTERNAL_MANAGED_INDEX_NAME, new RuntimeException("it failed :("))
)
);
Metadata newMetadata = Metadata.builder(currentState.metadata())
.putCustom(FeatureMigrationResults.TYPE, newResults)
.build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
clusterStateUpdated.countDown();
}

@Override
public void onFailure(Exception e) {
failure.set(e);
clusterStateUpdated.countDown();
}
}, ClusterStateTaskExecutor.unbatched());

clusterStateUpdated.await(10, TimeUnit.SECONDS); // Should be basically instantaneous
if (failure.get() != null) {
logger.error("cluster state update to inject migration failure state did not succeed", failure.get());
fail("cluster state update failed, see log for details");
}

PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest();
PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get();
// Make sure we actually started the migration
assertTrue(
"could not find [" + FEATURE_NAME + "] in response: " + Strings.toString(migrationResponse),
migrationResponse.getFeatures().stream().anyMatch(feature -> feature.getFeatureName().equals(FEATURE_NAME))
);

// Now wait for the migration to finish (otherwise the test infra explodes)
assertBusy(() -> {
GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest();
GetFeatureUpgradeStatusResponse statusResp = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
});
}

public void assertIndexHasCorrectProperties(
Metadata metadata,
String indexName,
Expand Down Expand Up @@ -344,6 +410,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
static final String FEATURE_NAME = "A-test-feature"; // Sorts alphabetically before the feature from MultiFeatureMigrationIT
static final String ORIGIN = FeatureMigrationIT.class.getSimpleName();
static final String FlAG_SETTING_KEY = IndexMetadata.INDEX_PRIORITY_SETTING.getKey();
static final String INTERNAL_MANAGED_INDEX_NAME = ".int-man-old";
static final int INDEX_DOC_COUNT = 100; // arbitrarily chosen
public static final Version NEEDS_UPGRADE_VERSION = Version.V_7_0_0;

Expand All @@ -354,7 +421,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
static final SystemIndexDescriptor INTERNAL_MANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".int-man-*")
.setAliasName(".internal-managed-alias")
.setPrimaryIndex(".int-man-old")
.setPrimaryIndex(INTERNAL_MANAGED_INDEX_NAME)
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
.setSettings(createSimpleSettings(NEEDS_UPGRADE_VERSION, INTERNAL_MANAGED_FLAG_VALUE))
.setMappings(createSimpleMapping(true, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams;

import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.getFeatureUpgradeStatus;
Expand Down Expand Up @@ -75,11 +77,15 @@ protected void masterOperation(
ClusterState state,
ActionListener<PostFeatureUpgradeResponse> listener
) throws Exception {
final Set<GetFeatureUpgradeStatusResponse.UpgradeStatus> upgradableStatuses = EnumSet.of(
GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED,
GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR
);
List<PostFeatureUpgradeResponse.Feature> featuresToMigrate = systemIndices.getFeatures()
.values()
.stream()
.map(feature -> getFeatureUpgradeStatus(state, feature))
.filter(status -> status.getUpgradeStatus().equals(GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED))
.filter(status -> upgradableStatuses.contains(status.getUpgradeStatus()))
.map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getFeatureName)
.map(PostFeatureUpgradeResponse.Feature::new)
.sorted(Comparator.comparing(PostFeatureUpgradeResponse.Feature::getFeatureName)) // consistent ordering to simplify testing
Expand Down