diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 6fe7b0fd75c01..07a4dc44cd20f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; @@ -25,9 +26,11 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -38,6 +41,8 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -76,12 +81,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; @@ -90,6 +98,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; public abstract class CcrIntegTestCase extends ESTestCase { @@ -240,10 +249,12 @@ protected final ClusterHealthStatus ensureLeaderYellow(String... indices) { } protected final ClusterHealthStatus ensureLeaderGreen(String... indices) { + logger.info("ensure green leader indices {}", Arrays.toString(indices)); return ensureColor(clusterGroup.leaderCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices); } protected final ClusterHealthStatus ensureFollowerGreen(String... indices) { + logger.info("ensure green follower indices {}", Arrays.toString(indices)); return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices); } @@ -391,20 +402,54 @@ public static ResumeFollowAction.Request resumeFollow(String followerIndex) { return request; } - protected void assertSameDocCount(String leaderIndex, String followerIndex) throws Exception { - refresh(leaderClient(), leaderIndex); - SearchRequest request1 = new SearchRequest(leaderIndex); - request1.source(new SearchSourceBuilder().size(0)); - SearchResponse response1 = leaderClient().search(request1).actionGet(); + /** + * This asserts the index is fully replicated from the leader index to the follower index. It first verifies that the seq_no_stats + * on the follower equal the leader's; then verifies the existing pairs of (docId, seqNo) on the follower also equal the leader. + */ + protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String followerIndex) throws Exception { + logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex); assertBusy(() -> { - refresh(followerClient(), followerIndex); - SearchRequest request2 = new SearchRequest(followerIndex); - request2.source(new SearchSourceBuilder().size(0)); - SearchResponse response2 = followerClient().search(request2).actionGet(); - assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits())); + Map leaderStats = new HashMap<>(); + for (ShardStats shardStat : leaderClient().admin().indices().prepareStats(leaderIndex).clear().get().getShards()) { + if (shardStat.getSeqNoStats() == null) { + throw new AssertionError("leader seq_no_stats is not available [" + Strings.toString(shardStat) + "]"); + } + leaderStats.put(shardStat.getShardRouting().shardId().id(), shardStat.getSeqNoStats()); + } + Map followerStats = new HashMap<>(); + for (ShardStats shardStat : followerClient().admin().indices().prepareStats(followerIndex).clear().get().getShards()) { + if (shardStat.getSeqNoStats() == null) { + throw new AssertionError("follower seq_no_stats is not available [" + Strings.toString(shardStat) + "]"); + } + followerStats.put(shardStat.getShardRouting().shardId().id(), shardStat.getSeqNoStats()); + } + assertThat(leaderStats, equalTo(followerStats)); + }, 60, TimeUnit.SECONDS); + logger.info("--> asserting <> between {} and {}", leaderIndex, followerIndex); + assertBusy(() -> { + assertThat(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex), + equalTo(getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex))); }, 60, TimeUnit.SECONDS); } + private Map> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException { + final ClusterState state = cluster.client().admin().cluster().prepareState().get().getState(); + List shardRoutings = state.routingTable().allShards(index); + Randomness.shuffle(shardRoutings); + final Map> docs = new HashMap<>(); + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting == null || shardRouting.assignedToNode() == false || docs.containsKey(shardRouting.shardId().id())) { + continue; + } + IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName()) + .indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); + docs.put(shardRouting.shardId().id(), IndexShardTestCase.getDocIdAndSeqNos(indexShard).stream() + .map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L)) // normalize primary term as the follower use its own term + .collect(Collectors.toList())); + } + return docs; + } + protected void atLeastDocsIndexed(Client client, String index, long numDocsReplicated) throws InterruptedException { logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index); awaitBusy(() -> { @@ -416,6 +461,20 @@ protected void atLeastDocsIndexed(Client client, String index, long numDocsRepli }, 60, TimeUnit.SECONDS); } + protected void awaitGlobalCheckpointAtLeast(Client client, ShardId shardId, long minimumGlobalCheckpoint) throws Exception { + logger.info("waiting for the global checkpoint on [{}] at least [{}]", shardId, minimumGlobalCheckpoint); + assertBusy(() -> { + ShardStats stats = client.admin().indices().prepareStats(shardId.getIndexName()).clear().get() + .asMap().entrySet().stream().filter(e -> e.getKey().shardId().equals(shardId)) + .map(Map.Entry::getValue).findFirst().orElse(null); + if (stats == null || stats.getSeqNoStats() == null) { + throw new AssertionError("seq_no_stats for shard [" + shardId + "] is not found"); // causes assertBusy to retry + } + assertThat(Strings.toString(stats.getSeqNoStats()), + stats.getSeqNoStats().getGlobalCheckpoint(), greaterThanOrEqualTo(minimumGlobalCheckpoint)); + }, 60, TimeUnit.SECONDS); + } + protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index followerIndex, int numberOfShards) throws Exception { assertBusy(() -> { long[] msuOnLeader = new long[numberOfShards]; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index e2318e149b4eb..6685776e9805d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -19,11 +19,14 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import java.util.Locale; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -47,9 +50,17 @@ public void testFailOverOnFollower() throws Exception { AtomicBoolean stopped = new AtomicBoolean(); Thread[] threads = new Thread[between(1, 8)]; AtomicInteger docID = new AtomicInteger(); + Semaphore availableDocs = new Semaphore(0); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { while (stopped.get() == false) { + try { + if (availableDocs.tryAcquire(10, TimeUnit.MILLISECONDS) == false) { + continue; + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } if (frequently()) { String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update IndexResponse indexResponse = leaderClient().prepareIndex("leader-index", "doc", id) @@ -64,6 +75,7 @@ public void testFailOverOnFollower() throws Exception { }); threads[i].start(); } + availableDocs.release(between(100, 200)); PutFollowAction.Request follow = putFollow("leader-index", "follower-index"); follow.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048)); follow.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); @@ -74,7 +86,7 @@ public void testFailOverOnFollower() throws Exception { logger.info("--> follow params {}", Strings.toString(follow.getFollowRequest())); followerClient().execute(PutFollowAction.INSTANCE, follow).get(); ensureFollowerGreen("follower-index"); - atLeastDocsIndexed(followerClient(), "follower-index", between(20, 60)); + awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(30, 80)); final ClusterState clusterState = getFollowerCluster().clusterService().state(); for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) { if (shardRouting.primary()) { @@ -83,17 +95,18 @@ public void testFailOverOnFollower() throws Exception { break; } } + availableDocs.release(between(50, 200)); ensureFollowerGreen("follower-index"); - atLeastDocsIndexed(followerClient(), "follower-index", between(80, 150)); + availableDocs.release(between(50, 200)); + awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(100, 150)); stopped.set(true); for (Thread thread : threads) { thread.join(); } - assertSameDocCount("leader-index", "follower-index"); + assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); pauseFollow("follower-index"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337") public void testFollowIndexAndCloseNode() throws Exception { getFollowerCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); @@ -101,18 +114,23 @@ public void testFollowIndexAndCloseNode() throws Exception { ensureLeaderGreen("index1"); AtomicBoolean run = new AtomicBoolean(true); + Semaphore availableDocs = new Semaphore(0); Thread thread = new Thread(() -> { int counter = 0; while (run.get()) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); try { - leaderClient().prepareIndex("index1", "doc") - .setSource(source, XContentType.JSON) - .setTimeout(TimeValue.timeValueSeconds(1)) - .get(); - } catch (Exception e) { - logger.error("Error while indexing into leader index", e); + if (availableDocs.tryAcquire(10, TimeUnit.MILLISECONDS) == false) { + continue; + } + } catch (InterruptedException e) { + throw new AssertionError(e); } + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + IndexResponse indexResp = leaderClient().prepareIndex("index1", "doc") + .setSource(source, XContentType.JSON) + .setTimeout(TimeValue.timeValueSeconds(1)) + .get(); + logger.info("--> index id={} seq_no={}", indexResp.getId(), indexResp.getSeqNo()); } }); thread.start(); @@ -125,19 +143,19 @@ public void testFollowIndexAndCloseNode() throws Exception { followRequest.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + logger.info("--> follow params {}", Strings.toString(followRequest.getFollowRequest())); - long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getFollowRequest().getMaxReadRequestOperationCount(), - followRequest.getFollowRequest().getMaxReadRequestOperationCount() * 10)); - long minNumDocsReplicated = maxNumDocsReplicated / 3L; - logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated); - atLeastDocsIndexed(followerClient(), "index2", minNumDocsReplicated); + int maxOpsPerRead = followRequest.getFollowRequest().getMaxReadRequestOperationCount(); + int maxNumDocsReplicated = Math.min(between(50, 500), between(maxOpsPerRead, maxOpsPerRead * 10)); + availableDocs.release(maxNumDocsReplicated / 2 + 1); + atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated / 3); getFollowerCluster().stopRandomNonMasterNode(); - logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated); - atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated); + availableDocs.release(maxNumDocsReplicated / 2 + 1); + atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated * 2 / 3); run.set(false); thread.join(); - assertSameDocCount("index1", "index2"); + assertIndexFullyReplicatedToFollower("index1", "index2"); pauseFollow("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 3); } @@ -188,15 +206,15 @@ public void testAddNewReplicasOnFollower() throws Exception { } }); flushingOnFollower.start(); - atLeastDocsIndexed(followerClient(), "follower-index", 50); + awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 50); followerClient().admin().indices().prepareUpdateSettings("follower-index") .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get(); ensureFollowerGreen("follower-index"); - atLeastDocsIndexed(followerClient(), "follower-index", 100); + awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 100); stopped.set(true); flushingOnFollower.join(); indexingOnLeader.join(); - assertSameDocCount("leader-index", "follower-index"); + assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); pauseFollow("follower-index"); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index eff849bca7671..ffc76b5c58e31 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -58,6 +58,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -211,14 +212,23 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }; + int bulkSize = between(1, 20); BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener) - .setBulkActions(100) + .setBulkActions(bulkSize) .setConcurrentRequests(4) .build(); AtomicBoolean run = new AtomicBoolean(true); + Semaphore availableDocs = new Semaphore(0); Thread thread = new Thread(() -> { int counter = 0; while (run.get()) { + try { + if (availableDocs.tryAcquire(10, TimeUnit.MILLISECONDS) == false) { + continue; + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); IndexRequest indexRequest = new IndexRequest("index1", "doc") .source(source, XContentType.JSON) @@ -229,26 +239,28 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) thread.start(); // Waiting for some document being index before following the index: - int maxReadSize = randomIntBetween(128, 2048); - long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); + int maxOpsPerRead = randomIntBetween(10, 100); + int numDocsIndexed = Math.min(between(20, 300), between(maxOpsPerRead, maxOpsPerRead * 10)); + availableDocs.release(numDocsIndexed / 2 + bulkSize); atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed / 3); PutFollowAction.Request followRequest = putFollow("index1", "index2"); - followRequest.getFollowRequest().setMaxReadRequestOperationCount(maxReadSize); - followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(2, 10)); - followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(2, 10)); + followRequest.getFollowRequest().setMaxReadRequestOperationCount(maxOpsPerRead); + followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); + followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); followRequest.getFollowRequest().setMaxWriteBufferCount(randomIntBetween(1024, 10240)); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - + availableDocs.release(numDocsIndexed * 2 + bulkSize); atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed); run.set(false); thread.join(); assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true)); - assertSameDocCount("index1", "index2"); + assertIndexFullyReplicatedToFollower("index1", "index2"); + pauseFollow("index2"); + leaderClient().admin().indices().prepareRefresh("index1").get(); assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfShards, leaderClient().prepareSearch("index1").get().getHits().totalHits); - pauseFollow("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards); }