-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initialize primary term for shrunk indices #25307
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,23 +25,21 @@ | |
import org.apache.lucene.search.SortedSetSortField; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; | ||
import org.elasticsearch.action.admin.indices.segments.IndexSegments; | ||
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; | ||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; | ||
import org.elasticsearch.action.admin.indices.segments.ShardSegments; | ||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; | ||
import org.elasticsearch.action.support.ActiveShardCount; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterInfoService; | ||
import org.elasticsearch.cluster.InternalClusterInfoService; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.routing.RoutingTable; | ||
import org.elasticsearch.cluster.routing.UnassignedInfo; | ||
import org.elasticsearch.common.Priority; | ||
import org.elasticsearch.common.collect.ImmutableOpenMap; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.index.engine.Segment; | ||
import org.elasticsearch.index.query.TermsQueryBuilder; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
|
@@ -50,10 +48,14 @@ | |
|
||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.stream.IntStream; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
|
||
public class ShrinkIndexIT extends ESIntegTestCase { | ||
|
||
|
@@ -135,6 +137,53 @@ public void testCreateShrinkIndexToN() { | |
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); | ||
} | ||
|
||
public void testShrinkIndexPrimaryTerm() throws Exception { | ||
final List<Integer> factors = Arrays.asList(2, 3, 5, 7); | ||
final List<Integer> numberOfShardsFactors = randomSubsetOf(randomIntBetween(1, factors.size()), factors); | ||
final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y); | ||
final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y); | ||
internalCluster().ensureAtLeastNumDataNodes(2); | ||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get(); | ||
|
||
final ImmutableOpenMap<String, DiscoveryNode> dataNodes = | ||
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); | ||
assertThat(dataNodes.size(), greaterThanOrEqualTo(2)); | ||
final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); | ||
final String mergeNode = discoveryNodes[0].getName(); | ||
ensureGreen(); | ||
|
||
// restart random data nodes to force the primary term for some shards to increase | ||
for (int i = 0; i < randomIntBetween(0, 16); i++) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need up to 16 restarts? Maybe it's faster to fail shards by getting IndexShard instances from internalCluster? also this if loop calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I pushed a change that does this. |
||
internalCluster().restartRandomDataNode(); | ||
ensureGreen(); | ||
} | ||
// relocate all shards to one node such that we can merge it. | ||
final Settings.Builder prepareShrinkSettings = | ||
Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true); | ||
client().admin().indices().prepareUpdateSettings("source").setSettings(prepareShrinkSettings).get(); | ||
ensureGreen(); | ||
|
||
final IndexMetaData indexMetaData = indexMetaData(client(), "source"); | ||
final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetaData::primaryTerm).max().getAsLong(); | ||
|
||
// now merge source into target | ||
final Settings shrinkSettings = | ||
Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", numberOfTargetShards).build(); | ||
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target").setSettings(shrinkSettings).get()); | ||
|
||
ensureGreen(); | ||
|
||
final IndexMetaData afterShrinkIndexMetaData = indexMetaData(client(), "target"); | ||
for (int shardId = 0; shardId < numberOfTargetShards; shardId++) { | ||
assertThat(afterShrinkIndexMetaData.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1)); | ||
} | ||
} | ||
|
||
private static IndexMetaData indexMetaData(final Client client, final String index) { | ||
final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet(); | ||
return clusterStateResponse.getState().metaData().index(index); | ||
} | ||
|
||
public void testCreateShrinkIndex() { | ||
internalCluster().ensureAtLeastNumDataNodes(2); | ||
Version version = VersionUtils.randomVersion(random()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add a comment explaining why we do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a comment.