Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Segment Replication] Compatibility check for differing lucene codec versions #6991

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,13 @@ public boolean isSystem() {
return indexSettings.getIndexMetadata().isSystem();
}

/**
* Returns the name of the default codec in codecService
*/
public String getDefaultCodecName() {
return codecService.codec(CodecService.DEFAULT_CODEC).getName();
}

/**
* USE THIS METHOD WITH CARE!
* Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about
Expand Down Expand Up @@ -1489,7 +1496,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
return null;
}
if (getEngineOrNull() == null) {
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId));
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()));
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
Expand All @@ -1506,13 +1513,14 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
// getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues.
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
getEngine().config().getCodec().getName()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())));
}

/**
Expand Down Expand Up @@ -1582,6 +1590,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) {
logger.trace(
() -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec())
);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (copyState.getCheckpoint().getCodec().equals(request.getCheckpoint().getCodec()) == false) {
logger.trace("Requested unsupported codec version {}", request.getCheckpoint().getCodec());
throw new CancellableThreads.ExecutionCancelledException(
new ParameterizedMessage("Requested unsupported codec version {}", request.getCheckpoint().getCodec()).toString()
);
}
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId()),
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.replication.checkpoint;

import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -30,37 +31,46 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long segmentsGen;
private final long segmentInfosVersion;
private final long length;
private final String codec;

public static ReplicationCheckpoint empty(ShardId shardId) {
return new ReplicationCheckpoint(shardId);
public static ReplicationCheckpoint empty(ShardId shardId, String codec) {
return new ReplicationCheckpoint(shardId, codec);
}

private ReplicationCheckpoint(ShardId shardId) {
private ReplicationCheckpoint(ShardId shardId, String codec) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
length = 0L;
this.codec = codec;
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L);
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec);
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length) {
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.segmentInfosVersion = segmentInfosVersion;
this.length = length;
this.codec = codec;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
segmentInfosVersion = in.readLong();
length = in.readLong();
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
length = in.readLong();
codec = in.readString();
} else {
length = 0L;
codec = null;
}
}

/**
Expand Down Expand Up @@ -102,13 +112,25 @@ public long getLength() {
return length;
}

/**
* Latest supported codec version
*
* @return the codec name
*/
public String getCodec() {
return codec;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(segmentInfosVersion);
out.writeLong(length);
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeLong(length);
out.writeString(codec);
}
}

@Override
Expand All @@ -124,7 +146,8 @@ public boolean equals(Object o) {
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
&& Objects.equals(shardId, that.shardId)
&& codec.equals(that.codec);
}

@Override
Expand Down Expand Up @@ -155,6 +178,8 @@ public String toString() {
+ segmentInfosVersion
+ ", size="
+ length
+ ", codec="
+ codec
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.gateway;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -220,9 +222,9 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -253,9 +255,9 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -319,9 +321,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -351,9 +353,9 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -384,9 +386,9 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -809,11 +811,23 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), null);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
null
);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), storeException);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
storeException
);
}

public TestAllocator addData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -1800,9 +1801,30 @@ public void testSegmentReplicationCheckpointTracking() {
.filter(id -> tracker.shardAllocationId.equals(id) == false)
.collect(Collectors.toSet());

final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
1L,
Codec.getDefault().getName()
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.setLatestReplicationCheckpoint(secondCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.shard;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -306,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard);
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
Expand Down Expand Up @@ -1020,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary);
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1034,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId),
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Loading