diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 9d1fe5d67df98..70d21feea0640 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -58,7 +58,7 @@ public static class Status implements Task.Status { } public Status(StreamInput in) throws IOException { - this.processedGlobalCheckpoint = in.readVLong(); + this.processedGlobalCheckpoint = in.readZLong(); } public long getProcessedGlobalCheckpoint() { @@ -72,7 +72,7 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(processedGlobalCheckpoint); + out.writeZLong(processedGlobalCheckpoint); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 9f62b014a1826..718a72388a1d5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -93,7 +93,10 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client; logger.info("Starting shard following [{}]", params); fetchGlobalCheckpoint(client, params.getFollowShardId(), - followGlobalCheckPoint -> prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed); + followGlobalCheckPoint -> { + shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint); + prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint); + }, task::markAsFailed); } void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { @@ -107,10 +110,13 @@ void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask para fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { // TODO: check if both indices have the same history uuid if (leaderGlobalCheckPoint == followGlobalCheckPoint) { + logger.debug("{} no write operations to fetch", followerShard); retry(leaderClient, task, params, followGlobalCheckPoint); } else { assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; + logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, + leaderGlobalCheckPoint, followGlobalCheckPoint); Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); Consumer handler = e -> { if (e == null) { @@ -151,8 +157,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer .findAny(); if (filteredShardStats.isPresent()) { - // Treat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1. - final long globalCheckPoint = Math.max(0, filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint()); + final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint(); handler.accept(globalCheckPoint); } else { errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); @@ -284,7 +289,9 @@ static class ChunkProcessor { void start(final long from, final long to, final long maxTranslogsBytes) { ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard); - request.setMinSeqNo(from); + // Treat -1 as 0, because shard changes api min_seq_no is inclusive and therefore it doesn't allow a negative min_seq_no + // (If no indexing has happened in leader shard then global checkpoint is -1.) + request.setMinSeqNo(Math.max(0, from)); request.setMaxSeqNo(to); request.setMaxTranslogsBytes(maxTranslogsBytes); leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() {