From c5b6ccc6d9e460afa687960604aa1d5348bbb123 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 28 Aug 2018 06:06:22 -0400 Subject: [PATCH] Only fetch mapping updates when necessary (#33182) Today we fetch the mapping from the leader and apply it as a mapping update whenever the index metadata version on the leader changes. Yet, the index metadata can change for many reasons other than a mapping update (e.g., settings updates, adding an alias, or a replica being promoted to a primary among many other reasons). This commit builds on the addition of a mapping version to the index metadata to only fetch mapping updates when the mapping version increases. This reduces the number of these fetches and application of mappings on the follower to the bare minimum. --- .../xpack/ccr/action/ShardChangesAction.java | 22 +++---- .../xpack/ccr/action/ShardFollowNodeTask.java | 57 +++++++++---------- .../ccr/action/ShardFollowTasksExecutor.java | 2 +- .../ccr/action/ShardChangesResponseTests.java | 4 +- .../ShardFollowNodeTaskRandomTests.java | 48 ++++++++-------- .../ShardFollowNodeTaskStatusTests.java | 2 +- .../ccr/action/ShardFollowNodeTaskTests.java | 36 ++++++------ .../ShardFollowTaskReplicationTests.java | 2 +- .../rest-api-spec/test/ccr/stats.yml | 2 +- 9 files changed, 87 insertions(+), 88 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index a20d55d93bc77..e1975fc46d10b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -168,10 +168,10 @@ public String toString() { public static final class Response extends ActionResponse { - private long indexMetadataVersion; + private long mappingVersion; - public long getIndexMetadataVersion() { - return indexMetadataVersion; + public long getMappingVersion() { + return mappingVersion; } private long globalCheckpoint; @@ -195,8 +195,8 @@ public Translog.Operation[] getOperations() { Response() { } - Response(final long indexMetadataVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { - this.indexMetadataVersion = indexMetadataVersion; + Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { + this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.operations = operations; @@ -205,7 +205,7 @@ public Translog.Operation[] getOperations() { @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - indexMetadataVersion = in.readVLong(); + mappingVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); @@ -214,7 +214,7 @@ public void readFrom(final StreamInput in) throws IOException { @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - out.writeVLong(indexMetadataVersion); + out.writeVLong(mappingVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); out.writeArray(Translog.Operation::writeOperation, operations); @@ -225,7 +225,7 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final Response that = (Response) o; - return indexMetadataVersion == that.indexMetadataVersion && + return mappingVersion == that.mappingVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && Arrays.equals(operations, that.operations); @@ -233,7 +233,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(indexMetadataVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); } } @@ -266,7 +266,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); - final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); + final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final Translog.Operation[] operations = getOperations( indexShard, @@ -274,7 +274,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.fromSeqNo, request.maxOperationCount, request.maxOperationSizeInBytes); - return new Response(indexMetaDataVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index f2b5b7b3772d2..6854a9f5741b7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -80,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long followerMaxSeqNo = 0; private int numConcurrentReads = 0; private int numConcurrentWrites = 0; - private long currentIndexMetadataVersion = 0; + private long currentMappingVersion = 0; private long totalFetchTimeMillis = 0; private long numberOfSuccessfulFetches = 0; private long numberOfFailedFetches = 0; @@ -131,14 +131,13 @@ void start( this.lastRequestedSeqNo = followerGlobalCheckpoint; } - // Forcefully updates follower mapping, this gets us the leader imd version and - // makes sure that leader and follower mapping are identical. - updateMapping(imdVersion -> { + // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical + updateMapping(mappingVersion -> { synchronized (ShardFollowNodeTask.this) { - currentIndexMetadataVersion = imdVersion; + currentMappingVersion = mappingVersion; } - LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion={}", - params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, imdVersion); + LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, mappingVersion={}", + params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, mappingVersion); coordinateReads(); }); } @@ -258,7 +257,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { - maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); + maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); } /** Called when some operations are fetched from the leading */ @@ -344,16 +343,16 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse coordinateReads(); } - private synchronized void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) { - if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) { - LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) { + if (currentMappingVersion >= minimumRequiredMappingVersion) { + LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]", + params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); task.run(); } else { - LOGGER.trace("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]", - params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); - updateMapping(imdVersion -> { - currentIndexMetadataVersion = imdVersion; + LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]", + params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); + updateMapping(mappingVersion -> { + currentMappingVersion = mappingVersion; task.run(); }); } @@ -422,7 +421,7 @@ public synchronized Status getStatus() { numConcurrentReads, numConcurrentWrites, buffer.size(), - currentIndexMetadataVersion, + currentMappingVersion, totalFetchTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, @@ -448,7 +447,7 @@ public static class Status implements Task.Status { static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); - static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version"); + static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches"); static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches"); @@ -504,7 +503,7 @@ public static class Status implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); - STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); @@ -582,10 +581,10 @@ public int numberOfQueuedWrites() { return numberOfQueuedWrites; } - private final long indexMetadataVersion; + private final long mappingVersion; - public long indexMetadataVersion() { - return indexMetadataVersion; + public long mappingVersion() { + return mappingVersion; } private final long totalFetchTimeMillis; @@ -658,7 +657,7 @@ public NavigableMap fetchExceptions() { final int numberOfConcurrentReads, final int numberOfConcurrentWrites, final int numberOfQueuedWrites, - final long indexMetadataVersion, + final long mappingVersion, final long totalFetchTimeMillis, final long numberOfSuccessfulFetches, final long numberOfFailedFetches, @@ -678,7 +677,7 @@ public NavigableMap fetchExceptions() { this.numberOfConcurrentReads = numberOfConcurrentReads; this.numberOfConcurrentWrites = numberOfConcurrentWrites; this.numberOfQueuedWrites = numberOfQueuedWrites; - this.indexMetadataVersion = indexMetadataVersion; + this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; this.numberOfSuccessfulFetches = numberOfSuccessfulFetches; this.numberOfFailedFetches = numberOfFailedFetches; @@ -701,7 +700,7 @@ public Status(final StreamInput in) throws IOException { this.numberOfConcurrentReads = in.readVInt(); this.numberOfConcurrentWrites = in.readVInt(); this.numberOfQueuedWrites = in.readVInt(); - this.indexMetadataVersion = in.readVLong(); + this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); this.numberOfSuccessfulFetches = in.readVLong(); this.numberOfFailedFetches = in.readVLong(); @@ -730,7 +729,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(numberOfConcurrentReads); out.writeVInt(numberOfConcurrentWrites); out.writeVInt(numberOfQueuedWrites); - out.writeVLong(indexMetadataVersion); + out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); out.writeVLong(numberOfSuccessfulFetches); out.writeVLong(numberOfFailedFetches); @@ -756,7 +755,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites); - builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion); + builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion); builder.humanReadableField( TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), "total_fetch_time", @@ -815,7 +814,7 @@ public boolean equals(final Object o) { numberOfConcurrentReads == that.numberOfConcurrentReads && numberOfConcurrentWrites == that.numberOfConcurrentWrites && numberOfQueuedWrites == that.numberOfQueuedWrites && - indexMetadataVersion == that.indexMetadataVersion && + mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && numberOfSuccessfulFetches == that.numberOfSuccessfulFetches && numberOfFailedFetches == that.numberOfFailedFetches && @@ -837,7 +836,7 @@ public int hashCode() { numberOfConcurrentReads, numberOfConcurrentWrites, numberOfQueuedWrites, - indexMetadataVersion, + mappingVersion, totalFetchTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index dbb9547b99bf8..0d709ba0e85d6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -116,7 +116,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro putMappingRequest.type(mappingMetaData.type()); putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( - putMappingResponse -> handler.accept(indexMetaData.getVersion()), + putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); }, errorHandler)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index 8e150b8f934e4..e9c67097d72b2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -12,7 +12,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase fromToSlot = new HashMap<>(); @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { - handler.accept(indexMetadataVersion); + handler.accept(mappingVersion); } @Override @@ -134,7 +134,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co fromToSlot.put(from, ++slot); // if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong. } - indexMetadataVersion = testResponse.indexMetadataVersion; + mappingVersion = testResponse.mappingVersion; if (testResponse.exception != null) { errorHandler.accept(testResponse.exception); } else { @@ -187,15 +187,15 @@ private void tearDown() { }; } - private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVersion, int maxOperationCount) { + private static TestRun createTestRun(long startSeqNo, long startMappingVersion, int maxOperationCount) { long prevGlobalCheckpoint = startSeqNo; - long indexMetaDataVersion = startIndexMetadataVersion; + long mappingVersion = startMappingVersion; int numResponses = randomIntBetween(16, 256); Map> responses = new HashMap<>(numResponses); for (int i = 0; i < numResponses; i++) { long nextGlobalCheckPoint = prevGlobalCheckpoint + maxOperationCount; if (sometimes()) { - indexMetaDataVersion++; + mappingVersion++; } if (sometimes()) { @@ -203,7 +203,7 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, indexMetaDataVersion, null)); + item.add(new TestResponse(error, mappingVersion, null)); } List ops = new ArrayList<>(); for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) { @@ -211,8 +211,8 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer byte[] source = "{}".getBytes(StandardCharsets.UTF_8); ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } - item.add(new TestResponse(null, indexMetaDataVersion, - new ShardChangesAction.Response(indexMetaDataVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); + item.add(new TestResponse(null, mappingVersion, + new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); responses.put(prevGlobalCheckpoint, item); } else { // Simulates a leader shard copy not having all the operations the shard follow task thinks it has by @@ -224,13 +224,13 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, indexMetaDataVersion, null)); + item.add(new TestResponse(error, mappingVersion, null)); } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { ShardChangesAction.Response response = - new ShardChangesAction.Response(indexMetaDataVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); - item.add(new TestResponse(null, indexMetaDataVersion, response)); + new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); + item.add(new TestResponse(null, mappingVersion, response)); } List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -241,14 +241,14 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer // Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind: long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; ShardChangesAction.Response response = - new ShardChangesAction.Response(indexMetaDataVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); - item.add(new TestResponse(null, indexMetaDataVersion, response)); + new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); + item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } } prevGlobalCheckpoint = nextGlobalCheckPoint + 1; } - return new TestRun(maxOperationCount, startSeqNo, startIndexMetadataVersion, indexMetaDataVersion, + return new TestRun(maxOperationCount, startSeqNo, startMappingVersion, mappingVersion, prevGlobalCheckpoint - 1, responses); } @@ -261,18 +261,18 @@ private static class TestRun { final int maxOperationCount; final long startSeqNo; - final long startIndexMetadataVersion; + final long startMappingVersion; - final long finalIndexMetaDataVerion; + final long finalMappingVersion; final long finalExpectedGlobalCheckpoint; final Map> responses; - private TestRun(int maxOperationCount, long startSeqNo, long startIndexMetadataVersion, long finalIndexMetaDataVerion, + private TestRun(int maxOperationCount, long startSeqNo, long startMappingVersion, long finalMappingVersion, long finalExpectedGlobalCheckpoint, Map> responses) { this.maxOperationCount = maxOperationCount; this.startSeqNo = startSeqNo; - this.startIndexMetadataVersion = startIndexMetadataVersion; - this.finalIndexMetaDataVerion = finalIndexMetaDataVerion; + this.startMappingVersion = startMappingVersion; + this.finalMappingVersion = finalMappingVersion; this.finalExpectedGlobalCheckpoint = finalExpectedGlobalCheckpoint; this.responses = Collections.unmodifiableMap(responses); } @@ -281,12 +281,12 @@ private TestRun(int maxOperationCount, long startSeqNo, long startIndexMetadataV private static class TestResponse { final Exception exception; - final long indexMetadataVersion; + final long mappingVersion; final ShardChangesAction.Response response; - private TestResponse(Exception exception, long indexMetadataVersion, ShardChangesAction.Response response) { + private TestResponse(Exception exception, long mappingVersion, ShardChangesAction.Response response) { this.exception = exception; - this.indexMetadataVersion = indexMetadataVersion; + this.mappingVersion = mappingVersion; this.response = response; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 4eb4283091959..234b7334e64fb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -65,7 +65,7 @@ protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedIns assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads())); assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites())); assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites())); - assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion())); + assertThat(newInstance.mappingVersion(), equalTo(expectedInstance.mappingVersion())); assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis())); assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches())); assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 54aef6bd3d116..4f7c0bf16645c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -51,7 +51,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue readFailures; private Queue writeFailures; private Queue mappingUpdateFailures; - private Queue imdVersions; + private Queue mappingVersions; private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; @@ -180,7 +180,7 @@ public void testReceiveRetryableError() { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - imdVersions.add(1L); + mappingVersions.add(1L); leaderGlobalCheckpoints.add(63L); maxSeqNos.add(63L); simulateResponse.set(true); @@ -327,7 +327,7 @@ public void testHandleReadResponse() { assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(0L)); + assertThat(status.mappingVersion(), equalTo(0L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(1)); @@ -433,7 +433,7 @@ public void testMappingUpdate() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - imdVersions.add(1L); + mappingVersions.add(1L); task.coordinateReads(); ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); task.handleReadResponse(0L, 63L, response); @@ -442,7 +442,7 @@ public void testMappingUpdate() { assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(1L)); + assertThat(status.mappingVersion(), equalTo(1L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(1)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -458,7 +458,7 @@ public void testMappingUpdateRetryableError() { for (int i = 0; i < max; i++) { mappingUpdateFailures.add(new ConnectException()); } - imdVersions.add(1L); + mappingVersions.add(1L); task.coordinateReads(); ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); task.handleReadResponse(0L, 63L, response); @@ -467,7 +467,7 @@ public void testMappingUpdateRetryableError() { assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(task.isStopped(), equalTo(false)); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(1L)); + assertThat(status.mappingVersion(), equalTo(1L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(1)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -483,17 +483,17 @@ public void testMappingUpdateRetryableErrorRetriedTooManyTimes() { for (int i = 0; i < max; i++) { mappingUpdateFailures.add(new ConnectException()); } - imdVersions.add(1L); + mappingVersions.add(1L); task.coordinateReads(); ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); task.handleReadResponse(0L, 64L, response); assertThat(mappingUpdateFailures.size(), equalTo(max - 11)); - assertThat(imdVersions.size(), equalTo(1)); + assertThat(mappingVersions.size(), equalTo(1)); assertThat(bulkShardOperationRequests.size(), equalTo(0)); assertThat(task.isStopped(), equalTo(true)); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(0L)); + assertThat(status.mappingVersion(), equalTo(0L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -512,7 +512,7 @@ public void testMappingUpdateNonRetryableError() { assertThat(bulkShardOperationRequests.size(), equalTo(0)); assertThat(task.isStopped(), equalTo(true)); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(0L)); + assertThat(status.mappingVersion(), equalTo(0L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -723,7 +723,7 @@ ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxCon readFailures = new LinkedList<>(); writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); - imdVersions = new LinkedList<>(); + mappingVersions = new LinkedList<>(); leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); @@ -738,9 +738,9 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro return; } - Long imdVersion = imdVersions.poll(); - if (imdVersion != null) { - handler.accept(imdVersion); + final Long mappingVersion = mappingVersions.poll(); + if (mappingVersion != null) { + handler.accept(mappingVersion); } } @@ -779,7 +779,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con } final ShardChangesAction.Response response = new ShardChangesAction.Response( - imdVersions.poll(), + mappingVersions.poll(), leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), operations); @@ -805,7 +805,7 @@ public void markAsFailed(Exception e) { }; } - private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion, + private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long mappingVersion, long leaderGlobalCheckPoint) { List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -814,7 +814,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } return new ShardChangesAction.Response( - imdVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); + mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); } void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 32cc876125701..ec180943a3b5b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -209,7 +209,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co try { Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, params.getMaxBatchSizeInBytes()); - // Hard code index metadata version, this is ok, as mapping updates are not tested here. + // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops); handler.accept(response); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index 34019455b8005..abb64e8f2bab8 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -34,7 +34,7 @@ - gte: { bar.0.number_of_concurrent_reads: 0 } - match: { bar.0.number_of_concurrent_writes: 0 } - match: { bar.0.number_of_queued_writes: 0 } - - gte: { bar.0.index_metadata_version: 0 } + - gte: { bar.0.mapping_version: 0 } - gte: { bar.0.total_fetch_time_millis: 0 } - gte: { bar.0.number_of_successful_fetches: 0 } - gte: { bar.0.number_of_failed_fetches: 0 }