From c7e3b023a8f359c0d23be4cfdfa96d9106448bdd Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 6 Apr 2023 10:03:38 -0700 Subject: [PATCH] [Segment Replication] Compatibility check for differing lucene codec versions (#6730) (#6991) This change aims to fail segment replications between the primary and replica if they are utilizing differing lucene codec versions. This is to avoid the current behavior of failing the replica shard in such situations. (cherry picked from commit c334bbde881c61e66b7b4768e2d43455f1c827d9) Signed-off-by: Poojita Raj --- .../opensearch/index/shard/IndexShard.java | 20 ++++++-- .../OngoingSegmentReplications.java | 7 +++ .../SegmentReplicationTargetService.java | 2 +- .../checkpoint/ReplicationCheckpoint.java | 43 +++++++++++++---- .../gateway/PrimaryShardAllocatorTests.java | 46 +++++++++++------- .../index/seqno/ReplicationTrackerTests.java | 28 +++++++++-- .../SegmentReplicationIndexShardTests.java | 7 +-- .../OngoingSegmentReplicationsTests.java | 47 ++++++++++++++++++- .../PrimaryShardReplicationSourceTests.java | 33 +++++++++++-- .../SegmentReplicationSourceServiceTests.java | 9 +++- .../SegmentReplicationTargetServiceTests.java | 11 +++-- .../SegmentReplicationTargetTests.java | 4 +- .../PublishCheckpointActionTests.java | 5 +- .../replication/common/CopyStateTests.java | 15 +++++- .../index/shard/IndexShardTestCase.java | 7 ++- 15 files changed, 232 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9d06ce7c6a391..82a080e7dc512 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -496,6 +496,13 @@ public boolean isSystem() { return indexSettings.getIndexMetadata().isSystem(); } + /** + * Returns the name of the default codec in codecService + */ + public String getDefaultCodecName() { + return codecService.codec(CodecService.DEFAULT_CODEC).getName(); + } + /** * USE THIS METHOD WITH CARE! * Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about @@ -1489,7 +1496,7 @@ public Tuple, ReplicationCheckpoint> getLatestSegme return null; } if (getEngineOrNull() == null) { - return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)); + return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())); } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); @@ -1506,13 +1513,14 @@ public Tuple, ReplicationCheckpoint> getLatestSegme // getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues. shardRouting.primary() ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() - : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes() + : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(), + getEngine().config().getCodec().getName() ) ); } catch (IOException e) { throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()))); } /** @@ -1582,6 +1590,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } + if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) { + logger.trace( + () -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec()) + ); + return false; + } return true; } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6f04c6cf6f665..3ab0a7539fb06 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { if (segrepHandler != null) { logger.warn("Override handler for allocation id {}", request.getTargetAllocationId()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index bf626ff93760c..1858449e13ae8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha return; } startReplication( - ReplicationCheckpoint.empty(request.getShardId()), + ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()), indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 57e667b06a223..32521fb0cd944 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -30,29 +31,32 @@ public class ReplicationCheckpoint implements Writeable, Comparable tracker.shardAllocationId.equals(id) == false) .collect(Collectors.toSet()); - final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L); - final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L); - final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L); + final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 1, + 1, + 1L, + Codec.getDefault().getName() + ); + final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 2, + 2, + 50L, + Codec.getDefault().getName() + ); + final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 2, + 3, + 100L, + Codec.getDefault().getName() + ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.setLatestReplicationCheckpoint(secondCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 014a37249612b..c4db88782638f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.ExceptionsHelper; @@ -306,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); assertEquals(true, primaryShard.routingEntry().primary()); - spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard); + spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); // Verify that checkpoint is not processed as shard routing is primary. verify(spy, times(0)).startReplication(any(), any(), any()); @@ -1020,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary); + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1034,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId), + ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 78767ee1dcf8c..6e27a4db6afec 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexService; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -47,7 +48,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private final IndicesService mockIndicesService = mock(IndicesService.class); - private ReplicationCheckpoint testCheckpoint; + private ReplicationCheckpoint testCheckpoint, olderCodecTestCheckpoint; private DiscoveryNode primaryDiscoveryNode; private DiscoveryNode replicaDiscoveryNode; private IndexShard primary; @@ -73,8 +74,12 @@ public void setUp() throws Exception { ShardId testShardId = primary.shardId(); + CodecService codecService = new CodecService(null, null); + String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); + // This mirrors the creation of the ReplicationCheckpoint inside CopyState - testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L); + testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, defaultCodecName); + olderCodecTestCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, "Lucene94"); IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); @@ -89,6 +94,44 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testSuccessfulCodecCompatibilityCheck() throws Exception { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + // replica checkpoint is on same/higher lucene codec than primary + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + } + + public void testFailCodecCompatibilityCheck() throws Exception { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + // replica checkpoint is on lower/older lucene codec than primary + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + olderCodecTestCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + try { + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + } catch (CancellableThreads.ExecutionCancelledException ex) { + Assert.assertTrue(ex.getMessage().contains("Requested unsupported codec version")); + } + } + public void testPrepareAndSendSegments() throws IOException { indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); primary.refresh("Test"); diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index d925956bd95ef..995f38087297e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.util.Version; import org.junit.Assert; import org.opensearch.action.ActionListener; @@ -93,7 +94,13 @@ public void tearDown() throws Exception { } public void testGetCheckpointMetadata() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class)); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); assertEquals(1, requestList.length); @@ -104,7 +111,13 @@ public void testGetCheckpointMetadata() { } public void testGetSegmentFiles() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, @@ -126,7 +139,13 @@ public void testGetSegmentFiles() { */ public void testTransportTimeoutForGetSegmentFilesAction() { long fileSize = (long) (Math.pow(10, 9)); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, @@ -145,7 +164,13 @@ public void testTransportTimeoutForGetSegmentFilesAction() { public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 0d05b1ec8679e..41022b77b46e1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -55,7 +56,13 @@ public void setUp() throws Exception { when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); // This mirrors the creation of the ReplicationCheckpoint inside CopyState - testCheckpoint = new ReplicationCheckpoint(testShardId, mockIndexShard.getOperationPrimaryTerm(), 0L, 0L); + testCheckpoint = new ReplicationCheckpoint( + testShardId, + mockIndexShard.getOperationPrimaryTerm(), + 0L, + 0L, + Codec.getDefault().getName() + ); testThreadPool = new TestThreadPool("test", Settings.EMPTY); CapturingTransport transport = new CapturingTransport(); localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index bae0afb5bcc3b..357a88c27fc46 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -62,10 +63,12 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); + CodecService codecService = new CodecService(null, null); + String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); - checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L); + checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); @@ -76,13 +79,15 @@ public void setUp() throws Exception { initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm(), initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSegmentInfosVersion() + 1 + initialCheckpoint.getSegmentInfosVersion() + 1, + defaultCodecName ); newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1, initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSegmentInfosVersion() + 1 + initialCheckpoint.getSegmentInfosVersion() + 1, + defaultCodecName ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 599e73b548ddb..b36dbdba40be8 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; @@ -106,7 +107,8 @@ public void setUp() throws Exception { spyIndexShard.shardId(), spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), - testSegmentInfos.version + testSegmentInfos.version, + Codec.getDefault().getName() ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 2c05fbc9328e5..bc597fd39539f 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.apache.lucene.codecs.Codec; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; @@ -104,7 +105,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1, Codec.getDefault().getName()); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { @@ -139,7 +140,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1, Codec.getDefault().getName()); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index a87a8de206a39..e3b48302ae6ef 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -8,12 +8,14 @@ package org.opensearch.indices.replication.common; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; @@ -49,7 +51,10 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard); + CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(mockIndexShard.shardId(), new CodecService(null, null).codec("default").getName()), + mockIndexShard + ); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -67,7 +72,13 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockShard.shardId(), + mockShard.getOperationPrimaryTerm(), + 0L, + 0L, + Codec.getDefault().getName() + ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ab0cf38f77c7d..f2dd0a7e5def0 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1299,7 +1299,10 @@ public void getCheckpointMetadata( ActionListener listener ) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + final CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()), + primaryShard + ); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1353,7 +1356,7 @@ public final List replicateSegments(IndexShard primary for (IndexShard replica : replicaShards) { final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId), + ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override