From c0b1b594f71a7b355d7f282c1365ac7f3ad11ebb Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Fri, 22 Jul 2022 16:37:45 +0000 Subject: [PATCH] [Backport 2.x] [Segment Replication] Moving RecoveryState.Index to a top-level class and renaming (#3971) * Backport https://github.com/opensearch-project/OpenSearch/pull/3075 to 2.x branch and resolve conflict in RecoveryState class Signed-off-by: Rishikesh1159 Co-authored-by: Kartik Ganesh --- .../gateway/RecoveryFromGatewayIT.java | 5 +- .../indices/recovery/IndexRecoveryIT.java | 3 +- .../opensearch/index/shard/StoreRecovery.java | 13 +- .../indices/recovery/MultiFileWriter.java | 5 +- .../recovery/PeerRecoveryTargetService.java | 5 +- .../indices/recovery/RecoveryState.java | 517 +---------------- .../indices/recovery/RecoveryTarget.java | 3 +- .../common/ReplicationLuceneIndex.java | 530 ++++++++++++++++++ .../index/shard/IndexShardTests.java | 3 +- .../index/shard/StoreRecoveryTests.java | 8 +- .../recovery/RecoverySourceHandlerTests.java | 9 +- .../indices/recovery/RecoveryTargetTests.java | 60 +- .../repositories/fs/FsRepositoryTests.java | 5 +- .../action/cat/RestRecoveryActionTests.java | 47 +- 14 files changed, 639 insertions(+), 574 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/common/ReplicationLuceneIndex.java diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index e088a1236fa3e..11af1fb3cbfab 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -62,6 +62,7 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -547,7 +548,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { final Set files = new HashSet<>(); for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) { if (recoveryState.getTargetNode().getName().equals(replicaNode)) { - for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) { + for (final ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) { files.add(file.name()); } break; @@ -607,7 +608,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { long reused = 0; int filesRecovered = 0; int filesReused = 0; - for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) { + for (final ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) { if (files.contains(file.name()) == false) { recovered += file.length(); filesRecovered++; diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index e33560b9bbd4e..43059774af649 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -102,6 +102,7 @@ import org.opensearch.indices.NodeIndicesStats; import org.opensearch.indices.analysis.AnalysisModule; import org.opensearch.indices.recovery.RecoveryState.Stage; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.node.NodeClosedException; import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.AnalysisPlugin; @@ -836,7 +837,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, return client().admin().indices().prepareStats(name).execute().actionGet(); } - private void validateIndexRecoveryState(RecoveryState.Index indexState) { + private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) { assertThat(indexState.time(), greaterThanOrEqualTo(0L)); assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f)); assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f)); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index df501d24265c3..0499140237dbd 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -62,6 +62,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; @@ -178,7 +179,7 @@ void recoverFromLocalShards( } void addIndices( - final RecoveryState.Index indexRecoveryStats, + final ReplicationLuceneIndex indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources, @@ -235,9 +236,9 @@ void addIndices( * @opensearch.internal */ static final class StatsDirectoryWrapper extends FilterDirectory { - private final RecoveryState.Index index; + private final ReplicationLuceneIndex index; - StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) { + StatsDirectoryWrapper(Directory in, ReplicationLuceneIndex indexRecoveryStats) { super(in); this.index = indexRecoveryStats; } @@ -358,7 +359,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi + "]"; if (logger.isTraceEnabled()) { - RecoveryState.Index index = recoveryState.getIndex(); + ReplicationLuceneIndex index = recoveryState.getIndex(); StringBuilder sb = new StringBuilder(); sb.append(" index : files [") .append(index.totalFileCount()) @@ -475,7 +476,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size - final RecoveryState.Index index = recoveryState.getIndex(); + final ReplicationLuceneIndex index = recoveryState.getIndex(); try { if (si != null) { addRecoveredFileDetails(si, store, index); @@ -513,7 +514,7 @@ private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws assert indexShard.loadRetentionLeases().leases().isEmpty(); } - private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException { + private void addRecoveredFileDetails(SegmentInfos si, Store store, ReplicationLuceneIndex index) throws IOException { final Directory directory = store.directory(); for (String name : Lucene.files(si)) { long length = directory.fileLength(name); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index 4d054e7bcf29c..3509615052707 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -44,6 +44,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.transport.Transports; import java.io.IOException; @@ -63,7 +64,7 @@ */ public class MultiFileWriter extends AbstractRefCounted implements Releasable { - public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { + public MultiFileWriter(Store store, ReplicationLuceneIndex indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { super("multi_file_writer"); this.store = store; this.indexState = indexState; @@ -76,7 +77,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF private final AtomicBoolean closed = new AtomicBoolean(false); private final Logger logger; private final Store store; - private final RecoveryState.Index indexState; + private final ReplicationLuceneIndex indexState; private final String tempFilePrefix; private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 70d751a3e16d2..4ae188abe5896 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -70,6 +70,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -533,8 +534,8 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha return; } - final RecoveryState.Index indexState = recoveryTarget.state().getIndex(); - if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) { + final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex(); + if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) { indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index 250a408008178..35ac5cbc12bde 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -37,29 +37,19 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; -import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ToXContentFragment; -import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.StoreStats; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Locale; -import java.util.Map; /** * Keeps track of state related to shard recovery. @@ -127,7 +117,7 @@ public static Stage fromId(byte id) { private Stage stage; - private final Index index; + private final ReplicationLuceneIndex index; private final Translog translog; private final VerifyIndex verifyIndex; private final ReplicationTimer timer; @@ -140,10 +130,15 @@ public static Stage fromId(byte id) { private boolean primary; public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) { - this(shardRouting, targetNode, sourceNode, new Index()); + this(shardRouting, targetNode, sourceNode, new ReplicationLuceneIndex()); } - public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) { + public RecoveryState( + ShardRouting shardRouting, + DiscoveryNode targetNode, + @Nullable DiscoveryNode sourceNode, + ReplicationLuceneIndex index + ) { assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting; RecoverySource recoverySource = shardRouting.recoverySource(); assert (recoverySource.getType() == RecoverySource.Type.PEER) == (sourceNode != null) @@ -168,7 +163,7 @@ public RecoveryState(StreamInput in) throws IOException { recoverySource = RecoverySource.readFrom(in); targetNode = new DiscoveryNode(in); sourceNode = in.readOptionalWriteable(DiscoveryNode::new); - index = new Index(in); + index = new ReplicationLuceneIndex(in); translog = new Translog(in); verifyIndex = new VerifyIndex(in); primary = in.readBoolean(); @@ -252,7 +247,7 @@ public synchronized RecoveryState setStage(Stage stage) { return this; } - public Index getIndex() { + public ReplicationLuceneIndex getIndex() { return index; } @@ -365,23 +360,10 @@ static final class Fields { static final String TOTAL_ON_START = "total_on_start"; static final String VERIFY_INDEX = "verify_index"; static final String RECOVERED = "recovered"; - static final String RECOVERED_IN_BYTES = "recovered_in_bytes"; static final String CHECK_INDEX_TIME = "check_index_time"; static final String CHECK_INDEX_TIME_IN_MILLIS = "check_index_time_in_millis"; - static final String LENGTH = "length"; - static final String LENGTH_IN_BYTES = "length_in_bytes"; - static final String FILES = "files"; static final String TOTAL = "total"; - static final String TOTAL_IN_BYTES = "total_in_bytes"; - static final String REUSED = "reused"; - static final String REUSED_IN_BYTES = "reused_in_bytes"; static final String PERCENT = "percent"; - static final String DETAILS = "details"; - static final String SIZE = "size"; - static final String SOURCE_THROTTLE_TIME = "source_throttle_time"; - static final String SOURCE_THROTTLE_TIME_IN_MILLIS = "source_throttle_time_in_millis"; - static final String TARGET_THROTTLE_TIME = "target_throttle_time"; - static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis"; } /** @@ -576,481 +558,4 @@ public synchronized XContentBuilder toXContent(XContentBuilder builder, Params p } } - /** - * Details about the file - * - * @opensearch.internal - */ - public static class FileDetail implements ToXContentObject, Writeable { - private String name; - private long length; - private long recovered; - private boolean reused; - - public FileDetail(String name, long length, boolean reused) { - assert name != null; - this.name = name; - this.length = length; - this.reused = reused; - } - - public FileDetail(StreamInput in) throws IOException { - name = in.readString(); - length = in.readVLong(); - recovered = in.readVLong(); - reused = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(name); - out.writeVLong(length); - out.writeVLong(recovered); - out.writeBoolean(reused); - } - - void addRecoveredBytes(long bytes) { - assert reused == false : "file is marked as reused, can't update recovered bytes"; - assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; - recovered += bytes; - } - - /** - * file name * - */ - public String name() { - return name; - } - - /** - * file length * - */ - public long length() { - return length; - } - - /** - * number of bytes recovered for this file (so far). 0 if the file is reused * - */ - public long recovered() { - return recovered; - } - - /** - * returns true if the file is reused from a local copy - */ - public boolean reused() { - return reused; - } - - boolean fullyRecovered() { - return reused == false && length == recovered; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(Fields.NAME, name); - builder.humanReadableField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, new ByteSizeValue(length)); - builder.field(Fields.REUSED, reused); - builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered)); - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof FileDetail) { - FileDetail other = (FileDetail) obj; - return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered(); - } - return false; - } - - @Override - public int hashCode() { - int result = name.hashCode(); - result = 31 * result + Long.hashCode(length); - result = 31 * result + Long.hashCode(recovered); - result = 31 * result + (reused ? 1 : 0); - return result; - } - - @Override - public String toString() { - return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])"; - } - } - - /** - * Details about the Recovery Files - * - * @opensearch.internal - */ - public static class RecoveryFilesDetails implements ToXContentFragment, Writeable { - protected final Map fileDetails = new HashMap<>(); - protected boolean complete; - - public RecoveryFilesDetails() {} - - RecoveryFilesDetails(StreamInput in) throws IOException { - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - FileDetail file = new FileDetail(in); - fileDetails.put(file.name, file); - } - if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { - complete = in.readBoolean(); - } else { - // This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not - // then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete - // so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path - // anyway since they always use IndexShard#getRecoveryState which is never transported over the wire. - complete = fileDetails.isEmpty() == false; - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - final FileDetail[] files = values().toArray(new FileDetail[0]); - out.writeVInt(files.length); - for (FileDetail file : files) { - file.writeTo(out); - } - if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { - out.writeBoolean(complete); - } - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if (params.paramAsBoolean("detailed", false)) { - builder.startArray(Fields.DETAILS); - for (FileDetail file : values()) { - file.toXContent(builder, params); - } - builder.endArray(); - } - - return builder; - } - - public void addFileDetails(String name, long length, boolean reused) { - assert complete == false : "addFileDetail for [" + name + "] when file details are already complete"; - FileDetail existing = fileDetails.put(name, new FileDetail(name, length, reused)); - assert existing == null : "file [" + name + "] is already reported"; - } - - public void addRecoveredBytesToFile(String name, long bytes) { - FileDetail file = fileDetails.get(name); - assert file != null : "file [" + name + "] hasn't been reported"; - file.addRecoveredBytes(bytes); - } - - public FileDetail get(String name) { - return fileDetails.get(name); - } - - public void setComplete() { - complete = true; - } - - public int size() { - return fileDetails.size(); - } - - public boolean isEmpty() { - return fileDetails.isEmpty(); - } - - public void clear() { - fileDetails.clear(); - complete = false; - } - - public Collection values() { - return fileDetails.values(); - } - - public boolean isComplete() { - return complete; - } - } - - /** - * File details per Index - * - * @opensearch.internal - */ - public static class Index extends ReplicationTimer implements ToXContentFragment, Writeable { - private final RecoveryFilesDetails fileDetails; - - public static final long UNKNOWN = -1L; - - private long sourceThrottlingInNanos = UNKNOWN; - private long targetThrottleTimeInNanos = UNKNOWN; - - public Index() { - this(new RecoveryFilesDetails()); - } - - public Index(RecoveryFilesDetails recoveryFilesDetails) { - this.fileDetails = recoveryFilesDetails; - } - - public Index(StreamInput in) throws IOException { - super(in); - fileDetails = new RecoveryFilesDetails(in); - sourceThrottlingInNanos = in.readLong(); - targetThrottleTimeInNanos = in.readLong(); - } - - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - fileDetails.writeTo(out); - out.writeLong(sourceThrottlingInNanos); - out.writeLong(targetThrottleTimeInNanos); - } - - public synchronized List fileDetails() { - return Collections.unmodifiableList(new ArrayList<>(fileDetails.values())); - } - - public synchronized void reset() { - super.reset(); - fileDetails.clear(); - sourceThrottlingInNanos = UNKNOWN; - targetThrottleTimeInNanos = UNKNOWN; - } - - public synchronized void addFileDetail(String name, long length, boolean reused) { - fileDetails.addFileDetails(name, length, reused); - } - - public synchronized void setFileDetailsComplete() { - fileDetails.setComplete(); - } - - public synchronized void addRecoveredBytesToFile(String name, long bytes) { - fileDetails.addRecoveredBytesToFile(name, bytes); - } - - public synchronized void addSourceThrottling(long timeInNanos) { - if (sourceThrottlingInNanos == UNKNOWN) { - sourceThrottlingInNanos = timeInNanos; - } else { - sourceThrottlingInNanos += timeInNanos; - } - } - - public synchronized void addTargetThrottling(long timeInNanos) { - if (targetThrottleTimeInNanos == UNKNOWN) { - targetThrottleTimeInNanos = timeInNanos; - } else { - targetThrottleTimeInNanos += timeInNanos; - } - } - - public synchronized TimeValue sourceThrottling() { - return TimeValue.timeValueNanos(sourceThrottlingInNanos); - } - - public synchronized TimeValue targetThrottling() { - return TimeValue.timeValueNanos(targetThrottleTimeInNanos); - } - - /** - * total number of files that are part of this recovery, both re-used and recovered - */ - public synchronized int totalFileCount() { - return fileDetails.size(); - } - - /** - * total number of files to be recovered (potentially not yet done) - */ - public synchronized int totalRecoverFiles() { - int total = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total++; - } - } - return total; - } - - /** - * number of file that were recovered (excluding on ongoing files) - */ - public synchronized int recoveredFileCount() { - int count = 0; - for (FileDetail file : fileDetails.values()) { - if (file.fullyRecovered()) { - count++; - } - } - return count; - } - - /** - * percent of recovered (i.e., not reused) files out of the total files to be recovered - */ - public synchronized float recoveredFilesPercent() { - int total = 0; - int recovered = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total++; - if (file.fullyRecovered()) { - recovered++; - } - } - } - if (total == 0 && fileDetails.size() == 0) { // indicates we are still in init phase - return 0.0f; - } - if (total == recovered) { - return 100.0f; - } else { - float result = 100.0f * (recovered / (float) total); - return result; - } - } - - /** - * total number of bytes in th shard - */ - public synchronized long totalBytes() { - long total = 0; - for (FileDetail file : fileDetails.values()) { - total += file.length(); - } - return total; - } - - /** - * total number of bytes recovered so far, including both existing and reused - */ - public synchronized long recoveredBytes() { - long recovered = 0; - for (FileDetail file : fileDetails.values()) { - recovered += file.recovered(); - } - return recovered; - } - - /** - * total bytes of files to be recovered (potentially not yet done) - */ - public synchronized long totalRecoverBytes() { - long total = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total += file.length(); - } - } - return total; - } - - /** - * @return number of bytes still to recover, i.e. {@link Index#totalRecoverBytes()} minus {@link Index#recoveredBytes()}, or - * {@code -1} if the full set of files to recover is not yet known - */ - public synchronized long bytesStillToRecover() { - if (fileDetails.isComplete() == false) { - return -1L; - } - long total = 0L; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total += file.length() - file.recovered(); - } - } - return total; - } - - /** - * percent of bytes recovered out of total files bytes *to be* recovered - */ - public synchronized float recoveredBytesPercent() { - long total = 0; - long recovered = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused() == false) { - total += file.length(); - recovered += file.recovered(); - } - } - if (total == 0 && fileDetails.size() == 0) { - // indicates we are still in init phase - return 0.0f; - } - if (total == recovered) { - return 100.0f; - } else { - return 100.0f * recovered / total; - } - } - - public synchronized int reusedFileCount() { - int reused = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused()) { - reused++; - } - } - return reused; - } - - public synchronized long reusedBytes() { - long reused = 0; - for (FileDetail file : fileDetails.values()) { - if (file.reused()) { - reused += file.length(); - } - } - return reused; - } - - @Override - public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - // stream size first, as it matters more and the files section can be long - builder.startObject(Fields.SIZE); - builder.humanReadableField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalBytes())); - builder.humanReadableField(Fields.REUSED_IN_BYTES, Fields.REUSED, new ByteSizeValue(reusedBytes())); - builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recoveredBytes())); - builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent())); - builder.endObject(); - - builder.startObject(Fields.FILES); - builder.field(Fields.TOTAL, totalFileCount()); - builder.field(Fields.REUSED, reusedFileCount()); - builder.field(Fields.RECOVERED, recoveredFileCount()); - builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent())); - fileDetails.toXContent(builder, params); - builder.endObject(); - builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(time())); - builder.humanReadableField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling()); - builder.humanReadableField(Fields.TARGET_THROTTLE_TIME_IN_MILLIS, Fields.TARGET_THROTTLE_TIME, targetThrottling()); - return builder; - } - - @Override - public synchronized String toString() { - try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); - builder.startObject(); - toXContent(builder, EMPTY_PARAMS); - builder.endObject(); - return Strings.toString(builder); - } catch (IOException e) { - return "{ \"error\" : \"" + e.getMessage() + "\"}"; - } - } - - public synchronized FileDetail getFileDetails(String dest) { - return fileDetails.get(dest); - } - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 83c2b03fa19e0..bb557cc6837ab 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -60,6 +60,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import java.io.IOException; import java.nio.file.Path; @@ -425,7 +426,7 @@ public void receiveFileInfo( ActionListener.completeWith(listener, () -> { indexShard.resetRecoveryStage(); indexShard.prepareForIndexRecovery(); - final RecoveryState.Index index = state().getIndex(); + final ReplicationLuceneIndex index = state().getIndex(); for (int i = 0; i < phase1ExistingFileNames.size(); i++) { index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationLuceneIndex.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationLuceneIndex.java new file mode 100644 index 0000000000000..3813c55f5aa49 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationLuceneIndex.java @@ -0,0 +1,530 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.index.store.StoreStats; +import org.opensearch.indices.recovery.RecoveryState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Represents the Lucene Index (set of files on a single shard) involved + * in the replication process. + * + * @opensearch.internal + */ +public final class ReplicationLuceneIndex extends ReplicationTimer implements ToXContentFragment, Writeable { + private final FilesDetails filesDetails; + + public static final long UNKNOWN = -1L; + + private long sourceThrottlingInNanos = UNKNOWN; + private long targetThrottleTimeInNanos = UNKNOWN; + + public ReplicationLuceneIndex() { + this(new FilesDetails()); + } + + public ReplicationLuceneIndex(FilesDetails filesDetails) { + this.filesDetails = filesDetails; + } + + public ReplicationLuceneIndex(StreamInput in) throws IOException { + super(in); + filesDetails = new FilesDetails(in); + sourceThrottlingInNanos = in.readLong(); + targetThrottleTimeInNanos = in.readLong(); + } + + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + filesDetails.writeTo(out); + out.writeLong(sourceThrottlingInNanos); + out.writeLong(targetThrottleTimeInNanos); + } + + public synchronized List fileDetails() { + return Collections.unmodifiableList(new ArrayList<>(filesDetails.values())); + } + + public synchronized void reset() { + super.reset(); + filesDetails.clear(); + sourceThrottlingInNanos = UNKNOWN; + targetThrottleTimeInNanos = UNKNOWN; + } + + public synchronized void addFileDetail(String name, long length, boolean reused) { + filesDetails.addFileDetails(name, length, reused); + } + + public synchronized void setFileDetailsComplete() { + filesDetails.setComplete(); + } + + public synchronized void addRecoveredBytesToFile(String name, long bytes) { + filesDetails.addRecoveredBytesToFile(name, bytes); + } + + public synchronized void addSourceThrottling(long timeInNanos) { + if (sourceThrottlingInNanos == UNKNOWN) { + sourceThrottlingInNanos = timeInNanos; + } else { + sourceThrottlingInNanos += timeInNanos; + } + } + + public synchronized void addTargetThrottling(long timeInNanos) { + if (targetThrottleTimeInNanos == UNKNOWN) { + targetThrottleTimeInNanos = timeInNanos; + } else { + targetThrottleTimeInNanos += timeInNanos; + } + } + + public synchronized TimeValue sourceThrottling() { + return TimeValue.timeValueNanos(sourceThrottlingInNanos); + } + + public synchronized TimeValue targetThrottling() { + return TimeValue.timeValueNanos(targetThrottleTimeInNanos); + } + + /** + * total number of files that are part of this recovery, both re-used and recovered + */ + public synchronized int totalFileCount() { + return filesDetails.size(); + } + + /** + * total number of files to be recovered (potentially not yet done) + */ + public synchronized int totalRecoverFiles() { + int total = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.reused() == false) { + total++; + } + } + return total; + } + + /** + * number of file that were recovered (excluding on ongoing files) + */ + public synchronized int recoveredFileCount() { + int count = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.fullyRecovered()) { + count++; + } + } + return count; + } + + /** + * percent of recovered (i.e., not reused) files out of the total files to be recovered + */ + public synchronized float recoveredFilesPercent() { + int total = 0; + int recovered = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.reused() == false) { + total++; + if (file.fullyRecovered()) { + recovered++; + } + } + } + if (total == 0 && filesDetails.size() == 0) { // indicates we are still in init phase + return 0.0f; + } + if (total == recovered) { + return 100.0f; + } else { + float result = 100.0f * (recovered / (float) total); + return result; + } + } + + /** + * total number of bytes in th shard + */ + public synchronized long totalBytes() { + long total = 0; + for (FileMetadata file : filesDetails.values()) { + total += file.length(); + } + return total; + } + + /** + * total number of bytes recovered so far, including both existing and reused + */ + public synchronized long recoveredBytes() { + long recovered = 0; + for (FileMetadata file : filesDetails.values()) { + recovered += file.recovered(); + } + return recovered; + } + + /** + * total bytes of files to be recovered (potentially not yet done) + */ + public synchronized long totalRecoverBytes() { + long total = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.reused() == false) { + total += file.length(); + } + } + return total; + } + + /** + * @return number of bytes still to recover, i.e. {@link ReplicationLuceneIndex#totalRecoverBytes()} minus {@link ReplicationLuceneIndex#recoveredBytes()}, or + * {@code -1} if the full set of files to recover is not yet known + */ + public synchronized long bytesStillToRecover() { + if (filesDetails.isComplete() == false) { + return -1L; + } + long total = 0L; + for (FileMetadata file : filesDetails.values()) { + if (file.reused() == false) { + total += file.length() - file.recovered(); + } + } + return total; + } + + /** + * percent of bytes recovered out of total files bytes *to be* recovered + */ + public synchronized float recoveredBytesPercent() { + long total = 0; + long recovered = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.reused() == false) { + total += file.length(); + recovered += file.recovered(); + } + } + if (total == 0 && filesDetails.size() == 0) { + // indicates we are still in init phase + return 0.0f; + } + if (total == recovered) { + return 100.0f; + } else { + return 100.0f * recovered / total; + } + } + + public synchronized int reusedFileCount() { + int reused = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.reused()) { + reused++; + } + } + return reused; + } + + public synchronized long reusedBytes() { + long reused = 0; + for (FileMetadata file : filesDetails.values()) { + if (file.reused()) { + reused += file.length(); + } + } + return reused; + } + + @Override + public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // stream size first, as it matters more and the files section can be long + builder.startObject(Fields.SIZE); + builder.humanReadableField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalBytes())); + builder.humanReadableField(Fields.REUSED_IN_BYTES, Fields.REUSED, new ByteSizeValue(reusedBytes())); + builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recoveredBytes())); + builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent())); + builder.endObject(); + + builder.startObject(Fields.FILES); + builder.field(Fields.TOTAL, totalFileCount()); + builder.field(Fields.REUSED, reusedFileCount()); + builder.field(Fields.RECOVERED, recoveredFileCount()); + builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent())); + filesDetails.toXContent(builder, params); + builder.endObject(); + builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(time())); + builder.humanReadableField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling()); + builder.humanReadableField(Fields.TARGET_THROTTLE_TIME_IN_MILLIS, Fields.TARGET_THROTTLE_TIME, targetThrottling()); + return builder; + } + + @Override + public synchronized String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return Strings.toString(builder); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } + + public synchronized FileMetadata getFileDetails(String dest) { + return filesDetails.get(dest); + } + + private static final class FilesDetails implements ToXContentFragment, Writeable { + protected final Map fileMetadataMap = new HashMap<>(); + protected boolean complete; + + public FilesDetails() {} + + FilesDetails(StreamInput in) throws IOException { + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + FileMetadata file = new FileMetadata(in); + fileMetadataMap.put(file.name, file); + } + if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { + complete = in.readBoolean(); + } else { + // This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not + // then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete + // so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path + // anyway since they always use IndexShard#getRecoveryState which is never transported over the wire. + complete = fileMetadataMap.isEmpty() == false; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + final FileMetadata[] files = values().toArray(new FileMetadata[0]); + out.writeVInt(files.length); + for (FileMetadata file : files) { + file.writeTo(out); + } + if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { + out.writeBoolean(complete); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (params.paramAsBoolean("detailed", false)) { + builder.startArray(Fields.DETAILS); + for (FileMetadata file : values()) { + file.toXContent(builder, params); + } + builder.endArray(); + } + + return builder; + } + + public void addFileDetails(String name, long length, boolean reused) { + assert complete == false : "addFileDetail for [" + name + "] when file details are already complete"; + FileMetadata existing = fileMetadataMap.put(name, new FileMetadata(name, length, reused)); + assert existing == null : "file [" + name + "] is already reported"; + } + + public void addRecoveredBytesToFile(String name, long bytes) { + FileMetadata file = fileMetadataMap.get(name); + assert file != null : "file [" + name + "] hasn't been reported"; + file.addRecoveredBytes(bytes); + } + + public FileMetadata get(String name) { + return fileMetadataMap.get(name); + } + + public void setComplete() { + complete = true; + } + + public int size() { + return fileMetadataMap.size(); + } + + public boolean isEmpty() { + return fileMetadataMap.isEmpty(); + } + + public void clear() { + fileMetadataMap.clear(); + complete = false; + } + + public Collection values() { + return fileMetadataMap.values(); + } + + public boolean isComplete() { + return complete; + } + } + + /** + * Metadata about a file + * + * @opensearch.internal + */ + public static final class FileMetadata implements ToXContentObject, Writeable { + private String name; + private long length; + private long recovered; + private boolean reused; + + public FileMetadata(String name, long length, boolean reused) { + assert name != null; + this.name = name; + this.length = length; + this.reused = reused; + } + + public FileMetadata(StreamInput in) throws IOException { + name = in.readString(); + length = in.readVLong(); + recovered = in.readVLong(); + reused = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(length); + out.writeVLong(recovered); + out.writeBoolean(reused); + } + + public void addRecoveredBytes(long bytes) { + assert reused == false : "file is marked as reused, can't update recovered bytes"; + assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; + recovered += bytes; + } + + /** + * file name + */ + public String name() { + return name; + } + + /** + * file length + */ + public long length() { + return length; + } + + /** + * number of bytes recovered for this file (so far). 0 if the file is reused + */ + public long recovered() { + return recovered; + } + + /** + * returns true if the file is reused from a local copy + */ + public boolean reused() { + return reused; + } + + public boolean fullyRecovered() { + return reused == false && length == recovered; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Fields.NAME, name); + builder.humanReadableField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, new ByteSizeValue(length)); + builder.field(Fields.REUSED, reused); + builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered)); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FileMetadata) { + FileMetadata other = (FileMetadata) obj; + return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered(); + } + return false; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + Long.hashCode(length); + result = 31 * result + Long.hashCode(recovered); + result = 31 * result + (reused ? 1 : 0); + return result; + } + + @Override + public String toString() { + return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])"; + } + } + + /** + * Duplicates many of Field names in {@link RecoveryState} + */ + static final class Fields { + static final String TOTAL_TIME = "total_time"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String NAME = "name"; + static final String RECOVERED = "recovered"; + static final String RECOVERED_IN_BYTES = "recovered_in_bytes"; + static final String LENGTH = "length"; + static final String LENGTH_IN_BYTES = "length_in_bytes"; + static final String FILES = "files"; + static final String TOTAL = "total"; + static final String TOTAL_IN_BYTES = "total_in_bytes"; + static final String REUSED = "reused"; + static final String REUSED_IN_BYTES = "reused_in_bytes"; + static final String PERCENT = "percent"; + static final String DETAILS = "details"; + static final String SIZE = "size"; + static final String SOURCE_THROTTLE_TIME = "source_throttle_time"; + static final String SOURCE_THROTTLE_TIME_IN_MILLIS = "source_throttle_time_in_millis"; + static final String TARGET_THROTTLE_TIME = "target_throttle_time"; + static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis"; + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 3bc5218e2f61f..e54d30c626812 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -133,6 +133,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -3139,7 +3140,7 @@ public void testRecoverFromLocalShard() throws IOException { RecoveryState recoveryState = targetShard.recoveryState(); assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); assertTrue(recoveryState.getIndex().fileDetails().size() > 0); - for (RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) { + for (ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) { if (file.reused()) { assertEquals(file.recovered(), 0); } else { diff --git a/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java b/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java index c325ac6bc754e..64b6490ace1c7 100644 --- a/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/StoreRecoveryTests.java @@ -61,7 +61,7 @@ import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.Uid; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -108,7 +108,7 @@ public void testAddIndices() throws IOException { writer.close(); } StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger); - RecoveryState.Index indexStats = new RecoveryState.Index(); + ReplicationLuceneIndex indexStats = new ReplicationLuceneIndex(); Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); @@ -174,7 +174,7 @@ public void testSplitShard() throws IOException { writer.commit(); writer.close(); StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger); - RecoveryState.Index indexStats = new RecoveryState.Index(); + ReplicationLuceneIndex indexStats = new ReplicationLuceneIndex(); Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); @@ -250,7 +250,7 @@ public void testSplitShard() throws IOException { public void testStatsDirWrapper() throws IOException { Directory dir = newDirectory(); Directory target = newDirectory(); - RecoveryState.Index indexStats = new RecoveryState.Index(); + ReplicationLuceneIndex indexStats = new ReplicationLuceneIndex(); StoreRecovery.StatsDirectoryWrapper wrapper = new StoreRecovery.StatsDirectoryWrapper(target, indexStats); try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) { CodecUtil.writeHeader(output, "foo", 0); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index 80326fbf2de6b..1739f546150d9 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -94,6 +94,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; @@ -189,12 +190,14 @@ public void testSendFiles() throws Throwable { writer.close(); Store.MetadataSnapshot metadata = store.getMetadata(null); + ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { metas.add(md); + luceneIndex.addFileDetail(md.name(), md.length(), false); } Store targetStore = newStore(createTempDir()); - MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, luceneIndex, "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { @Override public void writeFileChunk( @@ -508,10 +511,12 @@ public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { writer.commit(); writer.close(); + ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex(); Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { metas.add(md); + luceneIndex.addFileDetail(md.name(), md.length(), false); } CorruptionUtils.corruptFile( @@ -522,7 +527,7 @@ public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { ) ); Store targetStore = newStore(createTempDir(), false); - MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, luceneIndex, "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { @Override public void writeFileChunk( diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java index dd4b17fbac5de..bb296f32ca8bd 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java @@ -41,8 +41,8 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.recovery.RecoveryState.FileDetail; -import org.opensearch.indices.recovery.RecoveryState.Index; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex.FileMetadata; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.recovery.RecoveryState.Stage; import org.opensearch.indices.recovery.RecoveryState.Translog; import org.opensearch.indices.recovery.RecoveryState.VerifyIndex; @@ -136,11 +136,11 @@ ReplicationTimer createObj(StreamInput in) throws IOException { public void testIndexTimer() throws Throwable { AtomicBoolean stop = new AtomicBoolean(); - Index index = new Index(); - Streamer streamer = new Streamer<>(stop, index) { + ReplicationLuceneIndex index = new ReplicationLuceneIndex(); + Streamer streamer = new Streamer<>(stop, index) { @Override - Index createObj(StreamInput in) throws IOException { - return new Index(in); + ReplicationLuceneIndex createObj(StreamInput in) throws IOException { + return new ReplicationLuceneIndex(in); } }; doTimerTest(index, streamer); @@ -200,8 +200,8 @@ private void doTimerTest(ReplicationTimer timer, Streamer filesToRecover = new ArrayList<>(); + FileMetadata[] files = new FileMetadata[randomIntBetween(1, 20)]; + ArrayList filesToRecover = new ArrayList<>(); long totalFileBytes = 0; long totalReusedBytes = 0; int totalReused = 0; @@ -209,7 +209,7 @@ public void testIndex() throws Throwable { final int fileLength = randomIntBetween(1, 1000); final boolean reused = randomBoolean(); totalFileBytes += fileLength; - files[i] = new FileDetail("f_" + i, fileLength, reused); + files[i] = new FileMetadata("f_" + i, fileLength, reused); if (reused) { totalReused++; totalReusedBytes += fileLength; @@ -219,7 +219,7 @@ public void testIndex() throws Throwable { } Collections.shuffle(Arrays.asList(files), random()); - final RecoveryState.Index index = new RecoveryState.Index(); + final ReplicationLuceneIndex index = new ReplicationLuceneIndex(); assertThat(index.bytesStillToRecover(), equalTo(-1L)); if (randomBoolean()) { @@ -246,11 +246,11 @@ public void testIndex() throws Throwable { // before we start we must report 0 assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0)); assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0)); - assertThat(index.sourceThrottling().nanos(), equalTo(Index.UNKNOWN)); - assertThat(index.targetThrottling().nanos(), equalTo(Index.UNKNOWN)); + assertThat(index.sourceThrottling().nanos(), equalTo(ReplicationLuceneIndex.UNKNOWN)); + assertThat(index.targetThrottling().nanos(), equalTo(ReplicationLuceneIndex.UNKNOWN)); index.start(); - for (FileDetail file : files) { + for (FileMetadata file : files) { index.addFileDetail(file.name(), file.length(), file.reused()); } @@ -278,24 +278,24 @@ public void testIndex() throws Throwable { } AtomicBoolean streamShouldStop = new AtomicBoolean(); - Streamer backgroundReader = new Streamer(streamShouldStop, index) { + Streamer backgroundReader = new Streamer(streamShouldStop, index) { @Override - Index createObj(StreamInput in) throws IOException { - return new Index(in); + ReplicationLuceneIndex createObj(StreamInput in) throws IOException { + return new ReplicationLuceneIndex(in); } }; backgroundReader.start(); long recoveredBytes = 0; - long sourceThrottling = Index.UNKNOWN; - long targetThrottling = Index.UNKNOWN; + long sourceThrottling = ReplicationLuceneIndex.UNKNOWN; + long targetThrottling = ReplicationLuceneIndex.UNKNOWN; while (bytesToRecover > 0) { - FileDetail file = randomFrom(filesToRecover); + FileMetadata file = randomFrom(filesToRecover); final long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered()))); final long throttledOnSource = rarely() ? randomIntBetween(10, 200) : 0; index.addSourceThrottling(throttledOnSource); - if (sourceThrottling == Index.UNKNOWN) { + if (sourceThrottling == ReplicationLuceneIndex.UNKNOWN) { sourceThrottling = throttledOnSource; } else { sourceThrottling += throttledOnSource; @@ -303,7 +303,7 @@ Index createObj(StreamInput in) throws IOException { index.addRecoveredBytesToFile(file.name(), toRecover); file.addRecoveredBytes(toRecover); final long throttledOnTarget = rarely() ? randomIntBetween(10, 200) : 0; - if (targetThrottling == Index.UNKNOWN) { + if (targetThrottling == ReplicationLuceneIndex.UNKNOWN) { targetThrottling = throttledOnTarget; } else { targetThrottling += throttledOnTarget; @@ -325,7 +325,7 @@ Index createObj(StreamInput in) throws IOException { logger.info("testing serialized information"); streamShouldStop.set(true); backgroundReader.join(); - final Index lastRead = backgroundReader.lastRead(); + final ReplicationLuceneIndex lastRead = backgroundReader.lastRead(); assertThat(lastRead.fileDetails().toArray(), arrayContainingInAnyOrder(index.fileDetails().toArray())); assertThat(lastRead.startTime(), equalTo(index.startTime())); if (completeRecovery) { @@ -543,12 +543,12 @@ VerifyIndex createObj(StreamInput in) throws IOException { } public void testConcurrentModificationIndexFileDetailsMap() throws InterruptedException { - final Index index = new Index(); + final ReplicationLuceneIndex index = new ReplicationLuceneIndex(); final AtomicBoolean stop = new AtomicBoolean(false); - Streamer readWriteIndex = new Streamer(stop, index) { + Streamer readWriteIndex = new Streamer(stop, index) { @Override - Index createObj(StreamInput in) throws IOException { - return new Index(in); + ReplicationLuceneIndex createObj(StreamInput in) throws IOException { + return new ReplicationLuceneIndex(in); } }; Thread modifyThread = new Thread() { @@ -568,14 +568,14 @@ public void run() { } public void testFileHashCodeAndEquals() { - FileDetail f = new FileDetail("foo", randomIntBetween(0, 100), randomBoolean()); - FileDetail anotherFile = new FileDetail(f.name(), f.length(), f.reused()); + FileMetadata f = new FileMetadata("foo", randomIntBetween(0, 100), randomBoolean()); + FileMetadata anotherFile = new FileMetadata(f.name(), f.length(), f.reused()); assertEquals(f, anotherFile); assertEquals(f.hashCode(), anotherFile.hashCode()); int iters = randomIntBetween(10, 100); for (int i = 0; i < iters; i++) { - f = new FileDetail("foo", randomIntBetween(0, 100), randomBoolean()); - anotherFile = new FileDetail(f.name(), randomIntBetween(0, 100), randomBoolean()); + f = new FileMetadata("foo", randomIntBetween(0, 100), randomBoolean()); + anotherFile = new FileMetadata(f.name(), randomIntBetween(0, 100), randomBoolean()); if (f.equals(anotherFile)) { assertEquals(f.hashCode(), anotherFile.hashCode()); } else if (f.hashCode() != anotherFile.hashCode()) { diff --git a/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java index 2bfcec6e75ffc..53f124a91f0ac 100644 --- a/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java @@ -68,6 +68,7 @@ import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.snapshots.Snapshot; @@ -203,12 +204,12 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { futureC.actionGet(); assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size() - 2); assertEquals(secondState.getIndex().recoveredFileCount(), 2); - List recoveredFiles = secondState.getIndex() + List recoveredFiles = secondState.getIndex() .fileDetails() .stream() .filter(f -> f.reused() == false) .collect(Collectors.toList()); - Collections.sort(recoveredFiles, Comparator.comparing(RecoveryState.FileDetail::name)); + Collections.sort(recoveredFiles, Comparator.comparing(ReplicationLuceneIndex.FileMetadata::name)); assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv")); assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_" + incIndexCommit.getGeneration())); } finally { diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java index e7eb9cbf24015..efd31dda92d09 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java @@ -45,6 +45,7 @@ import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.test.OpenSearchTestCase; @@ -91,21 +92,7 @@ public void testRestRecoveryAction() { when(targetNode.getHostName()).thenReturn(randomAlphaOfLength(8)); when(state.getTargetNode()).thenReturn(targetNode); - RecoveryState.Index index = mock(RecoveryState.Index.class); - - final int totalRecoveredFiles = randomIntBetween(1, 64); - when(index.totalRecoverFiles()).thenReturn(totalRecoveredFiles); - final int recoveredFileCount = randomIntBetween(0, totalRecoveredFiles); - when(index.recoveredFileCount()).thenReturn(recoveredFileCount); - when(index.recoveredFilesPercent()).thenReturn((100f * recoveredFileCount) / totalRecoveredFiles); - when(index.totalFileCount()).thenReturn(randomIntBetween(totalRecoveredFiles, 2 * totalRecoveredFiles)); - - final int totalRecoveredBytes = randomIntBetween(1, 1 << 24); - when(index.totalRecoverBytes()).thenReturn((long) totalRecoveredBytes); - final int recoveredBytes = randomIntBetween(0, totalRecoveredBytes); - when(index.recoveredBytes()).thenReturn((long) recoveredBytes); - when(index.recoveredBytesPercent()).thenReturn((100f * recoveredBytes) / totalRecoveredBytes); - when(index.totalRecoverBytes()).thenReturn((long) randomIntBetween(totalRecoveredBytes, 2 * totalRecoveredBytes)); + ReplicationLuceneIndex index = createTestIndex(); when(state.getIndex()).thenReturn(index); final RecoveryState.Translog translog = mock(RecoveryState.Translog.class); @@ -214,6 +201,36 @@ public void testRestRecoveryAction() { } } + private ReplicationLuceneIndex createTestIndex() { + ReplicationLuceneIndex index = new ReplicationLuceneIndex(); + final int filesToRecoverCount = randomIntBetween(1, 64); + final int recoveredFilesCount = randomIntBetween(0, filesToRecoverCount); + addTestFileMetadata(index, 0, recoveredFilesCount, false, true); + addTestFileMetadata(index, recoveredFilesCount, filesToRecoverCount, false, false); + + final int totalFilesCount = randomIntBetween(filesToRecoverCount, 2 * filesToRecoverCount); + addTestFileMetadata(index, filesToRecoverCount, totalFilesCount, true, false); + return index; + } + + private void addTestFileMetadata(ReplicationLuceneIndex index, int startIndex, int endIndex, boolean reused, boolean isFullyRecovered) { + for (int i = startIndex; i < endIndex; i++) { + final int completeFileSize = randomIntBetween(1, 1024); + index.addFileDetail(String.valueOf(i), completeFileSize, reused); + + if (!reused) { + final int recoveredFileSize; + if (isFullyRecovered) { + recoveredFileSize = completeFileSize; + + } else { + recoveredFileSize = randomIntBetween(0, completeFileSize); + } + index.addRecoveredBytesToFile(String.valueOf(i), recoveredFileSize); + } + } + } + private static String percent(float percent) { return String.format(Locale.ROOT, "%1.1f%%", percent); }