Skip to content

Commit

Permalink
Revert "[Segment Replication] Remove codec name string match check fo…
Browse files Browse the repository at this point in the history
…r checkpoints (opensearch-project#7741)"

This reverts commit 89edd55.
  • Loading branch information
dreamer-89 committed Jun 2, 2023
1 parent 36ea4b2 commit c0be2bc
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) {
logger.trace(
() -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec())
);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (copyState.getCheckpoint().getCodec().equals(request.getCheckpoint().getCodec()) == false) {
logger.trace("Requested unsupported codec version {}", request.getCheckpoint().getCodec());
throw new CancellableThreads.ExecutionCancelledException(
new ParameterizedMessage("Requested unsupported codec version {}", request.getCheckpoint().getCodec()).toString()
);
}
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
public class OngoingSegmentReplicationsTests extends IndexShardTestCase {

private final IndicesService mockIndicesService = mock(IndicesService.class);
private ReplicationCheckpoint testCheckpoint;
private ReplicationCheckpoint testCheckpoint, olderCodecTestCheckpoint;
private DiscoveryNode primaryDiscoveryNode;
private DiscoveryNode replicaDiscoveryNode;
private IndexShard primary;
Expand Down Expand Up @@ -79,6 +79,7 @@ public void setUp() throws Exception {

// This mirrors the creation of the ReplicationCheckpoint inside CopyState
testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, defaultCodecName);
olderCodecTestCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, "Lucene94");
IndexService mockIndexService = mock(IndexService.class);
when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndexService.getShard(testShardId.id())).thenReturn(primary);
Expand All @@ -93,6 +94,44 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testSuccessfulCodecCompatibilityCheck() throws Exception {
indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
primary.refresh("Test");
OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings));
// replica checkpoint is on same/higher lucene codec than primary
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> {
listener.onResponse(null);
};
final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
}

public void testFailCodecCompatibilityCheck() throws Exception {
indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
primary.refresh("Test");
OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings));
// replica checkpoint is on lower/older lucene codec than primary
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
olderCodecTestCheckpoint
);
final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> {
listener.onResponse(null);
};
try {
final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
} catch (CancellableThreads.ExecutionCancelledException ex) {
Assert.assertTrue(ex.getMessage().contains("Requested unsupported codec version"));
}
}

public void testPrepareAndSendSegments() throws IOException {
indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
primary.refresh("Test");
Expand Down

0 comments on commit c0be2bc

Please sign in to comment.