Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover closed indices after a full cluster restart #39249

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
Expand All @@ -41,6 +42,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,8 +61,11 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* Tests to run before and after a full cluster restart. This is run twice,
Expand Down Expand Up @@ -951,6 +956,97 @@ public void testSoftDeletes() throws Exception {
}
}

/**
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
* it verifies that the index exists and is replicated if the old version supports replication.
*/
public void testClosedIndices() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen(index);

int numDocs = 0;
if (randomBoolean()) {
numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
assertOK(client().performRequest(request));
if (rarely()) {
refresh();
}
}
refresh();
}

assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
closeIndex(index);
}

if (getOldClusterVersion().onOrAfter(Version.V_8_0_0)) {
ensureGreenLongWait(index);
assertClosedIndex(index, true);
} else {
assertClosedIndex(index, false);
}

if (isRunningAgainstOldCluster() == false) {
openIndex(index);
ensureGreen(index);

final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
assertThat(numberOfShards, notNullValue());
final int nbShards = Integer.parseInt(numberOfShards);
assertThat(nbShards, greaterThanOrEqualTo(1));

for (int i = 0; i < nbShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(2));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Predicate;
Expand All @@ -43,7 +48,9 @@
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* In depth testing of the recovery mechanism during a rolling restart.
Expand Down Expand Up @@ -310,4 +317,144 @@ public void testRecoveryWithSoftDeletes() throws Exception {
}
ensureGreen(index);
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
* the replication of closed indices) during the rolling upgrade.
*/
public void testRecoveryClosedIndex() throws Exception {
final String indexName = "closed_index_created_on_old";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

final Version indexVersionCreated = indexVersionCreated(indexName);
if (indexVersionCreated.onOrAfter(Version.V_8_0_0)) {
// index was created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
* time the index was closed.
*/
public void testCloseIndexDuringRollingUpgrade() throws Exception {
final Version minimumNodeVersion = minimumNodeVersion();
final String indexName =
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);

if (indexExists(indexName) == false) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

if (minimumNodeVersion.onOrAfter(Version.V_8_0_0)) {
// index is created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* Returns the version in which the given index has been created
*/
private static Version indexVersionCreated(final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_settings");
final String versionCreatedSetting = indexName + ".settings.index.version.created";
request.addParameter("filter_path", versionCreatedSetting);

final Response response = client().performRequest(request);
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));

for (int i = 0; i < numberOfShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -90,6 +92,8 @@ public class MetaDataIndexStateService {
public static final int INDEX_CLOSED_BLOCK_ID = 4;
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false,
false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
public static final Setting<Boolean> VERIFIED_BEFORE_CLOSE_SETTING =
Setting.boolSetting("index.verified_before_close", false, Setting.Property.IndexScope, Setting.Property.PrivateIndex);

private final ClusterService clusterService;
private final AllocationService allocationService;
Expand Down Expand Up @@ -402,15 +406,22 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
continue;
}

logger.debug("closing index {} succeeded", index);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
final IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE);
if (removeRoutingTable) {
metadata.put(updatedMetaData);
routingTable.remove(index.getName());
} else {
metadata.put(updatedMetaData
.settingsVersion(indexMetaData.getSettingsVersion() + 1)
.settings(Settings.builder()
.put(indexMetaData.getSettings())
.put(VERIFIED_BEFORE_CLOSE_SETTING.getKey(), true)));
routingTable.addAsFromOpenToClose(metadata.getSafe(index));
}

logger.debug("closing index {} succeeded", index);
closedIndices.add(index.getName());
} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
Expand Down Expand Up @@ -490,7 +501,15 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState)
for (IndexMetaData indexMetaData : indicesToOpen) {
final Index index = indexMetaData.getIndex();
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build();
final Settings.Builder updatedSettings = Settings.builder().put(indexMetaData.getSettings());
updatedSettings.remove(VERIFIED_BEFORE_CLOSE_SETTING.getKey());

IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData)
.state(IndexMetaData.State.OPEN)
.settingsVersion(indexMetaData.getSettingsVersion() + 1)
.settings(updatedSettings)
.build();

// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion);
Expand Down Expand Up @@ -554,4 +573,9 @@ public static ClusterBlock createIndexClosingBlock() {
EnumSet.of(ClusterBlockLevel.WRITE));
}

public static boolean isIndexVerifiedBeforeClosed(final IndexMetaData indexMetaData) {
return indexMetaData.getState() == IndexMetaData.State.CLOSE
&& VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetaData.getSettings())
&& VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetaData.getSettings());
}
}
Loading