-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[segment replication]Introducing common Replication interfaces for segment replication and recovery code paths #3234
Merged
Poojita-Raj
merged 10 commits into
opensearch-project:main
from
Poojita-Raj:recovery-replication-target
May 23, 2022
Merged
Changes from 4 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
9d381c0
RecoveryState inherits from ReplicationState + RecoveryTarget inherit…
Poojita-Raj ca0106a
Refactoring: mixedClusterVersion error fix + move Stage to Replicatio…
Poojita-Raj ba51c17
pull ReplicationListener into a top level class + add javadocs + addr…
Poojita-Raj d7fc756
fix javadoc
Poojita-Raj b84c184
review changes
Poojita-Raj 8447624
Refactoring the hierarchy relationship between repl and recovery
Poojita-Raj 5edeae3
style fix
Poojita-Raj 47379df
move package common under replication
Poojita-Raj 233acbc
rename to replication
Poojita-Raj 50c4e6e
rename and doc changes
Poojita-Raj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,6 +43,7 @@ | |||||
import org.opensearch.index.shard.IndexShard; | ||||||
import org.opensearch.index.shard.IndexShardClosedException; | ||||||
import org.opensearch.index.shard.ShardId; | ||||||
import org.opensearch.indices.replication.common.ReplicationListener; | ||||||
import org.opensearch.threadpool.ThreadPool; | ||||||
|
||||||
import java.util.ArrayList; | ||||||
|
@@ -76,28 +77,23 @@ public RecoveriesCollection(Logger logger, ThreadPool threadPool) { | |||||
* | ||||||
* @return the id of the new recovery. | ||||||
*/ | ||||||
public long startRecovery( | ||||||
IndexShard indexShard, | ||||||
DiscoveryNode sourceNode, | ||||||
PeerRecoveryTargetService.RecoveryListener listener, | ||||||
TimeValue activityTimeout | ||||||
) { | ||||||
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener, TimeValue activityTimeout) { | ||||||
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener); | ||||||
startRecoveryInternal(recoveryTarget, activityTimeout); | ||||||
return recoveryTarget.recoveryId(); | ||||||
return recoveryTarget.getId(); | ||||||
} | ||||||
|
||||||
private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) { | ||||||
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget); | ||||||
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.getId(), recoveryTarget); | ||||||
assert existingTarget == null : "found two RecoveryStatus instances with the same id"; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
logger.trace( | ||||||
"{} started recovery from {}, id [{}]", | ||||||
recoveryTarget.shardId(), | ||||||
recoveryTarget.indexShard().shardId(), | ||||||
recoveryTarget.sourceNode(), | ||||||
recoveryTarget.recoveryId() | ||||||
recoveryTarget.getId() | ||||||
); | ||||||
threadPool.schedule( | ||||||
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout), | ||||||
new RecoveryMonitor(recoveryTarget.getId(), recoveryTarget.lastAccessTime(), activityTimeout), | ||||||
activityTimeout, | ||||||
ThreadPool.Names.GENERIC | ||||||
); | ||||||
|
@@ -131,21 +127,21 @@ public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activ | |||||
if (successfulReset) { | ||||||
logger.trace( | ||||||
"{} restarted recovery from {}, id [{}], previous id [{}]", | ||||||
newRecoveryTarget.shardId(), | ||||||
newRecoveryTarget.indexShard().shardId(), | ||||||
newRecoveryTarget.sourceNode(), | ||||||
newRecoveryTarget.recoveryId(), | ||||||
oldRecoveryTarget.recoveryId() | ||||||
newRecoveryTarget.getId(), | ||||||
oldRecoveryTarget.getId() | ||||||
); | ||||||
return newRecoveryTarget; | ||||||
} else { | ||||||
logger.trace( | ||||||
"{} recovery could not be reset as it is already cancelled, recovery from {}, id [{}], previous id [{}]", | ||||||
newRecoveryTarget.shardId(), | ||||||
newRecoveryTarget.indexShard().shardId(), | ||||||
newRecoveryTarget.sourceNode(), | ||||||
newRecoveryTarget.recoveryId(), | ||||||
oldRecoveryTarget.recoveryId() | ||||||
newRecoveryTarget.getId(), | ||||||
oldRecoveryTarget.getId() | ||||||
); | ||||||
cancelRecovery(newRecoveryTarget.recoveryId(), "recovery cancelled during reset"); | ||||||
cancelRecovery(newRecoveryTarget.getId(), "recovery cancelled during reset"); | ||||||
return null; | ||||||
} | ||||||
} catch (Exception e) { | ||||||
|
@@ -180,7 +176,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) { | |||||
if (recoveryRef == null) { | ||||||
throw new IndexShardClosedException(shardId); | ||||||
} | ||||||
assert recoveryRef.get().shardId().equals(shardId); | ||||||
assert recoveryRef.get().indexShard().shardId().equals(shardId); | ||||||
return recoveryRef; | ||||||
} | ||||||
|
||||||
|
@@ -191,9 +187,9 @@ public boolean cancelRecovery(long id, String reason) { | |||||
if (removed != null) { | ||||||
logger.trace( | ||||||
"{} canceled recovery from {}, id [{}] (reason [{}])", | ||||||
removed.shardId(), | ||||||
removed.indexShard().shardId(), | ||||||
removed.sourceNode(), | ||||||
removed.recoveryId(), | ||||||
removed.getId(), | ||||||
reason | ||||||
); | ||||||
removed.cancel(reason); | ||||||
|
@@ -214,9 +210,9 @@ public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFa | |||||
if (removed != null) { | ||||||
logger.trace( | ||||||
"{} failing recovery from {}, id [{}]. Send shard failure: [{}]", | ||||||
removed.shardId(), | ||||||
removed.indexShard().shardId(), | ||||||
removed.sourceNode(), | ||||||
removed.recoveryId(), | ||||||
removed.getId(), | ||||||
sendShardFailure | ||||||
); | ||||||
removed.fail(e, sendShardFailure); | ||||||
|
@@ -227,7 +223,12 @@ public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFa | |||||
public void markRecoveryAsDone(long id) { | ||||||
RecoveryTarget removed = onGoingRecoveries.remove(id); | ||||||
if (removed != null) { | ||||||
logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId()); | ||||||
logger.trace( | ||||||
"{} marking recovery from {} as done, id [{}]", | ||||||
removed.indexShard().shardId(), | ||||||
removed.sourceNode(), | ||||||
removed.getId() | ||||||
); | ||||||
removed.markAsDone(); | ||||||
} | ||||||
} | ||||||
|
@@ -250,7 +251,7 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { | |||||
synchronized (onGoingRecoveries) { | ||||||
for (Iterator<RecoveryTarget> it = onGoingRecoveries.values().iterator(); it.hasNext();) { | ||||||
RecoveryTarget status = it.next(); | ||||||
if (status.shardId().equals(shardId)) { | ||||||
if (status.indexShard().shardId().equals(shardId)) { | ||||||
matchedRecoveries.add(status); | ||||||
it.remove(); | ||||||
} | ||||||
|
@@ -259,9 +260,9 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { | |||||
for (RecoveryTarget removed : matchedRecoveries) { | ||||||
logger.trace( | ||||||
"{} canceled recovery from {}, id [{}] (reason [{}])", | ||||||
removed.shardId(), | ||||||
removed.indexShard().shardId(), | ||||||
removed.sourceNode(), | ||||||
removed.recoveryId(), | ||||||
removed.getId(), | ||||||
reason | ||||||
); | ||||||
removed.cancel(reason); | ||||||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used, please remove