diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 612abee7dbf5b..4c791d1fb3a2d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -62,6 +62,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -555,7 +556,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { final Set files = new HashSet<>(); for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) { if (recoveryState.getTargetNode().getName().equals(replicaNode)) { - for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) { + for (final RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) { files.add(file.name()); } break; @@ -615,7 +616,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { long reused = 0; int filesRecovered = 0; int filesReused = 0; - for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) { + for (final RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) { if (files.contains(file.name()) == false) { recovered += file.length(); filesRecovered++; diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index a7dc77e024d5c..c8ff3ad7dc8c6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -836,7 +836,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, return client().admin().indices().prepareStats(name).execute().actionGet(); } - private void validateIndexRecoveryState(RecoveryState.Index indexState) { + private void validateIndexRecoveryState(RecoveryIndex indexState) { assertThat(indexState.time(), greaterThanOrEqualTo(0L)); assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f)); assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f)); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 6cf6ad645ca00..97b40a5e746c5 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -62,6 +62,7 @@ import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; @@ -177,7 +178,7 @@ void recoverFromLocalShards( } void addIndices( - final RecoveryState.Index indexRecoveryStats, + final RecoveryIndex indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources, @@ -232,9 +233,9 @@ void addIndices( * Directory wrapper that records copy process for recovery statistics */ static final class StatsDirectoryWrapper extends FilterDirectory { - private final RecoveryState.Index index; + private final RecoveryIndex index; - StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) { + StatsDirectoryWrapper(Directory in, RecoveryIndex indexRecoveryStats) { super(in); this.index = indexRecoveryStats; } @@ -355,7 +356,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi + "]"; if (logger.isTraceEnabled()) { - RecoveryState.Index index = recoveryState.getIndex(); + RecoveryIndex index = recoveryState.getIndex(); StringBuilder sb = new StringBuilder(); sb.append(" index : files [") .append(index.totalFileCount()) @@ -472,7 +473,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size - final RecoveryState.Index index = recoveryState.getIndex(); + final RecoveryIndex index = recoveryState.getIndex(); try { if (si != null) { addRecoveredFileDetails(si, store, index); @@ -510,7 +511,7 @@ private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws assert indexShard.loadRetentionLeases().leases().isEmpty(); } - private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException { + private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryIndex index) throws IOException { final Directory directory = store.directory(); for (String name : Lucene.files(si)) { long length = directory.fileLength(name); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index e88d123f50679..415847dec1af4 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -58,7 +58,7 @@ public class MultiFileWriter extends AbstractRefCounted implements Releasable { - public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { + public MultiFileWriter(Store store, RecoveryIndex indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { super("multi_file_writer"); this.store = store; this.indexState = indexState; @@ -71,7 +71,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF private final AtomicBoolean closed = new AtomicBoolean(false); private final Logger logger; private final Store store; - private final RecoveryState.Index indexState; + private final RecoveryIndex indexState; private final String tempFilePrefix; private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 59246d03beda5..03ba6cf2aeb47 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -544,8 +544,8 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha return; } - final RecoveryState.Index indexState = recoveryTarget.state().getIndex(); - if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) { + final RecoveryIndex indexState = recoveryTarget.state().getIndex(); + if (request.sourceThrottleTimeInNanos() != RecoveryIndex.UNKNOWN) { indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryIndex.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryIndex.java new file mode 100644 index 0000000000000..3a790d9170cc5 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryIndex.java @@ -0,0 +1,509 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.index.store.StoreStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public class RecoveryIndex extends Timer implements ToXContentFragment, Writeable { + private final RecoveryFilesDetails fileDetails; + + public static final long UNKNOWN = -1L; + + private long sourceThrottlingInNanos = UNKNOWN; + private long targetThrottleTimeInNanos = UNKNOWN; + + public RecoveryIndex() { + this(new RecoveryFilesDetails()); + } + + public RecoveryIndex(RecoveryFilesDetails recoveryFilesDetails) { + this.fileDetails = recoveryFilesDetails; + } + + public RecoveryIndex(StreamInput in) throws IOException { + super(in); + fileDetails = new RecoveryFilesDetails(in); + sourceThrottlingInNanos = in.readLong(); + targetThrottleTimeInNanos = in.readLong(); + } + + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + fileDetails.writeTo(out); + out.writeLong(sourceThrottlingInNanos); + out.writeLong(targetThrottleTimeInNanos); + } + + public synchronized List fileDetails() { + return Collections.unmodifiableList(new ArrayList<>(fileDetails.values())); + } + + public synchronized void reset() { + super.reset(); + fileDetails.clear(); + sourceThrottlingInNanos = UNKNOWN; + targetThrottleTimeInNanos = UNKNOWN; + } + + public synchronized void addFileDetail(String name, long length, boolean reused) { + fileDetails.addFileDetails(name, length, reused); + } + + public synchronized void setFileDetailsComplete() { + fileDetails.setComplete(); + } + + public synchronized void addRecoveredBytesToFile(String name, long bytes) { + fileDetails.addRecoveredBytesToFile(name, bytes); + } + + public synchronized void addSourceThrottling(long timeInNanos) { + if (sourceThrottlingInNanos == UNKNOWN) { + sourceThrottlingInNanos = timeInNanos; + } else { + sourceThrottlingInNanos += timeInNanos; + } + } + + public synchronized void addTargetThrottling(long timeInNanos) { + if (targetThrottleTimeInNanos == UNKNOWN) { + targetThrottleTimeInNanos = timeInNanos; + } else { + targetThrottleTimeInNanos += timeInNanos; + } + } + + public synchronized TimeValue sourceThrottling() { + return TimeValue.timeValueNanos(sourceThrottlingInNanos); + } + + public synchronized TimeValue targetThrottling() { + return TimeValue.timeValueNanos(targetThrottleTimeInNanos); + } + + /** + * total number of files that are part of this recovery, both re-used and recovered + */ + public synchronized int totalFileCount() { + return fileDetails.size(); + } + + /** + * total number of files to be recovered (potentially not yet done) + */ + public synchronized int totalRecoverFiles() { + int total = 0; + for (FileDetail file : fileDetails.values()) { + if (file.reused() == false) { + total++; + } + } + return total; + } + + /** + * number of file that were recovered (excluding on ongoing files) + */ + public synchronized int recoveredFileCount() { + int count = 0; + for (FileDetail file : fileDetails.values()) { + if (file.fullyRecovered()) { + count++; + } + } + return count; + } + + /** + * percent of recovered (i.e., not reused) files out of the total files to be recovered + */ + public synchronized float recoveredFilesPercent() { + int total = 0; + int recovered = 0; + for (FileDetail file : fileDetails.values()) { + if (file.reused() == false) { + total++; + if (file.fullyRecovered()) { + recovered++; + } + } + } + if (total == 0 && fileDetails.size() == 0) { // indicates we are still in init phase + return 0.0f; + } + if (total == recovered) { + return 100.0f; + } else { + float result = 100.0f * (recovered / (float) total); + return result; + } + } + + /** + * total number of bytes in th shard + */ + public synchronized long totalBytes() { + long total = 0; + for (FileDetail file : fileDetails.values()) { + total += file.length(); + } + return total; + } + + /** + * total number of bytes recovered so far, including both existing and reused + */ + public synchronized long recoveredBytes() { + long recovered = 0; + for (FileDetail file : fileDetails.values()) { + recovered += file.recovered(); + } + return recovered; + } + + /** + * total bytes of files to be recovered (potentially not yet done) + */ + public synchronized long totalRecoverBytes() { + long total = 0; + for (FileDetail file : fileDetails.values()) { + if (file.reused() == false) { + total += file.length(); + } + } + return total; + } + + /** + * @return number of bytes still to recover, i.e. {@link RecoveryIndex#totalRecoverBytes()} minus {@link RecoveryIndex#recoveredBytes()}, or + * {@code -1} if the full set of files to recover is not yet known + */ + public synchronized long bytesStillToRecover() { + if (fileDetails.isComplete() == false) { + return -1L; + } + long total = 0L; + for (FileDetail file : fileDetails.values()) { + if (file.reused() == false) { + total += file.length() - file.recovered(); + } + } + return total; + } + + /** + * percent of bytes recovered out of total files bytes *to be* recovered + */ + public synchronized float recoveredBytesPercent() { + long total = 0; + long recovered = 0; + for (FileDetail file : fileDetails.values()) { + if (file.reused() == false) { + total += file.length(); + recovered += file.recovered(); + } + } + if (total == 0 && fileDetails.size() == 0) { + // indicates we are still in init phase + return 0.0f; + } + if (total == recovered) { + return 100.0f; + } else { + return 100.0f * recovered / total; + } + } + + public synchronized int reusedFileCount() { + int reused = 0; + for (FileDetail file : fileDetails.values()) { + if (file.reused()) { + reused++; + } + } + return reused; + } + + public synchronized long reusedBytes() { + long reused = 0; + for (FileDetail file : fileDetails.values()) { + if (file.reused()) { + reused += file.length(); + } + } + return reused; + } + + @Override + public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // stream size first, as it matters more and the files section can be long + builder.startObject(RecoveryState.Fields.SIZE); + builder.humanReadableField(RecoveryState.Fields.TOTAL_IN_BYTES, RecoveryState.Fields.TOTAL, new ByteSizeValue(totalBytes())); + builder.humanReadableField(RecoveryState.Fields.REUSED_IN_BYTES, RecoveryState.Fields.REUSED, new ByteSizeValue(reusedBytes())); + builder.humanReadableField( + RecoveryState.Fields.RECOVERED_IN_BYTES, + RecoveryState.Fields.RECOVERED, + new ByteSizeValue(recoveredBytes()) + ); + builder.field(RecoveryState.Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent())); + builder.endObject(); + + builder.startObject(RecoveryState.Fields.FILES); + builder.field(RecoveryState.Fields.TOTAL, totalFileCount()); + builder.field(RecoveryState.Fields.REUSED, reusedFileCount()); + builder.field(RecoveryState.Fields.RECOVERED, recoveredFileCount()); + builder.field(RecoveryState.Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent())); + fileDetails.toXContent(builder, params); + builder.endObject(); + builder.humanReadableField(RecoveryState.Fields.TOTAL_TIME_IN_MILLIS, RecoveryState.Fields.TOTAL_TIME, new TimeValue(time())); + builder.humanReadableField( + RecoveryState.Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, + RecoveryState.Fields.SOURCE_THROTTLE_TIME, + sourceThrottling() + ); + builder.humanReadableField( + RecoveryState.Fields.TARGET_THROTTLE_TIME_IN_MILLIS, + RecoveryState.Fields.TARGET_THROTTLE_TIME, + targetThrottling() + ); + return builder; + } + + @Override + public synchronized String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return Strings.toString(builder); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } + + public synchronized FileDetail getFileDetails(String dest) { + return fileDetails.get(dest); + } + + public static class FileDetail implements ToXContentObject, Writeable { + private String name; + private long length; + private long recovered; + private boolean reused; + + public FileDetail(String name, long length, boolean reused) { + assert name != null; + this.name = name; + this.length = length; + this.reused = reused; + } + + public FileDetail(StreamInput in) throws IOException { + name = in.readString(); + length = in.readVLong(); + recovered = in.readVLong(); + reused = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(length); + out.writeVLong(recovered); + out.writeBoolean(reused); + } + + void addRecoveredBytes(long bytes) { + assert reused == false : "file is marked as reused, can't update recovered bytes"; + assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; + recovered += bytes; + } + + /** + * file name * + */ + public String name() { + return name; + } + + /** + * file length * + */ + public long length() { + return length; + } + + /** + * number of bytes recovered for this file (so far). 0 if the file is reused * + */ + public long recovered() { + return recovered; + } + + /** + * returns true if the file is reused from a local copy + */ + public boolean reused() { + return reused; + } + + boolean fullyRecovered() { + return reused == false && length == recovered; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(RecoveryState.Fields.NAME, name); + builder.humanReadableField(RecoveryState.Fields.LENGTH_IN_BYTES, RecoveryState.Fields.LENGTH, new ByteSizeValue(length)); + builder.field(RecoveryState.Fields.REUSED, reused); + builder.humanReadableField( + RecoveryState.Fields.RECOVERED_IN_BYTES, + RecoveryState.Fields.RECOVERED, + new ByteSizeValue(recovered) + ); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FileDetail) { + FileDetail other = (FileDetail) obj; + return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered(); + } + return false; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + Long.hashCode(length); + result = 31 * result + Long.hashCode(recovered); + result = 31 * result + (reused ? 1 : 0); + return result; + } + + @Override + public String toString() { + return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])"; + } + } + + public static class RecoveryFilesDetails implements ToXContentFragment, Writeable { + protected final Map fileDetails = new HashMap<>(); + protected boolean complete; + + public RecoveryFilesDetails() {} + + RecoveryFilesDetails(StreamInput in) throws IOException { + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + FileDetail file = new FileDetail(in); + fileDetails.put(file.name, file); + } + if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { + complete = in.readBoolean(); + } else { + // This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not + // then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete + // so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path + // anyway since they always use IndexShard#getRecoveryState which is never transported over the wire. + complete = fileDetails.isEmpty() == false; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + final FileDetail[] files = values().toArray(new FileDetail[0]); + out.writeVInt(files.length); + for (FileDetail file : files) { + file.writeTo(out); + } + if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { + out.writeBoolean(complete); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (params.paramAsBoolean("detailed", false)) { + builder.startArray(RecoveryState.Fields.DETAILS); + for (FileDetail file : values()) { + file.toXContent(builder, params); + } + builder.endArray(); + } + + return builder; + } + + public void addFileDetails(String name, long length, boolean reused) { + assert complete == false : "addFileDetail for [" + name + "] when file details are already complete"; + FileDetail existing = fileDetails.put(name, new FileDetail(name, length, reused)); + assert existing == null : "file [" + name + "] is already reported"; + } + + public void addRecoveredBytesToFile(String name, long bytes) { + FileDetail file = fileDetails.get(name); + assert file != null : "file [" + name + "] hasn't been reported"; + file.addRecoveredBytes(bytes); + } + + public FileDetail get(String name) { + return fileDetails.get(name); + } + + public void setComplete() { + complete = true; + } + + public int size() { + return fileDetails.size(); + } + + public boolean isEmpty() { + return fileDetails.isEmpty(); + } + + public void clear() { + fileDetails.clear(); + complete = false; + } + + public Collection values() { + return fileDetails.values(); + } + + public boolean isComplete() { + return complete; + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index 802deb577eff7..19d3bb24be96b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -37,29 +37,18 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; -import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ToXContentFragment; -import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.StoreStats; import org.opensearch.indices.replication.common.RState; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Locale; -import java.util.Map; /** * Keeps track of state related to shard recovery. @@ -120,7 +109,7 @@ public static Stage fromId(byte id) { private Stage stage; - private final Index index; + private final RecoveryIndex index; private final Translog translog; private final VerifyIndex verifyIndex; private final Timer timer; @@ -133,10 +122,10 @@ public static Stage fromId(byte id) { private boolean primary; public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) { - this(shardRouting, targetNode, sourceNode, new Index()); + this(shardRouting, targetNode, sourceNode, new RecoveryIndex()); } - public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) { + public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, RecoveryIndex index) { assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting; RecoverySource recoverySource = shardRouting.recoverySource(); assert (recoverySource.getType() == RecoverySource.Type.PEER) == (sourceNode != null) @@ -161,7 +150,7 @@ public RecoveryState(StreamInput in) throws IOException { recoverySource = RecoverySource.readFrom(in); targetNode = new DiscoveryNode(in); sourceNode = in.readOptionalWriteable(DiscoveryNode::new); - index = new Index(in); + index = new RecoveryIndex(in); translog = new Translog(in); verifyIndex = new VerifyIndex(in); primary = in.readBoolean(); @@ -245,7 +234,7 @@ public synchronized RecoveryState setStage(Stage stage) { return this; } - public Index getIndex() { + public RecoveryIndex getIndex() { return index; } @@ -558,466 +547,4 @@ public synchronized XContentBuilder toXContent(XContentBuilder builder, Params p } } - public static class FileDetail implements ToXContentObject, Writeable { - private String name; - private long length; - private long recovered; - private boolean reused; - - public FileDetail(String name, long length, boolean reused) { - assert name != null; - this.name = name; - this.length = length; - this.reused = reused; - } - - public FileDetail(StreamInput in) throws IOException { - name = in.readString(); - length = in.readVLong(); - recovered = in.readVLong(); - reused = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(name); - out.writeVLong(length); - out.writeVLong(recovered); - out.writeBoolean(reused); - } - - void addRecoveredBytes(long bytes) { - assert reused == false : "file is marked as reused, can't update recovered bytes"; - assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; - recovered += bytes; - } - - /** - * file name * - */ - public String name() { - return name; - } - - /** - * file length * - */ - public long length() { - return length; - } - - /** - * number of bytes recovered for this file (so far). 0 if the file is reused * - */ - public long recovered() { - return recovered; - } - - /** - * returns true if the file is reused from a local copy - */ - public boolean reused() { - return reused; - } - - boolean fullyRecovered() { - return reused == false && length == recovered; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(Fields.NAME, name); - builder.humanReadableField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, new ByteSizeValue(length)); - builder.field(Fields.REUSED, reused); - builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered)); - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof FileDetail) { - FileDetail other = (FileDetail) obj; - return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered(); - } - return false; - } - - @Override - public int hashCode() { - int result = name.hashCode(); - result = 31 * result + Long.hashCode(length); - result = 31 * result + Long.hashCode(recovered); - result = 31 * result + (reused ? 1 : 0); - return result; - } - - @Override - public String toString() { - return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])"; - } - } - - public static class RecoveryFilesDetails implements ToXContentFragment, Writeable { - protected final Map fileDetails = new HashMap<>(); - protected boolean complete; - - public RecoveryFilesDetails() {} - - RecoveryFilesDetails(StreamInput in) throws IOException { - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - FileDetail file = new FileDetail(in); - fileDetails.put(file.name, file); - } - if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { - complete = in.readBoolean(); - } else { - // This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not - // then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete - // so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path - // anyway since they always use IndexShard#getRecoveryState which is never transported over the wire. - complete = fileDetails.isEmpty() == false; - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - final FileDetail[] files = values().toArray(new FileDetail[0]); - out.writeVInt(files.length); - for (FileDetail file : files) { - file.writeTo(out); - } - if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { - out.writeBoolean(complete); - } - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if (params.paramAsBoolean("detailed", false)) { - builder.startArray(Fields.DETAILS); - for (FileDetail file : values()) { - file.toXContent(builder, params); - } - builder.endArray(); - } - - return builder; - } - - public void addFileDetails(String name, long length, boolean reused) { - assert complete == false : "addFileDetail for [" + name + "] when file details are already complete"; - FileDetail existing = fileDetails.put(name, new FileDetail(name, length, reused)); - assert existing == null : "file [" + name + "] is already reported"; - } - - public void addRecoveredBytesToFile(String name, long bytes) { - FileDetail file = fileDetails.get(name); - assert file != null : "file [" + name + "] hasn't been reported"; - file.addRecoveredBytes(bytes); - } - - public FileDetail get(String name) { - return fileDetails.get(name); - } - - public void setComplete() { - complete = true; - } - - public int size() { - return fileDetails.size(); - } - - public boolean isEmpty() { - return fileDetails.isEmpty(); - } - - public void clear() { - fileDetails.clear(); - complete = false; - } - - public Collection values() { - return fileDetails.values(); - } - - public boolean isComplete() { - return complete; - } - } - - public static class Index extends Timer implements ToXContentFragment, Writeable { - private final RecoveryFilesDetails fileDetails; - - public static final long UNKNOWN = -1L; - - private long sourceThrottlingInNanos = UNKNOWN; - private long targetThrottleTimeInNanos = UNKNOWN; - - public Index() { - this(new RecoveryFilesDetails()); - } - - public Index(RecoveryFilesDetails recoveryFilesDetails) { - this.fileDetails = recoveryFilesDetails; - } - - public Index(StreamInput in) throws IOException { - super(in); - fileDetails = new RecoveryFilesDetails(in); - sourceThrottlingInNanos = in.readLong(); - targetThrottleTimeInNanos = in.readLong(); - } - - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - fileDetails.writeTo(out); - out.writeLong(sourceThrottlingInNanos); - out.writeLong(targetThrottleTimeInNanos); - } - - public synchronized List fileDetails() { - return Collections.unmodifiableList(new ArrayList<>(fileDetails.values())); - } - - public synchronized void reset() { - super.reset(); - fileDetails.clear(); - sourceThrottlingInNanos = UNKNOWN; - targetThrottleTimeInNanos = UNKNOWN; - } - - public synchronized void addFileDetail(String name, long length, boolean reused) { - fileDetails.addFileDetails(name, length, reused); - } - - public synchronized void setFileDetailsComplete() { - fileDetails.setComplete(); - } - - public synchronized void addRecoveredBytesToFile(String name, long bytes) { - fileDetails.addRecoveredBytesToFile(name, bytes); - } - - public synchronized void addSourceThrottling(long timeInNanos) { - if (sourceThrottlingInNanos == UNKNOWN) { - sourceThrottlingInNanos = timeInNanos; - } else { - sourceThrottlingInNanos += timeInNanos; - } - } - - public synchronized void addTargetThrottling(long timeInNanos) { - if (targetThrottleTimeInNanos == UNKNOWN) { - targetThrottleTimeInNanos = timeInNanos; - } else { - targetThrottleTimeInNanos += timeInNanos; - } - } - - public synchronized TimeValue sourceThrottling() { - return TimeValue.timeValueNanos(sourceThrottlingInNanos); - } - - public synchronized TimeValue targetThrottling() { - return TimeValue.timeValueNanos(targetThrottleTimeInNanos); - } - - /** - * total number of files that are part of this recovery, both re-used and recovered - */ - public synchronized int totalFileCount() { - return fileDetails.size(); - } - - /** - * total number of files to be recovered (potentially not yet done) - */ - public synchronized int totalRecoverFiles() { - int total = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total++; - } - } - return total; - } - - /** - * number of file that were recovered (excluding on ongoing files) - */ - public synchronized int recoveredFileCount() { - int count = 0; - for (FileDetail file : fileDetails.values()) { - if (file.fullyRecovered()) { - count++; - } - } - return count; - } - - /** - * percent of recovered (i.e., not reused) files out of the total files to be recovered - */ - public synchronized float recoveredFilesPercent() { - int total = 0; - int recovered = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total++; - if (file.fullyRecovered()) { - recovered++; - } - } - } - if (total == 0 && fileDetails.size() == 0) { // indicates we are still in init phase - return 0.0f; - } - if (total == recovered) { - return 100.0f; - } else { - float result = 100.0f * (recovered / (float) total); - return result; - } - } - - /** - * total number of bytes in th shard - */ - public synchronized long totalBytes() { - long total = 0; - for (FileDetail file : fileDetails.values()) { - total += file.length(); - } - return total; - } - - /** - * total number of bytes recovered so far, including both existing and reused - */ - public synchronized long recoveredBytes() { - long recovered = 0; - for (FileDetail file : fileDetails.values()) { - recovered += file.recovered(); - } - return recovered; - } - - /** - * total bytes of files to be recovered (potentially not yet done) - */ - public synchronized long totalRecoverBytes() { - long total = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total += file.length(); - } - } - return total; - } - - /** - * @return number of bytes still to recover, i.e. {@link Index#totalRecoverBytes()} minus {@link Index#recoveredBytes()}, or - * {@code -1} if the full set of files to recover is not yet known - */ - public synchronized long bytesStillToRecover() { - if (fileDetails.isComplete() == false) { - return -1L; - } - long total = 0L; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total += file.length() - file.recovered(); - } - } - return total; - } - - /** - * percent of bytes recovered out of total files bytes *to be* recovered - */ - public synchronized float recoveredBytesPercent() { - long total = 0; - long recovered = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total += file.length(); - recovered += file.recovered(); - } - } - if (total == 0 && fileDetails.size() == 0) { - // indicates we are still in init phase - return 0.0f; - } - if (total == recovered) { - return 100.0f; - } else { - return 100.0f * recovered / total; - } - } - - public synchronized int reusedFileCount() { - int reused = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused()) { - reused++; - } - } - return reused; - } - - public synchronized long reusedBytes() { - long reused = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused()) { - reused += file.length(); - } - } - return reused; - } - - @Override - public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - // stream size first, as it matters more and the files section can be long - builder.startObject(Fields.SIZE); - builder.humanReadableField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalBytes())); - builder.humanReadableField(Fields.REUSED_IN_BYTES, Fields.REUSED, new ByteSizeValue(reusedBytes())); - builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recoveredBytes())); - builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent())); - builder.endObject(); - - builder.startObject(Fields.FILES); - builder.field(Fields.TOTAL, totalFileCount()); - builder.field(Fields.REUSED, reusedFileCount()); - builder.field(Fields.RECOVERED, recoveredFileCount()); - builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent())); - fileDetails.toXContent(builder, params); - builder.endObject(); - builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(time())); - builder.humanReadableField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling()); - builder.humanReadableField(Fields.TARGET_THROTTLE_TIME_IN_MILLIS, Fields.TARGET_THROTTLE_TIME, targetThrottling()); - return builder; - } - - @Override - public synchronized String toString() { - try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); - builder.startObject(); - toXContent(builder, EMPTY_PARAMS); - builder.endObject(); - return Strings.toString(builder); - } catch (IOException e) { - return "{ \"error\" : \"" + e.getMessage() + "\"}"; - } - } - - public synchronized FileDetail getFileDetails(String dest) { - return fileDetails.get(dest); - } - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 79a9f5da8a69f..ebdead1fcf250 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -312,7 +312,7 @@ public void receiveFileInfo( ActionListener.completeWith(listener, () -> { indexShard.resetRecoveryStage(); indexShard.prepareForIndexRecovery(); - final RecoveryState.Index index = state().getIndex(); + final RecoveryIndex index = state().getIndex(); for (int i = 0; i < phase1ExistingFileNames.size(); i++) { index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 9a4b26300d16c..151b4a4011f5e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -19,8 +19,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.MultiFileWriter; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryRequestTracker; -import org.opensearch.indices.recovery.RecoveryState; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -41,7 +41,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected final ReplicationListener listener; protected final MultiFileWriter multiFileWriter; protected final Logger logger; - protected final RecoveryState.Index recoveryStateIndex; + protected final RecoveryIndex recoveryStateIndex; protected abstract String getPrefix(); @@ -53,7 +53,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted { public abstract RState state(); - public ReplicationTarget(String name, IndexShard indexShard, RecoveryState.Index recoveryStateIndex, ReplicationListener listener) { + public ReplicationTarget(String name, IndexShard indexShard, RecoveryIndex recoveryStateIndex, ReplicationListener listener) { super(name); this.logger = Loggers.getLogger(getClass(), indexShard.shardId()); this.listener = listener; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java index 0856e5bad5dc6..0721756b7ea8f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.copy; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.Timer; import org.opensearch.indices.replication.common.RState; @@ -15,10 +16,10 @@ public class ReplicationState implements RState { private Timer timer; - private RecoveryState.Index index; + private RecoveryIndex index; private Stage stage; - public ReplicationState(RecoveryState.Index index) { + public ReplicationState(RecoveryIndex index) { this.index = index; this.timer = new Timer(); stage = Stage.INACTIVE; @@ -33,41 +34,29 @@ public Timer getTimer() { return timer; } - public RecoveryState.Index getIndex() { + public RecoveryIndex getIndex() { return index; } + /** + * THis class duplicates the purpose/functionality of {@link RecoveryState.Stage} + * so this temporary implementation simply aliases the enums from the other class. + */ public enum Stage { // TODO: Add more steps here. - INACTIVE((byte) 0), + INACTIVE(RecoveryState.Stage.INIT), - ACTIVE((byte) 1); - - private static final ReplicationState.Stage[] STAGES = new ReplicationState.Stage[ReplicationState.Stage.values().length]; - - static { - for (ReplicationState.Stage stage : ReplicationState.Stage.values()) { - assert stage.id() < STAGES.length && stage.id() >= 0; - STAGES[stage.id] = stage; - } - } + ACTIVE(RecoveryState.Stage.INDEX); private final byte id; - Stage(byte id) { - this.id = id; + Stage(RecoveryState.Stage recoveryStage) { + this.id = recoveryStage.id(); } public byte id() { return id; } - - public static ReplicationState.Stage fromId(byte id) { - if (id < 0 || id >= STAGES.length) { - throw new IllegalArgumentException("No mapping for id [" + id + "]"); - } - return STAGES[id]; - } } public synchronized Stage getStage() { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index 613125b3aee1a..e4729166654e8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -44,6 +44,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse; @@ -79,7 +80,7 @@ public SegmentReplicationTarget( PrimaryShardReplicationSource source, SegmentReplicationReplicaService.SegmentReplicationListener listener ) { - super("replication_status", indexShard, new RecoveryState.Index(), listener); + super("replication_status", indexShard, new RecoveryIndex(), listener); this.checkpoint = checkpoint; this.source = source; state = new ReplicationState(recoveryStateIndex); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index d73f3f81c8138..ef172f325bc68 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -134,6 +134,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.repositories.IndexId; @@ -3150,7 +3151,7 @@ public void testRecoverFromLocalShard() throws IOException { RecoveryState recoveryState = targetShard.recoveryState(); assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); assertTrue(recoveryState.getIndex().fileDetails().size() > 0); - for (RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) { + for (RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) { if (file.reused()) { assertEquals(file.recovered(), 0); } else { diff --git a/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java b/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java index c325ac6bc754e..06fecdd0d22ce 100644 --- a/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java @@ -61,7 +61,7 @@ import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.Uid; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -108,7 +108,7 @@ public void testAddIndices() throws IOException { writer.close(); } StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger); - RecoveryState.Index indexStats = new RecoveryState.Index(); + RecoveryIndex indexStats = new RecoveryIndex(); Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); @@ -174,7 +174,7 @@ public void testSplitShard() throws IOException { writer.commit(); writer.close(); StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger); - RecoveryState.Index indexStats = new RecoveryState.Index(); + RecoveryIndex indexStats = new RecoveryIndex(); Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); @@ -250,7 +250,7 @@ public void testSplitShard() throws IOException { public void testStatsDirWrapper() throws IOException { Directory dir = newDirectory(); Directory target = newDirectory(); - RecoveryState.Index indexStats = new RecoveryState.Index(); + RecoveryIndex indexStats = new RecoveryIndex(); StoreRecovery.StatsDirectoryWrapper wrapper = new StoreRecovery.StatsDirectoryWrapper(target, indexStats); try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) { CodecUtil.writeHeader(output, "foo", 0); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index b195984de64b5..b005df315aad6 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -198,7 +198,7 @@ public void testSendFiles() throws Throwable { metas.add(md); } Store targetStore = newStore(createTempDir()); - MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryIndex.class), "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { @Override public void writeFileChunk( @@ -528,7 +528,7 @@ public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { ) ); Store targetStore = newStore(createTempDir(), false); - MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryIndex.class), "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { @Override public void writeFileChunk( diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java index 57bea083cfd2b..f65bf98c5e916 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java @@ -41,8 +41,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.recovery.RecoveryState.FileDetail; -import org.opensearch.indices.recovery.RecoveryState.Index; +import org.opensearch.indices.recovery.RecoveryIndex.FileDetail; import org.opensearch.indices.recovery.RecoveryState.Stage; import org.opensearch.indices.recovery.RecoveryState.Translog; import org.opensearch.indices.recovery.RecoveryState.VerifyIndex; @@ -136,11 +135,11 @@ Timer createObj(StreamInput in) throws IOException { } }; } else if (randomBoolean()) { - timer = new Index(); + timer = new RecoveryIndex(); streamer = new Streamer(stop, timer) { @Override Timer createObj(StreamInput in) throws IOException { - return new Index(in); + return new RecoveryIndex(in); } }; } else if (randomBoolean()) { @@ -211,7 +210,7 @@ public void testIndex() throws Throwable { } Collections.shuffle(Arrays.asList(files), random()); - final RecoveryState.Index index = new RecoveryState.Index(); + final RecoveryIndex index = new RecoveryIndex(); assertThat(index.bytesStillToRecover(), equalTo(-1L)); if (randomBoolean()) { @@ -238,8 +237,8 @@ public void testIndex() throws Throwable { // before we start we must report 0 assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0)); assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0)); - assertThat(index.sourceThrottling().nanos(), equalTo(Index.UNKNOWN)); - assertThat(index.targetThrottling().nanos(), equalTo(Index.UNKNOWN)); + assertThat(index.sourceThrottling().nanos(), equalTo(RecoveryIndex.UNKNOWN)); + assertThat(index.targetThrottling().nanos(), equalTo(RecoveryIndex.UNKNOWN)); index.start(); for (FileDetail file : files) { @@ -270,24 +269,24 @@ public void testIndex() throws Throwable { } AtomicBoolean streamShouldStop = new AtomicBoolean(); - Streamer backgroundReader = new Streamer(streamShouldStop, index) { + Streamer backgroundReader = new Streamer(streamShouldStop, index) { @Override - Index createObj(StreamInput in) throws IOException { - return new Index(in); + RecoveryIndex createObj(StreamInput in) throws IOException { + return new RecoveryIndex(in); } }; backgroundReader.start(); long recoveredBytes = 0; - long sourceThrottling = Index.UNKNOWN; - long targetThrottling = Index.UNKNOWN; + long sourceThrottling = RecoveryIndex.UNKNOWN; + long targetThrottling = RecoveryIndex.UNKNOWN; while (bytesToRecover > 0) { FileDetail file = randomFrom(filesToRecover); final long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered()))); final long throttledOnSource = rarely() ? randomIntBetween(10, 200) : 0; index.addSourceThrottling(throttledOnSource); - if (sourceThrottling == Index.UNKNOWN) { + if (sourceThrottling == RecoveryIndex.UNKNOWN) { sourceThrottling = throttledOnSource; } else { sourceThrottling += throttledOnSource; @@ -295,7 +294,7 @@ Index createObj(StreamInput in) throws IOException { index.addRecoveredBytesToFile(file.name(), toRecover); file.addRecoveredBytes(toRecover); final long throttledOnTarget = rarely() ? randomIntBetween(10, 200) : 0; - if (targetThrottling == Index.UNKNOWN) { + if (targetThrottling == RecoveryIndex.UNKNOWN) { targetThrottling = throttledOnTarget; } else { targetThrottling += throttledOnTarget; @@ -317,7 +316,7 @@ Index createObj(StreamInput in) throws IOException { logger.info("testing serialized information"); streamShouldStop.set(true); backgroundReader.join(); - final Index lastRead = backgroundReader.lastRead(); + final RecoveryIndex lastRead = backgroundReader.lastRead(); assertThat(lastRead.fileDetails().toArray(), arrayContainingInAnyOrder(index.fileDetails().toArray())); assertThat(lastRead.startTime(), equalTo(index.startTime())); if (completeRecovery) { @@ -535,12 +534,12 @@ VerifyIndex createObj(StreamInput in) throws IOException { } public void testConcurrentModificationIndexFileDetailsMap() throws InterruptedException { - final Index index = new Index(); + final RecoveryIndex index = new RecoveryIndex(); final AtomicBoolean stop = new AtomicBoolean(false); - Streamer readWriteIndex = new Streamer(stop, index) { + Streamer readWriteIndex = new Streamer(stop, index) { @Override - Index createObj(StreamInput in) throws IOException { - return new Index(in); + RecoveryIndex createObj(StreamInput in) throws IOException { + return new RecoveryIndex(in); } }; Thread modifyThread = new Thread() { diff --git a/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java index f2c6a13b92597..3420963bdd3e6 100644 --- a/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java @@ -66,6 +66,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; @@ -203,12 +204,12 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { futureC.actionGet(); assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size() - 2); assertEquals(secondState.getIndex().recoveredFileCount(), 2); - List recoveredFiles = secondState.getIndex() + List recoveredFiles = secondState.getIndex() .fileDetails() .stream() .filter(f -> f.reused() == false) .collect(Collectors.toList()); - Collections.sort(recoveredFiles, Comparator.comparing(RecoveryState.FileDetail::name)); + Collections.sort(recoveredFiles, Comparator.comparing(RecoveryIndex.FileDetail::name)); assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv")); assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_" + incIndexCommit.getGeneration())); } finally { diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java index 4407c3483a1b3..26fae393365b3 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java @@ -44,6 +44,7 @@ import org.opensearch.common.xcontent.XContentOpenSearchExtension; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.Timer; import org.opensearch.test.OpenSearchTestCase; @@ -91,7 +92,7 @@ public void testRestRecoveryAction() { when(targetNode.getHostName()).thenReturn(randomAlphaOfLength(8)); when(state.getTargetNode()).thenReturn(targetNode); - RecoveryState.Index index = mock(RecoveryState.Index.class); + RecoveryIndex index = mock(RecoveryIndex.class); final int totalRecoveredFiles = randomIntBetween(1, 64); when(index.totalRecoverFiles()).thenReturn(totalRecoveredFiles);