-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add global checkpoint to translog checkpoints #21254
Changes from all commits
8bea450
37a50bb
19d4db0
86ec4d0
6cbed98
d742a68
6c1338c
37af724
90f7b60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,8 @@ | |
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.io.UnsupportedEncodingException; | ||
|
||
public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest, | ||
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> { | ||
|
@@ -64,10 +66,11 @@ protected ReplicationResponse newResponseInstance() { | |
} | ||
|
||
@Override | ||
protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) { | ||
protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) throws Exception { | ||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); | ||
IndexShard indexShard = indexService.getShard(request.shardId().id()); | ||
long checkpoint = indexShard.getGlobalCheckpoint(); | ||
syncTranslog(indexShard); | ||
return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse()); | ||
} | ||
|
||
|
@@ -76,9 +79,19 @@ protected ReplicaResult shardOperationOnReplica(ReplicaRequest request) { | |
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); | ||
IndexShard indexShard = indexService.getShard(request.shardId().id()); | ||
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint); | ||
syncTranslog(indexShard); | ||
return new ReplicaResult(); | ||
} | ||
|
||
private void syncTranslog(final IndexShard indexShard) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need the conversion to an unchecked exceptions? shardOperationOnX Allows throwing them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In master both |
||
try { | ||
indexShard.getTranslog().sync(); | ||
} catch (final IOException e) { | ||
// nocommit: no need to wrap this exception after integrating master into feature/seq_no | ||
throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e); | ||
} | ||
} | ||
|
||
public void updateCheckpointForShard(ShardId shardId) { | ||
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() { | ||
@Override | ||
|
@@ -135,4 +148,5 @@ public long getCheckpoint() { | |
return checkpoint; | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,26 +40,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { | |
final GlobalCheckpointService globalCheckpointService; | ||
|
||
/** | ||
* Initialize the sequence number service. The {@code maxSeqNo} | ||
* should be set to the last sequence number assigned by this | ||
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, | ||
* {@code localCheckpoint} should be set to the last known local | ||
* checkpoint for this shard, or | ||
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, and | ||
* {@code globalCheckpoint} should be set to the last known global | ||
* checkpoint for this shard, or | ||
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. | ||
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OMG what happened here :) |
||
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, {@code localCheckpoint} should be set to the last known local checkpoint for this | ||
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, and {@code globalCheckpoint} should be set to the last known global | ||
* checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. | ||
* | ||
* @param shardId the shard this service is providing tracking | ||
* local checkpoints for | ||
* @param indexSettings the index settings | ||
* @param maxSeqNo the last sequence number assigned by this | ||
* shard, or | ||
* {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param localCheckpoint the last known local checkpoint for this shard, | ||
* or {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param globalCheckpoint the last known global checkpoint for this shard, | ||
* or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} | ||
* @param shardId the shard this service is providing tracking local checkpoints for | ||
* @param indexSettings the index settings | ||
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED} | ||
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} | ||
*/ | ||
public SequenceNumbersService( | ||
final ShardId shardId, | ||
|
@@ -100,8 +90,7 @@ public void markSeqNoAsCompleted(long seqNo) { | |
* Gets sequence number related stats | ||
*/ | ||
public SeqNoStats stats() { | ||
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(), | ||
globalCheckpointService.getCheckpoint()); | ||
return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint()); | ||
} | ||
|
||
/** | ||
|
@@ -130,6 +119,16 @@ public long getGlobalCheckpoint() { | |
return globalCheckpointService.getCheckpoint(); | ||
} | ||
|
||
/** | ||
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly. | ||
* | ||
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints | ||
* of one of the active allocations is not known. | ||
*/ | ||
public boolean updateGlobalCheckpointOnPrimary() { | ||
return globalCheckpointService.updateCheckpointOnPrimary(); | ||
} | ||
|
||
/** | ||
* updates the global checkpoint on a replica shard (after it has been updated by the primary). | ||
*/ | ||
|
@@ -148,13 +147,4 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S | |
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); | ||
} | ||
|
||
/** | ||
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly. | ||
* | ||
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints | ||
* of one of the active allocations is not known. | ||
*/ | ||
public boolean updateGlobalCheckpointOnPrimary() { | ||
return globalCheckpointService.updateCheckpointOnPrimary(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a little funky as we read some stuff from given parameters and some stuff from disk but then only if we're not supposed to ignore the translog. How about never reading the global checkpoint from the translog on opening and make it part of the recovery from translog? also it means peer recovery will need to also pass the global checkpoint as part the engine opening (but we can do this a follow up - add a no commit please if so)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 19d4db0.