Skip to content

Commit

Permalink
Moved RecoveryState.Index from an inner class to a top-level class
Browse files Browse the repository at this point in the history
This class is currently across both recovery and segment replication. It has two dependent inner classes - RecoveryFilesDetails and FileDetail - which have been moved from RecoveryState to be inner classes in RecoveryIndex

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Mar 14, 2022
1 parent 7f20128 commit a5b5199
Show file tree
Hide file tree
Showing 17 changed files with 578 additions and 548 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryIndex;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -555,7 +556,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
final Set<String> files = new HashSet<>();
for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) {
files.add(file.name());
}
break;
Expand Down Expand Up @@ -615,7 +616,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
long reused = 0;
int filesRecovered = 0;
int filesReused = 0;
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) {
if (files.contains(file.name()) == false) {
recovered += file.length();
filesRecovered++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
return client().admin().indices().prepareStats(name).execute().actionGet();
}

private void validateIndexRecoveryState(RecoveryState.Index indexState) {
private void validateIndexRecoveryState(RecoveryIndex indexState) {
assertThat(indexState.time(), greaterThanOrEqualTo(0L));
assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoveryIndex;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -177,7 +178,7 @@ void recoverFromLocalShards(
}

void addIndices(
final RecoveryState.Index indexRecoveryStats,
final RecoveryIndex indexRecoveryStats,
final Directory target,
final Sort indexSort,
final Directory[] sources,
Expand Down Expand Up @@ -232,9 +233,9 @@ void addIndices(
* Directory wrapper that records copy process for recovery statistics
*/
static final class StatsDirectoryWrapper extends FilterDirectory {
private final RecoveryState.Index index;
private final RecoveryIndex index;

StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) {
StatsDirectoryWrapper(Directory in, RecoveryIndex indexRecoveryStats) {
super(in);
this.index = indexRecoveryStats;
}
Expand Down Expand Up @@ -355,7 +356,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
+ "]";

if (logger.isTraceEnabled()) {
RecoveryState.Index index = recoveryState.getIndex();
RecoveryIndex index = recoveryState.getIndex();
StringBuilder sb = new StringBuilder();
sb.append(" index : files [")
.append(index.totalFileCount())
Expand Down Expand Up @@ -472,7 +473,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
final RecoveryIndex index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
Expand Down Expand Up @@ -510,7 +511,7 @@ private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws
assert indexShard.loadRetentionLeases().leases().isEmpty();
}

private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryIndex index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

public class MultiFileWriter extends AbstractRefCounted implements Releasable {

public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
public MultiFileWriter(Store store, RecoveryIndex indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
Expand All @@ -71,7 +71,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
private final RecoveryIndex indexState;
private final String tempFilePrefix;

private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
return;
}

final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
final RecoveryIndex indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}

Expand Down
Loading

0 comments on commit a5b5199

Please sign in to comment.