Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Segment Replication] Added source-side classes for orchestrating replication events. #4128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ protected SegmentInfos getLatestSegmentInfos() {
return null;
};

/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
* object directly, this method returns a {@link GatedCloseable} reference to the same object.
* This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable}
* which is run when the reference is closed. The default implementation of the clean-up
* procedure is a no-op.
*
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that
* must be closed for segment files to be deleted.
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// default implementation
return new GatedCloseable<>(getLatestSegmentInfos(), () -> {});
}

public MergeStats getMergeStats() {
return new MergeStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,22 @@ public SegmentInfos getLatestSegmentInfos() {
}
}

/**
* Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()}
* but also increment the ref-count to ensure that these segment files are retained
* until the reference is closed. On close, the ref-count is decremented.
*/
@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.engine.RefreshFailedEngineException;
import org.opensearch.index.engine.SafeCommitInfo;
Expand Down Expand Up @@ -2650,6 +2652,28 @@ public long getLocalCheckpoint() {
return getEngine().getPersistedLocalCheckpoint();
}

/**
* Fetch the latest checkpoint that has been processed but not necessarily persisted. This should be used only when Segment Replication is enabled.
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
assert indexSettings.isSegRepEnabled();
// Returns checkpoint only if the current engine is an instance of NRTReplicationEngine or InternalEngine
return getReplicationEngine().map(NRTReplicationEngine::getProcessedLocalCheckpoint).orElseGet(() -> {
final Engine engine = getEngine();
assert engine instanceof InternalEngine;
return ((InternalEngine) engine).getProcessedLocalCheckpoint();
});
}

private Optional<NRTReplicationEngine> getReplicationEngine() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not part of actual PR. But to fix the breaking change and write getProcessedLocalCheckpoint() in Indexshard.java in a cleaner way. This method is picked from another Segment Replication PR, which will be backported later. While backporting with PR, we should not include getReplicationEngine() in IndexShard.java

if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

/**
* Returns the global checkpoint for the shard.
*
Expand Down Expand Up @@ -4017,4 +4041,14 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException {
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

/**
* Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped
* by a a {@link GatedCloseable} to ensure files are not deleted/merged away.
*
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}
}
97 changes: 62 additions & 35 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
return getMetadata(commit, false);
}

/**
* Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input.
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null, false);
}

/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
Expand Down Expand Up @@ -315,6 +322,16 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t
}
}

/**
* Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object.
* In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios
* where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that
* may not have a IndexCommit associated with it, such as with segment replication.
*/
public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException {
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -477,7 +494,7 @@ public static MetadataSnapshot readMetadataSnapshot(
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return new MetadataSnapshot((IndexCommit) null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
Expand Down Expand Up @@ -682,7 +699,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
Expand Down Expand Up @@ -822,7 +839,14 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
this(loadMetadata(commit, directory, logger));
}

MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
this(loadMetadata(segmentInfos, directory, logger));
}

private MetadataSnapshot(LoadedMetadata loadedMetadata) {
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
Expand Down Expand Up @@ -890,40 +914,9 @@ static class LoadedMetadata {
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return loadMetadata(segmentCommitInfos, directory, logger);
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -949,6 +942,40 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}
throw ex;
}
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Map<String, StoreFileMetadata> builder = new HashMap<>();
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;

/**
* Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from
* a {@link SegmentReplicationSource}. This object is created by the target node and sent
* to the source node.
*
* @opensearch.internal
*/
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {

private final ReplicationCheckpoint checkpoint;

public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}

public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;
import java.util.List;

/**
* Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}.
* This object is created by the target node and sent to the source node.
*
* @opensearch.internal
*/
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {

private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;

public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
}

public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
List<StoreFileMetadata> filesToFetch,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.filesToFetch = filesToFetch;
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(filesToFetch);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Loading