From 169cfc4333050947549f2df2be52af46b5297d61 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 24 Mar 2022 13:38:38 -0700 Subject: [PATCH] More PR review changes. - Updated TrackShardRequestHandler to send error case back to replicas. - Renamed additionalFiles to pendingDeleteFiles in TransportCheckpointInfoResponse. - Refactored Store.cleanupAndVerify methods to remove duplication. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 1 + .../opensearch/index/shard/IndexShard.java | 3 + .../org/opensearch/index/store/Store.java | 74 ++++++++----------- .../TransportCheckpointInfoResponse.java | 14 ++-- .../SegmentReplicationPrimaryService.java | 16 +++- .../copy/SegmentReplicationTarget.java | 6 +- 6 files changed, 61 insertions(+), 53 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 16d42a8264d7d..f7d0de08ed65c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -75,6 +75,7 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); } flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. Thread.sleep(1000); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b0f46029ccfaf..b1258fe3f1769 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1349,6 +1349,7 @@ public void rollTranslogGeneration() { public void forceMerge(ForceMergeRequest forceMerge) throws IOException { if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) { + // With segment replication enabled, replicas do not perform this operation. return; } verifyActive(); @@ -3016,6 +3017,8 @@ public void startRecovery( try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); if (indexSettings.isSegrepEnabled()) { + // Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary + // and started with the latest set of segments. segmentReplicationReplicaService.startRecovery( this, recoveryState.getTargetNode(), diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2dea831be5d7b..e89abab2d1c49 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; @@ -672,26 +673,7 @@ private static void failIfCorrupted(Directory directory) throws IOException { public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) throws IOException { metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - for (String existingFile : directory.listAll()) { - if (Store.isAutogenerated(existingFile) || sourceMetadata.contains(existingFile)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - // FNF should not happen since we hold a write lock? - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } + cleanupFiles(reason, sourceMetadata, null); directory.syncMetaData(); final Store.MetadataSnapshot metadataOrEmpty = getMetadata((IndexCommit) null); verifyAfterCleanup(sourceMetadata, metadataOrEmpty); @@ -712,37 +694,43 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. - final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - for (String existingFile : directory.listAll()) { - if (Store.isAutogenerated(existingFile) - || remoteSnapshot.contains(existingFile) - || latestCommitPointMetadata.contains(existingFile)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - // FNF should not happen since we hold a write lock? - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } + final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); + cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); verifyAfterCleanup(remoteSnapshot, localSnapshot); } finally { metadataLock.writeLock().unlock(); } } + private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + throws IOException { + assert metadataLock.isWriteLockedByCurrentThread(); + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || remoteSnapshot.contains(existingFile) + || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java index 29b4b3c5707c5..77f8643a5dce1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java @@ -23,7 +23,9 @@ public class TransportCheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot snapshot; private final byte[] infosBytes; - private final Set additionalFiles; + // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos + // but are still referenced by the latest commit point (Segments_N). + private final Set pendingDeleteFiles; public TransportCheckpointInfoResponse( final ReplicationCheckpoint checkpoint, @@ -34,14 +36,14 @@ public TransportCheckpointInfoResponse( this.checkpoint = checkpoint; this.snapshot = snapshot; this.infosBytes = infosBytes; - this.additionalFiles = additionalFiles; + this.pendingDeleteFiles = additionalFiles; } public TransportCheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.snapshot = new Store.MetadataSnapshot(in); this.infosBytes = in.readByteArray(); - this.additionalFiles = in.readSet(StoreFileMetadata::new); + this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); } @Override @@ -49,7 +51,7 @@ public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); snapshot.writeTo(out); out.writeByteArray(infosBytes); - out.writeCollection(additionalFiles); + out.writeCollection(pendingDeleteFiles); } public ReplicationCheckpoint getCheckpoint() { @@ -64,7 +66,7 @@ public byte[] getInfosBytes() { return infosBytes; } - public Set getAdditionalFiles() { - return additionalFiles; + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index 58ca0c0745c56..4f0818a71bbc3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.ChannelActionListener; @@ -237,7 +238,20 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, final StepListener addRetentionLeaseStep = new StepListener<>(); final Consumer onFailure = e -> { assert Transports.assertNotTransportThread(this + "[onFailure]"); - logger.error("Failure", e); + logger.error( + new ParameterizedMessage( + "Error marking shard {} as tracked for allocation ID {}", + shardId, + request.getTargetAllocationId() + ), + e + ); + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send back failure on track shard request", inner); + } }; PrimaryShardReplicationHandler.runUnderPrimaryPermit( () -> shard.cloneLocalPeerRecoveryRetentionLease( diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index b34207c5fd355..2c8d1e19c8c92 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -152,12 +152,12 @@ private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListen .collect(Collectors.toList()); Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); - final Set additionalFiles = checkpointInfo.getAdditionalFiles() + final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() .stream() .filter(f -> storeFiles.contains(f.name()) == false) .collect(Collectors.toSet()); - filesToFetch.addAll(additionalFiles); + filesToFetch.addAll(pendingDeleteFiles); for (StoreFileMetadata file : filesToFetch) { state.getIndex().addFileDetail(file.name(), file.length(), false); @@ -185,7 +185,7 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR assert indexShard.assertRetentionLeasesPersisted(); } - // Deserialize the new SegmentInfos object sent primary the primary. + // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); SegmentInfos infos = SegmentInfos.readCommit( store.directory(),