Skip to content

Commit

Permalink
More PR review changes.
Browse files Browse the repository at this point in the history
- Updated TrackShardRequestHandler to send error case back to replicas.
- Renamed additionalFiles to pendingDeleteFiles in TransportCheckpointInfoResponse.
- Refactored Store.cleanupAndVerify methods to remove duplication.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Mar 25, 2022
1 parent 7d5ab35 commit 169cfc4
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testReplicationAfterForceMerge() throws Exception {
waitForDocs(initialDocCount, indexer);
}
flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,7 @@ public void rollTranslogGeneration() {

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) {
// With segment replication enabled, replicas do not perform this operation.
return;
}
verifyActive();
Expand Down Expand Up @@ -3016,6 +3017,8 @@ public void startRecovery(
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
if (indexSettings.isSegrepEnabled()) {
// Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary
// and started with the latest set of segments.
segmentReplicationReplicaService.startRecovery(
this,
recoveryState.getTargetNode(),
Expand Down
74 changes: 31 additions & 43 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -672,26 +673,7 @@ private static void failIfCorrupted(Directory directory) throws IOException {
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) throws IOException {
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile) || sourceMetadata.contains(existingFile)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
cleanupFiles(reason, sourceMetadata, null);
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata((IndexCommit) null);
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
Expand All @@ -712,37 +694,43 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null);
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| remoteSnapshot.contains(existingFile)
|| latestCommitPointMetadata.contains(existingFile)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null);
cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata);
verifyAfterCleanup(remoteSnapshot, localSnapshot);
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| remoteSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public class TransportCheckpointInfoResponse extends TransportResponse {
private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot snapshot;
private final byte[] infosBytes;
private final Set<StoreFileMetadata> additionalFiles;
// pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos
// but are still referenced by the latest commit point (Segments_N).
private final Set<StoreFileMetadata> pendingDeleteFiles;

public TransportCheckpointInfoResponse(
final ReplicationCheckpoint checkpoint,
Expand All @@ -34,22 +36,22 @@ public TransportCheckpointInfoResponse(
this.checkpoint = checkpoint;
this.snapshot = snapshot;
this.infosBytes = infosBytes;
this.additionalFiles = additionalFiles;
this.pendingDeleteFiles = additionalFiles;
}

public TransportCheckpointInfoResponse(StreamInput in) throws IOException {
this.checkpoint = new ReplicationCheckpoint(in);
this.snapshot = new Store.MetadataSnapshot(in);
this.infosBytes = in.readByteArray();
this.additionalFiles = in.readSet(StoreFileMetadata::new);
this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
checkpoint.writeTo(out);
snapshot.writeTo(out);
out.writeByteArray(infosBytes);
out.writeCollection(additionalFiles);
out.writeCollection(pendingDeleteFiles);
}

public ReplicationCheckpoint getCheckpoint() {
Expand All @@ -64,7 +66,7 @@ public byte[] getInfosBytes() {
return infosBytes;
}

public Set<StoreFileMetadata> getAdditionalFiles() {
return additionalFiles;
public Set<StoreFileMetadata> getPendingDeleteFiles() {
return pendingDeleteFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -237,7 +238,20 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel,
final StepListener<ReplicationResponse> addRetentionLeaseStep = new StepListener<>();
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(this + "[onFailure]");
logger.error("Failure", e);
logger.error(
new ParameterizedMessage(
"Error marking shard {} as tracked for allocation ID {}",
shardId,
request.getTargetAllocationId()
),
e
);
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on track shard request", inner);
}
};
PrimaryShardReplicationHandler.runUnderPrimaryPermit(
() -> shard.cloneLocalPeerRecoveryRetentionLease(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListen
.collect(Collectors.toList());

Set<String> storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll()));
final Set<StoreFileMetadata> additionalFiles = checkpointInfo.getAdditionalFiles()
final Set<StoreFileMetadata> pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles()
.stream()
.filter(f -> storeFiles.contains(f.name()) == false)
.collect(Collectors.toSet());

filesToFetch.addAll(additionalFiles);
filesToFetch.addAll(pendingDeleteFiles);

for (StoreFileMetadata file : filesToFetch) {
state.getIndex().addFileDetail(file.name(), file.length(), false);
Expand Down Expand Up @@ -185,7 +185,7 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR
assert indexShard.assertRetentionLeasesPersisted();
}

// Deserialize the new SegmentInfos object sent primary the primary.
// Deserialize the new SegmentInfos object sent from the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();
SegmentInfos infos = SegmentInfos.readCommit(
store.directory(),
Expand Down

0 comments on commit 169cfc4

Please sign in to comment.